anamnesis_adapter_memori/
lib.rs1#![forbid(unsafe_code)]
22#![warn(missing_docs)]
23
24pub mod detector;
25pub mod normalizer;
26pub mod scanner;
27
28use std::path::PathBuf;
29use std::sync::Arc;
30
31use anamnesis_core::adapter::{HealthStatus, MemoryAdapter, RawRecord, ScanOpts};
32use anamnesis_core::error::Result;
33use anamnesis_core::model::{AnamnesisRecord, SourceDescriptor};
34use async_trait::async_trait;
35use futures::stream::{self, BoxStream, StreamExt};
36
37pub use detector::MemoriDetector;
38pub use scanner::{
39 MemoriConversationMessage, MemoriConversationSummary, MemoriEntityFact, MemoriKgTriple,
40 MemoriProcessAttribute, MemoriScan,
41};
42
43pub const ADAPTER_ID: &str = "memori";
45
46#[derive(Debug, Clone)]
48pub struct MemoriConfig {
49 pub db_path: PathBuf,
51 pub instance: Option<String>,
53}
54
55pub struct MemoriAdapter {
57 config: Arc<MemoriConfig>,
58}
59
60impl MemoriAdapter {
61 pub fn new(config: MemoriConfig) -> Self {
63 Self {
64 config: Arc::new(config),
65 }
66 }
67}
68
69#[async_trait]
70impl MemoryAdapter for MemoriAdapter {
71 fn descriptor(&self) -> SourceDescriptor {
72 SourceDescriptor {
73 adapter: ADAPTER_ID.into(),
74 instance: self.config.instance.clone(),
75 version: env!("CARGO_PKG_VERSION").into(),
76 }
77 }
78
79 fn scan<'a>(&'a self, opts: ScanOpts) -> BoxStream<'a, Result<RawRecord>> {
80 let cfg = (*self.config).clone();
81 let raws = collect_raws(&cfg, &opts);
82 Box::pin(stream::iter(raws).map(Ok))
83 }
84
85 fn normalize(&self, raw: RawRecord) -> Result<Vec<AnamnesisRecord>> {
86 normalizer::normalize(raw, self.config.instance.as_deref())
87 }
88
89 async fn health(&self) -> HealthStatus {
90 if !self.config.db_path.is_file() {
91 return HealthStatus {
92 ok: false,
93 detail: format!("memori db not found: {}", self.config.db_path.display()),
94 };
95 }
96 let s = scanner::scan_memori(&self.config.db_path);
97 let mut detail = format!(
98 "memori db: {} (entity_facts={}, process_attrs={}, messages={}, summaries={}, kg={})",
99 self.config.db_path.display(),
100 s.entity_facts.len(),
101 s.process_attrs.len(),
102 s.messages.len(),
103 s.summaries.len(),
104 s.kg_triples.len(),
105 );
106 if let Some(err) = s.schema_error {
107 detail.push_str(&format!(" โ schema note: {err}"));
108 }
109 HealthStatus { ok: true, detail }
110 }
111}
112
113fn collect_raws(cfg: &MemoriConfig, opts: &ScanOpts) -> Vec<RawRecord> {
114 let scan = scanner::scan_memori(&cfg.db_path);
115 let mut out = Vec::with_capacity(scan.total());
116 for f in &scan.entity_facts {
117 let ts = f
118 .date_last_time
119 .as_deref()
120 .and_then(scanner::parse_memori_time)
121 .or_else(|| {
122 f.date_created
123 .as_deref()
124 .and_then(scanner::parse_memori_time)
125 });
126 if passes_since(ts, opts) {
127 out.push(normalizer::raw_from_entity_fact(f, cfg.instance.as_deref()));
128 }
129 }
130 for a in &scan.process_attrs {
131 let ts = a
132 .date_last_time
133 .as_deref()
134 .and_then(scanner::parse_memori_time)
135 .or_else(|| {
136 a.date_created
137 .as_deref()
138 .and_then(scanner::parse_memori_time)
139 });
140 if passes_since(ts, opts) {
141 out.push(normalizer::raw_from_process_attr(
142 a,
143 cfg.instance.as_deref(),
144 ));
145 }
146 }
147 for m in &scan.messages {
148 let ts = m
149 .date_created
150 .as_deref()
151 .and_then(scanner::parse_memori_time);
152 if passes_since(ts, opts) {
153 out.push(normalizer::raw_from_message(m, cfg.instance.as_deref()));
154 }
155 }
156 for s in &scan.summaries {
157 let ts = s
158 .date_created
159 .as_deref()
160 .and_then(scanner::parse_memori_time);
161 if passes_since(ts, opts) {
162 out.push(normalizer::raw_from_summary(s, cfg.instance.as_deref()));
163 }
164 }
165 for t in &scan.kg_triples {
166 let ts = t
167 .date_last_time
168 .as_deref()
169 .and_then(scanner::parse_memori_time)
170 .or_else(|| {
171 t.date_created
172 .as_deref()
173 .and_then(scanner::parse_memori_time)
174 });
175 if passes_since(ts, opts) {
176 out.push(normalizer::raw_from_kg_triple(t, cfg.instance.as_deref()));
177 }
178 }
179 out
180}
181
182fn passes_since(ts_unix: Option<i64>, opts: &ScanOpts) -> bool {
183 if opts.full {
184 return true;
185 }
186 let Some(threshold) = opts.since else {
187 return true;
188 };
189 match ts_unix {
190 Some(t) => t > threshold.timestamp(),
191 None => true,
192 }
193}
194
195pub fn memori_adapter(db_path: impl Into<PathBuf>, instance: Option<&str>) -> MemoriAdapter {
197 MemoriAdapter::new(MemoriConfig {
198 db_path: db_path.into(),
199 instance: instance.map(str::to_owned),
200 })
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use anamnesis_core::Kind;
207 use rusqlite::{params, Connection};
208 use std::fs;
209 use std::sync::atomic::{AtomicU64, Ordering};
210
211 static MEMORI_LIB_TMP_NONCE: AtomicU64 = AtomicU64::new(0);
212
213 fn tmp_db() -> PathBuf {
214 let n = std::time::SystemTime::now()
215 .duration_since(std::time::UNIX_EPOCH)
216 .unwrap()
217 .as_nanos();
218 let seq = MEMORI_LIB_TMP_NONCE.fetch_add(1, Ordering::Relaxed);
219 let dir = std::env::temp_dir().join(format!(
220 "anamnesis-memori-{n}-{pid}-{seq}",
221 pid = std::process::id()
222 ));
223 fs::create_dir_all(&dir).unwrap();
224 dir.join("memori.db")
225 }
226
227 fn seed(db: &std::path::Path) {
228 let conn = Connection::open(db).unwrap();
229 conn.execute_batch(
230 "CREATE TABLE memori_entity (id INTEGER PRIMARY KEY, uuid TEXT, external_id TEXT);
231 CREATE TABLE memori_process (id INTEGER PRIMARY KEY, uuid TEXT, external_id TEXT);
232 CREATE TABLE memori_session (id INTEGER PRIMARY KEY, uuid TEXT, entity_id INTEGER, process_id INTEGER);
233 CREATE TABLE memori_conversation (id INTEGER PRIMARY KEY, uuid TEXT, session_id INTEGER, summary TEXT, date_created TEXT);
234 CREATE TABLE memori_conversation_message (
235 id INTEGER PRIMARY KEY, uuid TEXT, conversation_id INTEGER,
236 role TEXT, type TEXT, content TEXT, date_created TEXT
237 );
238 CREATE TABLE memori_entity_fact (
239 id INTEGER PRIMARY KEY, uuid TEXT, entity_id INTEGER,
240 content TEXT, num_times INTEGER, date_last_time TEXT, date_created TEXT
241 );",
242 )
243 .unwrap();
244 conn.execute(
245 "INSERT INTO memori_entity (id, uuid, external_id) VALUES (?, ?, ?)",
246 params![1, "ent-uuid", "user-123"],
247 )
248 .unwrap();
249 conn.execute(
250 "INSERT INTO memori_session (id, uuid, entity_id) VALUES (?, ?, ?)",
251 params![100, "sess-uuid", 1],
252 )
253 .unwrap();
254 conn.execute(
255 "INSERT INTO memori_conversation (id, uuid, session_id, summary, date_created) \
256 VALUES (?, ?, ?, ?, ?)",
257 params![
258 1000,
259 "conv-uuid",
260 100,
261 "User asked about Paris.",
262 "2026-05-01 10:00:00"
263 ],
264 )
265 .unwrap();
266 conn.execute(
267 "INSERT INTO memori_conversation_message \
268 (uuid, conversation_id, role, content, date_created) \
269 VALUES (?, ?, ?, ?, ?)",
270 params![
271 "msg-1",
272 1000,
273 "user",
274 "I live in Paris",
275 "2026-05-01 10:00:00"
276 ],
277 )
278 .unwrap();
279 conn.execute(
280 "INSERT INTO memori_entity_fact \
281 (uuid, entity_id, content, num_times, date_last_time, date_created) \
282 VALUES (?, ?, ?, ?, ?, ?)",
283 params![
284 "fact-1",
285 1,
286 "user lives in Paris",
287 1,
288 "2026-05-01 10:00:00",
289 "2026-04-01 10:00:00",
290 ],
291 )
292 .unwrap();
293 }
294
295 #[tokio::test]
296 async fn descriptor_carries_instance() {
297 let a = memori_adapter("/tmp/x", Some("laptop"));
298 let d = a.descriptor();
299 assert_eq!(d.adapter, "memori");
300 assert_eq!(d.instance.as_deref(), Some("laptop"));
301 }
302
303 #[tokio::test]
304 async fn health_false_when_db_missing() {
305 let a = memori_adapter("/tmp/never-here-memori.db", None);
306 let h = a.health().await;
307 assert!(!h.ok);
308 }
309
310 #[tokio::test]
311 async fn scan_yields_expected_kinds() {
312 let db = tmp_db();
313 seed(&db);
314 let a = memori_adapter(&db, Some("local"));
315 let raws: Vec<_> = a
316 .scan(ScanOpts::default())
317 .collect::<Vec<_>>()
318 .await
319 .into_iter()
320 .filter_map(|r| r.ok())
321 .collect();
322 assert_eq!(raws.len(), 3);
324 let mut kinds = std::collections::HashSet::new();
325 for raw in raws {
326 for r in a.normalize(raw).unwrap() {
327 kinds.insert(r.kind);
328 }
329 }
330 assert!(kinds.contains(&Kind::Fact));
331 assert!(kinds.contains(&Kind::Episode));
332 }
333
334 #[tokio::test]
335 async fn scan_full_overrides_since() {
336 let db = tmp_db();
337 seed(&db);
338 let a = memori_adapter(&db, None);
339 let cutoff = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
340 .unwrap()
341 .with_timezone(&chrono::Utc);
342 let dropped: Vec<_> = a
343 .scan(ScanOpts {
344 since: Some(cutoff),
345 full: false,
346 })
347 .collect::<Vec<_>>()
348 .await
349 .into_iter()
350 .filter_map(|r| r.ok())
351 .collect();
352 assert_eq!(dropped.len(), 0);
353 let full: Vec<_> = a
354 .scan(ScanOpts {
355 since: Some(cutoff),
356 full: true,
357 })
358 .collect::<Vec<_>>()
359 .await
360 .into_iter()
361 .filter_map(|r| r.ok())
362 .collect();
363 assert_eq!(full.len(), 3);
364 }
365
366 #[tokio::test]
367 async fn idempotent_across_scans() {
368 let db = tmp_db();
369 seed(&db);
370 let a = memori_adapter(&db, Some("laptop"));
371 let run = || async {
372 let mut ids: Vec<_> = a
373 .scan(ScanOpts::default())
374 .collect::<Vec<_>>()
375 .await
376 .into_iter()
377 .filter_map(|r| r.ok())
378 .flat_map(|raw| a.normalize(raw).unwrap())
379 .map(|r| r.id.0)
380 .collect();
381 ids.sort();
382 ids
383 };
384 assert_eq!(run().await, run().await);
385 }
386}