pulsedb/sync/
transport_http.rs1use async_trait::async_trait;
17use reqwest::Client;
18use tracing::debug;
19
20use super::error::SyncError;
21use super::transport::SyncTransport;
22use super::types::{
23 HandshakeRequest, HandshakeResponse, PullRequest, PullResponse, PushResponse, SyncChange,
24};
25
26pub struct HttpSyncTransport {
40 client: Client,
41 base_url: String,
42 auth_token: Option<String>,
43}
44
45impl HttpSyncTransport {
46 pub fn new(base_url: impl Into<String>) -> Self {
51 Self {
52 client: Client::new(),
53 base_url: base_url.into(),
54 auth_token: None,
55 }
56 }
57
58 pub fn with_auth(base_url: impl Into<String>, token: impl Into<String>) -> Self {
62 Self {
63 client: Client::new(),
64 base_url: base_url.into(),
65 auth_token: Some(token.into()),
66 }
67 }
68
69 async fn post_bincode<Req, Resp>(&self, path: &str, request: &Req) -> Result<Resp, SyncError>
71 where
72 Req: serde::Serialize,
73 Resp: serde::de::DeserializeOwned,
74 {
75 let url = format!("{}{}", self.base_url, path);
76 let body =
77 bincode::serialize(request).map_err(|e| SyncError::serialization(e.to_string()))?;
78
79 let mut req = self
80 .client
81 .post(&url)
82 .header("Content-Type", "application/octet-stream")
83 .body(body);
84
85 if let Some(ref token) = self.auth_token {
86 req = req.header("Authorization", format!("Bearer {}", token));
87 }
88
89 let response = req.send().await.map_err(|e| {
90 if e.is_timeout() {
91 SyncError::Timeout
92 } else if e.is_connect() {
93 SyncError::ConnectionLost
94 } else {
95 SyncError::transport(e.to_string())
96 }
97 })?;
98
99 let status = response.status();
100 if !status.is_success() {
101 let body_text = response.text().await.unwrap_or_else(|_| "unknown".into());
102 return Err(if status.is_client_error() {
103 SyncError::invalid_payload(format!("HTTP {}: {}", status, body_text))
104 } else {
105 SyncError::transport(format!("HTTP {}: {}", status, body_text))
106 });
107 }
108
109 let response_bytes = response
110 .bytes()
111 .await
112 .map_err(|e| SyncError::transport(format!("Failed to read response body: {}", e)))?;
113
114 bincode::deserialize(&response_bytes)
115 .map_err(|e| SyncError::serialization(format!("Response deserialization: {}", e)))
116 }
117}
118
119#[async_trait]
120impl SyncTransport for HttpSyncTransport {
121 async fn handshake(&self, request: HandshakeRequest) -> Result<HandshakeResponse, SyncError> {
122 debug!(url = %self.base_url, "HTTP sync handshake");
123 self.post_bincode("/sync/handshake", &request).await
124 }
125
126 async fn push_changes(&self, changes: Vec<SyncChange>) -> Result<PushResponse, SyncError> {
127 let count = changes.len();
128 debug!(count, "HTTP sync push");
129 self.post_bincode("/sync/push", &changes).await
130 }
131
132 async fn pull_changes(&self, request: PullRequest) -> Result<PullResponse, SyncError> {
133 debug!("HTTP sync pull");
134 self.post_bincode("/sync/pull", &request).await
135 }
136
137 async fn health_check(&self) -> Result<(), SyncError> {
138 let url = format!("{}/sync/health", self.base_url);
139
140 let mut req = self.client.get(&url);
141 if let Some(ref token) = self.auth_token {
142 req = req.header("Authorization", format!("Bearer {}", token));
143 }
144
145 let response = req.send().await.map_err(|e| {
146 if e.is_timeout() {
147 SyncError::Timeout
148 } else if e.is_connect() {
149 SyncError::ConnectionLost
150 } else {
151 SyncError::transport(e.to_string())
152 }
153 })?;
154
155 if response.status().is_success() {
156 Ok(())
157 } else {
158 Err(SyncError::transport(format!(
159 "Health check failed: HTTP {}",
160 response.status()
161 )))
162 }
163 }
164}
165
166