Skip to main content

pulsedb/sync/
transport_http.rs

1//! HTTP sync transport implementation.
2//!
3//! [`HttpSyncTransport`] implements [`SyncTransport`] using `reqwest` for
4//! HTTP communication. Uses bincode serialization for compact payloads.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use pulsedb::sync::transport_http::HttpSyncTransport;
10//!
11//! let transport = HttpSyncTransport::new("http://server:3000");
12//! // or with authentication:
13//! let transport = HttpSyncTransport::with_auth("https://server:3000", "my-secret-token");
14//! ```
15
16use async_trait::async_trait;
17use reqwest::Client;
18use tracing::debug;
19
20use super::error::SyncError;
21use super::transport::SyncTransport;
22use super::types::{
23    HandshakeRequest, HandshakeResponse, PullRequest, PullResponse, PushResponse, SyncChange,
24};
25
26/// HTTP-based sync transport using reqwest.
27///
28/// Communicates with a remote PulseDB sync server over HTTP using
29/// bincode-serialized request/response bodies.
30///
31/// # Endpoints
32///
33/// | Method | Path | Request | Response |
34/// |--------|------|---------|----------|
35/// | POST | `/sync/handshake` | `HandshakeRequest` | `HandshakeResponse` |
36/// | POST | `/sync/push` | `Vec<SyncChange>` | `PushResponse` |
37/// | POST | `/sync/pull` | `PullRequest` | `PullResponse` |
38/// | GET | `/sync/health` | (none) | 200 OK |
39pub struct HttpSyncTransport {
40    client: Client,
41    base_url: String,
42    auth_token: Option<String>,
43}
44
45impl HttpSyncTransport {
46    /// Creates a new HTTP transport pointing at the given base URL.
47    ///
48    /// The URL should not include a trailing slash.
49    /// Example: `"http://localhost:3000"` or `"https://api.example.com"`
50    pub fn new(base_url: impl Into<String>) -> Self {
51        Self {
52            client: Client::new(),
53            base_url: base_url.into(),
54            auth_token: None,
55        }
56    }
57
58    /// Creates a new HTTP transport with Bearer token authentication.
59    ///
60    /// The token is sent as `Authorization: Bearer {token}` on every request.
61    pub fn with_auth(base_url: impl Into<String>, token: impl Into<String>) -> Self {
62        Self {
63            client: Client::new(),
64            base_url: base_url.into(),
65            auth_token: Some(token.into()),
66        }
67    }
68
69    /// Sends a POST request with a bincode body and deserializes the response.
70    async fn post_bincode<Req, Resp>(&self, path: &str, request: &Req) -> Result<Resp, SyncError>
71    where
72        Req: serde::Serialize,
73        Resp: serde::de::DeserializeOwned,
74    {
75        let url = format!("{}{}", self.base_url, path);
76        let body =
77            bincode::serialize(request).map_err(|e| SyncError::serialization(e.to_string()))?;
78
79        let mut req = self
80            .client
81            .post(&url)
82            .header("Content-Type", "application/octet-stream")
83            .body(body);
84
85        if let Some(ref token) = self.auth_token {
86            req = req.header("Authorization", format!("Bearer {}", token));
87        }
88
89        let response = req.send().await.map_err(|e| {
90            if e.is_timeout() {
91                SyncError::Timeout
92            } else if e.is_connect() {
93                SyncError::ConnectionLost
94            } else {
95                SyncError::transport(e.to_string())
96            }
97        })?;
98
99        let status = response.status();
100        if !status.is_success() {
101            let body_text = response.text().await.unwrap_or_else(|_| "unknown".into());
102            return Err(if status.is_client_error() {
103                SyncError::invalid_payload(format!("HTTP {}: {}", status, body_text))
104            } else {
105                SyncError::transport(format!("HTTP {}: {}", status, body_text))
106            });
107        }
108
109        let response_bytes = response
110            .bytes()
111            .await
112            .map_err(|e| SyncError::transport(format!("Failed to read response body: {}", e)))?;
113
114        bincode::deserialize(&response_bytes)
115            .map_err(|e| SyncError::serialization(format!("Response deserialization: {}", e)))
116    }
117}
118
119#[async_trait]
120impl SyncTransport for HttpSyncTransport {
121    async fn handshake(&self, request: HandshakeRequest) -> Result<HandshakeResponse, SyncError> {
122        debug!(url = %self.base_url, "HTTP sync handshake");
123        self.post_bincode("/sync/handshake", &request).await
124    }
125
126    async fn push_changes(&self, changes: Vec<SyncChange>) -> Result<PushResponse, SyncError> {
127        let count = changes.len();
128        debug!(count, "HTTP sync push");
129        self.post_bincode("/sync/push", &changes).await
130    }
131
132    async fn pull_changes(&self, request: PullRequest) -> Result<PullResponse, SyncError> {
133        debug!("HTTP sync pull");
134        self.post_bincode("/sync/pull", &request).await
135    }
136
137    async fn health_check(&self) -> Result<(), SyncError> {
138        let url = format!("{}/sync/health", self.base_url);
139
140        let mut req = self.client.get(&url);
141        if let Some(ref token) = self.auth_token {
142            req = req.header("Authorization", format!("Bearer {}", token));
143        }
144
145        let response = req.send().await.map_err(|e| {
146            if e.is_timeout() {
147                SyncError::Timeout
148            } else if e.is_connect() {
149                SyncError::ConnectionLost
150            } else {
151                SyncError::transport(e.to_string())
152            }
153        })?;
154
155        if response.status().is_success() {
156            Ok(())
157        } else {
158            Err(SyncError::transport(format!(
159                "Health check failed: HTTP {}",
160                response.status()
161            )))
162        }
163    }
164}
165
166// HttpSyncTransport is Send + Sync (reqwest::Client is Send + Sync)