Skip to main content

braid_blob/
sync.rs

1use braid_http::client::BraidClient;
2use braid_http::error::{BraidError, Result};
3use tokio::sync::mpsc;
4
5/// Blob sync configuration.
6#[derive(Debug, Clone)]
7pub struct BlobSyncConfig {
8    /// Remote URL to sync with.
9    pub url: String,
10    /// Local peer ID.
11    pub peer: String,
12    /// Additional headers (e.g., cookies).
13    pub headers: std::collections::HashMap<String, String>,
14    /// Reconnect delay in milliseconds.
15    pub reconnect_delay_ms: u64,
16}
17
18/// Blob sync client.
19pub struct BlobSyncClient {
20    config: BlobSyncConfig,
21    client: BraidClient,
22    /// Channel to receive blob updates.
23    update_tx: mpsc::Sender<BlobUpdate>,
24}
25
26/// A blob update received from the server.
27#[derive(Debug, Clone)]
28pub struct BlobUpdate {
29    pub key: String,
30    pub version: Vec<String>,
31    pub content_type: Option<String>,
32    pub data: Vec<u8>,
33}
34
35impl BlobSyncClient {
36    /// Create a new blob sync client.
37    pub fn new(config: BlobSyncConfig) -> (Self, mpsc::Receiver<BlobUpdate>) {
38        let (update_tx, update_rx) = mpsc::channel(100);
39
40        (
41            Self {
42                config,
43                client: BraidClient::new().expect("Failed to create BraidClient"),
44                update_tx,
45            },
46            update_rx,
47        )
48    }
49
50    /// Start syncing with the remote blob server.
51    pub async fn sync<F, W, D>(&self, _on_read: F, on_write: W, _on_delete: D) -> Result<()>
52    where
53        F: Fn(
54                &str,
55            ) -> std::pin::Pin<
56                Box<dyn std::future::Future<Output = Result<Option<Vec<u8>>>> + Send>,
57            > + Send
58            + Sync,
59        W: Fn(
60                &str,
61                Vec<u8>,
62            )
63                -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
64            + Send
65            + Sync,
66        D: Fn(&str) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
67            + Send
68            + Sync,
69    {
70        tracing::info!("Starting blob sync with {}", self.config.url);
71
72        // Build request with headers
73        let mut req = self
74            .client
75            .client()
76            .get(&self.config.url)
77            .header("Subscribe", "true")
78            .header("Peer", &self.config.peer);
79
80        for (key, value) in &self.config.headers {
81            req = req.header(key, value);
82        }
83
84        let response = req
85            .send()
86            .await
87            .map_err(|e| BraidError::Http(e.to_string()))?;
88        if !response.status().is_success() {
89            return Err(BraidError::Http(format!(
90                "Blob sync failed: {}",
91                response.status()
92            )));
93        }
94
95        let mut stream = response.bytes_stream();
96        use futures::StreamExt;
97
98        while let Some(chunk) = stream.next().await {
99            let chunk = chunk
100                .map_err(|e| BraidError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
101
102            let update = BlobUpdate {
103                key: self.config.url.clone(),
104                version: vec![],
105                content_type: None,
106                data: chunk.to_vec(),
107            };
108
109            on_write(&update.key, update.data.clone()).await?;
110            let _ = self.update_tx.send(update).await;
111        }
112
113        Ok(())
114    }
115
116    pub async fn put(
117        &self,
118        key: &str,
119        data: Vec<u8>,
120        version: Option<Vec<String>>,
121    ) -> Result<Vec<String>> {
122        let mut req = self
123            .client
124            .client()
125            .put(&format!("{}/{}", self.config.url, key))
126            .header("Peer", &self.config.peer)
127            .body(data);
128
129        if let Some(v) = &version {
130            let parents_str = v
131                .iter()
132                .map(|p| format!("\"{}\"", p))
133                .collect::<Vec<_>>()
134                .join(", ");
135            req = req.header("Parents", parents_str);
136        }
137
138        for (k, v) in &self.config.headers {
139            req = req.header(k, v);
140        }
141
142        let response = req
143            .send()
144            .await
145            .map_err(|e| BraidError::Http(e.to_string()))?;
146        if !response.status().is_success() {
147            return Err(BraidError::Http(format!(
148                "Blob put failed: {}",
149                response.status()
150            )));
151        }
152
153        let new_version = response
154            .headers()
155            .get("Version")
156            .and_then(|v| v.to_str().ok())
157            .map(|s| vec![s.to_string()])
158            .unwrap_or_default();
159
160        Ok(new_version)
161    }
162
163    pub async fn delete(&self, key: &str) -> Result<()> {
164        let mut req = self
165            .client
166            .client()
167            .delete(&format!("{}/{}", self.config.url, key))
168            .header("Peer", &self.config.peer);
169
170        for (k, v) in &self.config.headers {
171            req = req.header(k, v);
172        }
173
174        let response = req
175            .send()
176            .await
177            .map_err(|e| BraidError::Http(e.to_string()))?;
178        if !response.status().is_success() && response.status().as_u16() != 404 {
179            return Err(BraidError::Http(format!(
180                "Blob delete failed: {}",
181                response.status()
182            )));
183        }
184
185        Ok(())
186    }
187}
188
189pub struct Reconnector<F> {
190    get_delay: Box<dyn Fn(Option<&str>, u32) -> u64 + Send + Sync>,
191    func: F,
192    retry_count: u32,
193}
194
195impl<F, Fut> Reconnector<F>
196where
197    F: Fn() -> Fut + Send + Sync,
198    Fut: std::future::Future<Output = Result<()>> + Send,
199{
200    pub fn new<D>(get_delay: D, func: F) -> Self
201    where
202        D: Fn(Option<&str>, u32) -> u64 + Send + Sync + 'static,
203    {
204        Self {
205            get_delay: Box::new(get_delay),
206            func,
207            retry_count: 0,
208        }
209    }
210
211    pub async fn run(&mut self) -> Result<()> {
212        loop {
213            match (self.func)().await {
214                Ok(()) => {
215                    self.retry_count = 0;
216                    return Ok(());
217                }
218                Err(e) => {
219                    self.retry_count += 1;
220                    let delay = (self.get_delay)(Some(&e.to_string()), self.retry_count);
221                    tracing::warn!(
222                        "Reconnecting in {}ms after error: {} (attempt {})",
223                        delay,
224                        e,
225                        self.retry_count
226                    );
227                    tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
228                }
229            }
230        }
231    }
232}