1use braid_http::client::BraidClient;
2use braid_http::error::{BraidError, Result};
3use tokio::sync::mpsc;
4
5#[derive(Debug, Clone)]
7pub struct BlobSyncConfig {
8 pub url: String,
10 pub peer: String,
12 pub headers: std::collections::HashMap<String, String>,
14 pub reconnect_delay_ms: u64,
16}
17
18pub struct BlobSyncClient {
20 config: BlobSyncConfig,
21 client: BraidClient,
22 update_tx: mpsc::Sender<BlobUpdate>,
24}
25
26#[derive(Debug, Clone)]
28pub struct BlobUpdate {
29 pub key: String,
30 pub version: Vec<String>,
31 pub content_type: Option<String>,
32 pub data: Vec<u8>,
33}
34
35impl BlobSyncClient {
36 pub fn new(config: BlobSyncConfig) -> (Self, mpsc::Receiver<BlobUpdate>) {
38 let (update_tx, update_rx) = mpsc::channel(100);
39
40 (
41 Self {
42 config,
43 client: BraidClient::new().expect("Failed to create BraidClient"),
44 update_tx,
45 },
46 update_rx,
47 )
48 }
49
50 pub async fn sync<F, W, D>(&self, _on_read: F, on_write: W, _on_delete: D) -> Result<()>
52 where
53 F: Fn(
54 &str,
55 ) -> std::pin::Pin<
56 Box<dyn std::future::Future<Output = Result<Option<Vec<u8>>>> + Send>,
57 > + Send
58 + Sync,
59 W: Fn(
60 &str,
61 Vec<u8>,
62 )
63 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
64 + Send
65 + Sync,
66 D: Fn(&str) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
67 + Send
68 + Sync,
69 {
70 tracing::info!("Starting blob sync with {}", self.config.url);
71
72 let mut req = self
74 .client
75 .client()
76 .get(&self.config.url)
77 .header("Subscribe", "true")
78 .header("Peer", &self.config.peer);
79
80 for (key, value) in &self.config.headers {
81 req = req.header(key, value);
82 }
83
84 let response = req
85 .send()
86 .await
87 .map_err(|e| BraidError::Http(e.to_string()))?;
88 if !response.status().is_success() {
89 return Err(BraidError::Http(format!(
90 "Blob sync failed: {}",
91 response.status()
92 )));
93 }
94
95 let mut stream = response.bytes_stream();
96 use futures::StreamExt;
97
98 while let Some(chunk) = stream.next().await {
99 let chunk = chunk
100 .map_err(|e| BraidError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
101
102 let update = BlobUpdate {
103 key: self.config.url.clone(),
104 version: vec![],
105 content_type: None,
106 data: chunk.to_vec(),
107 };
108
109 on_write(&update.key, update.data.clone()).await?;
110 let _ = self.update_tx.send(update).await;
111 }
112
113 Ok(())
114 }
115
116 pub async fn put(
117 &self,
118 key: &str,
119 data: Vec<u8>,
120 version: Option<Vec<String>>,
121 ) -> Result<Vec<String>> {
122 let mut req = self
123 .client
124 .client()
125 .put(&format!("{}/{}", self.config.url, key))
126 .header("Peer", &self.config.peer)
127 .body(data);
128
129 if let Some(v) = &version {
130 let parents_str = v
131 .iter()
132 .map(|p| format!("\"{}\"", p))
133 .collect::<Vec<_>>()
134 .join(", ");
135 req = req.header("Parents", parents_str);
136 }
137
138 for (k, v) in &self.config.headers {
139 req = req.header(k, v);
140 }
141
142 let response = req
143 .send()
144 .await
145 .map_err(|e| BraidError::Http(e.to_string()))?;
146 if !response.status().is_success() {
147 return Err(BraidError::Http(format!(
148 "Blob put failed: {}",
149 response.status()
150 )));
151 }
152
153 let new_version = response
154 .headers()
155 .get("Version")
156 .and_then(|v| v.to_str().ok())
157 .map(|s| vec![s.to_string()])
158 .unwrap_or_default();
159
160 Ok(new_version)
161 }
162
163 pub async fn delete(&self, key: &str) -> Result<()> {
164 let mut req = self
165 .client
166 .client()
167 .delete(&format!("{}/{}", self.config.url, key))
168 .header("Peer", &self.config.peer);
169
170 for (k, v) in &self.config.headers {
171 req = req.header(k, v);
172 }
173
174 let response = req
175 .send()
176 .await
177 .map_err(|e| BraidError::Http(e.to_string()))?;
178 if !response.status().is_success() && response.status().as_u16() != 404 {
179 return Err(BraidError::Http(format!(
180 "Blob delete failed: {}",
181 response.status()
182 )));
183 }
184
185 Ok(())
186 }
187}
188
189pub struct Reconnector<F> {
190 get_delay: Box<dyn Fn(Option<&str>, u32) -> u64 + Send + Sync>,
191 func: F,
192 retry_count: u32,
193}
194
195impl<F, Fut> Reconnector<F>
196where
197 F: Fn() -> Fut + Send + Sync,
198 Fut: std::future::Future<Output = Result<()>> + Send,
199{
200 pub fn new<D>(get_delay: D, func: F) -> Self
201 where
202 D: Fn(Option<&str>, u32) -> u64 + Send + Sync + 'static,
203 {
204 Self {
205 get_delay: Box::new(get_delay),
206 func,
207 retry_count: 0,
208 }
209 }
210
211 pub async fn run(&mut self) -> Result<()> {
212 loop {
213 match (self.func)().await {
214 Ok(()) => {
215 self.retry_count = 0;
216 return Ok(());
217 }
218 Err(e) => {
219 self.retry_count += 1;
220 let delay = (self.get_delay)(Some(&e.to_string()), self.retry_count);
221 tracing::warn!(
222 "Reconnecting in {}ms after error: {} (attempt {})",
223 delay,
224 e,
225 self.retry_count
226 );
227 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
228 }
229 }
230 }
231 }
232}