dx_forge/watcher/
mod.rs

1pub mod detector;
2pub mod cache_warmer;
3
4use anyhow::Result;
5use sha2::{Digest, Sha256};
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use crate::storage::{Database, OperationLog};
10use crate::sync::{SyncManager, remote::connect_peer};
11use crate::crdt::Operation;
12
13/// Rapid change notification - ultra-fast (<35µs typical, 1-2µs best case)
14#[derive(Debug, Clone)]
15pub struct RapidChange {
16    /// File path that changed
17    pub path: String,
18    /// Detection time in microseconds (typically 1-2µs, max 35µs)
19    pub time_us: u64,
20    /// Sequence number for ordering
21    pub sequence: u64,
22}
23
24/// Quality change notification - detailed analysis (<60µs typical)
25#[derive(Debug, Clone)]
26pub struct QualityChange {
27    /// File path that changed
28    pub path: String,
29    /// Detected operations
30    pub operations: Vec<Operation>,
31    /// Detection time in microseconds (typically <60µs)
32    pub time_us: u64,
33    /// Total processing time (rapid + quality)
34    pub total_us: u64,
35}
36
37/// Forge event types emitted by the watcher
38#[derive(Debug, Clone)]
39pub enum ForgeEvent {
40    /// Rapid notification - immediate feedback (<35µs)
41    Rapid {
42        path: String,
43        time_us: u64,
44        sequence: u64,
45    },
46    /// Quality notification - full details (<60µs after rapid)
47    Quality {
48        path: String,
49        operations: Vec<Operation>,
50        time_us: u64,
51        total_us: u64,
52    },
53}
54
55/// Forge watcher - monitors file changes and emits rapid + quality events
56pub struct ForgeWatcher {
57    pub repo_root: PathBuf,
58    pub oplog: Arc<OperationLog>,
59    pub actor_id: String,
60    pub repo_id: String,
61    pub sync_mgr: Option<Arc<SyncManager>>,
62}
63
64impl ForgeWatcher {
65    /// Create a new forge watcher
66    pub async fn new<P: Into<PathBuf>>(path: P, enable_sync: bool, peers: Vec<String>) -> Result<Self> {
67        let path_buf = path.into();
68        let repo_root = path_buf.canonicalize().unwrap_or(path_buf);
69        let forge_dir = repo_root.join(".dx/forge");
70
71        let db = Database::new(&forge_dir)?;
72        db.initialize()?;
73        let oplog = Arc::new(OperationLog::new(Arc::new(db)));
74
75        // Load config
76        let config_raw = tokio::fs::read_to_string(forge_dir.join("config.json")).await?;
77        let config: serde_json::Value = serde_json::from_str(&config_raw)?;
78        let actor_id = config["actor_id"].as_str().unwrap().to_string();
79        let repo_id = config["repo_id"]
80            .as_str()
81            .map(|s| s.to_string())
82            .unwrap_or_else(|| {
83                let mut hasher = Sha256::new();
84                let path_string = repo_root.to_string_lossy().into_owned();
85                hasher.update(path_string.as_bytes());
86                format!("local-{:x}", hasher.finalize())
87            });
88
89        let sync_mgr = if enable_sync {
90            Some(Arc::new(SyncManager::new()))
91        } else {
92            None
93        };
94
95        // Connect to remote peers if provided
96        if let (Some(mgr), true) = (&sync_mgr, !peers.is_empty()) {
97            for url in peers {
98                let _ = connect_peer(
99                    &url,
100                    actor_id.clone(),
101                    repo_id.clone(),
102                    mgr.as_ref().clone(),
103                    oplog.clone(),
104                )
105                .await;
106            }
107        }
108
109        // Warm OS page cache
110        let _ = tokio::task::spawn_blocking({
111            let repo_root_clone = repo_root.clone();
112            move || cache_warmer::warm_cache(&repo_root_clone)
113        })
114        .await??;
115
116        Ok(Self {
117            repo_root,
118            oplog,
119            actor_id,
120            repo_id,
121            sync_mgr,
122        })
123    }
124    
125    /// Run the watcher (blocking)
126    pub async fn run(self) -> Result<()> {
127        detector::start_watching(
128            self.repo_root,
129            self.oplog,
130            self.actor_id,
131            self.repo_id,
132            self.sync_mgr,
133        ).await
134    }
135}
136
137// Legacy function for backward compatibility
138pub async fn watch(path: PathBuf, enable_sync: bool, peers: Vec<String>) -> Result<()> {
139    let watcher = ForgeWatcher::new(path, enable_sync, peers).await?;
140    watcher.run().await
141}