dx_forge/watcher_legacy/
mod.rs

1pub mod cache_warmer;
2pub mod detector;
3pub mod lsp_detector;
4
5use anyhow::Result;
6use colored::Colorize;
7use sha2::{Digest, Sha256};
8use std::path::PathBuf;
9use std::sync::Arc;
10
11use crate::crdt::Operation;
12use crate::storage::{Database, OperationLog};
13use crate::sync::{remote::connect_peer, SyncManager};
14
15/// Rapid change notification - ultra-fast (<35µs typical, 1-2µs best case)
16#[derive(Debug, Clone)]
17pub struct RapidChange {
18    /// File path that changed
19    pub path: String,
20    /// Detection time in microseconds (typically 1-2µs, max 35µs)
21    pub time_us: u64,
22    /// Sequence number for ordering
23    pub sequence: u64,
24}
25
26/// Quality change notification - detailed analysis (<60µs typical)
27#[derive(Debug, Clone)]
28pub struct QualityChange {
29    /// File path that changed
30    pub path: String,
31    /// Detected operations
32    pub operations: Vec<Operation>,
33    /// Detection time in microseconds (typically <60µs)
34    pub time_us: u64,
35    /// Total processing time (rapid + quality)
36    pub total_us: u64,
37}
38
39/// Forge event types emitted by the watcher
40#[derive(Debug, Clone)]
41pub enum ForgeEvent {
42    /// Rapid notification - immediate feedback (<35µs)
43    Rapid {
44        path: String,
45        time_us: u64,
46        sequence: u64,
47    },
48    /// Quality notification - full details (<60µs after rapid)
49    Quality {
50        path: String,
51        operations: Vec<Operation>,
52        time_us: u64,
53        total_us: u64,
54    },
55}
56
57/// Forge watcher - monitors file changes and emits rapid + quality events
58pub struct ForgeWatcher {
59    pub repo_root: PathBuf,
60    pub oplog: Arc<OperationLog>,
61    pub actor_id: String,
62    pub repo_id: String,
63    pub sync_mgr: Option<Arc<SyncManager>>,
64}
65
66impl ForgeWatcher {
67    /// Create a new forge watcher
68    pub async fn new<P: Into<PathBuf>>(
69        path: P,
70        enable_sync: bool,
71        peers: Vec<String>,
72    ) -> Result<Self> {
73        let path_buf = path.into();
74        let repo_root = path_buf.canonicalize().unwrap_or(path_buf);
75        let forge_dir = repo_root.join(".dx/forge");
76
77        let db = Database::new(&forge_dir)?;
78        db.initialize()?;
79        let oplog = Arc::new(OperationLog::new(Arc::new(db)));
80
81        // Load config
82        let config_raw = tokio::fs::read_to_string(forge_dir.join("config.json")).await?;
83        let config: serde_json::Value = serde_json::from_str(&config_raw)?;
84        let actor_id = config["actor_id"].as_str().unwrap().to_string();
85        let repo_id = config["repo_id"]
86            .as_str()
87            .map(|s| s.to_string())
88            .unwrap_or_else(|| {
89                let mut hasher = Sha256::new();
90                let path_string = repo_root.to_string_lossy().into_owned();
91                hasher.update(path_string.as_bytes());
92                format!("local-{:x}", hasher.finalize())
93            });
94
95        let sync_mgr = if enable_sync {
96            Some(Arc::new(SyncManager::new()))
97        } else {
98            None
99        };
100
101        // Connect to remote peers if provided
102        if let (Some(mgr), true) = (&sync_mgr, !peers.is_empty()) {
103            for url in peers {
104                let _ = connect_peer(
105                    &url,
106                    actor_id.clone(),
107                    repo_id.clone(),
108                    mgr.as_ref().clone(),
109                    oplog.clone(),
110                )
111                .await;
112            }
113        }
114
115        // Warm OS page cache
116        let _ = tokio::task::spawn_blocking({
117            let repo_root_clone = repo_root.clone();
118            move || cache_warmer::warm_cache(&repo_root_clone)
119        })
120        .await??;
121
122        Ok(Self {
123            repo_root,
124            oplog,
125            actor_id,
126            repo_id,
127            sync_mgr,
128        })
129    }
130
131    /// Run the watcher (blocking)
132    pub async fn run(self) -> Result<()> {
133        // Check if LSP support is available
134        let lsp_available = lsp_detector::detect_lsp_support().await?;
135
136        if lsp_available {
137            // Use LSP-based detection
138            lsp_detector::start_lsp_monitoring(
139                self.repo_root,
140                self.oplog,
141                self.actor_id,
142                self.sync_mgr,
143            )
144            .await
145        } else {
146                // Fall back to file system watching
147                tracing::info!(
148                    "{} {} mode (no LSP extension detected)",
149                    "👁️".bright_yellow(),
150                    "File watching".bright_cyan().bold()
151                );
152            detector::start_watching(
153                self.repo_root,
154                self.oplog,
155                self.actor_id,
156                self.repo_id,
157                self.sync_mgr,
158            )
159            .await
160        }
161    }
162}
163
164// Legacy function for backward compatibility
165pub async fn watch(path: PathBuf, enable_sync: bool, peers: Vec<String>) -> Result<()> {
166    let watcher = ForgeWatcher::new(path, enable_sync, peers).await?;
167    watcher.run().await
168}