braid_core/fs/
binary_sync.rs1use crate::core::Result;
7use crate::fs::config::{get_root_dir, is_binary};
8use crate::fs::rate_limiter::ReconnectRateLimiter;
9use braid_blob::BlobStore;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15#[derive(Debug)]
17pub struct BinarySyncState {
18 pub url: String,
20 pub peer: String,
22 pub merge_type: String,
24 pub file_mtime_ns_str: Option<String>,
26 pub file_read_only: Option<bool>,
28 pub aborted: bool,
30}
31
32impl BinarySyncState {
33 pub fn new(url: String) -> Self {
34 Self {
35 url,
36 peer: uuid::Uuid::new_v4().to_string()[..12].to_string(),
37 merge_type: "aww".to_string(),
38 file_mtime_ns_str: None,
39 file_read_only: None,
40 aborted: false,
41 }
42 }
43}
44
45#[derive(Debug)]
47pub struct BinarySyncManager {
48 syncs: Arc<RwLock<HashMap<String, BinarySyncState>>>,
50 rate_limiter: Arc<ReconnectRateLimiter>,
52 blob_store: Option<Arc<BlobStore>>,
54 temp_folder: PathBuf,
56 meta_folder: PathBuf,
58}
59
60impl BinarySyncManager {
61 pub fn new(
63 rate_limiter: Arc<ReconnectRateLimiter>,
64 blob_store: Arc<BlobStore>,
65 ) -> Result<Self> {
66 let root = get_root_dir().map_err(|e| crate::core::BraidError::Config(e.to_string()))?;
67 let braidfs_dir = root.join(".braidfs");
68
69 Ok(Self {
70 syncs: Arc::new(RwLock::new(HashMap::new())),
71 rate_limiter,
72 blob_store: Some(blob_store),
73 temp_folder: braidfs_dir.join("temp"),
74 meta_folder: braidfs_dir.join("braid-blob-meta"),
75 })
76 }
77
78 pub async fn init_binary_sync(&self, url: &str, fullpath: &Path) -> Result<()> {
80 tracing::info!("init_binary_sync: {}", url);
81
82 let mut state = BinarySyncState::new(url.to_string());
84
85 let meta_path = self.get_meta_path(url);
87 if let Ok(content) = tokio::fs::read_to_string(&meta_path).await {
88 if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&content) {
89 if let Some(peer) = meta.get("peer").and_then(|v| v.as_str()) {
90 state.peer = peer.to_string();
91 }
92 if let Some(mtime) = meta.get("file_mtime_ns_str").and_then(|v| v.as_str()) {
93 state.file_mtime_ns_str = Some(mtime.to_string());
94 }
95 }
96 }
97
98 self.save_meta(url, &state).await?;
100
101 self.signal_file_needs_reading(url, fullpath).await?;
103
104 self.syncs.write().await.insert(url.to_string(), state);
106
107 Ok(())
108 }
109
110 pub async fn signal_file_needs_reading(&self, url: &str, fullpath: &Path) -> Result<()> {
112 let metadata = match tokio::fs::metadata(fullpath).await {
114 Ok(m) => m,
115 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
116 Err(e) => return Err(crate::core::BraidError::Io(e)),
117 };
118
119 let mtime = metadata
121 .modified()
122 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
123 .duration_since(std::time::SystemTime::UNIX_EPOCH)
124 .unwrap_or_default()
125 .as_nanos();
126 let mtime_str = mtime.to_string();
127
128 let needs_upload = {
130 let syncs = self.syncs.read().await;
131 if let Some(state) = syncs.get(url) {
132 state.file_mtime_ns_str.as_ref() != Some(&mtime_str)
133 } else {
134 true
135 }
136 };
137
138 if needs_upload {
139 let data = tokio::fs::read(fullpath).await?;
141
142 if let Some(store) = &self.blob_store {
144 store
145 .put(url, data.into(), vec![], vec![], None)
146 .await
147 .map_err(crate::core::BraidError::Client)?;
148 }
149
150 let mut syncs = self.syncs.write().await;
152 if let Some(state) = syncs.get_mut(url) {
153 state.file_mtime_ns_str = Some(mtime_str);
154 drop(syncs); self.save_meta(url, &self.syncs.read().await.get(url).unwrap())
156 .await?;
157 }
158 }
159
160 Ok(())
161 }
162
163 async fn save_meta(&self, url: &str, state: &BinarySyncState) -> Result<()> {
165 tokio::fs::create_dir_all(&self.meta_folder).await?;
166
167 let meta = serde_json::json!({
168 "merge_type": state.merge_type,
169 "peer": state.peer,
170 "file_mtime_ns_str": state.file_mtime_ns_str,
171 });
172
173 let meta_path = self.get_meta_path(url);
174 braid_blob::store::atomic_write(
175 &meta_path,
176 serde_json::to_string_pretty(&meta)?.as_bytes(),
177 &self.temp_folder,
178 )
179 .await
180 .map_err(crate::core::BraidError::Client)?;
181
182 Ok(())
183 }
184
185 fn get_meta_path(&self, url: &str) -> PathBuf {
187 let encoded = braid_blob::store::encode_filename(url);
188 self.meta_folder.join(encoded)
189 }
190
191 pub async fn disconnect(&self, url: &str) {
193 let mut syncs = self.syncs.write().await;
194 if let Some(state) = syncs.get_mut(url) {
195 state.aborted = true;
196 }
197 self.rate_limiter.on_diss(url).await;
198 }
199
200 pub async fn reconnect(&self, url: &str, fullpath: &Path) -> Result<()> {
202 self.rate_limiter.get_turn(url).await;
203 self.rate_limiter.on_conn(url).await;
204 self.signal_file_needs_reading(url, fullpath).await
205 }
206
207 pub fn blob_store(&self) -> Arc<BlobStore> {
208 self.blob_store
209 .clone()
210 .expect("BlobStore must be initialized")
211 }
212}
213
214pub struct BinarySyncDb {
216 fullpath: PathBuf,
217 temp_folder: PathBuf,
218}
219
220impl BinarySyncDb {
221 pub fn new(fullpath: PathBuf, temp_folder: PathBuf) -> Self {
222 Self {
223 fullpath,
224 temp_folder,
225 }
226 }
227
228 pub async fn read(&self, _key: &str) -> Result<Option<Vec<u8>>> {
230 match tokio::fs::read(&self.fullpath).await {
231 Ok(data) => Ok(Some(data)),
232 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
233 Err(e) => Err(crate::core::BraidError::Io(e)),
234 }
235 }
236
237 pub async fn write(&self, _key: &str, data: &[u8]) -> Result<std::fs::Metadata> {
239 braid_blob::store::atomic_write(&self.fullpath, data, &self.temp_folder)
240 .await
241 .map_err(crate::core::BraidError::Client)?;
242
243 tokio::fs::metadata(&self.fullpath)
244 .await
245 .map_err(crate::core::BraidError::Io)
246 }
247
248 pub async fn delete(&self, _key: &str) -> Result<()> {
250 match tokio::fs::remove_file(&self.fullpath).await {
251 Ok(()) => Ok(()),
252 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
253 Err(e) => Err(crate::core::BraidError::Io(e)),
254 }
255 }
256}
257
258pub fn should_use_binary_sync(path: &str) -> bool {
260 is_binary(path)
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn test_should_use_binary_sync() {
269 assert!(should_use_binary_sync("image.jpg"));
270 assert!(should_use_binary_sync("document.pdf"));
271 assert!(!should_use_binary_sync("readme.txt"));
272 assert!(!should_use_binary_sync("code.rs"));
273 }
274}