Skip to main content

braid_core/fs/
debouncer.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5use tokio::sync::RwLock;
6use tokio::time::{self, Duration, Instant};
7use tracing::{error, info};
8
9use crate::fs::state::DaemonState;
10use crate::fs::sync::sync_local_to_remote;
11
12/// A request to sync a specific URL from a specific local path.
13#[derive(Debug, Clone)]
14struct DebounceRequest {
15    url: String,
16    path: PathBuf,
17}
18
19/// Manages debouncing of sync requests to prevent network flooding
20/// while maintaining high responsiveness for "sync-as-you-type".
21pub struct DebouncedSyncManager {
22    tx: mpsc::Sender<DebounceRequest>,
23}
24
25impl DebouncedSyncManager {
26    /// Create a placeholder manager (used for circular initialization)
27    pub fn new_placeholder() -> Self {
28        let (tx, _) = mpsc::channel(1);
29        Self { tx }
30    }
31
32    /// Create a new manager and spawn its processing loop.
33    pub fn new(state: DaemonState, debounce_ms: u64) -> Arc<Self> {
34        let (tx, rx) = mpsc::channel(100);
35        let manager = Arc::new(Self { tx });
36
37        // Spawn the background processing task
38        let state_clone = state.clone();
39        tokio::spawn(async move {
40            Self::process_loop(rx, state_clone, Duration::from_millis(debounce_ms)).await;
41        });
42
43        manager
44    }
45
46    /// Request a sync for a given URL and path.
47    pub async fn request_sync(&self, url: String, path: PathBuf) {
48        if let Err(e) = self.tx.send(DebounceRequest { url, path }).await {
49            error!("[Debouncer] Failed to send sync request: {}", e);
50        }
51    }
52
53    async fn process_loop(
54        mut rx: mpsc::Receiver<DebounceRequest>,
55        state: DaemonState,
56        debounce_duration: Duration,
57    ) {
58        // Track the latest path and the next scheduled sync time for each URL
59        let pending: Arc<RwLock<HashMap<String, (PathBuf, Instant)>>> =
60            Arc::new(RwLock::new(HashMap::new()));
61
62        let pending_clone = pending.clone();
63        let state_sync = state.clone();
64
65        // 1. Task to handle incoming requests and update deadlines
66        tokio::spawn(async move {
67            while let Some(req) = rx.recv().await {
68                let now = Instant::now();
69                let mut p = pending_clone.write().await;
70
71                // If it's the first request for this URL in a while, trigger it sooner
72                let deadline = if !p.contains_key(&req.url) {
73                    now + Duration::from_millis(50)
74                } else {
75                    now + debounce_duration
76                };
77
78                info!(
79                    "[Debouncer] Received request for {}. Setting deadline in {:?}",
80                    req.url,
81                    deadline.duration_since(now)
82                );
83                p.insert(req.url, (req.path, deadline));
84            }
85        });
86
87        // 2. Monitoring loop to trigger syncs when deadlines expire
88        loop {
89            time::sleep(Duration::from_millis(50)).await;
90
91            let mut to_sync = Vec::new();
92            {
93                let mut p = pending.write().await;
94                let now = Instant::now();
95
96                p.retain(|url, (path, deadline)| {
97                    if now >= *deadline {
98                        to_sync.push((url.clone(), path.clone()));
99                        false
100                    } else {
101                        true
102                    }
103                });
104            }
105
106            for (url, path) in to_sync {
107                let state_inner = state_sync.clone();
108                info!("[Debouncer] Deadline expired for {}. Triggering sync.", url);
109                tokio::spawn(async move {
110                    if let Err(e) = Self::perform_sync(&path, &url, state_inner).await {
111                        error!("[Debouncer] Sync failed for {}: {}", url, e);
112                    }
113                });
114            }
115        }
116    }
117
118    async fn perform_sync(
119        path: &PathBuf,
120        url: &str,
121        state: DaemonState,
122    ) -> crate::core::Result<()> {
123        let mut attempts = 0;
124        let mut content = None;
125        while attempts < 3 {
126            match tokio::fs::read_to_string(path).await {
127                Ok(c) => {
128                    content = Some(c);
129                    break;
130                }
131                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
132                    info!("[Debouncer] File removed, propagating deletion for {}", url);
133                    content = Some(String::new());
134                    break;
135                }
136                Err(e)
137                    if e.kind() == std::io::ErrorKind::PermissionDenied
138                        || e.raw_os_error() == Some(32) =>
139                {
140                    attempts += 1;
141                    tokio::time::sleep(Duration::from_millis(100)).await;
142                }
143                Err(e) => {
144                    return Err(crate::core::BraidError::Io(e));
145                }
146            }
147        }
148
149        let content = content.ok_or_else(|| {
150            crate::core::BraidError::Fs(format!("Failed to read file after retries: {:?}", path))
151        })?;
152
153        let parents = {
154            let store = state.version_store.read().await;
155            store
156                .get(url)
157                .map(|v| v.current_version.clone())
158                .unwrap_or_default()
159        };
160
161        let original_content = {
162            let cache = state.content_cache.read().await;
163            cache.get(url).cloned()
164        };
165
166        sync_local_to_remote(path, url, &parents, original_content, content, None, state).await
167    }
168}