use harness_core::{Memory, MemoryEntry, MemoryError};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
pub struct FileMemory {
path: PathBuf,
write_lock: Mutex<()>,
}
impl FileMemory {
pub fn open(path: impl Into<PathBuf>) -> Result<Self, MemoryError> {
let path = path.into();
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)
.map_err(|e| MemoryError::Io(format!("create parent: {e}")))?;
}
if !path.exists() {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| MemoryError::Io(format!("create {}: {e}", path.display())))?;
}
Ok(Self {
path,
write_lock: Mutex::new(()),
})
}
pub fn path(&self) -> &Path {
&self.path
}
fn read_all(&self) -> Result<Vec<MemoryEntry>, MemoryError> {
let content = std::fs::read_to_string(&self.path)
.map_err(|e| MemoryError::Io(format!("read {}: {e}", self.path.display())))?;
let mut out = Vec::new();
for (i, line) in content.lines().enumerate() {
let line = line.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<MemoryEntry>(line) {
Ok(e) => out.push(e),
Err(err) => {
tracing::warn!(line = i + 1, error = %err, "memory line skipped");
}
}
}
Ok(out)
}
}
#[async_trait::async_trait]
impl Memory for FileMemory {
async fn recall(&self, query: &str, k: usize) -> Result<Vec<MemoryEntry>, MemoryError> {
let entries = self.read_all()?;
if entries.is_empty() || k == 0 {
return Ok(Vec::new());
}
let q_tokens = tokenise(query);
if q_tokens.is_empty() {
let mut all = entries;
all.sort_by_key(|e| std::cmp::Reverse(e.created_ms));
all.truncate(k);
return Ok(all);
}
let mut scored: Vec<(u32, &MemoryEntry)> = entries
.iter()
.map(|e| {
let mut hay = e.content.to_lowercase();
if !e.tags.is_empty() {
hay.push(' ');
hay.push_str(&e.tags.join(" ").to_lowercase());
}
let hits: u32 = q_tokens
.iter()
.map(|t| if hay.contains(t.as_str()) { 1 } else { 0 })
.sum();
(hits, e)
})
.filter(|(hits, _)| *hits > 0)
.collect();
scored.sort_by(|a, b| b.0.cmp(&a.0).then(b.1.created_ms.cmp(&a.1.created_ms)));
Ok(scored.into_iter().take(k).map(|(_, e)| e.clone()).collect())
}
async fn write(&self, mut entry: MemoryEntry) -> Result<(), MemoryError> {
if entry.id.is_empty() {
entry.id = short_id();
}
if entry.created_ms == 0 {
entry.created_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
}
let line = serde_json::to_string(&entry).map_err(|e| MemoryError::Serde(e.to_string()))?;
let _guard = self
.write_lock
.lock()
.map_err(|e| MemoryError::Backend(format!("poisoned mutex: {e}")))?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.map_err(|e| MemoryError::Io(format!("open {}: {e}", self.path.display())))?;
use std::io::Write;
writeln!(file, "{line}").map_err(|e| MemoryError::Io(format!("write: {e}")))?;
Ok(())
}
}
fn tokenise(s: &str) -> HashSet<String> {
s.to_lowercase()
.split(|c: char| !c.is_alphanumeric())
.filter(|t| t.len() >= 3) .map(String::from)
.collect()
}
fn short_id() -> String {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
format!("{:08x}", nanos as u64 & 0xFFFF_FFFF)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
static N: AtomicU64 = AtomicU64::new(0);
fn tmp() -> PathBuf {
let pid = std::process::id();
let n = N.fetch_add(1, Ordering::SeqCst);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
std::env::temp_dir().join(format!("harness-mem-test-{pid}-{nanos}-{n}.jsonl"))
}
#[tokio::test]
async fn write_then_recall_round_trips() {
let p = tmp();
let m = FileMemory::open(&p).unwrap();
m.write(MemoryEntry::new("user prefers dark roast coffee").with_tags(["coffee"]))
.await
.unwrap();
m.write(MemoryEntry::new("user lives in Beijing"))
.await
.unwrap();
let hits = m.recall("coffee preferences", 5).await.unwrap();
assert_eq!(hits.len(), 1);
assert!(hits[0].content.contains("dark roast"));
let _ = std::fs::remove_file(&p);
}
#[tokio::test]
async fn empty_query_falls_back_to_recent() {
let p = tmp();
let m = FileMemory::open(&p).unwrap();
m.write(MemoryEntry::new("fact one")).await.unwrap();
m.write(MemoryEntry::new("fact two")).await.unwrap();
let hits = m.recall("", 5).await.unwrap();
assert_eq!(hits.len(), 2);
let _ = std::fs::remove_file(&p);
}
#[tokio::test]
async fn malformed_lines_are_skipped() {
let p = tmp();
{
use std::io::Write;
let mut f = std::fs::File::create(&p).unwrap();
writeln!(f, "{{not valid json").unwrap();
writeln!(
f,
r#"{{"id":"abc","content":"valid fact","tags":[],"source":null,"created_ms":0}}"#
)
.unwrap();
}
let m = FileMemory::open(&p).unwrap();
let all = m.recall("valid", 10).await.unwrap();
assert_eq!(all.len(), 1);
let _ = std::fs::remove_file(&p);
}
}