Skip to main content

anamnesis_adapter_memori/
lib.rs

1//! Anamnesis adapter for **Memori** (MemoriLabs/Memori, Apache-2.0).
2//!
3//! Memori is "agent-native memory infrastructure" that turns LLM
4//! conversations into structured durable state. It supports multiple
5//! storage backends; this adapter targets the SQLite backend (most
6//! common for local installs).
7//!
8//! Memori writes data into a fixed set of `memori_*` tables. Coverage:
9//!
10//! | Table                             | Anamnesis Kind | Scope    |
11//! |-----------------------------------|----------------|----------|
12//! | `memori_entity_fact`              | `Fact`         | User     |
13//! | `memori_process_attribute`        | `Reference`    | Project  |
14//! | `memori_conversation_message`     | `Episode`      | Session  |
15//! | `memori_conversation` (`summary`) | `Episode`      | Session  |
16//! | `memori_knowledge_graph` (triple) | `Fact`         | User     |
17//!
18//! Per ยง-1.2.2 the adapter is read-only โ€” we open with
19//! `SQLITE_OPEN_READ_ONLY`.
20
21#![forbid(unsafe_code)]
22#![warn(missing_docs)]
23
24pub mod detector;
25pub mod normalizer;
26pub mod scanner;
27
28use std::path::PathBuf;
29use std::sync::Arc;
30
31use anamnesis_core::adapter::{HealthStatus, MemoryAdapter, RawRecord, ScanOpts};
32use anamnesis_core::error::Result;
33use anamnesis_core::model::{AnamnesisRecord, SourceDescriptor};
34use async_trait::async_trait;
35use futures::stream::{self, BoxStream, StreamExt};
36
37pub use detector::MemoriDetector;
38pub use scanner::{
39    MemoriConversationMessage, MemoriConversationSummary, MemoriEntityFact, MemoriKgTriple,
40    MemoriProcessAttribute, MemoriScan,
41};
42
43/// Stable adapter identifier.
44pub const ADAPTER_ID: &str = "memori";
45
46/// Adapter configuration.
47#[derive(Debug, Clone)]
48pub struct MemoriConfig {
49    /// Path to the Memori SQLite file.
50    pub db_path: PathBuf,
51    /// Instance discriminator (defaults to `"local"` in id synthesis).
52    pub instance: Option<String>,
53}
54
55/// The adapter.
56pub struct MemoriAdapter {
57    config: Arc<MemoriConfig>,
58}
59
60impl MemoriAdapter {
61    /// Build from explicit config.
62    pub fn new(config: MemoriConfig) -> Self {
63        Self {
64            config: Arc::new(config),
65        }
66    }
67}
68
69#[async_trait]
70impl MemoryAdapter for MemoriAdapter {
71    fn descriptor(&self) -> SourceDescriptor {
72        SourceDescriptor {
73            adapter: ADAPTER_ID.into(),
74            instance: self.config.instance.clone(),
75            version: env!("CARGO_PKG_VERSION").into(),
76        }
77    }
78
79    fn scan<'a>(&'a self, opts: ScanOpts) -> BoxStream<'a, Result<RawRecord>> {
80        let cfg = (*self.config).clone();
81        let raws = collect_raws(&cfg, &opts);
82        Box::pin(stream::iter(raws).map(Ok))
83    }
84
85    fn normalize(&self, raw: RawRecord) -> Result<Vec<AnamnesisRecord>> {
86        normalizer::normalize(raw, self.config.instance.as_deref())
87    }
88
89    async fn health(&self) -> HealthStatus {
90        if !self.config.db_path.is_file() {
91            return HealthStatus {
92                ok: false,
93                detail: format!("memori db not found: {}", self.config.db_path.display()),
94            };
95        }
96        let s = scanner::scan_memori(&self.config.db_path);
97        let mut detail = format!(
98            "memori db: {} (entity_facts={}, process_attrs={}, messages={}, summaries={}, kg={})",
99            self.config.db_path.display(),
100            s.entity_facts.len(),
101            s.process_attrs.len(),
102            s.messages.len(),
103            s.summaries.len(),
104            s.kg_triples.len(),
105        );
106        if let Some(err) = s.schema_error {
107            detail.push_str(&format!(" โ€” schema note: {err}"));
108        }
109        HealthStatus { ok: true, detail }
110    }
111}
112
113fn collect_raws(cfg: &MemoriConfig, opts: &ScanOpts) -> Vec<RawRecord> {
114    let scan = scanner::scan_memori(&cfg.db_path);
115    let mut out = Vec::with_capacity(scan.total());
116    for f in &scan.entity_facts {
117        let ts = f
118            .date_last_time
119            .as_deref()
120            .and_then(scanner::parse_memori_time)
121            .or_else(|| {
122                f.date_created
123                    .as_deref()
124                    .and_then(scanner::parse_memori_time)
125            });
126        if passes_since(ts, opts) {
127            out.push(normalizer::raw_from_entity_fact(f, cfg.instance.as_deref()));
128        }
129    }
130    for a in &scan.process_attrs {
131        let ts = a
132            .date_last_time
133            .as_deref()
134            .and_then(scanner::parse_memori_time)
135            .or_else(|| {
136                a.date_created
137                    .as_deref()
138                    .and_then(scanner::parse_memori_time)
139            });
140        if passes_since(ts, opts) {
141            out.push(normalizer::raw_from_process_attr(
142                a,
143                cfg.instance.as_deref(),
144            ));
145        }
146    }
147    for m in &scan.messages {
148        let ts = m
149            .date_created
150            .as_deref()
151            .and_then(scanner::parse_memori_time);
152        if passes_since(ts, opts) {
153            out.push(normalizer::raw_from_message(m, cfg.instance.as_deref()));
154        }
155    }
156    for s in &scan.summaries {
157        let ts = s
158            .date_created
159            .as_deref()
160            .and_then(scanner::parse_memori_time);
161        if passes_since(ts, opts) {
162            out.push(normalizer::raw_from_summary(s, cfg.instance.as_deref()));
163        }
164    }
165    for t in &scan.kg_triples {
166        let ts = t
167            .date_last_time
168            .as_deref()
169            .and_then(scanner::parse_memori_time)
170            .or_else(|| {
171                t.date_created
172                    .as_deref()
173                    .and_then(scanner::parse_memori_time)
174            });
175        if passes_since(ts, opts) {
176            out.push(normalizer::raw_from_kg_triple(t, cfg.instance.as_deref()));
177        }
178    }
179    out
180}
181
182fn passes_since(ts_unix: Option<i64>, opts: &ScanOpts) -> bool {
183    if opts.full {
184        return true;
185    }
186    let Some(threshold) = opts.since else {
187        return true;
188    };
189    match ts_unix {
190        Some(t) => t > threshold.timestamp(),
191        None => true,
192    }
193}
194
195/// Convenience constructor.
196pub fn memori_adapter(db_path: impl Into<PathBuf>, instance: Option<&str>) -> MemoriAdapter {
197    MemoriAdapter::new(MemoriConfig {
198        db_path: db_path.into(),
199        instance: instance.map(str::to_owned),
200    })
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use anamnesis_core::Kind;
207    use rusqlite::{params, Connection};
208    use std::fs;
209    use std::sync::atomic::{AtomicU64, Ordering};
210
211    static MEMORI_LIB_TMP_NONCE: AtomicU64 = AtomicU64::new(0);
212
213    fn tmp_db() -> PathBuf {
214        let n = std::time::SystemTime::now()
215            .duration_since(std::time::UNIX_EPOCH)
216            .unwrap()
217            .as_nanos();
218        let seq = MEMORI_LIB_TMP_NONCE.fetch_add(1, Ordering::Relaxed);
219        let dir = std::env::temp_dir().join(format!(
220            "anamnesis-memori-{n}-{pid}-{seq}",
221            pid = std::process::id()
222        ));
223        fs::create_dir_all(&dir).unwrap();
224        dir.join("memori.db")
225    }
226
227    fn seed(db: &std::path::Path) {
228        let conn = Connection::open(db).unwrap();
229        conn.execute_batch(
230            "CREATE TABLE memori_entity (id INTEGER PRIMARY KEY, uuid TEXT, external_id TEXT);
231             CREATE TABLE memori_process (id INTEGER PRIMARY KEY, uuid TEXT, external_id TEXT);
232             CREATE TABLE memori_session (id INTEGER PRIMARY KEY, uuid TEXT, entity_id INTEGER, process_id INTEGER);
233             CREATE TABLE memori_conversation (id INTEGER PRIMARY KEY, uuid TEXT, session_id INTEGER, summary TEXT, date_created TEXT);
234             CREATE TABLE memori_conversation_message (
235                 id INTEGER PRIMARY KEY, uuid TEXT, conversation_id INTEGER,
236                 role TEXT, type TEXT, content TEXT, date_created TEXT
237             );
238             CREATE TABLE memori_entity_fact (
239                 id INTEGER PRIMARY KEY, uuid TEXT, entity_id INTEGER,
240                 content TEXT, num_times INTEGER, date_last_time TEXT, date_created TEXT
241             );",
242        )
243        .unwrap();
244        conn.execute(
245            "INSERT INTO memori_entity (id, uuid, external_id) VALUES (?, ?, ?)",
246            params![1, "ent-uuid", "user-123"],
247        )
248        .unwrap();
249        conn.execute(
250            "INSERT INTO memori_session (id, uuid, entity_id) VALUES (?, ?, ?)",
251            params![100, "sess-uuid", 1],
252        )
253        .unwrap();
254        conn.execute(
255            "INSERT INTO memori_conversation (id, uuid, session_id, summary, date_created) \
256             VALUES (?, ?, ?, ?, ?)",
257            params![
258                1000,
259                "conv-uuid",
260                100,
261                "User asked about Paris.",
262                "2026-05-01 10:00:00"
263            ],
264        )
265        .unwrap();
266        conn.execute(
267            "INSERT INTO memori_conversation_message \
268             (uuid, conversation_id, role, content, date_created) \
269             VALUES (?, ?, ?, ?, ?)",
270            params![
271                "msg-1",
272                1000,
273                "user",
274                "I live in Paris",
275                "2026-05-01 10:00:00"
276            ],
277        )
278        .unwrap();
279        conn.execute(
280            "INSERT INTO memori_entity_fact \
281             (uuid, entity_id, content, num_times, date_last_time, date_created) \
282             VALUES (?, ?, ?, ?, ?, ?)",
283            params![
284                "fact-1",
285                1,
286                "user lives in Paris",
287                1,
288                "2026-05-01 10:00:00",
289                "2026-04-01 10:00:00",
290            ],
291        )
292        .unwrap();
293    }
294
295    #[tokio::test]
296    async fn descriptor_carries_instance() {
297        let a = memori_adapter("/tmp/x", Some("laptop"));
298        let d = a.descriptor();
299        assert_eq!(d.adapter, "memori");
300        assert_eq!(d.instance.as_deref(), Some("laptop"));
301    }
302
303    #[tokio::test]
304    async fn health_false_when_db_missing() {
305        let a = memori_adapter("/tmp/never-here-memori.db", None);
306        let h = a.health().await;
307        assert!(!h.ok);
308    }
309
310    #[tokio::test]
311    async fn scan_yields_expected_kinds() {
312        let db = tmp_db();
313        seed(&db);
314        let a = memori_adapter(&db, Some("local"));
315        let raws: Vec<_> = a
316            .scan(ScanOpts::default())
317            .collect::<Vec<_>>()
318            .await
319            .into_iter()
320            .filter_map(|r| r.ok())
321            .collect();
322        // 1 fact + 1 message + 1 summary = 3.
323        assert_eq!(raws.len(), 3);
324        let mut kinds = std::collections::HashSet::new();
325        for raw in raws {
326            for r in a.normalize(raw).unwrap() {
327                kinds.insert(r.kind);
328            }
329        }
330        assert!(kinds.contains(&Kind::Fact));
331        assert!(kinds.contains(&Kind::Episode));
332    }
333
334    #[tokio::test]
335    async fn scan_full_overrides_since() {
336        let db = tmp_db();
337        seed(&db);
338        let a = memori_adapter(&db, None);
339        let cutoff = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
340            .unwrap()
341            .with_timezone(&chrono::Utc);
342        let dropped: Vec<_> = a
343            .scan(ScanOpts {
344                since: Some(cutoff),
345                full: false,
346            })
347            .collect::<Vec<_>>()
348            .await
349            .into_iter()
350            .filter_map(|r| r.ok())
351            .collect();
352        assert_eq!(dropped.len(), 0);
353        let full: Vec<_> = a
354            .scan(ScanOpts {
355                since: Some(cutoff),
356                full: true,
357            })
358            .collect::<Vec<_>>()
359            .await
360            .into_iter()
361            .filter_map(|r| r.ok())
362            .collect();
363        assert_eq!(full.len(), 3);
364    }
365
366    #[tokio::test]
367    async fn idempotent_across_scans() {
368        let db = tmp_db();
369        seed(&db);
370        let a = memori_adapter(&db, Some("laptop"));
371        let run = || async {
372            let mut ids: Vec<_> = a
373                .scan(ScanOpts::default())
374                .collect::<Vec<_>>()
375                .await
376                .into_iter()
377                .filter_map(|r| r.ok())
378                .flat_map(|raw| a.normalize(raw).unwrap())
379                .map(|r| r.id.0)
380                .collect();
381            ids.sort();
382            ids
383        };
384        assert_eq!(run().await, run().await);
385    }
386}