Skip to main content

lago_fs/
tracker.rs

1//! Inline filesystem change tracker for O(1) write notifications.
2//!
3//! [`FsTracker`] wraps a [`Manifest`] and a [`BlobStore`] to produce
4//! event payloads on every write or delete — without scanning the workspace.
5//! The [`reconcile`] method provides an O(n) safety-net path for catching
6//! changes made outside of tracked writes (e.g. shell commands).
7
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use lago_core::LagoResult;
12use lago_core::event::EventPayload;
13use lago_store::BlobStore;
14
15use crate::diff::{self, DiffEntry};
16use crate::manifest::Manifest;
17use crate::snapshot;
18
19/// Inline filesystem tracker producing event payloads on writes/deletes.
20///
21/// Thread-safe: the internal manifest is behind a `Mutex`.
22pub struct FsTracker {
23    manifest: Mutex<Manifest>,
24    blob_store: Arc<BlobStore>,
25}
26
27impl FsTracker {
28    /// Create a new tracker seeded with an existing manifest state.
29    pub fn new(manifest: Manifest, blob_store: Arc<BlobStore>) -> Self {
30        Self {
31            manifest: Mutex::new(manifest),
32            blob_store,
33        }
34    }
35
36    /// O(1) track a file write. Stores the content in the blob store,
37    /// updates the manifest, and returns a `FileWrite` event payload.
38    pub fn track_write(
39        &self,
40        rel_path: &str,
41        content: &[u8],
42        content_type: Option<String>,
43    ) -> LagoResult<EventPayload> {
44        let blob_hash = self.blob_store.put(content)?;
45        let size_bytes = content.len() as u64;
46        let timestamp = now_micros();
47
48        let mut manifest = self.manifest.lock().unwrap();
49        manifest.apply_write(
50            rel_path.to_string(),
51            blob_hash.clone(),
52            size_bytes,
53            content_type.clone(),
54            timestamp,
55        );
56
57        Ok(EventPayload::FileWrite {
58            path: rel_path.to_string(),
59            blob_hash: blob_hash.into(),
60            size_bytes,
61            content_type,
62        })
63    }
64
65    /// O(1) track a file deletion. Updates the manifest and returns
66    /// a `FileDelete` event payload.
67    pub fn track_delete(&self, rel_path: &str) -> LagoResult<EventPayload> {
68        let mut manifest = self.manifest.lock().unwrap();
69        manifest.apply_delete(rel_path);
70
71        Ok(EventPayload::FileDelete {
72            path: rel_path.to_string(),
73        })
74    }
75
76    /// O(n) reconciliation: snapshot the workspace, diff against the
77    /// tracked manifest, update the manifest, and return event payloads
78    /// for every detected change. This is the safety-net path for catching
79    /// changes made outside of tracked writes.
80    pub fn reconcile(&self, workspace_root: &Path) -> LagoResult<Vec<EventPayload>> {
81        let mut manifest = self.manifest.lock().unwrap();
82        let new_manifest = snapshot::snapshot(workspace_root, &manifest, &self.blob_store)?;
83        let diffs = diff::diff(&manifest, &new_manifest);
84
85        // Replace manifest with the fresh snapshot.
86        *manifest = new_manifest;
87
88        let payloads = diffs
89            .into_iter()
90            .map(|d| match d {
91                DiffEntry::Added { path, entry }
92                | DiffEntry::Modified {
93                    path, new: entry, ..
94                } => EventPayload::FileWrite {
95                    path,
96                    blob_hash: entry.blob_hash.into(),
97                    size_bytes: entry.size_bytes,
98                    content_type: entry.content_type,
99                },
100                DiffEntry::Removed { path, .. } => EventPayload::FileDelete { path },
101            })
102            .collect();
103
104        Ok(payloads)
105    }
106
107    /// Clone the current manifest snapshot.
108    pub fn manifest(&self) -> Manifest {
109        self.manifest.lock().unwrap().clone()
110    }
111}
112
113fn now_micros() -> u64 {
114    std::time::SystemTime::now()
115        .duration_since(std::time::UNIX_EPOCH)
116        .unwrap_or_default()
117        .as_micros() as u64
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use lago_core::BlobHash;
124    use std::fs;
125
126    fn setup() -> (tempfile::TempDir, Arc<BlobStore>, FsTracker) {
127        let tmp = tempfile::tempdir().unwrap();
128        let blob_store = Arc::new(BlobStore::open(tmp.path().join("blobs")).unwrap());
129        let tracker = FsTracker::new(Manifest::new(), blob_store.clone());
130        (tmp, blob_store, tracker)
131    }
132
133    #[test]
134    fn track_write_produces_correct_event() {
135        let (_tmp, blob_store, tracker) = setup();
136        let payload = tracker
137            .track_write("/src/main.rs", b"fn main() {}", Some("text/x-rust".into()))
138            .unwrap();
139
140        match &payload {
141            EventPayload::FileWrite {
142                path,
143                blob_hash,
144                size_bytes,
145                content_type,
146            } => {
147                assert_eq!(path, "/src/main.rs");
148                assert_eq!(*size_bytes, 12);
149                assert_eq!(content_type.as_deref(), Some("text/x-rust"));
150                // Verify blob was stored
151                assert!(blob_store.exists(&BlobHash::from_hex(blob_hash.as_str())));
152            }
153            _ => panic!("expected FileWrite, got {payload:?}"),
154        }
155    }
156
157    #[test]
158    fn track_write_updates_manifest() {
159        let (_tmp, _blob, tracker) = setup();
160        tracker.track_write("/a.txt", b"hello", None).unwrap();
161
162        let manifest = tracker.manifest();
163        assert!(manifest.exists("/a.txt"));
164        assert_eq!(manifest.get("/a.txt").unwrap().size_bytes, 5);
165    }
166
167    #[test]
168    fn track_delete_produces_correct_event() {
169        let (_tmp, _blob, tracker) = setup();
170        // Write first, then delete
171        tracker.track_write("/x.txt", b"data", None).unwrap();
172        let payload = tracker.track_delete("/x.txt").unwrap();
173
174        match &payload {
175            EventPayload::FileDelete { path } => {
176                assert_eq!(path, "/x.txt");
177            }
178            _ => panic!("expected FileDelete, got {payload:?}"),
179        }
180
181        // Manifest should no longer contain the entry
182        assert!(!tracker.manifest().exists("/x.txt"));
183    }
184
185    #[test]
186    fn reconcile_detects_additions() {
187        let (tmp, blob_store, _) = setup();
188        let ws = tmp.path().join("ws");
189        fs::create_dir_all(&ws).unwrap();
190        fs::write(ws.join("new.txt"), "content").unwrap();
191
192        let tracker = FsTracker::new(Manifest::new(), blob_store);
193        let payloads = tracker.reconcile(&ws).unwrap();
194
195        assert!(!payloads.is_empty());
196        assert!(payloads.iter().any(|p| matches!(
197            p,
198            EventPayload::FileWrite { path, .. } if path == "/new.txt"
199        )));
200    }
201
202    #[test]
203    fn reconcile_detects_deletions() {
204        let (tmp, blob_store, _) = setup();
205        let ws = tmp.path().join("ws");
206        fs::create_dir_all(&ws).unwrap();
207
208        // Seed the manifest with a file that doesn't exist on disk
209        let mut manifest = Manifest::new();
210        manifest.apply_write(
211            "/gone.txt".into(),
212            BlobHash::from_hex("dead"),
213            4,
214            None,
215            1000,
216        );
217
218        let tracker = FsTracker::new(manifest, blob_store);
219        let payloads = tracker.reconcile(&ws).unwrap();
220
221        assert!(payloads.iter().any(|p| matches!(
222            p,
223            EventPayload::FileDelete { path } if path == "/gone.txt"
224        )));
225    }
226
227    #[test]
228    fn reconcile_detects_modifications() {
229        let (tmp, blob_store, _) = setup();
230        let ws = tmp.path().join("ws");
231        fs::create_dir_all(&ws).unwrap();
232
233        // Write a file, snapshot it, then change it.
234        // Use different-length content so the snapshot's size-based fast path doesn't skip the hash.
235        fs::write(ws.join("mod.txt"), "original").unwrap();
236        let initial = crate::snapshot::snapshot(&ws, &Manifest::new(), &blob_store).unwrap();
237
238        fs::write(
239            ws.join("mod.txt"),
240            "this content is much longer than original",
241        )
242        .unwrap();
243        let tracker = FsTracker::new(initial, blob_store);
244        let payloads = tracker.reconcile(&ws).unwrap();
245
246        assert!(payloads.iter().any(|p| matches!(
247            p,
248            EventPayload::FileWrite { path, .. } if path == "/mod.txt"
249        )));
250    }
251
252    #[test]
253    fn empty_reconcile_returns_empty_vec() {
254        let (tmp, blob_store, _) = setup();
255        let ws = tmp.path().join("ws");
256        fs::create_dir_all(&ws).unwrap();
257
258        let tracker = FsTracker::new(Manifest::new(), blob_store);
259        let payloads = tracker.reconcile(&ws).unwrap();
260        assert!(payloads.is_empty());
261    }
262
263    #[test]
264    fn concurrent_writes_do_not_panic() {
265        let (_tmp, _blob, tracker) = setup();
266        let tracker = Arc::new(tracker);
267
268        let handles: Vec<_> = (0..10)
269            .map(|i| {
270                let t = tracker.clone();
271                std::thread::spawn(move || {
272                    let path = format!("/file_{i}.txt");
273                    let content = format!("content {i}");
274                    t.track_write(&path, content.as_bytes(), None).unwrap();
275                })
276            })
277            .collect();
278
279        for h in handles {
280            h.join().unwrap();
281        }
282
283        assert_eq!(tracker.manifest().len(), 10);
284    }
285}