bamboo_tools/tools/
read_tracker.rs1use dashmap::DashMap;
2use std::collections::HashMap;
3use std::sync::{Arc, OnceLock};
4use std::time::Instant;
5use tokio::sync::Mutex;
6
7const MAX_TRACKED_SESSIONS: usize = 2_000;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ReadState {
11 Unread,
12 Stale,
13 Fresh,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17struct FileSnapshot {
18 size_bytes: u64,
19 modified_ns: Option<u128>,
20}
21
22#[derive(Debug, Default)]
23struct SessionReads {
24 files: HashMap<String, FileSnapshot>,
25 last_touched: Option<Instant>,
26}
27
28fn tracker() -> &'static DashMap<String, Arc<Mutex<SessionReads>>> {
29 static TRACKER: OnceLock<DashMap<String, Arc<Mutex<SessionReads>>>> = OnceLock::new();
30 TRACKER.get_or_init(DashMap::new)
31}
32
33fn normalize_path(path: &str) -> String {
34 std::fs::canonicalize(path)
35 .ok()
36 .and_then(|value| value.to_str().map(|s| s.to_string()))
37 .unwrap_or_else(|| path.to_string())
38}
39
40fn snapshot_for_path(path: &str) -> Option<FileSnapshot> {
41 let metadata = std::fs::metadata(path).ok()?;
42 let modified_ns = metadata
43 .modified()
44 .ok()
45 .and_then(|value| value.duration_since(std::time::UNIX_EPOCH).ok())
46 .map(|duration| duration.as_nanos());
47
48 Some(FileSnapshot {
49 size_bytes: metadata.len(),
50 modified_ns,
51 })
52}
53
54async fn cleanup_if_needed() {
55 let map = tracker();
56 if map.len() <= MAX_TRACKED_SESSIONS {
57 return;
58 }
59
60 let mut oldest: Option<(String, Instant)> = None;
61 for entry in map.iter() {
62 let key = entry.key().clone();
63 let session = entry.value().clone();
64 let touched = session.lock().await.last_touched.unwrap_or(Instant::now());
65 match oldest {
66 Some((_, ts)) if touched >= ts => {}
67 _ => oldest = Some((key, touched)),
68 }
69 }
70
71 if let Some((key, _)) = oldest {
72 map.remove(&key);
73 }
74}
75
76pub async fn mark_read(session_id: &str, path: &str) {
77 let normalized = normalize_path(path);
78 let snapshot = snapshot_for_path(path);
79 let entry = tracker()
80 .entry(session_id.to_string())
81 .or_insert_with(|| Arc::new(Mutex::new(SessionReads::default())))
82 .clone();
83
84 {
85 let mut guard = entry.lock().await;
86 guard.last_touched = Some(Instant::now());
87 if let Some(snapshot) = snapshot {
88 guard.files.insert(normalized, snapshot);
89 } else {
90 guard.files.insert(
92 normalized,
93 FileSnapshot {
94 size_bytes: 0,
95 modified_ns: None,
96 },
97 );
98 }
99 }
100
101 cleanup_if_needed().await;
102}
103
104pub async fn has_read(session_id: &str, path: &str) -> bool {
105 let normalized = normalize_path(path);
106 let Some(entry) = tracker().get(session_id).map(|value| value.clone()) else {
107 return false;
108 };
109
110 let mut guard = entry.lock().await;
111 guard.last_touched = Some(Instant::now());
112 guard.files.contains_key(&normalized)
113}
114
115pub async fn read_state(session_id: &str, path: &str) -> ReadState {
116 let normalized = normalize_path(path);
117 let Some(entry) = tracker().get(session_id).map(|value| value.clone()) else {
118 return ReadState::Unread;
119 };
120
121 let mut guard = entry.lock().await;
122 guard.last_touched = Some(Instant::now());
123 let Some(snapshot) = guard.files.get(&normalized).copied() else {
124 return ReadState::Unread;
125 };
126
127 let Some(current) = snapshot_for_path(path) else {
128 return ReadState::Stale;
129 };
130
131 if snapshot == current {
132 ReadState::Fresh
133 } else {
134 ReadState::Stale
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 fn session_id(label: &str) -> String {
143 format!("read-tracker-{label}-{}", uuid::Uuid::new_v4())
144 }
145
146 #[tokio::test]
147 async fn read_state_transitions_from_fresh_to_stale_after_external_change() {
148 let file = tempfile::NamedTempFile::new().unwrap();
149 tokio::fs::write(file.path(), "v1").await.unwrap();
150 let path = file.path().to_string_lossy().to_string();
151 let session = session_id("fresh-stale");
152
153 mark_read(&session, &path).await;
154 assert!(has_read(&session, &path).await);
155 assert_eq!(read_state(&session, &path).await, ReadState::Fresh);
156
157 tokio::fs::write(file.path(), "v2 changed").await.unwrap();
158 assert_eq!(read_state(&session, &path).await, ReadState::Stale);
159 }
160
161 #[tokio::test]
162 async fn missing_file_marked_read_is_treated_as_stale_for_writes() {
163 let dir = tempfile::tempdir().unwrap();
164 let path = dir.path().join("missing.txt");
165 let path_str = path.to_string_lossy().to_string();
166 let session = session_id("missing");
167
168 mark_read(&session, &path_str).await;
169 assert!(has_read(&session, &path_str).await);
170 assert_eq!(read_state(&session, &path_str).await, ReadState::Stale);
171 }
172}