1use harness_core::{RecallError, RecallMessage, RecallStore, SessionHit, SessionMeta};
13use std::path::PathBuf;
14use std::sync::Mutex;
15
16pub struct FileRecall {
17 root: PathBuf,
18 write_lock: Mutex<()>,
19}
20
21impl FileRecall {
22 pub fn open(root: impl Into<PathBuf>) -> Result<Self, RecallError> {
24 let root = root.into();
25 std::fs::create_dir_all(&root)
26 .map_err(|e| RecallError::Io(format!("create root {}: {e}", root.display())))?;
27 Ok(Self {
28 root,
29 write_lock: Mutex::new(()),
30 })
31 }
32
33 fn owner_dir(&self, owner: &str) -> PathBuf {
34 self.root.join(sanitize(owner))
35 }
36 fn session_path(&self, owner: &str, session_id: &str) -> PathBuf {
37 self.owner_dir(owner)
38 .join(format!("{}.jsonl", sanitize(session_id)))
39 }
40 fn meta_path(&self, owner: &str, session_id: &str) -> PathBuf {
41 self.owner_dir(owner)
42 .join(format!("{}.meta.json", sanitize(session_id)))
43 }
44
45 fn read_messages(&self, owner: &str, session_id: &str) -> Vec<RecallMessage> {
46 let path = self.session_path(owner, session_id);
47 let content = match std::fs::read_to_string(&path) {
48 Ok(c) => c,
49 Err(_) => return Vec::new(),
50 };
51 let mut out = Vec::new();
52 for (i, line) in content.lines().enumerate() {
53 let line = line.trim();
54 if line.is_empty() {
55 continue;
56 }
57 match serde_json::from_str::<RecallMessage>(line) {
58 Ok(mut m) => {
59 m.id = (i + 1) as i64; out.push(m);
61 }
62 Err(err) => {
63 tracing::warn!(line = i + 1, error = %err, "recall line skipped");
64 }
65 }
66 }
67 out
68 }
69
70 fn read_meta(&self, owner: &str, session_id: &str) -> Option<SessionMeta> {
71 let path = self.meta_path(owner, session_id);
72 let s = std::fs::read_to_string(&path).ok()?;
73 serde_json::from_str::<SessionMeta>(&s).ok()
74 }
75
76 fn write_meta(&self, owner: &str, m: &SessionMeta) -> Result<(), RecallError> {
77 let path = self.meta_path(owner, &m.session_id);
78 let s = serde_json::to_string(m).map_err(|e| RecallError::Serde(e.to_string()))?;
79 std::fs::write(&path, s).map_err(|e| RecallError::Io(e.to_string()))
80 }
81}
82
83fn sanitize(s: &str) -> String {
84 let cleaned: String = s
85 .chars()
86 .map(|c| {
87 if c.is_alphanumeric() || c == '-' || c == '_' {
88 c
89 } else {
90 '_'
91 }
92 })
93 .collect();
94 let cleaned = if cleaned.is_empty() {
95 "_".to_string()
96 } else {
97 cleaned
98 };
99 if cleaned.len() > 200 {
106 use std::hash::{Hash, Hasher};
107 let mut h = std::collections::hash_map::DefaultHasher::new();
108 s.hash(&mut h);
109 let prefix: String = cleaned.chars().take(40).collect();
110 format!("{prefix}-{:016x}", h.finish())
111 } else {
112 cleaned
113 }
114}
115
116fn tokenise(s: &str) -> Vec<String> {
117 s.to_lowercase()
118 .split(|c: char| !c.is_alphanumeric())
119 .filter(|t| !t.is_empty())
120 .map(String::from)
121 .collect()
122}
123
124fn make_snippet(content: &str, q_tokens: &[String]) -> String {
126 let lower = content.to_lowercase();
127 let hit = q_tokens
128 .iter()
129 .filter_map(|t| lower.find(t.as_str()).map(|pos| (pos, t.len())))
130 .min_by_key(|(pos, _)| *pos);
131 match hit {
132 Some((pos, len))
133 if content.is_char_boundary(pos) && content.is_char_boundary(pos + len) =>
134 {
135 let start = pos.saturating_sub(40);
136 let end = (pos + len + 40).min(content.len());
137 let start = (0..=start)
138 .rev()
139 .find(|i| content.is_char_boundary(*i))
140 .unwrap_or(0);
141 let end = (end..=content.len())
142 .find(|i| content.is_char_boundary(*i))
143 .unwrap_or(content.len());
144 let mut s = String::new();
145 if start > 0 {
146 s.push('…');
147 }
148 s.push_str(&content[start..pos]);
149 s.push_str(">>>");
150 s.push_str(&content[pos..pos + len]);
151 s.push_str("<<<");
152 s.push_str(&content[pos + len..end]);
153 if end < content.len() {
154 s.push('…');
155 }
156 s
157 }
158 _ => content.chars().take(80).collect(),
159 }
160}
161
162#[async_trait::async_trait]
163impl RecallStore for FileRecall {
164 async fn ensure_session(
165 &self,
166 owner: &str,
167 session_id: &str,
168 meta: &SessionMeta,
169 ) -> Result<(), RecallError> {
170 let _g = self
171 .write_lock
172 .lock()
173 .map_err(|e| RecallError::Backend(e.to_string()))?;
174 std::fs::create_dir_all(self.owner_dir(owner))
175 .map_err(|e| RecallError::Io(e.to_string()))?;
176 if self.read_meta(owner, session_id).is_none() {
177 let mut m = meta.clone();
178 m.session_id = session_id.to_string();
179 self.write_meta(owner, &m)?;
180 }
181 Ok(())
182 }
183
184 async fn append(
185 &self,
186 owner: &str,
187 session_id: &str,
188 msg: &RecallMessage,
189 ) -> Result<i64, RecallError> {
190 let _g = self
191 .write_lock
192 .lock()
193 .map_err(|e| RecallError::Backend(e.to_string()))?;
194 std::fs::create_dir_all(self.owner_dir(owner))
195 .map_err(|e| RecallError::Io(e.to_string()))?;
196 let line = serde_json::to_string(msg).map_err(|e| RecallError::Serde(e.to_string()))?;
197 let path = self.session_path(owner, session_id);
198 use std::io::Write;
199 let mut f = std::fs::OpenOptions::new()
200 .create(true)
201 .append(true)
202 .open(&path)
203 .map_err(|e| RecallError::Io(e.to_string()))?;
204 writeln!(f, "{line}").map_err(|e| RecallError::Io(e.to_string()))?;
205 let msgs = self.read_messages(owner, session_id);
207 let id = msgs.last().map(|m| m.id).unwrap_or(1);
208 let mut meta = self
209 .read_meta(owner, session_id)
210 .unwrap_or_else(|| SessionMeta::new(session_id, msg.ts_ms));
211 meta.message_count = msgs.len() as i64;
212 let _ = self.write_meta(owner, &meta);
213 Ok(id)
214 }
215
216 async fn search(
217 &self,
218 owner: &str,
219 query: &str,
220 limit: usize,
221 ) -> Result<Vec<SessionHit>, RecallError> {
222 let q = tokenise(query);
223 if q.is_empty() || limit == 0 {
224 return Ok(Vec::new());
225 }
226 let dir = self.owner_dir(owner);
227 let mut hits: Vec<(u32, i64, SessionHit)> = Vec::new(); let entries = match std::fs::read_dir(&dir) {
229 Ok(e) => e,
230 Err(_) => return Ok(Vec::new()),
231 };
232 for entry in entries.flatten() {
233 let p = entry.path();
234 if p.extension().and_then(|e| e.to_str()) != Some("jsonl") {
235 continue;
236 }
237 let session_id = p
238 .file_stem()
239 .and_then(|s| s.to_str())
240 .unwrap_or("")
241 .to_string();
242 let msgs = self.read_messages(owner, &session_id);
243 if msgs.is_empty() {
244 continue;
245 }
246 let mut best: Option<(u32, &RecallMessage)> = None;
248 for m in &msgs {
249 let hay = m.content.to_lowercase();
250 let score: u32 = q
251 .iter()
252 .map(|t| if hay.contains(t.as_str()) { 1 } else { 0 })
253 .sum();
254 if score > 0 && best.map(|(s, _)| score > s).unwrap_or(true) {
255 best = Some((score, m));
256 }
257 }
258 let Some((score, anchor)) = best else {
259 continue;
260 };
261 let meta = self
262 .read_meta(owner, &session_id)
263 .unwrap_or_else(|| SessionMeta::new(&session_id, msgs[0].ts_ms));
264 let started = meta.started_at_ms;
265 let around: Vec<RecallMessage> = msgs
266 .iter()
267 .filter(|m| (m.id - anchor.id).abs() <= 5)
268 .cloned()
269 .collect();
270 let bookend_start: Vec<RecallMessage> = msgs.iter().take(3).cloned().collect();
271 let bookend_end: Vec<RecallMessage> =
272 msgs.iter().rev().take(3).rev().cloned().collect();
273 hits.push((
274 score,
275 started,
276 SessionHit::new(
277 meta,
278 make_snippet(&anchor.content, &q),
279 anchor.id,
280 bookend_start,
281 around,
282 bookend_end,
283 ),
284 ));
285 }
286 hits.sort_by(|a, b| b.0.cmp(&a.0).then(b.1.cmp(&a.1)));
287 Ok(hits.into_iter().take(limit).map(|(_, _, h)| h).collect())
288 }
289
290 async fn scroll(
291 &self,
292 owner: &str,
293 session_id: &str,
294 around: i64,
295 window: usize,
296 ) -> Result<Vec<RecallMessage>, RecallError> {
297 let msgs = self.read_messages(owner, session_id);
298 let w = window as i64;
299 Ok(msgs
300 .into_iter()
301 .filter(|m| (m.id - around).abs() <= w)
302 .collect())
303 }
304
305 async fn recent(&self, owner: &str, limit: usize) -> Result<Vec<SessionMeta>, RecallError> {
306 let dir = self.owner_dir(owner);
307 let mut metas: Vec<SessionMeta> = Vec::new();
308 if let Ok(entries) = std::fs::read_dir(&dir) {
309 for entry in entries.flatten() {
310 let p = entry.path();
311 if p.extension().and_then(|e| e.to_str()) == Some("json")
312 && p.to_string_lossy().ends_with(".meta.json")
313 && let Ok(s) = std::fs::read_to_string(&p)
314 && let Ok(m) = serde_json::from_str::<SessionMeta>(&s)
315 {
316 metas.push(m);
317 }
318 }
319 }
320 metas.sort_by(|a, b| b.started_at_ms.cmp(&a.started_at_ms));
321 metas.truncate(limit);
322 Ok(metas)
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use std::sync::atomic::{AtomicU64, Ordering};
330
331 static N: AtomicU64 = AtomicU64::new(0);
332 fn tmp_root() -> PathBuf {
333 let pid = std::process::id();
334 let n = N.fetch_add(1, Ordering::SeqCst);
335 let nanos = std::time::SystemTime::now()
336 .duration_since(std::time::UNIX_EPOCH)
337 .unwrap()
338 .as_nanos();
339 std::env::temp_dir().join(format!("harness-recall-test-{pid}-{nanos}-{n}"))
340 }
341
342 #[tokio::test]
343 async fn append_then_search_and_scroll() {
344 let root = tmp_root();
345 let r = FileRecall::open(&root).unwrap();
346 r.ensure_session("u1", "s1", &SessionMeta::new("s1", 100))
347 .await
348 .unwrap();
349 r.append(
350 "u1",
351 "s1",
352 &RecallMessage::new("user", "let us refactor the auth module", 100),
353 )
354 .await
355 .unwrap();
356 r.append(
357 "u1",
358 "s1",
359 &RecallMessage::new("assistant", "sure, starting auth refactor", 101),
360 )
361 .await
362 .unwrap();
363
364 let hits = r.search("u1", "auth refactor", 5).await.unwrap();
365 assert_eq!(hits.len(), 1);
366 assert_eq!(hits[0].session.session_id, "s1");
367 assert!(hits[0].snippet.contains(">>>"));
368 assert!(!hits[0].bookend_start.is_empty());
369
370 let scrolled = r.scroll("u1", "s1", 1, 1).await.unwrap();
371 assert!(scrolled.iter().any(|m| m.id == 1));
372
373 let recent = r.recent("u1", 10).await.unwrap();
374 assert_eq!(recent.len(), 1);
375
376 let _ = std::fs::remove_dir_all(&root);
377 }
378
379 #[tokio::test]
380 async fn malformed_line_skipped() {
381 let root = tmp_root();
382 let r = FileRecall::open(&root).unwrap();
383 r.ensure_session("u1", "s1", &SessionMeta::new("s1", 1))
384 .await
385 .unwrap();
386 let path = r.session_path("u1", "s1");
388 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
389 std::fs::write(
390 &path,
391 "{bad\n{\"id\":0,\"role\":\"user\",\"content\":\"hello world\",\"ts_ms\":1}\n",
392 )
393 .unwrap();
394 let hits = r.search("u1", "hello", 5).await.unwrap();
395 assert_eq!(hits.len(), 1);
396 let _ = std::fs::remove_dir_all(&root);
397 }
398
399 #[tokio::test]
400 async fn cjk_owner_and_unicode_content_do_not_panic() {
401 let root = tmp_root();
402 let r = FileRecall::open(&root).unwrap();
403 let owner = "用户".repeat(50); r.ensure_session(&owner, "s1", &SessionMeta::new("s1", 1))
405 .await
406 .unwrap();
407 r.append(
408 &owner,
409 "s1",
410 &RecallMessage::new("user", "İstanbul café 支付服务 refactor", 1),
411 )
412 .await
413 .unwrap();
414 let _ = r.search(&owner, "refactor", 5).await.unwrap();
416 let _ = r.search(&owner, "İstanbul", 5).await.unwrap();
417 let _ = std::fs::remove_dir_all(&root);
418 }
419}