Skip to main content

braid_core/fs/
mod.rs

1//! # BraidFS Core Logic
2//!
3//! This module implements the synchronization daemon.
4
5use crate::core::{BraidClient, Result};
6use crate::fs::api::run_server;
7use crate::fs::binary_sync::BinarySyncManager;
8use crate::fs::config::Config;
9use crate::fs::rate_limiter::ReconnectRateLimiter;
10use crate::fs::scanner::{start_scan_loop, ScanState};
11use crate::fs::versions::VersionStore;
12use notify::{Event, RecursiveMode, Watcher};
13use rusqlite::Connection;
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18use tokio::sync::RwLock;
19
20pub mod api;
21pub mod binary_sync;
22pub mod blob_handlers;
23pub mod config;
24pub mod debouncer;
25pub mod diff;
26pub mod mapping;
27#[cfg(feature = "nfs")]
28pub mod mount;
29#[cfg(feature = "nfs")]
30pub mod nfs;
31pub mod rate_limiter;
32pub mod scanner;
33pub mod server_handlers;
34pub mod state;
35pub mod subscription;
36pub mod sync;
37pub mod versions;
38pub mod watcher;
39
40use state::{Command, DaemonState};
41use subscription::spawn_subscription;
42use watcher::handle_fs_event;
43
44lazy_static::lazy_static! {
45    pub static ref PEER_ID: Arc<RwLock<String>> = Arc::new(RwLock::new(String::new()));
46}
47
48#[derive(Clone)]
49pub struct PendingWrites {
50    // Map path -> Expiration Time (when we stop ignoring it)
51    paths: Arc<Mutex<HashMap<String, std::time::Instant>>>,
52}
53
54impl PendingWrites {
55    pub fn new() -> Self {
56        Self {
57            paths: Arc::new(Mutex::new(HashMap::new())),
58        }
59    }
60
61    fn normalize(path: &std::path::Path) -> String {
62        path.to_string_lossy().to_lowercase().replace('\\', "/")
63    }
64
65    pub fn add(&self, path: PathBuf) {
66        // Ignore events for this path for 100ms
67        let expiry = std::time::Instant::now() + Duration::from_millis(100);
68        self.paths
69            .lock()
70            .unwrap()
71            .insert(Self::normalize(&path), expiry);
72    }
73
74    pub fn remove(&self, path: &PathBuf) {
75        self.paths.lock().unwrap().remove(&Self::normalize(path));
76    }
77
78    pub fn should_ignore(&self, path: &PathBuf) -> bool {
79        let mut paths = self.paths.lock().unwrap();
80        let key = Self::normalize(path);
81
82        if let Some(&expiry) = paths.get(&key) {
83            if std::time::Instant::now() < expiry {
84                return true; // Still within ignore window
85            } else {
86                paths.remove(&key); // Expired, cleanup
87                return false;
88            }
89        }
90        false
91    }
92}
93
94#[derive(Clone)]
95pub struct ActivityTracker {
96    // Map URL -> Last Activity Time
97    activity: Arc<Mutex<HashMap<String, std::time::Instant>>>,
98}
99
100impl ActivityTracker {
101    pub fn new() -> Self {
102        Self {
103            activity: Arc::new(Mutex::new(HashMap::new())),
104        }
105    }
106
107    pub fn mark(&self, url: &str) {
108        let mut activity = self.activity.lock().unwrap();
109        activity.insert(url.to_string(), std::time::Instant::now());
110    }
111
112    pub fn is_active(&self, url: &str) -> bool {
113        // Log is active if there was activity in the last 10 minutes
114        let activity = self.activity.lock().unwrap();
115        if let Some(&last_time) = activity.get(url) {
116            std::time::Instant::now().duration_since(last_time) < Duration::from_secs(600)
117        } else {
118            false
119        }
120    }
121}
122
123pub async fn run_daemon(port: u16) -> Result<()> {
124    let mut config = Config::load().await?;
125    config.port = port;
126
127    // Filter out dead domains
128    config.sync.retain(|url, _| {
129        !url.contains("mail.braid.org")
130//             && !url.contains("braid.org/tino")
131            && !url.contains("braid.org/tino_test")
132            && !url.contains("braid.org/main")
133            && !url.contains("braid.org/about")
134            && !url.contains("braid.org/wiki")
135            && !url.contains("braid.org/xfmail")
136            && !url.contains("braid.org/editing")
137            && !url.contains("braid.org/127_xfmail")
138            && url != "https://braid.org/"
139            && url != "https://braid.org"
140    });
141
142    config.save().await?;
143    let config = Arc::new(RwLock::new(config));
144
145    // Set global PEER_ID from config
146    {
147        let cfg = config.read().await;
148        let mut id = PEER_ID.write().await;
149        *id = cfg.peer_id.clone();
150    }
151
152    let content_cache = Arc::new(RwLock::new(std::collections::HashMap::new()));
153
154    // Initialize Merge Registry
155    let mut merge_registry = crate::core::merge::MergeTypeRegistry::new();
156    merge_registry.register("antimatter", |id| {
157        Box::new(crate::core::merge::AntimatterMergeType::new_native(id))
158    });
159    merge_registry.register("diamond", |id| {
160        Box::new(crate::core::merge::DiamondMergeType::new(id))
161    });
162    merge_registry.register("dt", |id| {
163        Box::new(crate::core::merge::DiamondMergeType::new(id))
164    });
165    let merge_registry = Arc::new(merge_registry);
166    let active_merges = Arc::new(RwLock::new(HashMap::new()));
167
168    // Cache Warming AND Metadata Stubbing
169    {
170        let cfg = config.read().await;
171        let mut cache = content_cache.write().await;
172        for (url, enabled) in &cfg.sync {
173            if let Ok(path) = mapping::url_to_path(url) {
174                if path.exists() {
175                    // Cache warming for existing files
176                    if *enabled {
177                        if let Ok(content) = tokio::fs::read_to_string(&path).await {
178                            tracing::info!("[BraidFS] Cache warming for {} from {:?}", url, path);
179                            cache.insert(url.clone(), content);
180                        }
181                    }
182                } else {
183                    // Metadata Stubbing: Create empty file if it doesn't exist
184                    tracing::info!("[Discovery] Creating stub for {}", url);
185                    if let Some(parent) = path.parent() {
186                        let _ = tokio::fs::create_dir_all(parent).await;
187                    }
188                    if let Err(e) = tokio::fs::write(&path, "").await {
189                        tracing::error!("[Discovery] Failed to create stub for {}: {}", url, e);
190                    }
191                }
192            }
193        }
194    }
195
196    let version_store = VersionStore::load().await?;
197    let version_store = Arc::new(RwLock::new(version_store));
198
199    let root_dir =
200        config::get_root_dir().map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
201    tokio::fs::create_dir_all(&root_dir)
202        .await
203        .map_err(|e| crate::core::BraidError::Io(e))?;
204
205    tracing::info!("BraidFS root: {:?}", root_dir);
206
207    let pending_writes = PendingWrites::new();
208    let activity_tracker = ActivityTracker::new();
209
210    // Setup file watcher
211    let (tx_fs, mut rx_fs) = tokio::sync::mpsc::channel(100);
212    let tx_fs_watcher = tx_fs.clone();
213    let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| match res {
214        Ok(event) => {
215            let _ = tx_fs_watcher.blocking_send(event);
216        }
217        Err(e) => tracing::error!("Watch error: {:?}", e),
218    })?;
219
220    watcher.watch(&root_dir, RecursiveMode::Recursive)?;
221
222    let (tx_cmd, rx_cmd) = async_channel::unbounded::<Command>();
223    let rate_limiter = Arc::new(ReconnectRateLimiter::new(100));
224    let scan_state = Arc::new(RwLock::new(ScanState::new()));
225
226    // Initialize BlobStore
227    let braidfs_dir = root_dir.join(".braidfs");
228    let blob_store = Arc::new(
229        crate::blob::BlobStore::new(braidfs_dir.join("blobs"), braidfs_dir.join("meta.sqlite"))
230            .await
231            .map_err(|e| crate::core::BraidError::Anyhow(e.to_string()))?,
232    );
233
234    // Initialize Inode DB
235    let inode_db_path = braidfs_dir.join("inodes.sqlite");
236    let inode_conn = Connection::open(&inode_db_path)
237        .map_err(|e| crate::core::BraidError::Fs(format!("Failed to open inode DB: {}", e)))?;
238    inode_conn
239        .execute(
240            "CREATE TABLE IF NOT EXISTS inodes (
241            id INTEGER PRIMARY KEY,
242            path TEXT UNIQUE NOT NULL
243        )",
244            [],
245        )
246        .map_err(|e| {
247            crate::core::BraidError::Fs(format!("Failed to create inodes table: {}", e))
248        })?;
249    let inode_db = Arc::new(parking_lot::Mutex::new(inode_conn));
250
251    let binary_sync_manager = BinarySyncManager::new(rate_limiter.clone(), blob_store.clone())
252        .map_err(|e| crate::core::BraidError::Anyhow(e.to_string()))?;
253    let binary_sync_manager = Arc::new(binary_sync_manager);
254
255    // Track recently failed syncs to avoid log spam
256    let failed_syncs = Arc::new(RwLock::new(HashMap::new()));
257
258    // Track synced URLs for scanner
259    let sync_urls_map = Arc::new(RwLock::new({
260        let cfg = config.read().await;
261        cfg.sync
262            .iter()
263            .map(|(u, e)| (u.clone(), *e))
264            .collect::<HashMap<String, bool>>()
265    }));
266
267    // Start scan loop
268    let scan_state_clone = scan_state.clone();
269    let sync_urls_clone = sync_urls_map.clone();
270    let tx_fs_clone = tx_fs.clone();
271    tokio::spawn(async move {
272        start_scan_loop(
273            scan_state_clone,
274            sync_urls_clone,
275            Duration::from_secs(10), // Scan every 10s for more responsiveness during testing
276            move |path| {
277                tracing::info!("Scanner detected change in {:?}, triggering sync", path);
278                // Send a fake event to the FS watcher channel to trigger standard sync logic
279                let mut event =
280                    notify::Event::new(notify::EventKind::Modify(notify::event::ModifyKind::Any));
281                event.paths.push(path);
282                let _ = tx_fs_clone.blocking_send(event);
283            },
284        )
285        .await;
286    });
287
288    let braid_client = BraidClient::new()?;
289
290    let state = DaemonState {
291        config,
292        content_cache: content_cache.clone(), // Fix for later access
293        version_store: version_store.clone(),
294        tracker: activity_tracker,
295        merge_registry,
296        active_merges,
297        pending: pending_writes,
298        client: braid_client,
299        failed_syncs,
300        binary_sync: binary_sync_manager,
301        inode_db,
302        tx_cmd: tx_cmd.clone(),
303        debouncer: Arc::new(debouncer::DebouncedSyncManager::new_placeholder()), // Placeholder to fix circularity
304    };
305
306    // Initialize the real debouncer with the state
307    let debouncer = debouncer::DebouncedSyncManager::new(state.clone(), 100);
308
309    // Update state with the real debouncer
310    let mut state = state;
311    state.debouncer = debouncer;
312
313    let state_server = state.clone();
314    tokio::spawn(async move {
315        if let Err(e) = run_server(port, state_server).await {
316            tracing::error!("API Server crashed: {}", e);
317        }
318    });
319
320    // ---------------------------------------------------------
321    // Interactive Console (for Token/Cookie Entry)
322    // ---------------------------------------------------------
323    let tx_console = state.tx_cmd.clone();
324    tokio::spawn(async move {
325        use tokio::io::{self, AsyncBufReadExt, BufReader};
326        let mut reader = BufReader::new(io::stdin()).lines();
327
328        println!("\n[BraidFS CONSOLE] Ready for commands.");
329        println!("Available: token <domain> <value>  (e.g. token braid.org ud8zp...)");
330        println!("           sync <url>               (e.g. sync https://braid.org/tino)");
331
332        while let Ok(Some(line)) = reader.next_line().await {
333            let parts: Vec<&str> = line.split_whitespace().collect();
334            if parts.is_empty() {
335                continue;
336            }
337
338            match parts[0] {
339                "token" | "cookie" if parts.len() >= 3 => {
340                    let domain = parts[1].to_string();
341                    let value = parts[2].to_string();
342                    let _ = tx_console.send(Command::SetCookie { domain, value }).await;
343                    println!("[BraidFS] Cookie updated for {}", parts[1]);
344                }
345                "sync" if parts.len() >= 2 => {
346                    let url = parts[1].to_string();
347                    let _ = tx_console.send(Command::Sync { url }).await;
348                    println!("[BraidFS] Sync triggered for {}", parts[1]);
349                }
350                "help" => {
351                    println!("Commands: token <domain> <value>, sync <url>");
352                }
353                _ => {
354                    println!(
355                        "[BraidFS] Unknown command: {}. Try 'token' or 'sync'.",
356                        parts[0]
357                    );
358                }
359            }
360        }
361    });
362
363    let mut nfs_handle: Option<tokio::task::JoinHandle<()>> = None;
364
365    let mut subscriptions: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
366
367    /* Initial subscriptions - Disabled for On-Demand Architecture
368    {
369        let cfg = state.config.read().await;
370        for (url, enabled) in &cfg.sync {
371            if *enabled {
372                spawn_subscription(url.clone(), &mut subscriptions, state.clone()).await;
373            }
374        }
375    }
376    */
377
378    #[cfg(feature = "nfs")]
379    let mut active_mount_point: Option<String> = None;
380
381    // Main Event Loop
382    loop {
383        tokio::select! {
384            _ = tokio::signal::ctrl_c() => {
385                tracing::info!("Shutdown signal received");
386                #[cfg(feature = "nfs")]
387                if let Some(mp) = active_mount_point.take() {
388                    tracing::info!("Unmounting {}...", mp);
389                    let _ = mount::unmount(std::path::Path::new(&mp));
390                }
391                if let Some(handle) = nfs_handle.take() {
392                    handle.abort();
393                }
394                break;
395            }
396
397            Some(event) = rx_fs.recv() => {
398                handle_fs_event(event, state.clone()).await;
399            }
400
401            Ok(cmd) = rx_cmd.recv() => {
402                match cmd {
403                    Command::Sync { url } => {
404                        tracing::info!("Enable Sync: {}", url);
405                        {
406                            let mut cfg = state.config.write().await;
407                            cfg.sync.insert(url.clone(), true);
408                            let _ = cfg.save().await;
409                        }
410                        spawn_subscription(url.clone(), &mut subscriptions, state.clone()).await;
411
412                        if binary_sync::should_use_binary_sync(&url) {
413                            let bsm = state.binary_sync.clone();
414                            let url_clone = url.clone();
415                            let root = config::get_root_dir()?;
416                            let fullpath = root.join(url.trim_start_matches('/'));
417                            tokio::spawn(async move {
418                                let _ = bsm.init_binary_sync(&url_clone, &fullpath).await;
419                            });
420                        }
421                        sync_urls_map.write().await.insert(url, true);
422                    }
423                    Command::Unsync { url } => {
424                        tracing::info!("Disable Sync: {}", url);
425                        {
426                            let mut cfg = state.config.write().await;
427                            cfg.sync.remove(&url);
428                            let _ = cfg.save().await;
429                        }
430                        if let Some(handle) = subscriptions.remove(&url) {
431                            handle.abort();
432                        }
433                        sync_urls_map.write().await.remove(&url);
434                    }
435                    Command::SetCookie { domain, value } => {
436                        tracing::info!("Set Cookie: {} for {}", value, domain);
437                        let mut cfg = state.config.write().await;
438                        cfg.cookies.insert(domain, value);
439                        let _ = cfg.save().await;
440                    }
441                    Command::SetIdentity { domain, email } => {
442                        tracing::info!("Set Identity: {} for {}", email, domain);
443                        let mut cfg = state.config.write().await;
444                        cfg.identities.insert(domain, email);
445                        let _ = cfg.save().await;
446                    }
447                    #[cfg(feature = "nfs")]
448                    Command::Mount { port, mount_point } => {
449                        if nfs_handle.is_some() {
450                            tracing::warn!("NFS Server already running");
451                        } else {
452                            let state_nfs = state.clone();
453                            let handle = tokio::spawn(async move {
454                                let backend = nfs::BraidNfsBackend::new(state_nfs.clone(), state_nfs.binary_sync.blob_store());
455                                tracing::info!("Starting NFS server on port {}", port);
456                                 match nfsserve::tcp::NFSTcpListener::bind(&format!("127.0.0.1:{}", port), backend).await {
457                                     Ok(listener) => {
458                                         use nfsserve::tcp::NFSTcp;
459                                         if let Err(e) = listener.handle_forever().await {
460                                             tracing::error!("NFS Server error: {}", e);
461                                         }
462                                     }
463                                     Err(e) => {
464                                         tracing::error!("Failed to bind NFS server to port {}: {}", port, e);
465                                     }
466                                 }
467                            });
468                            nfs_handle = Some(handle);
469
470                            // Trigger OS-level mount if requested
471                            if let Some(mp) = mount_point {
472                                tracing::info!("Triggering OS mount to {}...", mp);
473                                if let Err(e) = mount::mount(port, std::path::Path::new(&mp)) {
474                                    tracing::error!("Failed to mount: {}", e);
475                                } else {
476                                    active_mount_point = Some(mp);
477                                }
478                            }
479                        }
480                    }
481                    #[cfg(feature = "nfs")]
482                    Command::Unmount => {
483                        if let Some(mp) = active_mount_point.take() {
484                             tracing::info!("Unmounting {}...", mp);
485                             let _ = mount::unmount(std::path::Path::new(&mp));
486                        }
487                        if let Some(handle) = nfs_handle.take() {
488                            tracing::info!("Stopping NFS server");
489                            handle.abort();
490                        }
491                    }
492                }
493            }
494        }
495    }
496    Ok(())
497}