Skip to main content

harness_context/
file_recall.rs

1//! File-backed [`RecallStore`]: append-only JSONL transcripts, one directory
2//! per owner. Open-format, greppable, operator-owned — same posture as
3//! [`crate::FileMemory`]. Search is a linear token-overlap scan (no FTS), fine
4//! at kilobyte–MB scale; apps at scale use `harness-recall-sqlite` instead.
5//!
6//! Layout under `root`:
7//! ```text
8//! <root>/<owner>/<session_id>.jsonl       one RecallMessage per line (id = line no.)
9//! <root>/<owner>/<session_id>.meta.json   SessionMeta sidecar
10//! ```
11
12use harness_core::{RecallError, RecallMessage, RecallStore, SessionHit, SessionMeta};
13use std::path::PathBuf;
14use std::sync::Mutex;
15
16pub struct FileRecall {
17    root: PathBuf,
18    write_lock: Mutex<()>,
19}
20
21impl FileRecall {
22    /// Open (or create) a recall root directory.
23    pub fn open(root: impl Into<PathBuf>) -> Result<Self, RecallError> {
24        let root = root.into();
25        std::fs::create_dir_all(&root)
26            .map_err(|e| RecallError::Io(format!("create root {}: {e}", root.display())))?;
27        Ok(Self {
28            root,
29            write_lock: Mutex::new(()),
30        })
31    }
32
33    fn owner_dir(&self, owner: &str) -> PathBuf {
34        self.root.join(sanitize(owner))
35    }
36    fn session_path(&self, owner: &str, session_id: &str) -> PathBuf {
37        self.owner_dir(owner)
38            .join(format!("{}.jsonl", sanitize(session_id)))
39    }
40    fn meta_path(&self, owner: &str, session_id: &str) -> PathBuf {
41        self.owner_dir(owner)
42            .join(format!("{}.meta.json", sanitize(session_id)))
43    }
44
45    fn read_messages(&self, owner: &str, session_id: &str) -> Vec<RecallMessage> {
46        let path = self.session_path(owner, session_id);
47        let content = match std::fs::read_to_string(&path) {
48            Ok(c) => c,
49            Err(_) => return Vec::new(),
50        };
51        let mut out = Vec::new();
52        for (i, line) in content.lines().enumerate() {
53            let line = line.trim();
54            if line.is_empty() {
55                continue;
56            }
57            match serde_json::from_str::<RecallMessage>(line) {
58                Ok(mut m) => {
59                    m.id = (i + 1) as i64; // id = 1-based line number
60                    out.push(m);
61                }
62                Err(err) => {
63                    tracing::warn!(line = i + 1, error = %err, "recall line skipped");
64                }
65            }
66        }
67        out
68    }
69
70    fn read_meta(&self, owner: &str, session_id: &str) -> Option<SessionMeta> {
71        let path = self.meta_path(owner, session_id);
72        let s = std::fs::read_to_string(&path).ok()?;
73        serde_json::from_str::<SessionMeta>(&s).ok()
74    }
75
76    fn write_meta(&self, owner: &str, m: &SessionMeta) -> Result<(), RecallError> {
77        let path = self.meta_path(owner, &m.session_id);
78        let s = serde_json::to_string(m).map_err(|e| RecallError::Serde(e.to_string()))?;
79        std::fs::write(&path, s).map_err(|e| RecallError::Io(e.to_string()))
80    }
81}
82
83fn sanitize(s: &str) -> String {
84    let cleaned: String = s
85        .chars()
86        .map(|c| {
87            if c.is_alphanumeric() || c == '-' || c == '_' {
88                c
89            } else {
90                '_'
91            }
92        })
93        .collect();
94    let cleaned = if cleaned.is_empty() {
95        "_".to_string()
96    } else {
97        cleaned
98    };
99    // Filesystem path components are limited to ~255 BYTES (Linux ENAMETOOLONG),
100    // not characters — a multibyte (e.g. CJK) name can blow past that. A short
101    // name passes through; an over-long one is replaced by a readable prefix plus
102    // a deterministic hash of the ORIGINAL string, so distinct names never collide
103    // (a collision on an `owner` would be a cross-tenant leak — truncation alone is
104    // unsafe). `DefaultHasher` uses fixed keys, so this is stable across runs.
105    if cleaned.len() > 200 {
106        use std::hash::{Hash, Hasher};
107        let mut h = std::collections::hash_map::DefaultHasher::new();
108        s.hash(&mut h);
109        let prefix: String = cleaned.chars().take(40).collect();
110        format!("{prefix}-{:016x}", h.finish())
111    } else {
112        cleaned
113    }
114}
115
116fn tokenise(s: &str) -> Vec<String> {
117    s.to_lowercase()
118        .split(|c: char| !c.is_alphanumeric())
119        .filter(|t| !t.is_empty())
120        .map(String::from)
121        .collect()
122}
123
124/// Cut a ~80-char snippet centred on the first matched token, marked `>>>…<<<`.
125fn make_snippet(content: &str, q_tokens: &[String]) -> String {
126    let lower = content.to_lowercase();
127    let hit = q_tokens
128        .iter()
129        .filter_map(|t| lower.find(t.as_str()).map(|pos| (pos, t.len())))
130        .min_by_key(|(pos, _)| *pos);
131    match hit {
132        Some((pos, len))
133            if content.is_char_boundary(pos) && content.is_char_boundary(pos + len) =>
134        {
135            let start = pos.saturating_sub(40);
136            let end = (pos + len + 40).min(content.len());
137            let start = (0..=start)
138                .rev()
139                .find(|i| content.is_char_boundary(*i))
140                .unwrap_or(0);
141            let end = (end..=content.len())
142                .find(|i| content.is_char_boundary(*i))
143                .unwrap_or(content.len());
144            let mut s = String::new();
145            if start > 0 {
146                s.push('…');
147            }
148            s.push_str(&content[start..pos]);
149            s.push_str(">>>");
150            s.push_str(&content[pos..pos + len]);
151            s.push_str("<<<");
152            s.push_str(&content[pos + len..end]);
153            if end < content.len() {
154                s.push('…');
155            }
156            s
157        }
158        _ => content.chars().take(80).collect(),
159    }
160}
161
162#[async_trait::async_trait]
163impl RecallStore for FileRecall {
164    async fn ensure_session(
165        &self,
166        owner: &str,
167        session_id: &str,
168        meta: &SessionMeta,
169    ) -> Result<(), RecallError> {
170        let _g = self
171            .write_lock
172            .lock()
173            .map_err(|e| RecallError::Backend(e.to_string()))?;
174        std::fs::create_dir_all(self.owner_dir(owner))
175            .map_err(|e| RecallError::Io(e.to_string()))?;
176        if self.read_meta(owner, session_id).is_none() {
177            let mut m = meta.clone();
178            m.session_id = session_id.to_string();
179            self.write_meta(owner, &m)?;
180        }
181        Ok(())
182    }
183
184    async fn append(
185        &self,
186        owner: &str,
187        session_id: &str,
188        msg: &RecallMessage,
189    ) -> Result<i64, RecallError> {
190        let _g = self
191            .write_lock
192            .lock()
193            .map_err(|e| RecallError::Backend(e.to_string()))?;
194        std::fs::create_dir_all(self.owner_dir(owner))
195            .map_err(|e| RecallError::Io(e.to_string()))?;
196        let line = serde_json::to_string(msg).map_err(|e| RecallError::Serde(e.to_string()))?;
197        let path = self.session_path(owner, session_id);
198        use std::io::Write;
199        let mut f = std::fs::OpenOptions::new()
200            .create(true)
201            .append(true)
202            .open(&path)
203            .map_err(|e| RecallError::Io(e.to_string()))?;
204        writeln!(f, "{line}").map_err(|e| RecallError::Io(e.to_string()))?;
205        // Derive the new message's id from its actual assigned line number.
206        let msgs = self.read_messages(owner, session_id);
207        let id = msgs.last().map(|m| m.id).unwrap_or(1);
208        let mut meta = self
209            .read_meta(owner, session_id)
210            .unwrap_or_else(|| SessionMeta::new(session_id, msg.ts_ms));
211        meta.message_count = msgs.len() as i64;
212        let _ = self.write_meta(owner, &meta);
213        Ok(id)
214    }
215
216    async fn search(
217        &self,
218        owner: &str,
219        query: &str,
220        limit: usize,
221    ) -> Result<Vec<SessionHit>, RecallError> {
222        let q = tokenise(query);
223        if q.is_empty() || limit == 0 {
224            return Ok(Vec::new());
225        }
226        let dir = self.owner_dir(owner);
227        let mut hits: Vec<(u32, i64, SessionHit)> = Vec::new(); // (score, started_at, hit)
228        let entries = match std::fs::read_dir(&dir) {
229            Ok(e) => e,
230            Err(_) => return Ok(Vec::new()),
231        };
232        for entry in entries.flatten() {
233            let p = entry.path();
234            if p.extension().and_then(|e| e.to_str()) != Some("jsonl") {
235                continue;
236            }
237            let session_id = p
238                .file_stem()
239                .and_then(|s| s.to_str())
240                .unwrap_or("")
241                .to_string();
242            let msgs = self.read_messages(owner, &session_id);
243            if msgs.is_empty() {
244                continue;
245            }
246            // Best-matching message in this session.
247            let mut best: Option<(u32, &RecallMessage)> = None;
248            for m in &msgs {
249                let hay = m.content.to_lowercase();
250                let score: u32 = q
251                    .iter()
252                    .map(|t| if hay.contains(t.as_str()) { 1 } else { 0 })
253                    .sum();
254                if score > 0 && best.map(|(s, _)| score > s).unwrap_or(true) {
255                    best = Some((score, m));
256                }
257            }
258            let Some((score, anchor)) = best else {
259                continue;
260            };
261            let meta = self
262                .read_meta(owner, &session_id)
263                .unwrap_or_else(|| SessionMeta::new(&session_id, msgs[0].ts_ms));
264            let started = meta.started_at_ms;
265            let around: Vec<RecallMessage> = msgs
266                .iter()
267                .filter(|m| (m.id - anchor.id).abs() <= 5)
268                .cloned()
269                .collect();
270            let bookend_start: Vec<RecallMessage> = msgs.iter().take(3).cloned().collect();
271            let bookend_end: Vec<RecallMessage> =
272                msgs.iter().rev().take(3).rev().cloned().collect();
273            hits.push((
274                score,
275                started,
276                SessionHit::new(
277                    meta,
278                    make_snippet(&anchor.content, &q),
279                    anchor.id,
280                    bookend_start,
281                    around,
282                    bookend_end,
283                ),
284            ));
285        }
286        hits.sort_by(|a, b| b.0.cmp(&a.0).then(b.1.cmp(&a.1)));
287        Ok(hits.into_iter().take(limit).map(|(_, _, h)| h).collect())
288    }
289
290    async fn scroll(
291        &self,
292        owner: &str,
293        session_id: &str,
294        around: i64,
295        window: usize,
296    ) -> Result<Vec<RecallMessage>, RecallError> {
297        let msgs = self.read_messages(owner, session_id);
298        let w = window as i64;
299        Ok(msgs
300            .into_iter()
301            .filter(|m| (m.id - around).abs() <= w)
302            .collect())
303    }
304
305    async fn recent(&self, owner: &str, limit: usize) -> Result<Vec<SessionMeta>, RecallError> {
306        let dir = self.owner_dir(owner);
307        let mut metas: Vec<SessionMeta> = Vec::new();
308        if let Ok(entries) = std::fs::read_dir(&dir) {
309            for entry in entries.flatten() {
310                let p = entry.path();
311                if p.extension().and_then(|e| e.to_str()) == Some("json")
312                    && p.to_string_lossy().ends_with(".meta.json")
313                    && let Ok(s) = std::fs::read_to_string(&p)
314                    && let Ok(m) = serde_json::from_str::<SessionMeta>(&s)
315                {
316                    metas.push(m);
317                }
318            }
319        }
320        metas.sort_by(|a, b| b.started_at_ms.cmp(&a.started_at_ms));
321        metas.truncate(limit);
322        Ok(metas)
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use std::sync::atomic::{AtomicU64, Ordering};
330
331    static N: AtomicU64 = AtomicU64::new(0);
332    fn tmp_root() -> PathBuf {
333        let pid = std::process::id();
334        let n = N.fetch_add(1, Ordering::SeqCst);
335        let nanos = std::time::SystemTime::now()
336            .duration_since(std::time::UNIX_EPOCH)
337            .unwrap()
338            .as_nanos();
339        std::env::temp_dir().join(format!("harness-recall-test-{pid}-{nanos}-{n}"))
340    }
341
342    #[tokio::test]
343    async fn append_then_search_and_scroll() {
344        let root = tmp_root();
345        let r = FileRecall::open(&root).unwrap();
346        r.ensure_session("u1", "s1", &SessionMeta::new("s1", 100))
347            .await
348            .unwrap();
349        r.append(
350            "u1",
351            "s1",
352            &RecallMessage::new("user", "let us refactor the auth module", 100),
353        )
354        .await
355        .unwrap();
356        r.append(
357            "u1",
358            "s1",
359            &RecallMessage::new("assistant", "sure, starting auth refactor", 101),
360        )
361        .await
362        .unwrap();
363
364        let hits = r.search("u1", "auth refactor", 5).await.unwrap();
365        assert_eq!(hits.len(), 1);
366        assert_eq!(hits[0].session.session_id, "s1");
367        assert!(hits[0].snippet.contains(">>>"));
368        assert!(!hits[0].bookend_start.is_empty());
369
370        let scrolled = r.scroll("u1", "s1", 1, 1).await.unwrap();
371        assert!(scrolled.iter().any(|m| m.id == 1));
372
373        let recent = r.recent("u1", 10).await.unwrap();
374        assert_eq!(recent.len(), 1);
375
376        let _ = std::fs::remove_dir_all(&root);
377    }
378
379    #[tokio::test]
380    async fn malformed_line_skipped() {
381        let root = tmp_root();
382        let r = FileRecall::open(&root).unwrap();
383        r.ensure_session("u1", "s1", &SessionMeta::new("s1", 1))
384            .await
385            .unwrap();
386        // hand-write a bad line then a good one
387        let path = r.session_path("u1", "s1");
388        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
389        std::fs::write(
390            &path,
391            "{bad\n{\"id\":0,\"role\":\"user\",\"content\":\"hello world\",\"ts_ms\":1}\n",
392        )
393        .unwrap();
394        let hits = r.search("u1", "hello", 5).await.unwrap();
395        assert_eq!(hits.len(), 1);
396        let _ = std::fs::remove_dir_all(&root);
397    }
398
399    #[tokio::test]
400    async fn cjk_owner_and_unicode_content_do_not_panic() {
401        let root = tmp_root();
402        let r = FileRecall::open(&root).unwrap();
403        let owner = "用户".repeat(50); // >120 bytes, multi-byte
404        r.ensure_session(&owner, "s1", &SessionMeta::new("s1", 1))
405            .await
406            .unwrap();
407        r.append(
408            &owner,
409            "s1",
410            &RecallMessage::new("user", "İstanbul café 支付服务 refactor", 1),
411        )
412        .await
413        .unwrap();
414        // search must not panic on the mixed-case/Unicode snippet path
415        let _ = r.search(&owner, "refactor", 5).await.unwrap();
416        let _ = r.search(&owner, "İstanbul", 5).await.unwrap();
417        let _ = std::fs::remove_dir_all(&root);
418    }
419}