1pub mod detector;
2pub mod cache_warmer;
3
4use anyhow::Result;
5use sha2::{Digest, Sha256};
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use crate::storage::{Database, OperationLog};
10use crate::sync::{SyncManager, remote::connect_peer};
11use crate::crdt::Operation;
12
13#[derive(Debug, Clone)]
15pub struct RapidChange {
16 pub path: String,
18 pub time_us: u64,
20 pub sequence: u64,
22}
23
24#[derive(Debug, Clone)]
26pub struct QualityChange {
27 pub path: String,
29 pub operations: Vec<Operation>,
31 pub time_us: u64,
33 pub total_us: u64,
35}
36
37#[derive(Debug, Clone)]
39pub enum ForgeEvent {
40 Rapid {
42 path: String,
43 time_us: u64,
44 sequence: u64,
45 },
46 Quality {
48 path: String,
49 operations: Vec<Operation>,
50 time_us: u64,
51 total_us: u64,
52 },
53}
54
55pub struct ForgeWatcher {
57 pub repo_root: PathBuf,
58 pub oplog: Arc<OperationLog>,
59 pub actor_id: String,
60 pub repo_id: String,
61 pub sync_mgr: Option<Arc<SyncManager>>,
62}
63
64impl ForgeWatcher {
65 pub async fn new<P: Into<PathBuf>>(path: P, enable_sync: bool, peers: Vec<String>) -> Result<Self> {
67 let path_buf = path.into();
68 let repo_root = path_buf.canonicalize().unwrap_or(path_buf);
69 let forge_dir = repo_root.join(".dx/forge");
70
71 let db = Database::new(&forge_dir)?;
72 db.initialize()?;
73 let oplog = Arc::new(OperationLog::new(Arc::new(db)));
74
75 let config_raw = tokio::fs::read_to_string(forge_dir.join("config.json")).await?;
77 let config: serde_json::Value = serde_json::from_str(&config_raw)?;
78 let actor_id = config["actor_id"].as_str().unwrap().to_string();
79 let repo_id = config["repo_id"]
80 .as_str()
81 .map(|s| s.to_string())
82 .unwrap_or_else(|| {
83 let mut hasher = Sha256::new();
84 let path_string = repo_root.to_string_lossy().into_owned();
85 hasher.update(path_string.as_bytes());
86 format!("local-{:x}", hasher.finalize())
87 });
88
89 let sync_mgr = if enable_sync {
90 Some(Arc::new(SyncManager::new()))
91 } else {
92 None
93 };
94
95 if let (Some(mgr), true) = (&sync_mgr, !peers.is_empty()) {
97 for url in peers {
98 let _ = connect_peer(
99 &url,
100 actor_id.clone(),
101 repo_id.clone(),
102 mgr.as_ref().clone(),
103 oplog.clone(),
104 )
105 .await;
106 }
107 }
108
109 let _ = tokio::task::spawn_blocking({
111 let repo_root_clone = repo_root.clone();
112 move || cache_warmer::warm_cache(&repo_root_clone)
113 })
114 .await??;
115
116 Ok(Self {
117 repo_root,
118 oplog,
119 actor_id,
120 repo_id,
121 sync_mgr,
122 })
123 }
124
125 pub async fn run(self) -> Result<()> {
127 detector::start_watching(
128 self.repo_root,
129 self.oplog,
130 self.actor_id,
131 self.repo_id,
132 self.sync_mgr,
133 ).await
134 }
135}
136
137pub async fn watch(path: PathBuf, enable_sync: bool, peers: Vec<String>) -> Result<()> {
139 let watcher = ForgeWatcher::new(path, enable_sync, peers).await?;
140 watcher.run().await
141}