hanzo_engine/files/
store.rs1use std::{
4 collections::HashMap,
5 sync::{Arc, RwLock},
6 time::{Duration, Instant},
7};
8
9use super::File;
10
11pub const DEFAULT_FILE_TTL: Duration = Duration::from_secs(30 * 60);
13
14pub const MAX_FILES: usize = 4096;
16
17const CLEANUP_INTERVAL: Duration = Duration::from_secs(120);
18
19struct StoredFile {
20 file: Arc<File>,
21 expires_at: Instant,
22 session_id: Option<String>,
24 seq: u64,
26}
27
28#[derive(Clone)]
29pub struct FileStore {
30 inner: Arc<RwLock<Inner>>,
31 ttl: Duration,
32}
33
34struct Inner {
35 by_id: HashMap<String, StoredFile>,
36 next_seq: u64,
37}
38
39impl FileStore {
40 pub fn new() -> Self {
41 Self::with_ttl(DEFAULT_FILE_TTL)
42 }
43
44 pub fn with_ttl(ttl: Duration) -> Self {
45 Self {
46 inner: Arc::new(RwLock::new(Inner {
47 by_id: HashMap::new(),
48 next_seq: 0,
49 })),
50 ttl,
51 }
52 }
53
54 pub fn insert(&self, file: File, session_id: Option<String>) {
56 let id = file.id.clone();
57 let mut guard = self.inner.write().unwrap();
58 let seq = guard.next_seq;
59 guard.next_seq += 1;
60 guard.by_id.insert(
61 id,
62 StoredFile {
63 file: Arc::new(file),
64 expires_at: Instant::now() + self.ttl,
65 session_id,
66 seq,
67 },
68 );
69 if guard.by_id.len() > MAX_FILES {
70 let now = Instant::now();
71 guard.by_id.retain(|_, e| e.expires_at >= now);
72 while guard.by_id.len() > MAX_FILES {
73 let Some(oldest_id) = guard
74 .by_id
75 .iter()
76 .min_by_key(|(_, e)| e.seq)
77 .map(|(k, _)| k.clone())
78 else {
79 break;
80 };
81 guard.by_id.remove(&oldest_id);
82 }
83 }
84 }
85
86 pub fn get(&self, id: &str) -> Option<Arc<File>> {
88 let now = Instant::now();
89 let guard = self.inner.read().unwrap();
90 let entry = guard.by_id.get(id)?;
91 if entry.expires_at < now {
92 None
93 } else {
94 Some(Arc::clone(&entry.file))
95 }
96 }
97
98 pub fn remove(&self, id: &str) -> bool {
100 self.inner.write().unwrap().by_id.remove(id).is_some()
101 }
102
103 pub fn touch_session(&self, session_id: &str) {
105 let new_expiry = std::time::Instant::now() + self.ttl;
106 let mut guard = self.inner.write().unwrap();
107 for entry in guard.by_id.values_mut() {
108 if entry.session_id.as_deref() == Some(session_id) {
109 entry.expires_at = new_expiry;
110 }
111 }
112 }
113
114 pub fn list_for_session(&self, session_id: &str) -> Vec<Arc<File>> {
116 let now = Instant::now();
117 let guard = self.inner.read().unwrap();
118 let mut hits: Vec<&StoredFile> = guard
119 .by_id
120 .values()
121 .filter(|s| s.expires_at >= now && s.session_id.as_deref() == Some(session_id))
122 .collect();
123 hits.sort_by_key(|s| s.seq);
124 hits.into_iter().map(|s| Arc::clone(&s.file)).collect()
125 }
126
127 pub fn list_all(&self) -> Vec<Arc<File>> {
129 let now = Instant::now();
130 let guard = self.inner.read().unwrap();
131 let mut hits: Vec<&StoredFile> = guard
132 .by_id
133 .values()
134 .filter(|s| s.expires_at >= now)
135 .collect();
136 hits.sort_by_key(|s| s.seq);
137 hits.into_iter().map(|s| Arc::clone(&s.file)).collect()
138 }
139
140 pub fn cleanup_expired(&self) -> usize {
141 let now = Instant::now();
142 let mut guard = self.inner.write().unwrap();
143 let before = guard.by_id.len();
144 guard.by_id.retain(|_, entry| entry.expires_at >= now);
145 before - guard.by_id.len()
146 }
147
148 pub fn spawn_cleanup_task(&self) {
150 let weak = Arc::downgrade(&self.inner);
151 tokio::spawn(async move {
152 loop {
153 tokio::time::sleep(CLEANUP_INTERVAL).await;
154 let Some(inner) = weak.upgrade() else { break };
155 let now = Instant::now();
156 let reaped = {
157 let mut guard = inner.write().unwrap();
158 let before = guard.by_id.len();
159 guard.by_id.retain(|_, e| e.expires_at >= now);
160 before - guard.by_id.len()
161 };
162 if reaped > 0 {
163 tracing::debug!("FileStore reaped {reaped} expired file(s)");
164 }
165 }
166 });
167 }
168
169 pub fn len(&self) -> usize {
170 self.inner.read().unwrap().by_id.len()
171 }
172
173 pub fn is_empty(&self) -> bool {
174 self.inner.read().unwrap().by_id.is_empty()
175 }
176}
177
178impl Default for FileStore {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use crate::files::{FileContent, FileSource};
188
189 fn make(id: &str) -> File {
190 File {
191 id: id.into(),
192 name: format!("{id}.txt"),
193 format: Some("txt".into()),
194 mime_type: Some("text/plain".into()),
195 bytes: 2,
196 created_at: 0,
197 source: FileSource {
198 tool: "execute_python".into(),
199 round: 0,
200 turn: 0,
201 },
202 content: FileContent::Text {
203 text: Some("hi".into()),
204 preview: None,
205 },
206 }
207 }
208
209 #[test]
210 fn insert_and_get() {
211 let s = FileStore::new();
212 s.insert(make("file_a"), None);
213 assert_eq!(s.get("file_a").unwrap().as_text(), Some("hi"));
214 assert!(s.get("missing").is_none());
215 }
216
217 #[test]
218 fn list_by_session_oldest_first() {
219 let s = FileStore::new();
220 s.insert(make("file_a"), Some("sess1".into()));
221 s.insert(make("file_b"), Some("sess1".into()));
222 s.insert(make("file_c"), Some("sess2".into()));
223 let list: Vec<_> = s
224 .list_for_session("sess1")
225 .iter()
226 .map(|f| f.id.clone())
227 .collect();
228 assert_eq!(list, vec!["file_a".to_string(), "file_b".to_string()]);
229 let list2: Vec<_> = s
230 .list_for_session("sess2")
231 .iter()
232 .map(|f| f.id.clone())
233 .collect();
234 assert_eq!(list2, vec!["file_c".to_string()]);
235 }
236
237 #[test]
238 fn ttl_eviction() {
239 let s = FileStore::with_ttl(Duration::from_millis(1));
240 s.insert(make("file_a"), None);
241 std::thread::sleep(Duration::from_millis(5));
242 assert!(s.get("file_a").is_none());
243 let swept = s.cleanup_expired();
244 assert_eq!(swept, 1);
245 assert!(s.is_empty());
246 }
247}