1use crate::core::{BraidClient, Result};
6use crate::fs::api::run_server;
7use crate::fs::binary_sync::BinarySyncManager;
8use crate::fs::config::Config;
9use crate::fs::rate_limiter::ReconnectRateLimiter;
10use crate::fs::scanner::{start_scan_loop, ScanState};
11use crate::fs::versions::VersionStore;
12use notify::{Event, RecursiveMode, Watcher};
13use rusqlite::Connection;
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18use tokio::sync::RwLock;
19
20pub mod api;
21pub mod binary_sync;
22pub mod blob_handlers;
23pub mod config;
24pub mod debouncer;
25pub mod diff;
26pub mod mapping;
27#[cfg(feature = "nfs")]
28pub mod mount;
29#[cfg(feature = "nfs")]
30pub mod nfs;
31pub mod rate_limiter;
32pub mod scanner;
33pub mod server_handlers;
34pub mod state;
35pub mod subscription;
36pub mod sync;
37pub mod versions;
38pub mod watcher;
39
40use state::{Command, DaemonState};
41use subscription::spawn_subscription;
42use watcher::handle_fs_event;
43
44lazy_static::lazy_static! {
45 pub static ref PEER_ID: Arc<RwLock<String>> = Arc::new(RwLock::new(String::new()));
46}
47
48#[derive(Clone)]
49pub struct PendingWrites {
50 paths: Arc<Mutex<HashMap<String, std::time::Instant>>>,
52}
53
54impl PendingWrites {
55 pub fn new() -> Self {
56 Self {
57 paths: Arc::new(Mutex::new(HashMap::new())),
58 }
59 }
60
61 fn normalize(path: &std::path::Path) -> String {
62 path.to_string_lossy().to_lowercase().replace('\\', "/")
63 }
64
65 pub fn add(&self, path: PathBuf) {
66 let expiry = std::time::Instant::now() + Duration::from_millis(100);
68 self.paths
69 .lock()
70 .unwrap()
71 .insert(Self::normalize(&path), expiry);
72 }
73
74 pub fn remove(&self, path: &PathBuf) {
75 self.paths.lock().unwrap().remove(&Self::normalize(path));
76 }
77
78 pub fn should_ignore(&self, path: &PathBuf) -> bool {
79 let mut paths = self.paths.lock().unwrap();
80 let key = Self::normalize(path);
81
82 if let Some(&expiry) = paths.get(&key) {
83 if std::time::Instant::now() < expiry {
84 return true; } else {
86 paths.remove(&key); return false;
88 }
89 }
90 false
91 }
92}
93
94#[derive(Clone)]
95pub struct ActivityTracker {
96 activity: Arc<Mutex<HashMap<String, std::time::Instant>>>,
98}
99
100impl ActivityTracker {
101 pub fn new() -> Self {
102 Self {
103 activity: Arc::new(Mutex::new(HashMap::new())),
104 }
105 }
106
107 pub fn mark(&self, url: &str) {
108 let mut activity = self.activity.lock().unwrap();
109 activity.insert(url.to_string(), std::time::Instant::now());
110 }
111
112 pub fn is_active(&self, url: &str) -> bool {
113 let activity = self.activity.lock().unwrap();
115 if let Some(&last_time) = activity.get(url) {
116 std::time::Instant::now().duration_since(last_time) < Duration::from_secs(600)
117 } else {
118 false
119 }
120 }
121}
122
123pub async fn run_daemon(port: u16) -> Result<()> {
124 let mut config = Config::load().await?;
125 config.port = port;
126
127 config.sync.retain(|url, _| {
129 !url.contains("mail.braid.org")
130&& !url.contains("braid.org/tino_test")
132 && !url.contains("braid.org/main")
133 && !url.contains("braid.org/about")
134 && !url.contains("braid.org/wiki")
135 && !url.contains("braid.org/xfmail")
136 && !url.contains("braid.org/editing")
137 && !url.contains("braid.org/127_xfmail")
138 && url != "https://braid.org/"
139 && url != "https://braid.org"
140 });
141
142 config.save().await?;
143 let config = Arc::new(RwLock::new(config));
144
145 {
147 let cfg = config.read().await;
148 let mut id = PEER_ID.write().await;
149 *id = cfg.peer_id.clone();
150 }
151
152 let content_cache = Arc::new(RwLock::new(std::collections::HashMap::new()));
153
154 let mut merge_registry = crate::core::merge::MergeTypeRegistry::new();
156 merge_registry.register("antimatter", |id| {
157 Box::new(crate::core::merge::AntimatterMergeType::new_native(id))
158 });
159 merge_registry.register("diamond", |id| {
160 Box::new(crate::core::merge::DiamondMergeType::new(id))
161 });
162 merge_registry.register("dt", |id| {
163 Box::new(crate::core::merge::DiamondMergeType::new(id))
164 });
165 let merge_registry = Arc::new(merge_registry);
166 let active_merges = Arc::new(RwLock::new(HashMap::new()));
167
168 {
170 let cfg = config.read().await;
171 let mut cache = content_cache.write().await;
172 for (url, enabled) in &cfg.sync {
173 if let Ok(path) = mapping::url_to_path(url) {
174 if path.exists() {
175 if *enabled {
177 if let Ok(content) = tokio::fs::read_to_string(&path).await {
178 tracing::info!("[BraidFS] Cache warming for {} from {:?}", url, path);
179 cache.insert(url.clone(), content);
180 }
181 }
182 } else {
183 tracing::info!("[Discovery] Creating stub for {}", url);
185 if let Some(parent) = path.parent() {
186 let _ = tokio::fs::create_dir_all(parent).await;
187 }
188 if let Err(e) = tokio::fs::write(&path, "").await {
189 tracing::error!("[Discovery] Failed to create stub for {}: {}", url, e);
190 }
191 }
192 }
193 }
194 }
195
196 let version_store = VersionStore::load().await?;
197 let version_store = Arc::new(RwLock::new(version_store));
198
199 let root_dir =
200 config::get_root_dir().map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
201 tokio::fs::create_dir_all(&root_dir)
202 .await
203 .map_err(|e| crate::core::BraidError::Io(e))?;
204
205 tracing::info!("BraidFS root: {:?}", root_dir);
206
207 let pending_writes = PendingWrites::new();
208 let activity_tracker = ActivityTracker::new();
209
210 let (tx_fs, mut rx_fs) = tokio::sync::mpsc::channel(100);
212 let tx_fs_watcher = tx_fs.clone();
213 let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| match res {
214 Ok(event) => {
215 let _ = tx_fs_watcher.blocking_send(event);
216 }
217 Err(e) => tracing::error!("Watch error: {:?}", e),
218 })?;
219
220 watcher.watch(&root_dir, RecursiveMode::Recursive)?;
221
222 let (tx_cmd, rx_cmd) = async_channel::unbounded::<Command>();
223 let rate_limiter = Arc::new(ReconnectRateLimiter::new(100));
224 let scan_state = Arc::new(RwLock::new(ScanState::new()));
225
226 let braidfs_dir = root_dir.join(".braidfs");
228 let blob_store = Arc::new(
229 crate::blob::BlobStore::new(braidfs_dir.join("blobs"), braidfs_dir.join("meta.sqlite"))
230 .await
231 .map_err(|e| crate::core::BraidError::Anyhow(e.to_string()))?,
232 );
233
234 let inode_db_path = braidfs_dir.join("inodes.sqlite");
236 let inode_conn = Connection::open(&inode_db_path)
237 .map_err(|e| crate::core::BraidError::Fs(format!("Failed to open inode DB: {}", e)))?;
238 inode_conn
239 .execute(
240 "CREATE TABLE IF NOT EXISTS inodes (
241 id INTEGER PRIMARY KEY,
242 path TEXT UNIQUE NOT NULL
243 )",
244 [],
245 )
246 .map_err(|e| {
247 crate::core::BraidError::Fs(format!("Failed to create inodes table: {}", e))
248 })?;
249 let inode_db = Arc::new(parking_lot::Mutex::new(inode_conn));
250
251 let binary_sync_manager = BinarySyncManager::new(rate_limiter.clone(), blob_store.clone())
252 .map_err(|e| crate::core::BraidError::Anyhow(e.to_string()))?;
253 let binary_sync_manager = Arc::new(binary_sync_manager);
254
255 let failed_syncs = Arc::new(RwLock::new(HashMap::new()));
257
258 let sync_urls_map = Arc::new(RwLock::new({
260 let cfg = config.read().await;
261 cfg.sync
262 .iter()
263 .map(|(u, e)| (u.clone(), *e))
264 .collect::<HashMap<String, bool>>()
265 }));
266
267 let scan_state_clone = scan_state.clone();
269 let sync_urls_clone = sync_urls_map.clone();
270 let tx_fs_clone = tx_fs.clone();
271 tokio::spawn(async move {
272 start_scan_loop(
273 scan_state_clone,
274 sync_urls_clone,
275 Duration::from_secs(10), move |path| {
277 tracing::info!("Scanner detected change in {:?}, triggering sync", path);
278 let mut event =
280 notify::Event::new(notify::EventKind::Modify(notify::event::ModifyKind::Any));
281 event.paths.push(path);
282 let _ = tx_fs_clone.blocking_send(event);
283 },
284 )
285 .await;
286 });
287
288 let braid_client = BraidClient::new()?;
289
290 let state = DaemonState {
291 config,
292 content_cache: content_cache.clone(), version_store: version_store.clone(),
294 tracker: activity_tracker,
295 merge_registry,
296 active_merges,
297 pending: pending_writes,
298 client: braid_client,
299 failed_syncs,
300 binary_sync: binary_sync_manager,
301 inode_db,
302 tx_cmd: tx_cmd.clone(),
303 debouncer: Arc::new(debouncer::DebouncedSyncManager::new_placeholder()), };
305
306 let debouncer = debouncer::DebouncedSyncManager::new(state.clone(), 100);
308
309 let mut state = state;
311 state.debouncer = debouncer;
312
313 let state_server = state.clone();
314 tokio::spawn(async move {
315 if let Err(e) = run_server(port, state_server).await {
316 tracing::error!("API Server crashed: {}", e);
317 }
318 });
319
320 let tx_console = state.tx_cmd.clone();
324 tokio::spawn(async move {
325 use tokio::io::{self, AsyncBufReadExt, BufReader};
326 let mut reader = BufReader::new(io::stdin()).lines();
327
328 println!("\n[BraidFS CONSOLE] Ready for commands.");
329 println!("Available: token <domain> <value> (e.g. token braid.org ud8zp...)");
330 println!(" sync <url> (e.g. sync https://braid.org/tino)");
331
332 while let Ok(Some(line)) = reader.next_line().await {
333 let parts: Vec<&str> = line.split_whitespace().collect();
334 if parts.is_empty() {
335 continue;
336 }
337
338 match parts[0] {
339 "token" | "cookie" if parts.len() >= 3 => {
340 let domain = parts[1].to_string();
341 let value = parts[2].to_string();
342 let _ = tx_console.send(Command::SetCookie { domain, value }).await;
343 println!("[BraidFS] Cookie updated for {}", parts[1]);
344 }
345 "sync" if parts.len() >= 2 => {
346 let url = parts[1].to_string();
347 let _ = tx_console.send(Command::Sync { url }).await;
348 println!("[BraidFS] Sync triggered for {}", parts[1]);
349 }
350 "help" => {
351 println!("Commands: token <domain> <value>, sync <url>");
352 }
353 _ => {
354 println!(
355 "[BraidFS] Unknown command: {}. Try 'token' or 'sync'.",
356 parts[0]
357 );
358 }
359 }
360 }
361 });
362
363 let mut nfs_handle: Option<tokio::task::JoinHandle<()>> = None;
364
365 let mut subscriptions: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
366
367 #[cfg(feature = "nfs")]
379 let mut active_mount_point: Option<String> = None;
380
381 loop {
383 tokio::select! {
384 _ = tokio::signal::ctrl_c() => {
385 tracing::info!("Shutdown signal received");
386 #[cfg(feature = "nfs")]
387 if let Some(mp) = active_mount_point.take() {
388 tracing::info!("Unmounting {}...", mp);
389 let _ = mount::unmount(std::path::Path::new(&mp));
390 }
391 if let Some(handle) = nfs_handle.take() {
392 handle.abort();
393 }
394 break;
395 }
396
397 Some(event) = rx_fs.recv() => {
398 handle_fs_event(event, state.clone()).await;
399 }
400
401 Ok(cmd) = rx_cmd.recv() => {
402 match cmd {
403 Command::Sync { url } => {
404 tracing::info!("Enable Sync: {}", url);
405 {
406 let mut cfg = state.config.write().await;
407 cfg.sync.insert(url.clone(), true);
408 let _ = cfg.save().await;
409 }
410 spawn_subscription(url.clone(), &mut subscriptions, state.clone()).await;
411
412 if binary_sync::should_use_binary_sync(&url) {
413 let bsm = state.binary_sync.clone();
414 let url_clone = url.clone();
415 let root = config::get_root_dir()?;
416 let fullpath = root.join(url.trim_start_matches('/'));
417 tokio::spawn(async move {
418 let _ = bsm.init_binary_sync(&url_clone, &fullpath).await;
419 });
420 }
421 sync_urls_map.write().await.insert(url, true);
422 }
423 Command::Unsync { url } => {
424 tracing::info!("Disable Sync: {}", url);
425 {
426 let mut cfg = state.config.write().await;
427 cfg.sync.remove(&url);
428 let _ = cfg.save().await;
429 }
430 if let Some(handle) = subscriptions.remove(&url) {
431 handle.abort();
432 }
433 sync_urls_map.write().await.remove(&url);
434 }
435 Command::SetCookie { domain, value } => {
436 tracing::info!("Set Cookie: {} for {}", value, domain);
437 let mut cfg = state.config.write().await;
438 cfg.cookies.insert(domain, value);
439 let _ = cfg.save().await;
440 }
441 Command::SetIdentity { domain, email } => {
442 tracing::info!("Set Identity: {} for {}", email, domain);
443 let mut cfg = state.config.write().await;
444 cfg.identities.insert(domain, email);
445 let _ = cfg.save().await;
446 }
447 #[cfg(feature = "nfs")]
448 Command::Mount { port, mount_point } => {
449 if nfs_handle.is_some() {
450 tracing::warn!("NFS Server already running");
451 } else {
452 let state_nfs = state.clone();
453 let handle = tokio::spawn(async move {
454 let backend = nfs::BraidNfsBackend::new(state_nfs.clone(), state_nfs.binary_sync.blob_store());
455 tracing::info!("Starting NFS server on port {}", port);
456 match nfsserve::tcp::NFSTcpListener::bind(&format!("127.0.0.1:{}", port), backend).await {
457 Ok(listener) => {
458 use nfsserve::tcp::NFSTcp;
459 if let Err(e) = listener.handle_forever().await {
460 tracing::error!("NFS Server error: {}", e);
461 }
462 }
463 Err(e) => {
464 tracing::error!("Failed to bind NFS server to port {}: {}", port, e);
465 }
466 }
467 });
468 nfs_handle = Some(handle);
469
470 if let Some(mp) = mount_point {
472 tracing::info!("Triggering OS mount to {}...", mp);
473 if let Err(e) = mount::mount(port, std::path::Path::new(&mp)) {
474 tracing::error!("Failed to mount: {}", e);
475 } else {
476 active_mount_point = Some(mp);
477 }
478 }
479 }
480 }
481 #[cfg(feature = "nfs")]
482 Command::Unmount => {
483 if let Some(mp) = active_mount_point.take() {
484 tracing::info!("Unmounting {}...", mp);
485 let _ = mount::unmount(std::path::Path::new(&mp));
486 }
487 if let Some(handle) = nfs_handle.take() {
488 tracing::info!("Stopping NFS server");
489 handle.abort();
490 }
491 }
492 }
493 }
494 }
495 }
496 Ok(())
497}