Skip to main content

braid_core/fs/
binary_sync.rs

1//! Binary file synchronization for BraidFS.
2//!
3//! Implements binary file sync using braid-blob for non-text files.
4//! Matches JS `init_binary_sync()` from braidfs/index.js.
5
6use crate::core::Result;
7use crate::fs::config::{get_root_dir, is_binary};
8use crate::fs::rate_limiter::ReconnectRateLimiter;
9use braid_blob::BlobStore;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// State for a binary sync operation.
16#[derive(Debug)]
17pub struct BinarySyncState {
18    /// The URL being synced.
19    pub url: String,
20    /// Peer ID for this sync.
21    pub peer: String,
22    /// Merge type (should be "aww" for binary).
23    pub merge_type: String,
24    /// Last known file modification time (nanoseconds as string).
25    pub file_mtime_ns_str: Option<String>,
26    /// Whether the file is read-only.
27    pub file_read_only: Option<bool>,
28    /// Abort controller.
29    pub aborted: bool,
30}
31
32impl BinarySyncState {
33    pub fn new(url: String) -> Self {
34        Self {
35            url,
36            peer: uuid::Uuid::new_v4().to_string()[..12].to_string(),
37            merge_type: "aww".to_string(),
38            file_mtime_ns_str: None,
39            file_read_only: None,
40            aborted: false,
41        }
42    }
43}
44
45/// Binary sync manager for multiple URLs.
46#[derive(Debug)]
47pub struct BinarySyncManager {
48    /// Active sync states.
49    syncs: Arc<RwLock<HashMap<String, BinarySyncState>>>,
50    /// Rate limiter for reconnections.
51    rate_limiter: Arc<ReconnectRateLimiter>,
52    /// Blob store for persistence.
53    blob_store: Option<Arc<BlobStore>>,
54    /// Temp folder for atomic writes.
55    temp_folder: PathBuf,
56    /// Meta folder for sync metadata.
57    meta_folder: PathBuf,
58}
59
60impl BinarySyncManager {
61    /// Create a new binary sync manager.
62    pub fn new(
63        rate_limiter: Arc<ReconnectRateLimiter>,
64        blob_store: Arc<BlobStore>,
65    ) -> Result<Self> {
66        let root = get_root_dir().map_err(|e| crate::core::BraidError::Config(e.to_string()))?;
67        let braidfs_dir = root.join(".braidfs");
68
69        Ok(Self {
70            syncs: Arc::new(RwLock::new(HashMap::new())),
71            rate_limiter,
72            blob_store: Some(blob_store),
73            temp_folder: braidfs_dir.join("temp"),
74            meta_folder: braidfs_dir.join("braid-blob-meta"),
75        })
76    }
77
78    /// Initialize a binary sync for a URL.
79    pub async fn init_binary_sync(&self, url: &str, fullpath: &Path) -> Result<()> {
80        tracing::info!("init_binary_sync: {}", url);
81
82        // Create state
83        let mut state = BinarySyncState::new(url.to_string());
84
85        // Try to load existing metadata
86        let meta_path = self.get_meta_path(url);
87        if let Ok(content) = tokio::fs::read_to_string(&meta_path).await {
88            if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&content) {
89                if let Some(peer) = meta.get("peer").and_then(|v| v.as_str()) {
90                    state.peer = peer.to_string();
91                }
92                if let Some(mtime) = meta.get("file_mtime_ns_str").and_then(|v| v.as_str()) {
93                    state.file_mtime_ns_str = Some(mtime.to_string());
94                }
95            }
96        }
97
98        // Save metadata
99        self.save_meta(url, &state).await?;
100
101        // Signal initial file read
102        self.signal_file_needs_reading(url, fullpath).await?;
103
104        // Store state
105        self.syncs.write().await.insert(url.to_string(), state);
106
107        Ok(())
108    }
109
110    /// Signal that a file needs reading and potentially uploading.
111    pub async fn signal_file_needs_reading(&self, url: &str, fullpath: &Path) -> Result<()> {
112        // Check if file exists
113        let metadata = match tokio::fs::metadata(fullpath).await {
114            Ok(m) => m,
115            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
116            Err(e) => return Err(crate::core::BraidError::Io(e)),
117        };
118
119        // Get modification time
120        let mtime = metadata
121            .modified()
122            .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
123            .duration_since(std::time::SystemTime::UNIX_EPOCH)
124            .unwrap_or_default()
125            .as_nanos();
126        let mtime_str = mtime.to_string();
127
128        // Check if changed
129        let needs_upload = {
130            let syncs = self.syncs.read().await;
131            if let Some(state) = syncs.get(url) {
132                state.file_mtime_ns_str.as_ref() != Some(&mtime_str)
133            } else {
134                true
135            }
136        };
137
138        if needs_upload {
139            // Read file content
140            let data = tokio::fs::read(fullpath).await?;
141
142            // Upload to blob store (if configured)
143            if let Some(store) = &self.blob_store {
144                store
145                    .put(url, data.into(), vec![], vec![], None)
146                    .await
147                    .map_err(crate::core::BraidError::Client)?;
148            }
149
150            // Update mtime
151            let mut syncs = self.syncs.write().await;
152            if let Some(state) = syncs.get_mut(url) {
153                state.file_mtime_ns_str = Some(mtime_str);
154                drop(syncs); // Release lock before save
155                self.save_meta(url, &self.syncs.read().await.get(url).unwrap())
156                    .await?;
157            }
158        }
159
160        Ok(())
161    }
162
163    /// Save metadata for a sync.
164    async fn save_meta(&self, url: &str, state: &BinarySyncState) -> Result<()> {
165        tokio::fs::create_dir_all(&self.meta_folder).await?;
166
167        let meta = serde_json::json!({
168            "merge_type": state.merge_type,
169            "peer": state.peer,
170            "file_mtime_ns_str": state.file_mtime_ns_str,
171        });
172
173        let meta_path = self.get_meta_path(url);
174        braid_blob::store::atomic_write(
175            &meta_path,
176            serde_json::to_string_pretty(&meta)?.as_bytes(),
177            &self.temp_folder,
178        )
179        .await
180        .map_err(crate::core::BraidError::Client)?;
181
182        Ok(())
183    }
184
185    /// Get the metadata file path for a URL.
186    fn get_meta_path(&self, url: &str) -> PathBuf {
187        let encoded = braid_blob::store::encode_filename(url);
188        self.meta_folder.join(encoded)
189    }
190
191    /// Disconnect a sync.
192    pub async fn disconnect(&self, url: &str) {
193        let mut syncs = self.syncs.write().await;
194        if let Some(state) = syncs.get_mut(url) {
195            state.aborted = true;
196        }
197        self.rate_limiter.on_diss(url).await;
198    }
199
200    /// Reconnect a sync.
201    pub async fn reconnect(&self, url: &str, fullpath: &Path) -> Result<()> {
202        self.rate_limiter.get_turn(url).await;
203        self.rate_limiter.on_conn(url).await;
204        self.signal_file_needs_reading(url, fullpath).await
205    }
206
207    pub fn blob_store(&self) -> Arc<BlobStore> {
208        self.blob_store
209            .clone()
210            .expect("BlobStore must be initialized")
211    }
212}
213
214/// Database interface for binary sync (matches JS `db` object).
215pub struct BinarySyncDb {
216    fullpath: PathBuf,
217    temp_folder: PathBuf,
218}
219
220impl BinarySyncDb {
221    pub fn new(fullpath: PathBuf, temp_folder: PathBuf) -> Self {
222        Self {
223            fullpath,
224            temp_folder,
225        }
226    }
227
228    /// Read file content.
229    pub async fn read(&self, _key: &str) -> Result<Option<Vec<u8>>> {
230        match tokio::fs::read(&self.fullpath).await {
231            Ok(data) => Ok(Some(data)),
232            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
233            Err(e) => Err(crate::core::BraidError::Io(e)),
234        }
235    }
236
237    /// Write file content atomically.
238    pub async fn write(&self, _key: &str, data: &[u8]) -> Result<std::fs::Metadata> {
239        braid_blob::store::atomic_write(&self.fullpath, data, &self.temp_folder)
240            .await
241            .map_err(crate::core::BraidError::Client)?;
242
243        tokio::fs::metadata(&self.fullpath)
244            .await
245            .map_err(crate::core::BraidError::Io)
246    }
247
248    /// Delete the file.
249    pub async fn delete(&self, _key: &str) -> Result<()> {
250        match tokio::fs::remove_file(&self.fullpath).await {
251            Ok(()) => Ok(()),
252            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
253            Err(e) => Err(crate::core::BraidError::Io(e)),
254        }
255    }
256}
257
258/// Check if a file should use binary sync.
259pub fn should_use_binary_sync(path: &str) -> bool {
260    is_binary(path)
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn test_should_use_binary_sync() {
269        assert!(should_use_binary_sync("image.jpg"));
270        assert!(should_use_binary_sync("document.pdf"));
271        assert!(!should_use_binary_sync("readme.txt"));
272        assert!(!should_use_binary_sync("code.rs"));
273    }
274}