use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::time::{self, Duration, Instant};
use tracing::{error, info};
use crate::fs::state::DaemonState;
use crate::fs::sync::sync_local_to_remote;
#[derive(Debug, Clone)]
struct DebounceRequest {
url: String,
path: PathBuf,
}
pub struct DebouncedSyncManager {
tx: mpsc::Sender<DebounceRequest>,
}
impl DebouncedSyncManager {
pub fn new_placeholder() -> Self {
let (tx, _) = mpsc::channel(1);
Self { tx }
}
pub fn new(state: DaemonState, debounce_ms: u64) -> Arc<Self> {
let (tx, rx) = mpsc::channel(100);
let manager = Arc::new(Self { tx });
let state_clone = state.clone();
tokio::spawn(async move {
Self::process_loop(rx, state_clone, Duration::from_millis(debounce_ms)).await;
});
manager
}
pub async fn request_sync(&self, url: String, path: PathBuf) {
if let Err(e) = self.tx.send(DebounceRequest { url, path }).await {
error!("[Debouncer] Failed to send sync request: {}", e);
}
}
async fn process_loop(
mut rx: mpsc::Receiver<DebounceRequest>,
state: DaemonState,
debounce_duration: Duration,
) {
let pending: Arc<RwLock<HashMap<String, (PathBuf, Instant)>>> =
Arc::new(RwLock::new(HashMap::new()));
let pending_clone = pending.clone();
let state_sync = state.clone();
tokio::spawn(async move {
while let Some(req) = rx.recv().await {
let now = Instant::now();
let mut p = pending_clone.write().await;
let deadline = if !p.contains_key(&req.url) {
now + Duration::from_millis(50)
} else {
now + debounce_duration
};
info!(
"[Debouncer] Received request for {}. Setting deadline in {:?}",
req.url,
deadline.duration_since(now)
);
p.insert(req.url, (req.path, deadline));
}
});
loop {
time::sleep(Duration::from_millis(50)).await;
let mut to_sync = Vec::new();
{
let mut p = pending.write().await;
let now = Instant::now();
p.retain(|url, (path, deadline)| {
if now >= *deadline {
to_sync.push((url.clone(), path.clone()));
false
} else {
true
}
});
}
for (url, path) in to_sync {
let state_inner = state_sync.clone();
info!("[Debouncer] Deadline expired for {}. Triggering sync.", url);
tokio::spawn(async move {
if let Err(e) = Self::perform_sync(&path, &url, state_inner).await {
error!("[Debouncer] Sync failed for {}: {}", url, e);
}
});
}
}
}
async fn perform_sync(
path: &PathBuf,
url: &str,
state: DaemonState,
) -> crate::core::Result<()> {
let mut attempts = 0;
let mut content = None;
while attempts < 3 {
match tokio::fs::read_to_string(path).await {
Ok(c) => {
content = Some(c);
break;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
info!("[Debouncer] File removed, propagating deletion for {}", url);
content = Some(String::new());
break;
}
Err(e)
if e.kind() == std::io::ErrorKind::PermissionDenied
|| e.raw_os_error() == Some(32) =>
{
attempts += 1;
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => {
return Err(crate::core::BraidError::Io(e));
}
}
}
let content = content.ok_or_else(|| {
crate::core::BraidError::Fs(format!("Failed to read file after retries: {:?}", path))
})?;
let parents = {
let store = state.version_store.read().await;
store
.get(url)
.map(|v| v.current_version.clone())
.unwrap_or_default()
};
let original_content = {
let cache = state.content_cache.read().await;
cache.get(url).cloned()
};
sync_local_to_remote(path, url, &parents, original_content, content, None, state).await
}
}