braid_core/fs/
debouncer.rs1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5use tokio::sync::RwLock;
6use tokio::time::{self, Duration, Instant};
7use tracing::{error, info};
8
9use crate::fs::state::DaemonState;
10use crate::fs::sync::sync_local_to_remote;
11
12#[derive(Debug, Clone)]
14struct DebounceRequest {
15 url: String,
16 path: PathBuf,
17}
18
19pub struct DebouncedSyncManager {
22 tx: mpsc::Sender<DebounceRequest>,
23}
24
25impl DebouncedSyncManager {
26 pub fn new_placeholder() -> Self {
28 let (tx, _) = mpsc::channel(1);
29 Self { tx }
30 }
31
32 pub fn new(state: DaemonState, debounce_ms: u64) -> Arc<Self> {
34 let (tx, rx) = mpsc::channel(100);
35 let manager = Arc::new(Self { tx });
36
37 let state_clone = state.clone();
39 tokio::spawn(async move {
40 Self::process_loop(rx, state_clone, Duration::from_millis(debounce_ms)).await;
41 });
42
43 manager
44 }
45
46 pub async fn request_sync(&self, url: String, path: PathBuf) {
48 if let Err(e) = self.tx.send(DebounceRequest { url, path }).await {
49 error!("[Debouncer] Failed to send sync request: {}", e);
50 }
51 }
52
53 async fn process_loop(
54 mut rx: mpsc::Receiver<DebounceRequest>,
55 state: DaemonState,
56 debounce_duration: Duration,
57 ) {
58 let pending: Arc<RwLock<HashMap<String, (PathBuf, Instant)>>> =
60 Arc::new(RwLock::new(HashMap::new()));
61
62 let pending_clone = pending.clone();
63 let state_sync = state.clone();
64
65 tokio::spawn(async move {
67 while let Some(req) = rx.recv().await {
68 let now = Instant::now();
69 let mut p = pending_clone.write().await;
70
71 let deadline = if !p.contains_key(&req.url) {
73 now + Duration::from_millis(50)
74 } else {
75 now + debounce_duration
76 };
77
78 info!(
79 "[Debouncer] Received request for {}. Setting deadline in {:?}",
80 req.url,
81 deadline.duration_since(now)
82 );
83 p.insert(req.url, (req.path, deadline));
84 }
85 });
86
87 loop {
89 time::sleep(Duration::from_millis(50)).await;
90
91 let mut to_sync = Vec::new();
92 {
93 let mut p = pending.write().await;
94 let now = Instant::now();
95
96 p.retain(|url, (path, deadline)| {
97 if now >= *deadline {
98 to_sync.push((url.clone(), path.clone()));
99 false
100 } else {
101 true
102 }
103 });
104 }
105
106 for (url, path) in to_sync {
107 let state_inner = state_sync.clone();
108 info!("[Debouncer] Deadline expired for {}. Triggering sync.", url);
109 tokio::spawn(async move {
110 if let Err(e) = Self::perform_sync(&path, &url, state_inner).await {
111 error!("[Debouncer] Sync failed for {}: {}", url, e);
112 }
113 });
114 }
115 }
116 }
117
118 async fn perform_sync(
119 path: &PathBuf,
120 url: &str,
121 state: DaemonState,
122 ) -> crate::core::Result<()> {
123 let mut attempts = 0;
124 let mut content = None;
125 while attempts < 3 {
126 match tokio::fs::read_to_string(path).await {
127 Ok(c) => {
128 content = Some(c);
129 break;
130 }
131 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
132 info!("[Debouncer] File removed, propagating deletion for {}", url);
133 content = Some(String::new());
134 break;
135 }
136 Err(e)
137 if e.kind() == std::io::ErrorKind::PermissionDenied
138 || e.raw_os_error() == Some(32) =>
139 {
140 attempts += 1;
141 tokio::time::sleep(Duration::from_millis(100)).await;
142 }
143 Err(e) => {
144 return Err(crate::core::BraidError::Io(e));
145 }
146 }
147 }
148
149 let content = content.ok_or_else(|| {
150 crate::core::BraidError::Fs(format!("Failed to read file after retries: {:?}", path))
151 })?;
152
153 let parents = {
154 let store = state.version_store.read().await;
155 store
156 .get(url)
157 .map(|v| v.current_version.clone())
158 .unwrap_or_default()
159 };
160
161 let original_content = {
162 let cache = state.content_cache.read().await;
163 cache.get(url).cloned()
164 };
165
166 sync_local_to_remote(path, url, &parents, original_content, content, None, state).await
167 }
168}