use crate::core::{BraidClient, Result};
use crate::fs::api::run_server;
use crate::fs::binary_sync::BinarySyncManager;
use crate::fs::config::Config;
use crate::fs::rate_limiter::ReconnectRateLimiter;
use crate::fs::scanner::{start_scan_loop, ScanState};
use crate::fs::versions::VersionStore;
use notify::{Event, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::RwLock;
pub mod api;
pub mod binary_sync;
pub mod config;
pub mod diff;
pub mod mapping;
pub mod nfs;
pub mod rate_limiter;
pub mod scanner;
pub mod server_handlers;
pub mod state;
pub mod subscription;
pub mod sync;
pub mod versions;
pub mod watcher;
use state::{Command, DaemonState};
use subscription::spawn_subscription;
use watcher::handle_fs_event;
lazy_static::lazy_static! {
pub static ref PEER_ID: Arc<RwLock<String>> = Arc::new(RwLock::new(String::new()));
}
#[derive(Clone)]
pub struct PendingWrites {
paths: Arc<Mutex<HashMap<String, std::time::Instant>>>,
}
impl PendingWrites {
pub fn new() -> Self {
Self {
paths: Arc::new(Mutex::new(HashMap::new())),
}
}
fn normalize(path: &std::path::Path) -> String {
path.to_string_lossy().to_lowercase().replace('\\', "/")
}
pub fn add(&self, path: PathBuf) {
let expiry = std::time::Instant::now() + Duration::from_millis(100);
self.paths
.lock()
.unwrap()
.insert(Self::normalize(&path), expiry);
}
pub fn remove(&self, path: &PathBuf) {
self.paths.lock().unwrap().remove(&Self::normalize(path));
}
pub fn should_ignore(&self, path: &PathBuf) -> bool {
let mut paths = self.paths.lock().unwrap();
let key = Self::normalize(path);
if let Some(&expiry) = paths.get(&key) {
if std::time::Instant::now() < expiry {
return true; } else {
paths.remove(&key); return false;
}
}
false
}
}
#[derive(Clone)]
pub struct ActivityTracker {
activity: Arc<Mutex<HashMap<String, std::time::Instant>>>,
}
impl ActivityTracker {
pub fn new() -> Self {
Self {
activity: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn mark(&self, url: &str) {
let mut activity = self.activity.lock().unwrap();
activity.insert(url.to_string(), std::time::Instant::now());
}
pub fn is_active(&self, url: &str) -> bool {
let activity = self.activity.lock().unwrap();
if let Some(&last_time) = activity.get(url) {
std::time::Instant::now().duration_since(last_time) < Duration::from_secs(600)
} else {
false
}
}
}
pub async fn run_daemon(port: u16) -> Result<()> {
let mut config = Config::load().await?;
config.port = port;
if !config.sync.contains_key("https://braid.org/tino") {
config
.sync
.insert("https://braid.org/tino".to_string(), true);
}
config.save().await?;
let config = Arc::new(RwLock::new(config));
{
let cfg = config.read().await;
let mut id = PEER_ID.write().await;
*id = cfg.peer_id.clone();
}
let content_cache = Arc::new(RwLock::new(std::collections::HashMap::new()));
let mut merge_registry = crate::core::merge::MergeTypeRegistry::new();
merge_registry.register("antimatter", |id| {
Box::new(crate::core::merge::AntimatterMergeType::new_native(id))
});
let merge_registry = Arc::new(merge_registry);
let active_merges = Arc::new(RwLock::new(HashMap::new()));
{
let cfg = config.read().await;
let mut cache = content_cache.write().await;
for (url, enabled) in &cfg.sync {
if *enabled {
if let Ok(path) = mapping::url_to_path(url) {
if path.exists() {
if let Ok(content) = tokio::fs::read_to_string(&path).await {
tracing::info!("[BraidFS] Priming cache for {} from {:?}", url, path);
cache.insert(url.clone(), content);
}
}
}
}
}
}
let version_store = VersionStore::load().await?;
let version_store = Arc::new(RwLock::new(version_store));
let root_dir =
config::get_root_dir().map_err(|e| crate::core::BraidError::Fs(e.to_string()))?;
tokio::fs::create_dir_all(&root_dir)
.await
.map_err(|e| crate::core::BraidError::Io(e))?;
tracing::info!("BraidFS root: {:?}", root_dir);
let pending_writes = PendingWrites::new();
let activity_tracker = ActivityTracker::new();
let (tx_fs, mut rx_fs) = tokio::sync::mpsc::channel(100);
let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| match res {
Ok(event) => {
let _ = tx_fs.blocking_send(event);
}
Err(e) => tracing::error!("Watch error: {:?}", e),
})?;
watcher.watch(&root_dir, RecursiveMode::Recursive)?;
let (tx_cmd, rx_cmd) = async_channel::unbounded();
let rate_limiter = Arc::new(ReconnectRateLimiter::new(100));
let scan_state = Arc::new(RwLock::new(ScanState::new()));
let braidfs_dir = root_dir.join(".braidfs");
let blob_store = Arc::new(
crate::blob::BlobStore::new(braidfs_dir.join("blobs"), braidfs_dir.join("meta.sqlite"))
.await
.map_err(|e| crate::core::BraidError::Anyhow(e.to_string()))?,
);
let binary_sync_manager = BinarySyncManager::new(rate_limiter.clone(), blob_store.clone())
.map_err(|e| crate::core::BraidError::Anyhow(e.to_string()))?;
let binary_sync_manager = Arc::new(binary_sync_manager);
let failed_syncs = Arc::new(RwLock::new(HashMap::new()));
let sync_urls_map = Arc::new(RwLock::new({
let cfg = config.read().await;
cfg.sync
.iter()
.map(|(u, e)| (u.clone(), *e))
.collect::<HashMap<String, bool>>()
}));
let scan_state_clone = scan_state.clone();
let sync_urls_clone = sync_urls_map.clone();
tokio::spawn(async move {
start_scan_loop(
scan_state_clone,
sync_urls_clone,
Duration::from_secs(60),
|path| {
tracing::info!("Scanner detected change in {:?}", path);
},
)
.await;
});
let braid_client = BraidClient::new()?;
let state = DaemonState {
config,
content_cache,
version_store,
tracker: activity_tracker,
merge_registry,
active_merges,
pending: pending_writes,
client: braid_client,
failed_syncs,
binary_sync: binary_sync_manager,
tx_cmd,
};
let state_server = state.clone();
tokio::spawn(async move {
if let Err(e) = run_server(port, state_server).await {
tracing::error!("API Server crashed: {}", e);
}
});
let mut nfs_handle: Option<tokio::task::JoinHandle<()>> = None;
let mut subscriptions: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
{
let cfg = state.config.read().await;
for (url, enabled) in &cfg.sync {
if *enabled {
spawn_subscription(url.clone(), &mut subscriptions, state.clone()).await;
}
}
}
loop {
tokio::select! {
Some(event) = rx_fs.recv() => {
handle_fs_event(event, state.clone()).await;
}
Ok(cmd) = rx_cmd.recv() => {
match cmd {
Command::Sync { url } => {
tracing::info!("Enable Sync: {}", url);
{
let mut cfg = state.config.write().await;
cfg.sync.insert(url.clone(), true);
let _ = cfg.save().await;
}
spawn_subscription(url.clone(), &mut subscriptions, state.clone()).await;
if binary_sync::should_use_binary_sync(&url) {
let bsm = state.binary_sync.clone();
let url_clone = url.clone();
let root = config::get_root_dir()?;
let fullpath = root.join(url.trim_start_matches('/'));
tokio::spawn(async move {
let _ = bsm.init_binary_sync(&url_clone, &fullpath).await;
});
}
sync_urls_map.write().await.insert(url, true);
}
Command::Unsync { url } => {
tracing::info!("Disable Sync: {}", url);
{
let mut cfg = state.config.write().await;
cfg.sync.remove(&url);
let _ = cfg.save().await;
}
if let Some(handle) = subscriptions.remove(&url) {
handle.abort();
}
sync_urls_map.write().await.remove(&url);
}
Command::SetCookie { domain, value } => {
tracing::info!("Set Cookie: {} for {}", value, domain);
let mut cfg = state.config.write().await;
cfg.cookies.insert(domain, value);
let _ = cfg.save().await;
}
Command::SetIdentity { domain, email } => {
tracing::info!("Set Identity: {} for {}", email, domain);
let mut cfg = state.config.write().await;
cfg.identities.insert(domain, email);
let _ = cfg.save().await;
}
Command::Mount { port } => {
if nfs_handle.is_some() {
tracing::warn!("NFS Server already running");
} else {
let state_nfs = state.clone();
let handle = tokio::spawn(async move {
let backend = nfs::BraidNfsBackend::new(state_nfs.clone(), state_nfs.binary_sync.blob_store());
tracing::info!("Starting NFS server on port {}", port);
if let Ok(listener) = nfsserve::tcp::NFSTcpListener::bind(&format!("127.0.0.1:{}", port), backend).await {
use nfsserve::tcp::NFSTcp;
if let Err(e) = listener.handle_forever().await {
tracing::error!("NFS Server error: {}", e);
}
}
});
nfs_handle = Some(handle);
}
}
Command::Unmount => {
if let Some(handle) = nfs_handle.take() {
tracing::info!("Stopping NFS server");
handle.abort();
}
}
}
}
}
}
}