1use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use lago_core::LagoResult;
12use lago_core::event::EventPayload;
13use lago_store::BlobStore;
14
15use crate::diff::{self, DiffEntry};
16use crate::manifest::Manifest;
17use crate::snapshot;
18
19pub struct FsTracker {
23 manifest: Mutex<Manifest>,
24 blob_store: Arc<BlobStore>,
25}
26
27impl FsTracker {
28 pub fn new(manifest: Manifest, blob_store: Arc<BlobStore>) -> Self {
30 Self {
31 manifest: Mutex::new(manifest),
32 blob_store,
33 }
34 }
35
36 pub fn track_write(
39 &self,
40 rel_path: &str,
41 content: &[u8],
42 content_type: Option<String>,
43 ) -> LagoResult<EventPayload> {
44 let blob_hash = self.blob_store.put(content)?;
45 let size_bytes = content.len() as u64;
46 let timestamp = now_micros();
47
48 let mut manifest = self.manifest.lock().unwrap();
49 manifest.apply_write(
50 rel_path.to_string(),
51 blob_hash.clone(),
52 size_bytes,
53 content_type.clone(),
54 timestamp,
55 );
56
57 Ok(EventPayload::FileWrite {
58 path: rel_path.to_string(),
59 blob_hash: blob_hash.into(),
60 size_bytes,
61 content_type,
62 })
63 }
64
65 pub fn track_delete(&self, rel_path: &str) -> LagoResult<EventPayload> {
68 let mut manifest = self.manifest.lock().unwrap();
69 manifest.apply_delete(rel_path);
70
71 Ok(EventPayload::FileDelete {
72 path: rel_path.to_string(),
73 })
74 }
75
76 pub fn reconcile(&self, workspace_root: &Path) -> LagoResult<Vec<EventPayload>> {
81 let mut manifest = self.manifest.lock().unwrap();
82 let new_manifest = snapshot::snapshot(workspace_root, &manifest, &self.blob_store)?;
83 let diffs = diff::diff(&manifest, &new_manifest);
84
85 *manifest = new_manifest;
87
88 let payloads = diffs
89 .into_iter()
90 .map(|d| match d {
91 DiffEntry::Added { path, entry }
92 | DiffEntry::Modified {
93 path, new: entry, ..
94 } => EventPayload::FileWrite {
95 path,
96 blob_hash: entry.blob_hash.into(),
97 size_bytes: entry.size_bytes,
98 content_type: entry.content_type,
99 },
100 DiffEntry::Removed { path, .. } => EventPayload::FileDelete { path },
101 })
102 .collect();
103
104 Ok(payloads)
105 }
106
107 pub fn manifest(&self) -> Manifest {
109 self.manifest.lock().unwrap().clone()
110 }
111}
112
113fn now_micros() -> u64 {
114 std::time::SystemTime::now()
115 .duration_since(std::time::UNIX_EPOCH)
116 .unwrap_or_default()
117 .as_micros() as u64
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use lago_core::BlobHash;
124 use std::fs;
125
126 fn setup() -> (tempfile::TempDir, Arc<BlobStore>, FsTracker) {
127 let tmp = tempfile::tempdir().unwrap();
128 let blob_store = Arc::new(BlobStore::open(tmp.path().join("blobs")).unwrap());
129 let tracker = FsTracker::new(Manifest::new(), blob_store.clone());
130 (tmp, blob_store, tracker)
131 }
132
133 #[test]
134 fn track_write_produces_correct_event() {
135 let (_tmp, blob_store, tracker) = setup();
136 let payload = tracker
137 .track_write("/src/main.rs", b"fn main() {}", Some("text/x-rust".into()))
138 .unwrap();
139
140 match &payload {
141 EventPayload::FileWrite {
142 path,
143 blob_hash,
144 size_bytes,
145 content_type,
146 } => {
147 assert_eq!(path, "/src/main.rs");
148 assert_eq!(*size_bytes, 12);
149 assert_eq!(content_type.as_deref(), Some("text/x-rust"));
150 assert!(blob_store.exists(&BlobHash::from_hex(blob_hash.as_str())));
152 }
153 _ => panic!("expected FileWrite, got {payload:?}"),
154 }
155 }
156
157 #[test]
158 fn track_write_updates_manifest() {
159 let (_tmp, _blob, tracker) = setup();
160 tracker.track_write("/a.txt", b"hello", None).unwrap();
161
162 let manifest = tracker.manifest();
163 assert!(manifest.exists("/a.txt"));
164 assert_eq!(manifest.get("/a.txt").unwrap().size_bytes, 5);
165 }
166
167 #[test]
168 fn track_delete_produces_correct_event() {
169 let (_tmp, _blob, tracker) = setup();
170 tracker.track_write("/x.txt", b"data", None).unwrap();
172 let payload = tracker.track_delete("/x.txt").unwrap();
173
174 match &payload {
175 EventPayload::FileDelete { path } => {
176 assert_eq!(path, "/x.txt");
177 }
178 _ => panic!("expected FileDelete, got {payload:?}"),
179 }
180
181 assert!(!tracker.manifest().exists("/x.txt"));
183 }
184
185 #[test]
186 fn reconcile_detects_additions() {
187 let (tmp, blob_store, _) = setup();
188 let ws = tmp.path().join("ws");
189 fs::create_dir_all(&ws).unwrap();
190 fs::write(ws.join("new.txt"), "content").unwrap();
191
192 let tracker = FsTracker::new(Manifest::new(), blob_store);
193 let payloads = tracker.reconcile(&ws).unwrap();
194
195 assert!(!payloads.is_empty());
196 assert!(payloads.iter().any(|p| matches!(
197 p,
198 EventPayload::FileWrite { path, .. } if path == "/new.txt"
199 )));
200 }
201
202 #[test]
203 fn reconcile_detects_deletions() {
204 let (tmp, blob_store, _) = setup();
205 let ws = tmp.path().join("ws");
206 fs::create_dir_all(&ws).unwrap();
207
208 let mut manifest = Manifest::new();
210 manifest.apply_write(
211 "/gone.txt".into(),
212 BlobHash::from_hex("dead"),
213 4,
214 None,
215 1000,
216 );
217
218 let tracker = FsTracker::new(manifest, blob_store);
219 let payloads = tracker.reconcile(&ws).unwrap();
220
221 assert!(payloads.iter().any(|p| matches!(
222 p,
223 EventPayload::FileDelete { path } if path == "/gone.txt"
224 )));
225 }
226
227 #[test]
228 fn reconcile_detects_modifications() {
229 let (tmp, blob_store, _) = setup();
230 let ws = tmp.path().join("ws");
231 fs::create_dir_all(&ws).unwrap();
232
233 fs::write(ws.join("mod.txt"), "original").unwrap();
236 let initial = crate::snapshot::snapshot(&ws, &Manifest::new(), &blob_store).unwrap();
237
238 fs::write(
239 ws.join("mod.txt"),
240 "this content is much longer than original",
241 )
242 .unwrap();
243 let tracker = FsTracker::new(initial, blob_store);
244 let payloads = tracker.reconcile(&ws).unwrap();
245
246 assert!(payloads.iter().any(|p| matches!(
247 p,
248 EventPayload::FileWrite { path, .. } if path == "/mod.txt"
249 )));
250 }
251
252 #[test]
253 fn empty_reconcile_returns_empty_vec() {
254 let (tmp, blob_store, _) = setup();
255 let ws = tmp.path().join("ws");
256 fs::create_dir_all(&ws).unwrap();
257
258 let tracker = FsTracker::new(Manifest::new(), blob_store);
259 let payloads = tracker.reconcile(&ws).unwrap();
260 assert!(payloads.is_empty());
261 }
262
263 #[test]
264 fn concurrent_writes_do_not_panic() {
265 let (_tmp, _blob, tracker) = setup();
266 let tracker = Arc::new(tracker);
267
268 let handles: Vec<_> = (0..10)
269 .map(|i| {
270 let t = tracker.clone();
271 std::thread::spawn(move || {
272 let path = format!("/file_{i}.txt");
273 let content = format!("content {i}");
274 t.track_write(&path, content.as_bytes(), None).unwrap();
275 })
276 })
277 .collect();
278
279 for h in handles {
280 h.join().unwrap();
281 }
282
283 assert_eq!(tracker.manifest().len(), 10);
284 }
285}