use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;
use chrono::Utc;
use inotify::{EventMask, Inotify, WatchMask};
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};
use crate::client;
pub async fn run(watch_dir: &str, debounce_ms: u64) -> anyhow::Result<()> {
let dir = watch_dir.to_string();
let root = Path::new(&dir).canonicalize()?;
tracing::info!("Starting daemon, watching: {}", dir);
tracing::info!("Debounce: {}ms", debounce_ms);
if let Err(e) = sync_dir(watch_dir).await {
tracing::warn!("Initial sync failed: {}", e);
}
let wdir = dir.clone();
let wdeb = debounce_ms;
let wroot = root.clone();
let watcher = tokio::spawn(async move {
if let Err(e) = watcher_loop(&wdir, wdeb, &wroot).await {
tracing::error!("Watcher error: {}", e);
}
});
let sdir = dir.clone();
let sse = tokio::spawn(async move {
if let Err(e) = sync_history(&sdir).await {
tracing::warn!("History sync failed: {}", e);
}
loop {
if let Err(e) = sse_loop(&sdir).await {
tracing::warn!("SSE error: {}, reconnecting in 5s", e);
sleep(Duration::from_secs(5)).await;
}
}
});
tokio::select! {
_ = watcher => {},
_ = sse => {},
}
Ok(())
}
fn add_watches_recursive(
inotify: &mut Inotify,
watch_dirs: &mut HashMap<inotify::WatchDescriptor, std::path::PathBuf>,
dir: &Path,
) -> anyhow::Result<()> {
let wd = inotify
.watches()
.add(dir, WatchMask::CREATE | WatchMask::MODIFY | WatchMask::DELETE)?;
watch_dirs.insert(wd, dir.to_path_buf());
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
if let Ok(ftype) = entry.file_type()
&& ftype.is_dir()
{
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with('.') {
continue;
}
if let Err(e) = add_watches_recursive(inotify, watch_dirs, &entry.path()) {
tracing::warn!("Failed to watch {}: {}", entry.path().display(), e);
}
}
}
}
Ok(())
}
async fn watcher_loop(_dir: &str, debounce_ms: u64, root: &Path) -> anyhow::Result<()> {
let mut inotify = Inotify::init()?;
let mut watch_dirs: HashMap<inotify::WatchDescriptor, std::path::PathBuf> = HashMap::new();
add_watches_recursive(&mut inotify, &mut watch_dirs, root)?;
tracing::info!("Watching {} directories with inotify", watch_dirs.len());
let mut buffer = [0u8; 65536];
let debounce = Duration::from_millis(debounce_ms);
loop {
let events = inotify.read_events_blocking(&mut buffer)?;
for event in events {
let ev_dir = match watch_dirs.get(&event.wd) {
Some(d) => d.clone(),
None => continue,
};
let is_create = event.mask.contains(EventMask::CREATE);
let is_modify = event.mask.contains(EventMask::MODIFY);
let is_delete = event.mask.contains(EventMask::DELETE);
if (is_create || is_modify)
&& let Some(name) = event.name.and_then(|n| n.to_str())
{
if name.starts_with('.') {
continue;
}
let full_path = ev_dir.join(name);
let path_str = full_path.to_string_lossy().to_string();
if is_create
&& let Ok(meta) = tokio::fs::metadata(&full_path).await
&& meta.is_dir()
{
if let Err(e) =
add_watches_recursive(&mut inotify, &mut watch_dirs, &full_path)
{
tracing::warn!(
"Failed to watch new dir {}: {}",
path_str,
e
);
}
continue;
}
tracing::info!("Change detected: {}", path_str);
sleep(debounce).await;
let rel_path = full_path.strip_prefix(root).ok()
.and_then(|p| p.parent())
.and_then(|p| {
let s = p.to_string_lossy().to_string();
if s.is_empty() { None } else { Some(s) }
});
let max_retries = 10;
let mut retry = 0;
loop {
match client::upload(&path_str, rel_path.as_deref()).await {
Ok(()) => break,
Err(e) => {
let retryable = e.to_string().contains("File is empty");
if retry >= max_retries || !retryable {
tracing::warn!("Upload failed for {}: {}", path_str, e);
break;
}
let backoff =
Duration::from_millis(debounce_ms * (1 << retry));
tracing::info!(
"File empty, retrying {} in {:?}...",
path_str,
backoff
);
retry += 1;
sleep(backoff).await;
}
}
}
}
if is_delete
&& let Some(name) = event.name.and_then(|n| n.to_str())
{
if name.starts_with('.') {
continue;
}
tracing::info!("Delete detected: {}/{}", ev_dir.display(), name);
}
}
}
}
async fn sync_history(dir: &str) -> anyhow::Result<()> {
client::ensure_valid_token().await?;
let cfg = crate::config::load()?;
let now = Utc::now().to_rfc3339();
let url = format!("{}/api/events/history?since={}", cfg.server_url, now);
let client = reqwest::Client::new();
let resp = client.get(&url).bearer_auth(&cfg.token).send().await?;
if !resp.status().is_success() {
return Ok(());
}
let events: Vec<serde_json::Value> = resp.json().await?;
for ev in &events {
handle_event(ev, dir).await;
}
Ok(())
}
async fn sse_loop(dir: &str) -> anyhow::Result<()> {
use futures_util::StreamExt;
client::ensure_valid_token().await?;
let cfg = crate::config::load()?;
let client = reqwest::Client::new();
let url = format!("{}/api/events", cfg.server_url);
let resp = client.get(&url).bearer_auth(&cfg.token).send().await?;
let status = resp.status();
if status == reqwest::StatusCode::UNAUTHORIZED {
return Err(anyhow::anyhow!("401 Unauthorized"));
}
let mut buf = String::new();
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buf.find("\n\n") {
let event_block = buf[..pos].to_string();
buf = buf[pos + 2..].to_string();
for line in event_block.lines() {
if let Some(data) = line.strip_prefix("data:")
&& let Ok(event) = serde_json::from_str::<serde_json::Value>(data.trim())
{
handle_event(&event, dir).await;
}
}
}
}
Ok(())
}
async fn handle_event(event: &serde_json::Value, dir: &str) {
let event_type = event["type"].as_str().unwrap_or("");
let payload = &event["payload"];
match event_type {
"file_created" | "file_updated" => {
let file_name = payload["name"].as_str().unwrap_or("unknown");
let path = payload["path"].as_str().unwrap_or("");
let dest = if path.is_empty() {
format!("{}/{}", dir, file_name)
} else {
format!("{}/{}/{}", dir, path, file_name)
};
if let Some(id) = event["resource_id"].as_str() {
if !Path::new(&dest).exists() {
tracing::info!("Remote {}: downloading {} as {}", event_type, id, dest);
if let Err(e) = client::download(id, Some(&dest)).await {
tracing::warn!("Download failed: {}", e);
}
} else {
tracing::info!(
"Remote {}: {} already exists locally, skipping",
event_type,
file_name
);
}
}
}
"file_deleted" => {
let file_name = payload["name"].as_str().unwrap_or("unknown");
let path = payload["path"].as_str().unwrap_or("");
let dest = if path.is_empty() {
format!("{}/{}", dir, file_name)
} else {
format!("{}/{}/{}", dir, path, file_name)
};
if Path::new(&dest).exists() {
tracing::info!("Remote delete: {}", dest);
let _ = tokio::fs::remove_file(&dest).await;
}
}
_ => {}
}
}
pub async fn sync_dir(dir: &str) -> anyhow::Result<()> {
client::ensure_valid_token().await?;
let dir_path = Path::new(dir);
if !dir_path.exists() {
anyhow::bail!("Directory does not exist: {}", dir);
}
tracing::info!("Bidirectional sync of {}", dir);
let cfg = crate::config::load()?;
let semaphore = Arc::new(Semaphore::new(cfg.concurrency));
let remote_files = client::fetch_remote_files().await?;
let mut remote_map: HashMap<String, &serde_json::Value> = HashMap::new();
for f in &remote_files {
let name = f["name"].as_str().unwrap_or("");
let path = f["path"].as_str().unwrap_or("");
let key = if path.is_empty() {
name.to_string()
} else {
format!("{}/{}", path, name)
};
remote_map.insert(key, f);
}
let mut local_keys: HashSet<String> = HashSet::new();
let files_to_upload = sync_local(dir_path, dir_path, &remote_map, &mut local_keys).await?;
let mut handles = Vec::new();
for (file_path, remote_path) in files_to_upload {
let sem = semaphore.clone();
handles.push(tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
if let Err(e) = client::upload(&file_path, remote_path.as_deref()).await {
tracing::warn!("Failed to upload {}: {}", file_path, e);
}
}));
}
for handle in handles {
let _ = handle.await;
}
let mut handles = Vec::new();
for (key, f) in &remote_map {
if local_keys.contains(key) {
continue;
}
let dest = format!("{}/{}", dir, key);
if let Some(id) = f["id"].as_str() {
let id = id.to_string();
if !Path::new(&dest).exists() {
if let Some(parent) = Path::new(&dest).parent() {
fs::create_dir_all(parent).await?;
}
let sem = semaphore.clone();
handles.push(tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
tracing::info!("Downloading remote file: {}", dest);
if let Err(e) = client::download(&id, Some(&dest)).await {
tracing::warn!("Failed to download {}: {}", dest, e);
}
}));
}
}
}
for handle in handles {
let _ = handle.await;
}
Ok(())
}
async fn sync_local(
base: &Path,
dir: &Path,
remote_map: &HashMap<String, &serde_json::Value>,
local_keys: &mut HashSet<String>,
) -> anyhow::Result<Vec<(String, Option<String>)>> {
let mut files_to_upload = Vec::new();
let mut entries = fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with('.') {
continue;
}
let file_type = entry.file_type().await?;
if file_type.is_dir() {
let sub = Box::pin(sync_local(base, &path, remote_map, local_keys)).await?;
files_to_upload.extend(sub);
} else if file_type.is_file() {
let rel_path = path.strip_prefix(base).unwrap();
let key = rel_path.to_string_lossy().to_string();
local_keys.insert(key.clone());
let remote_path = rel_path.parent().and_then(|p| {
let s = p.to_string_lossy();
if s.is_empty() { None } else { Some(s.to_string()) }
});
let needs_upload = match remote_map.get(&key) {
Some(remote) => {
let meta = fs::metadata(&path).await?;
let local_size = meta.len() as i64;
let local_mtime = meta.modified().ok();
let remote_size = (*remote)["size"].as_i64().unwrap_or(-1);
let remote_mtime = (*remote)["mtime"].as_str();
match (local_mtime, remote_mtime) {
(Some(lm), Some(rm)) if !rm.is_empty() && mtime_equalish(lm, rm) => {
false
}
_ => {
if local_size != remote_size {
true
} else {
match (*remote)["checksum_sha256"].as_str() {
Some(remote_hash) if !remote_hash.is_empty() => {
let local_hash = sha256_of_file(&path).await?;
local_hash != remote_hash
}
_ => true,
}
}
}
}
}
None => true,
};
if needs_upload {
files_to_upload.push((path.to_string_lossy().to_string(), remote_path));
}
}
}
Ok(files_to_upload)
}
async fn sha256_of_file(path: &Path) -> anyhow::Result<String> {
use sha2::Digest;
let mut file = tokio::fs::File::open(path).await?;
let mut hasher = sha2::Sha256::new();
let mut buf = [0u8; 65536];
loop {
let n = tokio::io::AsyncReadExt::read(&mut file, &mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
Ok(hex::encode(hasher.finalize()))
}
fn mtime_equalish(local: SystemTime, remote_mtime: &str) -> bool {
if let Ok(rm) = chrono::DateTime::parse_from_rfc3339(remote_mtime) {
let rm_sys: SystemTime = rm.with_timezone(&Utc).into();
let diff = if local > rm_sys {
local.duration_since(rm_sys)
} else {
rm_sys.duration_since(local)
};
diff.is_ok_and(|d| d.as_secs() < 2)
} else {
false
}
}