Skip to main content

hanzo_engine/files/
store.rs

1//! In-process file store keyed by id, with per-entry TTL.
2
3use std::{
4    collections::HashMap,
5    sync::{Arc, RwLock},
6    time::{Duration, Instant},
7};
8
9use super::File;
10
11/// Per-entry TTL. Matches the agentic session default.
12pub const DEFAULT_FILE_TTL: Duration = Duration::from_secs(30 * 60);
13
14/// Hard entry cap. Oldest evicted on insert.
15pub const MAX_FILES: usize = 4096;
16
17const CLEANUP_INTERVAL: Duration = Duration::from_secs(120);
18
19struct StoredFile {
20    file: Arc<File>,
21    expires_at: Instant,
22    /// `None` for runs without a session.
23    session_id: Option<String>,
24    /// Insertion order. `list_for_session` returns oldest first.
25    seq: u64,
26}
27
28#[derive(Clone)]
29pub struct FileStore {
30    inner: Arc<RwLock<Inner>>,
31    ttl: Duration,
32}
33
34struct Inner {
35    by_id: HashMap<String, StoredFile>,
36    next_seq: u64,
37}
38
39impl FileStore {
40    pub fn new() -> Self {
41        Self::with_ttl(DEFAULT_FILE_TTL)
42    }
43
44    pub fn with_ttl(ttl: Duration) -> Self {
45        Self {
46            inner: Arc::new(RwLock::new(Inner {
47                by_id: HashMap::new(),
48                next_seq: 0,
49            })),
50            ttl,
51        }
52    }
53
54    /// Evicts oldest entries to stay under `MAX_FILES`.
55    pub fn insert(&self, file: File, session_id: Option<String>) {
56        let id = file.id.clone();
57        let mut guard = self.inner.write().unwrap();
58        let seq = guard.next_seq;
59        guard.next_seq += 1;
60        guard.by_id.insert(
61            id,
62            StoredFile {
63                file: Arc::new(file),
64                expires_at: Instant::now() + self.ttl,
65                session_id,
66                seq,
67            },
68        );
69        if guard.by_id.len() > MAX_FILES {
70            let now = Instant::now();
71            guard.by_id.retain(|_, e| e.expires_at >= now);
72            while guard.by_id.len() > MAX_FILES {
73                let Some(oldest_id) = guard
74                    .by_id
75                    .iter()
76                    .min_by_key(|(_, e)| e.seq)
77                    .map(|(k, _)| k.clone())
78                else {
79                    break;
80                };
81                guard.by_id.remove(&oldest_id);
82            }
83        }
84    }
85
86    /// `None` if missing or expired.
87    pub fn get(&self, id: &str) -> Option<Arc<File>> {
88        let now = Instant::now();
89        let guard = self.inner.read().unwrap();
90        let entry = guard.by_id.get(id)?;
91        if entry.expires_at < now {
92            None
93        } else {
94            Some(Arc::clone(&entry.file))
95        }
96    }
97
98    /// Returns true if an entry existed.
99    pub fn remove(&self, id: &str) -> bool {
100        self.inner.write().unwrap().by_id.remove(id).is_some()
101    }
102
103    /// Refresh the TTL on every file tagged with `session_id`. Call when the session is touched.
104    pub fn touch_session(&self, session_id: &str) {
105        let new_expiry = std::time::Instant::now() + self.ttl;
106        let mut guard = self.inner.write().unwrap();
107        for entry in guard.by_id.values_mut() {
108            if entry.session_id.as_deref() == Some(session_id) {
109                entry.expires_at = new_expiry;
110            }
111        }
112    }
113
114    /// Non-expired files tagged with `session_id`, oldest first.
115    pub fn list_for_session(&self, session_id: &str) -> Vec<Arc<File>> {
116        let now = Instant::now();
117        let guard = self.inner.read().unwrap();
118        let mut hits: Vec<&StoredFile> = guard
119            .by_id
120            .values()
121            .filter(|s| s.expires_at >= now && s.session_id.as_deref() == Some(session_id))
122            .collect();
123        hits.sort_by_key(|s| s.seq);
124        hits.into_iter().map(|s| Arc::clone(&s.file)).collect()
125    }
126
127    /// Every non-expired file regardless of session, oldest first.
128    pub fn list_all(&self) -> Vec<Arc<File>> {
129        let now = Instant::now();
130        let guard = self.inner.read().unwrap();
131        let mut hits: Vec<&StoredFile> = guard
132            .by_id
133            .values()
134            .filter(|s| s.expires_at >= now)
135            .collect();
136        hits.sort_by_key(|s| s.seq);
137        hits.into_iter().map(|s| Arc::clone(&s.file)).collect()
138    }
139
140    pub fn cleanup_expired(&self) -> usize {
141        let now = Instant::now();
142        let mut guard = self.inner.write().unwrap();
143        let before = guard.by_id.len();
144        guard.by_id.retain(|_, entry| entry.expires_at >= now);
145        before - guard.by_id.len()
146    }
147
148    /// Periodic reaper bound to the store's lifetime via `Weak`. Dies with the last `Arc`.
149    pub fn spawn_cleanup_task(&self) {
150        let weak = Arc::downgrade(&self.inner);
151        tokio::spawn(async move {
152            loop {
153                tokio::time::sleep(CLEANUP_INTERVAL).await;
154                let Some(inner) = weak.upgrade() else { break };
155                let now = Instant::now();
156                let reaped = {
157                    let mut guard = inner.write().unwrap();
158                    let before = guard.by_id.len();
159                    guard.by_id.retain(|_, e| e.expires_at >= now);
160                    before - guard.by_id.len()
161                };
162                if reaped > 0 {
163                    tracing::debug!("FileStore reaped {reaped} expired file(s)");
164                }
165            }
166        });
167    }
168
169    pub fn len(&self) -> usize {
170        self.inner.read().unwrap().by_id.len()
171    }
172
173    pub fn is_empty(&self) -> bool {
174        self.inner.read().unwrap().by_id.is_empty()
175    }
176}
177
178impl Default for FileStore {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187    use crate::files::{FileContent, FileSource};
188
189    fn make(id: &str) -> File {
190        File {
191            id: id.into(),
192            name: format!("{id}.txt"),
193            format: Some("txt".into()),
194            mime_type: Some("text/plain".into()),
195            bytes: 2,
196            created_at: 0,
197            source: FileSource {
198                tool: "execute_python".into(),
199                round: 0,
200                turn: 0,
201            },
202            content: FileContent::Text {
203                text: Some("hi".into()),
204                preview: None,
205            },
206        }
207    }
208
209    #[test]
210    fn insert_and_get() {
211        let s = FileStore::new();
212        s.insert(make("file_a"), None);
213        assert_eq!(s.get("file_a").unwrap().as_text(), Some("hi"));
214        assert!(s.get("missing").is_none());
215    }
216
217    #[test]
218    fn list_by_session_oldest_first() {
219        let s = FileStore::new();
220        s.insert(make("file_a"), Some("sess1".into()));
221        s.insert(make("file_b"), Some("sess1".into()));
222        s.insert(make("file_c"), Some("sess2".into()));
223        let list: Vec<_> = s
224            .list_for_session("sess1")
225            .iter()
226            .map(|f| f.id.clone())
227            .collect();
228        assert_eq!(list, vec!["file_a".to_string(), "file_b".to_string()]);
229        let list2: Vec<_> = s
230            .list_for_session("sess2")
231            .iter()
232            .map(|f| f.id.clone())
233            .collect();
234        assert_eq!(list2, vec!["file_c".to_string()]);
235    }
236
237    #[test]
238    fn ttl_eviction() {
239        let s = FileStore::with_ttl(Duration::from_millis(1));
240        s.insert(make("file_a"), None);
241        std::thread::sleep(Duration::from_millis(5));
242        assert!(s.get("file_a").is_none());
243        let swept = s.cleanup_expired();
244        assert_eq!(swept, 1);
245        assert!(s.is_empty());
246    }
247}