Skip to main content

synaps_cli/memory/
store.rs

1use std::fs::{self, OpenOptions};
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
9pub struct MemoryRecord {
10    /// Caller-supplied namespace, e.g. "session-notes" or "<plugin-id>".
11    pub namespace: String,
12    /// Unix epoch milliseconds.
13    pub timestamp_ms: u64,
14    /// Free-form text content.
15    pub content: String,
16    /// Optional tag list (e.g. ["@user", "preference"]).
17    #[serde(default)]
18    pub tags: Vec<String>,
19    /// Optional structured metadata. Validated as JSON on read.
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    pub meta: Option<serde_json::Value>,
22}
23
24#[derive(Debug, Clone, Default)]
25pub struct MemoryQuery {
26    /// Optional substring match against `content` (case-insensitive).
27    pub content_contains: Option<String>,
28    /// Optional tag prefix; record matches if ANY of its tags has this prefix.
29    pub tag_prefix: Option<String>,
30    /// Inclusive lower bound on `timestamp_ms`.
31    pub since_ms: Option<u64>,
32    /// Inclusive upper bound on `timestamp_ms`.
33    pub until_ms: Option<u64>,
34    /// Maximum number of records to return (most recent first). Default: 50.
35    pub limit: Option<usize>,
36}
37
38/// Default per-query record cap.
39pub const DEFAULT_LIMIT: usize = 50;
40
41/// Maximum content length per record (UTF-8 byte length).
42pub const MAX_CONTENT_BYTES: usize = 16 * 1024;
43
44#[derive(Debug)]
45pub enum MemoryError {
46    InvalidNamespace(String),
47    ContentTooLarge { len: usize, max: usize },
48    Io(String),
49    Serde(String),
50}
51
52impl std::fmt::Display for MemoryError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            MemoryError::InvalidNamespace(s) => write!(f, "invalid namespace: {s:?}"),
56            MemoryError::ContentTooLarge { len, max } => {
57                write!(f, "content too large: {len} bytes (max {max})")
58            }
59            MemoryError::Io(s) => write!(f, "memory io error: {s}"),
60            MemoryError::Serde(s) => write!(f, "memory serde error: {s}"),
61        }
62    }
63}
64
65impl std::error::Error for MemoryError {}
66
67impl From<std::io::Error> for MemoryError {
68    fn from(e: std::io::Error) -> Self {
69        MemoryError::Io(e.to_string())
70    }
71}
72
73impl From<serde_json::Error> for MemoryError {
74    fn from(e: serde_json::Error) -> Self {
75        MemoryError::Serde(e.to_string())
76    }
77}
78
79/// Current Unix epoch in milliseconds.
80pub fn now_ms() -> u64 {
81    SystemTime::now()
82        .duration_since(UNIX_EPOCH)
83        .map(|d| d.as_millis() as u64)
84        .unwrap_or(0)
85}
86
87fn validate_namespace(ns: &str) -> Result<(), MemoryError> {
88    if ns.is_empty()
89        || ns.len() > 64
90        || ns.contains('/')
91        || ns.contains('\\')
92        || ns.contains("..")
93        || ns.chars().any(|c| c.is_whitespace())
94    {
95        return Err(MemoryError::InvalidNamespace(ns.to_string()));
96    }
97    Ok(())
98}
99
100/// Path to the memory directory under base. Caller creates dirs lazily.
101pub fn memory_dir() -> PathBuf {
102    crate::config::base_dir().join("memory")
103}
104
105pub(crate) fn memory_dir_in(base: &Path) -> PathBuf {
106    base.join("memory")
107}
108
109fn namespace_path(dir: &Path, ns: &str) -> PathBuf {
110    dir.join(format!("{ns}.jsonl"))
111}
112
113/// Append one record. Validates namespace, content size. Atomic per-line via O_APPEND.
114pub fn append(record: &MemoryRecord) -> Result<(), MemoryError> {
115    append_to(&crate::config::base_dir(), record)
116}
117
118pub(crate) fn append_to(base: &Path, record: &MemoryRecord) -> Result<(), MemoryError> {
119    validate_namespace(&record.namespace)?;
120    if record.content.len() > MAX_CONTENT_BYTES {
121        return Err(MemoryError::ContentTooLarge {
122            len: record.content.len(),
123            max: MAX_CONTENT_BYTES,
124        });
125    }
126    let dir = memory_dir_in(base);
127    fs::create_dir_all(&dir)?;
128    let path = namespace_path(&dir, &record.namespace);
129    let mut f = OpenOptions::new().append(true).create(true).open(&path)?;
130    let mut line = serde_json::to_string(record)?;
131    line.push('\n');
132    f.write_all(line.as_bytes())?;
133    Ok(())
134}
135
136/// Query records in a namespace, applying filters, returning most-recent-first up to limit.
137pub fn query(namespace: &str, q: &MemoryQuery) -> Result<Vec<MemoryRecord>, MemoryError> {
138    query_in(&crate::config::base_dir(), namespace, q)
139}
140
141pub(crate) fn query_in(
142    base: &Path,
143    namespace: &str,
144    q: &MemoryQuery,
145) -> Result<Vec<MemoryRecord>, MemoryError> {
146    validate_namespace(namespace)?;
147    let path = namespace_path(&memory_dir_in(base), namespace);
148    if !path.exists() {
149        return Ok(Vec::new());
150    }
151    let f = fs::File::open(&path)?;
152    let reader = BufReader::new(f);
153    let needle = q.content_contains.as_ref().map(|s| s.to_lowercase());
154    let mut out: Vec<MemoryRecord> = Vec::new();
155    for line in reader.lines() {
156        let line = match line {
157            Ok(l) => l,
158            Err(_) => continue,
159        };
160        if line.trim().is_empty() {
161            continue;
162        }
163        let rec: MemoryRecord = match serde_json::from_str(&line) {
164            Ok(r) => r,
165            Err(_) => continue,
166        };
167        if let Some(since) = q.since_ms {
168            if rec.timestamp_ms < since {
169                continue;
170            }
171        }
172        if let Some(until) = q.until_ms {
173            if rec.timestamp_ms > until {
174                continue;
175            }
176        }
177        if let Some(needle) = &needle {
178            if !rec.content.to_lowercase().contains(needle) {
179                continue;
180            }
181        }
182        if let Some(prefix) = &q.tag_prefix {
183            if !rec.tags.iter().any(|t| t.starts_with(prefix)) {
184                continue;
185            }
186        }
187        out.push(rec);
188    }
189    out.sort_by(|a, b| b.timestamp_ms.cmp(&a.timestamp_ms));
190    let limit = q.limit.unwrap_or(DEFAULT_LIMIT);
191    out.truncate(limit);
192    Ok(out)
193}
194
195/// List existing namespaces under the memory dir.
196pub fn list_namespaces() -> Result<Vec<String>, MemoryError> {
197    list_namespaces_in(&crate::config::base_dir())
198}
199
200pub(crate) fn list_namespaces_in(base: &Path) -> Result<Vec<String>, MemoryError> {
201    let dir = memory_dir_in(base);
202    if !dir.exists() {
203        return Ok(Vec::new());
204    }
205    let mut out = Vec::new();
206    for entry in fs::read_dir(&dir)? {
207        let entry = entry?;
208        let path = entry.path();
209        if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
210            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
211                out.push(stem.to_string());
212            }
213        }
214    }
215    out.sort();
216    Ok(out)
217}
218
219/// Build a record with `now_ms()` timestamp.
220pub fn new_record(
221    namespace: impl Into<String>,
222    content: impl Into<String>,
223    tags: Vec<String>,
224    meta: Option<serde_json::Value>,
225) -> MemoryRecord {
226    MemoryRecord {
227        namespace: namespace.into(),
228        timestamp_ms: now_ms(),
229        content: content.into(),
230        tags,
231        meta,
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use tempfile::TempDir;
239
240    fn rec(ns: &str, ts: u64, content: &str, tags: Vec<&str>) -> MemoryRecord {
241        MemoryRecord {
242            namespace: ns.to_string(),
243            timestamp_ms: ts,
244            content: content.to_string(),
245            tags: tags.into_iter().map(String::from).collect(),
246            meta: None,
247        }
248    }
249
250    #[test]
251    fn append_then_query_returns_record() {
252        let tmp = TempDir::new().unwrap();
253        let r = rec("ns", 100, "hello world", vec!["@user"]);
254        append_to(tmp.path(), &r).unwrap();
255        let got = query_in(tmp.path(), "ns", &MemoryQuery::default()).unwrap();
256        assert_eq!(got, vec![r]);
257    }
258
259    #[test]
260    fn query_filters_by_content_contains() {
261        let tmp = TempDir::new().unwrap();
262        append_to(tmp.path(), &rec("ns", 100, "Hello World", vec![])).unwrap();
263        append_to(tmp.path(), &rec("ns", 200, "goodbye", vec![])).unwrap();
264        let q = MemoryQuery {
265            content_contains: Some("hello".to_string()),
266            ..Default::default()
267        };
268        let got = query_in(tmp.path(), "ns", &q).unwrap();
269        assert_eq!(got.len(), 1);
270        assert_eq!(got[0].content, "Hello World");
271    }
272
273    #[test]
274    fn query_filters_by_tag_prefix() {
275        let tmp = TempDir::new().unwrap();
276        append_to(
277            tmp.path(),
278            &rec("ns", 100, "x", vec!["@user", "preference"]),
279        )
280        .unwrap();
281        let got = query_in(
282            tmp.path(),
283            "ns",
284            &MemoryQuery {
285                tag_prefix: Some("@u".into()),
286                ..Default::default()
287            },
288        )
289        .unwrap();
290        assert_eq!(got.len(), 1);
291        let got = query_in(
292            tmp.path(),
293            "ns",
294            &MemoryQuery {
295                tag_prefix: Some("@x".into()),
296                ..Default::default()
297            },
298        )
299        .unwrap();
300        assert!(got.is_empty());
301    }
302
303    #[test]
304    fn query_filters_by_time_range() {
305        let tmp = TempDir::new().unwrap();
306        append_to(tmp.path(), &rec("ns", 100, "a", vec![])).unwrap();
307        append_to(tmp.path(), &rec("ns", 200, "b", vec![])).unwrap();
308        append_to(tmp.path(), &rec("ns", 300, "c", vec![])).unwrap();
309        let got = query_in(
310            tmp.path(),
311            "ns",
312            &MemoryQuery {
313                since_ms: Some(150),
314                until_ms: Some(250),
315                ..Default::default()
316            },
317        )
318        .unwrap();
319        assert_eq!(got.len(), 1);
320        assert_eq!(got[0].timestamp_ms, 200);
321    }
322
323    #[test]
324    fn query_returns_most_recent_first() {
325        let tmp = TempDir::new().unwrap();
326        append_to(tmp.path(), &rec("ns", 100, "a", vec![])).unwrap();
327        append_to(tmp.path(), &rec("ns", 300, "c", vec![])).unwrap();
328        append_to(tmp.path(), &rec("ns", 200, "b", vec![])).unwrap();
329        let got = query_in(tmp.path(), "ns", &MemoryQuery::default()).unwrap();
330        let ts: Vec<u64> = got.iter().map(|r| r.timestamp_ms).collect();
331        assert_eq!(ts, vec![300, 200, 100]);
332    }
333
334    #[test]
335    fn query_respects_limit() {
336        let tmp = TempDir::new().unwrap();
337        for i in 1..=5 {
338            append_to(tmp.path(), &rec("ns", i * 100, "x", vec![])).unwrap();
339        }
340        let got = query_in(
341            tmp.path(),
342            "ns",
343            &MemoryQuery {
344                limit: Some(2),
345                ..Default::default()
346            },
347        )
348        .unwrap();
349        assert_eq!(got.len(), 2);
350        assert_eq!(got[0].timestamp_ms, 500);
351        assert_eq!(got[1].timestamp_ms, 400);
352    }
353
354    #[test]
355    fn query_skips_malformed_lines() {
356        let tmp = TempDir::new().unwrap();
357        let dir = memory_dir_in(tmp.path());
358        fs::create_dir_all(&dir).unwrap();
359        let path = namespace_path(&dir, "ns");
360        let v1 = serde_json::to_string(&rec("ns", 100, "a", vec![])).unwrap();
361        let v2 = serde_json::to_string(&rec("ns", 200, "b", vec![])).unwrap();
362        let body = format!("invalid json\n{v1}\nnot json either\n{v2}\n");
363        fs::write(&path, body).unwrap();
364        let got = query_in(tmp.path(), "ns", &MemoryQuery::default()).unwrap();
365        assert_eq!(got.len(), 2);
366    }
367
368    #[test]
369    fn append_rejects_oversized_content() {
370        let tmp = TempDir::new().unwrap();
371        let big = "x".repeat(MAX_CONTENT_BYTES + 1);
372        let r = rec("ns", 1, &big, vec![]);
373        match append_to(tmp.path(), &r) {
374            Err(MemoryError::ContentTooLarge { .. }) => {}
375            other => panic!("expected ContentTooLarge, got {other:?}"),
376        }
377    }
378
379    #[test]
380    fn append_rejects_invalid_namespace() {
381        let tmp = TempDir::new().unwrap();
382        let cases = [
383            "",
384            "a/b",
385            "a\\b",
386            "..",
387            "a..b",
388            "has space",
389            "tab\there",
390            &"x".repeat(65),
391        ];
392        for ns in cases {
393            let r = rec(ns, 1, "x", vec![]);
394            match append_to(tmp.path(), &r) {
395                Err(MemoryError::InvalidNamespace(_)) => {}
396                other => panic!("expected InvalidNamespace for {ns:?}, got {other:?}"),
397            }
398        }
399    }
400
401    #[test]
402    fn list_namespaces_returns_existing_files() {
403        let tmp = TempDir::new().unwrap();
404        append_to(tmp.path(), &rec("alpha", 1, "x", vec![])).unwrap();
405        append_to(tmp.path(), &rec("beta", 1, "x", vec![])).unwrap();
406        let got = list_namespaces_in(tmp.path()).unwrap();
407        assert_eq!(got, vec!["alpha".to_string(), "beta".to_string()]);
408    }
409
410    #[test]
411    fn list_namespaces_on_missing_dir_returns_empty() {
412        let tmp = TempDir::new().unwrap();
413        let got = list_namespaces_in(tmp.path()).unwrap();
414        assert!(got.is_empty());
415    }
416
417    #[test]
418    fn query_on_missing_namespace_returns_empty() {
419        let tmp = TempDir::new().unwrap();
420        let got = query_in(tmp.path(), "nope", &MemoryQuery::default()).unwrap();
421        assert!(got.is_empty());
422    }
423}