Skip to main content

bamboo_tools/tools/
read_tracker.rs

1use dashmap::DashMap;
2use std::collections::HashMap;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5use tokio::sync::Mutex;
6
7const MAX_TRACKED_SESSIONS: usize = 2_000;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ReadState {
11    Unread,
12    Stale,
13    Fresh,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17struct FileSnapshot {
18    size_bytes: u64,
19    modified_ns: Option<u128>,
20}
21
22#[derive(Debug, Default)]
23struct SessionReads {
24    files: HashMap<String, FileSnapshot>,
25    last_touched: Option<Instant>,
26}
27
28fn tracker() -> &'static DashMap<String, Arc<Mutex<SessionReads>>> {
29    static TRACKER: OnceLock<DashMap<String, Arc<Mutex<SessionReads>>>> = OnceLock::new();
30    TRACKER.get_or_init(DashMap::new)
31}
32
33fn normalize_path(path: &str) -> String {
34    std::fs::canonicalize(path)
35        .ok()
36        .and_then(|value| value.to_str().map(|s| s.to_string()))
37        .unwrap_or_else(|| path.to_string())
38}
39
40fn snapshot_for_path(path: &str) -> Option<FileSnapshot> {
41    let metadata = std::fs::metadata(path).ok()?;
42    let modified_ns = metadata
43        .modified()
44        .ok()
45        .and_then(|value| value.duration_since(std::time::UNIX_EPOCH).ok())
46        .map(|duration| duration.as_nanos());
47
48    Some(FileSnapshot {
49        size_bytes: metadata.len(),
50        modified_ns,
51    })
52}
53
54async fn cleanup_if_needed() {
55    let map = tracker();
56    if map.len() <= MAX_TRACKED_SESSIONS {
57        return;
58    }
59
60    let mut oldest: Option<(String, Instant)> = None;
61    for entry in map.iter() {
62        let key = entry.key().clone();
63        let session = entry.value().clone();
64        let touched = session.lock().await.last_touched.unwrap_or(Instant::now());
65        match oldest {
66            Some((_, ts)) if touched >= ts => {}
67            _ => oldest = Some((key, touched)),
68        }
69    }
70
71    if let Some((key, _)) = oldest {
72        map.remove(&key);
73    }
74}
75
76pub async fn mark_read(session_id: &str, path: &str) {
77    let normalized = normalize_path(path);
78    let snapshot = snapshot_for_path(path);
79    let entry = tracker()
80        .entry(session_id.to_string())
81        .or_insert_with(|| Arc::new(Mutex::new(SessionReads::default())))
82        .clone();
83
84    {
85        let mut guard = entry.lock().await;
86        guard.last_touched = Some(Instant::now());
87        if let Some(snapshot) = snapshot {
88            guard.files.insert(normalized, snapshot);
89        } else {
90            // Keep a sentinel entry so we still know a read happened.
91            guard.files.insert(
92                normalized,
93                FileSnapshot {
94                    size_bytes: 0,
95                    modified_ns: None,
96                },
97            );
98        }
99    }
100
101    cleanup_if_needed().await;
102}
103
104pub async fn has_read(session_id: &str, path: &str) -> bool {
105    let normalized = normalize_path(path);
106    let Some(entry) = tracker().get(session_id).map(|value| value.clone()) else {
107        return false;
108    };
109
110    let mut guard = entry.lock().await;
111    guard.last_touched = Some(Instant::now());
112    guard.files.contains_key(&normalized)
113}
114
115pub async fn read_state(session_id: &str, path: &str) -> ReadState {
116    let normalized = normalize_path(path);
117    let Some(entry) = tracker().get(session_id).map(|value| value.clone()) else {
118        return ReadState::Unread;
119    };
120
121    let mut guard = entry.lock().await;
122    guard.last_touched = Some(Instant::now());
123    let Some(snapshot) = guard.files.get(&normalized).copied() else {
124        return ReadState::Unread;
125    };
126
127    let Some(current) = snapshot_for_path(path) else {
128        return ReadState::Stale;
129    };
130
131    if snapshot == current {
132        ReadState::Fresh
133    } else {
134        ReadState::Stale
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    fn session_id(label: &str) -> String {
143        format!("read-tracker-{label}-{}", uuid::Uuid::new_v4())
144    }
145
146    #[tokio::test]
147    async fn read_state_transitions_from_fresh_to_stale_after_external_change() {
148        let file = tempfile::NamedTempFile::new().unwrap();
149        tokio::fs::write(file.path(), "v1").await.unwrap();
150        let path = file.path().to_string_lossy().to_string();
151        let session = session_id("fresh-stale");
152
153        mark_read(&session, &path).await;
154        assert!(has_read(&session, &path).await);
155        assert_eq!(read_state(&session, &path).await, ReadState::Fresh);
156
157        tokio::fs::write(file.path(), "v2 changed").await.unwrap();
158        assert_eq!(read_state(&session, &path).await, ReadState::Stale);
159    }
160
161    #[tokio::test]
162    async fn missing_file_marked_read_is_treated_as_stale_for_writes() {
163        let dir = tempfile::tempdir().unwrap();
164        let path = dir.path().join("missing.txt");
165        let path_str = path.to_string_lossy().to_string();
166        let session = session_id("missing");
167
168        mark_read(&session, &path_str).await;
169        assert!(has_read(&session, &path_str).await);
170        assert_eq!(read_state(&session, &path_str).await, ReadState::Stale);
171    }
172}