anamnesis_adapter_memos/
lib.rs1#![forbid(unsafe_code)]
40#![warn(missing_docs)]
41
42pub mod detector;
43pub mod normalizer;
44pub mod scanner;
45
46use std::path::PathBuf;
47use std::sync::Arc;
48
49use anamnesis_core::adapter::{HealthStatus, MemoryAdapter, RawRecord, ScanOpts};
50use anamnesis_core::error::Result;
51use anamnesis_core::model::{AnamnesisRecord, SourceDescriptor};
52use async_trait::async_trait;
53use futures::stream::{self, BoxStream, StreamExt};
54
55pub use detector::MemosDetector;
56pub use scanner::{MemosScan, MemosTextItem};
57
58pub const ADAPTER_ID: &str = "memos";
60
61#[derive(Debug, Clone)]
63pub struct MemosConfig {
64 pub root_dir: PathBuf,
66 pub instance: Option<String>,
68}
69
70pub struct MemosAdapter {
72 config: Arc<MemosConfig>,
73}
74
75impl MemosAdapter {
76 pub fn new(config: MemosConfig) -> Self {
78 Self {
79 config: Arc::new(config),
80 }
81 }
82}
83
84#[async_trait]
85impl MemoryAdapter for MemosAdapter {
86 fn descriptor(&self) -> SourceDescriptor {
87 SourceDescriptor {
88 adapter: ADAPTER_ID.into(),
89 instance: self.config.instance.clone(),
90 version: env!("CARGO_PKG_VERSION").into(),
91 }
92 }
93
94 fn scan<'a>(&'a self, opts: ScanOpts) -> BoxStream<'a, Result<RawRecord>> {
95 let cfg = (*self.config).clone();
96 let raws = collect_raws(&cfg, &opts);
97 Box::pin(stream::iter(raws).map(Ok))
98 }
99
100 fn normalize(&self, raw: RawRecord) -> Result<Vec<AnamnesisRecord>> {
101 normalizer::normalize(raw, self.config.instance.as_deref())
102 }
103
104 async fn health(&self) -> HealthStatus {
105 if !self.config.root_dir.is_dir() {
106 return HealthStatus {
107 ok: false,
108 detail: format!("memos root not found: {}", self.config.root_dir.display()),
109 };
110 }
111 let s = scanner::scan_memos(&self.config.root_dir);
112 let mut detail = format!(
113 "memos root: {} (cubes={}, items={})",
114 self.config.root_dir.display(),
115 s.cube_dirs.len(),
116 s.items.len(),
117 );
118 if !s.parse_errors.is_empty() {
119 detail.push_str(&format!(" — {} parse error(s)", s.parse_errors.len()));
120 }
121 HealthStatus { ok: true, detail }
122 }
123}
124
125fn collect_raws(cfg: &MemosConfig, opts: &ScanOpts) -> Vec<RawRecord> {
126 let scan = scanner::scan_memos(&cfg.root_dir);
127 let mut out = Vec::with_capacity(scan.total());
128 for i in &scan.items {
129 let ts = i
130 .updated_at
131 .as_deref()
132 .and_then(scanner::parse_memos_time)
133 .or_else(|| i.created_at.as_deref().and_then(scanner::parse_memos_time));
134 if passes_since(ts, opts) {
135 out.push(normalizer::raw_from_item(i, cfg.instance.as_deref()));
136 }
137 }
138 out
139}
140
141fn passes_since(ts_unix: Option<i64>, opts: &ScanOpts) -> bool {
142 if opts.full {
143 return true;
144 }
145 let Some(threshold) = opts.since else {
146 return true;
147 };
148 match ts_unix {
149 Some(t) => t > threshold.timestamp(),
150 None => true,
151 }
152}
153
154pub fn memos_adapter(root_dir: impl Into<PathBuf>, instance: Option<&str>) -> MemosAdapter {
156 MemosAdapter::new(MemosConfig {
157 root_dir: root_dir.into(),
158 instance: instance.map(str::to_owned),
159 })
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use anamnesis_core::Kind;
166 use std::fs;
167 use std::sync::atomic::{AtomicU64, Ordering};
168
169 static MEMOS_LIB_TMP_NONCE: AtomicU64 = AtomicU64::new(0);
170
171 fn tmp_dir() -> PathBuf {
172 let n = std::time::SystemTime::now()
173 .duration_since(std::time::UNIX_EPOCH)
174 .unwrap()
175 .as_nanos();
176 let seq = MEMOS_LIB_TMP_NONCE.fetch_add(1, Ordering::Relaxed);
177 let p = std::env::temp_dir().join(format!(
178 "anamnesis-memos-{n}-{pid}-{seq}",
179 pid = std::process::id()
180 ));
181 fs::create_dir_all(&p).unwrap();
182 p
183 }
184
185 fn seed_root(root: &std::path::Path) {
186 let cube = root.join("cube-1");
187 fs::create_dir_all(&cube).unwrap();
188 let payload = serde_json::json!([
189 {
190 "id": "i-1",
191 "memory": "user prefers Rust",
192 "metadata": {
193 "memory_type": "UserMemory",
194 "user_id": "u-1",
195 "session_id": "s-1",
196 "source": "conversation",
197 "status": "activated",
198 "updated_at": "2026-05-01T10:00:00"
199 }
200 },
201 {
202 "id": "i-2",
203 "memory": "Paris is the capital",
204 "metadata": {
205 "memory_type": "LongTermMemory",
206 "status": "activated",
207 "updated_at": "2026-05-02T10:00:00"
208 }
209 }
210 ]);
211 fs::write(cube.join("textual_memory.json"), payload.to_string()).unwrap();
212 }
213
214 #[tokio::test]
215 async fn descriptor_carries_instance() {
216 let a = memos_adapter("/tmp/x", Some("laptop"));
217 let d = a.descriptor();
218 assert_eq!(d.adapter, "memos");
219 assert_eq!(d.instance.as_deref(), Some("laptop"));
220 }
221
222 #[tokio::test]
223 async fn health_false_when_root_missing() {
224 let a = memos_adapter("/tmp/never-here-memos", None);
225 let h = a.health().await;
226 assert!(!h.ok);
227 }
228
229 #[tokio::test]
230 async fn scan_yields_expected_kinds() {
231 let dir = tmp_dir();
232 seed_root(&dir);
233 let a = memos_adapter(&dir, Some("local"));
234 let raws: Vec<_> = a
235 .scan(ScanOpts::default())
236 .collect::<Vec<_>>()
237 .await
238 .into_iter()
239 .filter_map(|r| r.ok())
240 .collect();
241 assert_eq!(raws.len(), 2);
242 let mut kinds = std::collections::HashSet::new();
243 for raw in raws {
244 for r in a.normalize(raw).unwrap() {
245 kinds.insert(r.kind);
246 }
247 }
248 assert!(kinds.contains(&Kind::Preference));
249 assert!(kinds.contains(&Kind::Fact));
250 }
251
252 #[tokio::test]
253 async fn scan_full_overrides_since() {
254 let dir = tmp_dir();
255 seed_root(&dir);
256 let a = memos_adapter(&dir, None);
257 let cutoff = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
258 .unwrap()
259 .with_timezone(&chrono::Utc);
260 let dropped: Vec<_> = a
261 .scan(ScanOpts {
262 since: Some(cutoff),
263 full: false,
264 })
265 .collect::<Vec<_>>()
266 .await
267 .into_iter()
268 .filter_map(|r| r.ok())
269 .collect();
270 assert_eq!(dropped.len(), 0);
271 let full: Vec<_> = a
272 .scan(ScanOpts {
273 since: Some(cutoff),
274 full: true,
275 })
276 .collect::<Vec<_>>()
277 .await
278 .into_iter()
279 .filter_map(|r| r.ok())
280 .collect();
281 assert_eq!(full.len(), 2);
282 }
283
284 #[tokio::test]
285 async fn idempotent_across_scans() {
286 let dir = tmp_dir();
287 seed_root(&dir);
288 let a = memos_adapter(&dir, Some("laptop"));
289 let run = || async {
290 let mut ids: Vec<_> = a
291 .scan(ScanOpts::default())
292 .collect::<Vec<_>>()
293 .await
294 .into_iter()
295 .filter_map(|r| r.ok())
296 .flat_map(|raw| a.normalize(raw).unwrap())
297 .map(|r| r.id.0)
298 .collect();
299 ids.sort();
300 ids
301 };
302 assert_eq!(run().await, run().await);
303 }
304}