Skip to main content

yarli_cli/yarli-memory/src/
memory_cli.rs

1//! Memory backend CLI-backed memory adapter.
2//!
3//! This adapter shells out to the memory backend CLI and uses the local project's
4//! `.backend.yml` / `.backend-data` (resolved relative to `project_dir`).
5//!
6//! It is intentionally conservative:
7//! - Uses `memory-backend query ... --output json` for query results.
8//! - Uses `memory-backend memory insert ...` for inserts.
9//! - Emits explicit errors on unsupported operations.
10
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::process::Stdio;
14
15use chrono::{DateTime, Utc};
16use serde_json::Value;
17use tokio::process::Command;
18use uuid::Uuid;
19
20use crate::yarli_memory::adapter::MemoryAdapter;
21use crate::yarli_memory::error::MemoryError;
22use crate::yarli_memory::types::{
23    content_may_contain_secrets, InsertMemory, LinkMemories, MemoryClass, MemoryQuery,
24    MemoryRecord, RelationshipKind, ScopeId,
25};
26
27#[derive(Debug, Clone)]
28pub struct MemoryCliAdapter {
29    command: String,
30    project_dir: PathBuf,
31}
32
33impl MemoryCliAdapter {
34    pub fn new(command: impl Into<String>, project_dir: impl Into<PathBuf>) -> Self {
35        Self {
36            command: command.into(),
37            project_dir: project_dir.into(),
38        }
39    }
40
41    pub fn project_dir(&self) -> &Path {
42        &self.project_dir
43    }
44
45    async fn run(&self, args: &[String]) -> Result<String, MemoryError> {
46        let child_fut = Command::new(&self.command)
47            .args(args)
48            .current_dir(&self.project_dir)
49            .stdin(Stdio::null())
50            .stdout(Stdio::piped())
51            .stderr(Stdio::piped())
52            .output();
53
54        let output = tokio::time::timeout(std::time::Duration::from_secs(10), child_fut)
55            .await
56            .map_err(|_| {
57                MemoryError::ConnectionFailed("memory backend CLI timed out after 10s".to_string())
58            })?
59            .map_err(|e| MemoryError::ConnectionFailed(e.to_string()))?;
60
61        if !output.status.success() {
62            let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
63            let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
64            let msg = if stderr.is_empty() { stdout } else { stderr };
65            return Err(MemoryError::Backend(msg));
66        }
67
68        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
69    }
70
71    fn class_to_backend(class: MemoryClass) -> &'static str {
72        match class {
73            MemoryClass::Working => "working",
74            MemoryClass::Semantic => "semantic",
75            MemoryClass::Episodic => "episodic",
76        }
77    }
78
79    fn relation_to_backend(kind: RelationshipKind) -> &'static str {
80        match kind {
81            RelationshipKind::RelatesTo => "relates-to",
82            RelationshipKind::CauseEffect => "caused-by",
83            RelationshipKind::Supersedes => "supersedes",
84            RelationshipKind::DependsOn => "references",
85        }
86    }
87
88    fn parse_datetime(value: &Value) -> Option<DateTime<Utc>> {
89        let s = value.as_str()?;
90        DateTime::parse_from_rfc3339(s)
91            .ok()
92            .map(|dt| dt.with_timezone(&Utc))
93    }
94
95    fn map_query_item(scope_fallback: &ScopeId, item: &Value) -> Option<MemoryRecord> {
96        let obj = item.as_object()?;
97
98        let memory_id = obj
99            .get("memory_id")
100            .or_else(|| obj.get("id"))
101            .and_then(|v| v.as_str())
102            .unwrap_or("")
103            .to_string();
104        if memory_id.trim().is_empty() {
105            return None;
106        }
107
108        let content = obj
109            .get("content")
110            .and_then(|v| v.as_str())
111            .unwrap_or("")
112            .to_string();
113
114        let scope_id = obj
115            .get("scope_id")
116            .and_then(|v| v.as_str())
117            .map(|s| ScopeId(s.to_string()))
118            .unwrap_or_else(|| scope_fallback.clone());
119
120        let memory_class = obj
121            .get("memory_type")
122            .or_else(|| obj.get("memory_class"))
123            .and_then(|v| v.as_str())
124            .and_then(|raw| match raw {
125                "working" => Some(MemoryClass::Working),
126                "semantic" => Some(MemoryClass::Semantic),
127                "episodic" => Some(MemoryClass::Episodic),
128                _ => None,
129            })
130            .unwrap_or(MemoryClass::Semantic);
131
132        let relevance_score = obj
133            .get("relevance_score")
134            .or_else(|| obj.get("score"))
135            .and_then(|v| v.as_f64())
136            .unwrap_or(0.0);
137
138        let retrieval_count = obj
139            .get("retrieval_count")
140            .and_then(|v| v.as_u64())
141            .unwrap_or(0);
142
143        let mut metadata = HashMap::new();
144        if let Some(meta) = obj.get("metadata").and_then(|v| v.as_object()) {
145            for (k, v) in meta {
146                if let Some(s) = v.as_str() {
147                    metadata.insert(k.clone(), s.to_string());
148                } else {
149                    metadata.insert(k.clone(), v.to_string());
150                }
151            }
152        }
153
154        let created_at = obj
155            .get("created_at")
156            .and_then(Self::parse_datetime)
157            .unwrap_or_else(Utc::now);
158        let updated_at = obj
159            .get("updated_at")
160            .and_then(Self::parse_datetime)
161            .unwrap_or(created_at);
162
163        Some(MemoryRecord {
164            memory_id,
165            scope_id,
166            memory_class,
167            content,
168            metadata,
169            relevance_score,
170            retrieval_count,
171            created_at,
172            updated_at,
173        })
174    }
175
176    fn parse_insert_memory_id(stdout: &str) -> Option<String> {
177        if stdout.trim().is_empty() {
178            return None;
179        }
180        if let Ok(value) = serde_json::from_str::<Value>(stdout) {
181            if let Some(id) = value
182                .get("memory_id")
183                .or_else(|| value.get("id"))
184                .and_then(|v| v.as_str())
185            {
186                return Some(id.to_string());
187            }
188        }
189
190        for token in stdout.split_whitespace() {
191            if Uuid::parse_str(token).is_ok() {
192                return Some(token.to_string());
193            }
194        }
195        None
196    }
197}
198
199impl MemoryAdapter for MemoryCliAdapter {
200    async fn store(
201        &self,
202        _project: &str,
203        request: InsertMemory,
204    ) -> Result<MemoryRecord, MemoryError> {
205        if content_may_contain_secrets(&request.content) {
206            return Err(MemoryError::RedactionRequired);
207        }
208
209        let mut args = vec![
210            "memory".to_string(),
211            "insert".to_string(),
212            request.scope_id.as_str().to_string(),
213            request.content.clone(),
214            "--memory-type".to_string(),
215            Self::class_to_backend(request.memory_class).to_string(),
216        ];
217
218        for (k, v) in &request.metadata {
219            args.push("--meta".to_string());
220            args.push(format!("{k}={v}"));
221        }
222
223        let stdout = self.run(&args).await?;
224        let memory_id = Self::parse_insert_memory_id(&stdout).unwrap_or_else(|| "unknown".into());
225        let now = Utc::now();
226
227        Ok(MemoryRecord {
228            memory_id,
229            scope_id: request.scope_id,
230            memory_class: request.memory_class,
231            content: request.content,
232            metadata: request.metadata,
233            relevance_score: 0.0,
234            retrieval_count: 0,
235            created_at: now,
236            updated_at: now,
237        })
238    }
239
240    async fn query(
241        &self,
242        _project: &str,
243        query: MemoryQuery,
244    ) -> Result<Vec<MemoryRecord>, MemoryError> {
245        let args = vec![
246            "query".to_string(),
247            query.scope_id.as_str().to_string(),
248            query.query_text.clone(),
249            "--output".to_string(),
250            "json".to_string(),
251            "--limit".to_string(),
252            query.limit.to_string(),
253        ];
254
255        let stdout = self.run(&args).await?;
256        let value: Value =
257            serde_json::from_str(&stdout).map_err(|e| MemoryError::Serialization(e.to_string()))?;
258
259        let items = match value {
260            Value::Array(items) => items,
261            Value::Object(obj) => obj
262                .get("items")
263                .and_then(|v| v.as_array())
264                .cloned()
265                .unwrap_or_default(),
266            _ => Vec::new(),
267        };
268
269        let mut out = Vec::new();
270        for item in &items {
271            if let Some(record) = Self::map_query_item(&query.scope_id, item) {
272                if let Some(class) = query.memory_class {
273                    if record.memory_class != class {
274                        continue;
275                    }
276                }
277                out.push(record);
278            }
279        }
280        Ok(out)
281    }
282
283    async fn get(&self, project: &str, memory_id: &str) -> Result<MemoryRecord, MemoryError> {
284        let scope = ScopeId(format!("project/{project}"));
285        let args = vec![
286            "memory".to_string(),
287            "get".to_string(),
288            scope.as_str().to_string(),
289            memory_id.to_string(),
290        ];
291        let stdout = self.run(&args).await?;
292        let now = Utc::now();
293        Ok(MemoryRecord {
294            memory_id: memory_id.to_string(),
295            scope_id: scope,
296            memory_class: MemoryClass::Semantic,
297            content: stdout,
298            metadata: HashMap::new(),
299            relevance_score: 0.0,
300            retrieval_count: 0,
301            created_at: now,
302            updated_at: now,
303        })
304    }
305
306    async fn delete(&self, _project: &str, memory_id: &str) -> Result<(), MemoryError> {
307        let args = vec![
308            "memory".to_string(),
309            "delete".to_string(),
310            "--yes".to_string(),
311            memory_id.to_string(),
312        ];
313        let _ = self.run(&args).await?;
314        Ok(())
315    }
316
317    async fn link(&self, _project: &str, link: LinkMemories) -> Result<(), MemoryError> {
318        let args = vec![
319            "memory".to_string(),
320            "link".to_string(),
321            "--relation".to_string(),
322            Self::relation_to_backend(link.relationship).to_string(),
323            link.from_memory_id,
324            link.to_memory_id,
325        ];
326        let _ = self.run(&args).await?;
327        Ok(())
328    }
329
330    async fn unlink(
331        &self,
332        _project: &str,
333        _from_memory_id: &str,
334        _to_memory_id: &str,
335    ) -> Result<(), MemoryError> {
336        Err(MemoryError::Backend(
337            "memory backend CLI does not support unlink".to_string(),
338        ))
339    }
340
341    async fn create_scope(
342        &self,
343        _project: &str,
344        _scope_id: &ScopeId,
345        _parent: Option<&ScopeId>,
346    ) -> Result<(), MemoryError> {
347        // Memory backend scopes are implicit; no explicit creation step.
348        Ok(())
349    }
350
351    async fn close_scope(&self, _project: &str, _scope_id: &ScopeId) -> Result<(), MemoryError> {
352        // Memory backend scopes are implicit; no explicit close step.
353        Ok(())
354    }
355
356    async fn health_check(&self) -> Result<bool, MemoryError> {
357        let args = vec!["doctor".to_string()];
358        match self.run(&args).await {
359            Ok(_) => Ok(true),
360            Err(MemoryError::Backend(_)) => Ok(false),
361            Err(e) => Err(e),
362        }
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use std::fs;
369    use std::os::unix::fs::PermissionsExt;
370
371    use tempfile::TempDir;
372
373    use super::*;
374
375    fn write_fake_memory_cli(dir: &Path) -> PathBuf {
376        let path = dir.join("memory-backend");
377        let script = r#"#!/bin/sh
378set -eu
379log="${MEMORY_BACKEND_LOG:-}"
380if [ -n "$log" ]; then
381  printf "%s\n" "$*" >> "$log"
382fi
383
384if [ "$1" = "query" ]; then
385  # memory-backend query <scope> <query> --output json --limit N
386  printf '[{"id":"11111111-1111-1111-1111-111111111111","scope_id":"%s","memory_type":"semantic","content":"remember to run migrations","score":0.9,"metadata":{"fingerprint":"abc"}}]\n' "$2"
387  exit 0
388fi
389
390if [ "$1" = "memory" ] && [ "$2" = "insert" ]; then
391  # memory-backend memory insert <scope> <content> --memory-type semantic ...
392  echo "22222222-2222-2222-2222-222222222222"
393  exit 0
394fi
395
396if [ "$1" = "doctor" ]; then
397  echo "ok"
398  exit 0
399fi
400
401if [ "$1" = "memory" ] && [ "$2" = "delete" ]; then
402  exit 0
403fi
404
405if [ "$1" = "memory" ] && [ "$2" = "link" ]; then
406  exit 0
407fi
408
409if [ "$1" = "memory" ] && [ "$2" = "get" ]; then
410  echo "content"
411  exit 0
412fi
413
414echo "unsupported" 1>&2
415exit 2
416"#;
417        fs::write(&path, script).unwrap();
418        let mut perms = fs::metadata(&path).unwrap().permissions();
419        perms.set_mode(0o755);
420        fs::set_permissions(&path, perms).unwrap();
421        path
422    }
423
424    #[tokio::test]
425    async fn memory_cli_store_and_query_roundtrip() {
426        let tmp = TempDir::new().unwrap();
427        let log_path = tmp.path().join("memory-backend.log");
428        let fake = write_fake_memory_cli(tmp.path());
429
430        std::env::set_var("MEMORY_BACKEND_LOG", &log_path);
431        let adapter = MemoryCliAdapter::new(fake.to_string_lossy().to_string(), tmp.path());
432
433        let mut insert =
434            InsertMemory::new(ScopeId("project/test".into()), MemoryClass::Semantic, "hi");
435        insert
436            .metadata
437            .insert("run_id".to_string(), "r1".to_string());
438
439        let stored = adapter.store("test", insert).await.unwrap();
440        assert_eq!(stored.memory_id, "22222222-2222-2222-2222-222222222222");
441
442        let query = MemoryQuery::new(ScopeId("project/test".into()), "migrations").with_limit(3);
443        let results = adapter.query("test", query).await.unwrap();
444        assert_eq!(results.len(), 1);
445        assert_eq!(results[0].memory_id, "11111111-1111-1111-1111-111111111111");
446
447        // Ensure we called the fake CLI with expected verbs.
448        let log = fs::read_to_string(log_path).unwrap();
449        assert!(log.contains("memory insert project/test hi --memory-type semantic"));
450        assert!(log.contains("query project/test migrations --output json --limit 3"));
451    }
452
453    #[tokio::test]
454    async fn memory_cli_redaction_required() {
455        let tmp = TempDir::new().unwrap();
456        let fake = write_fake_memory_cli(tmp.path());
457        let adapter = MemoryCliAdapter::new(fake.to_string_lossy().to_string(), tmp.path());
458
459        let insert = InsertMemory::new(
460            ScopeId("project/test".into()),
461            MemoryClass::Semantic,
462            "password=hunter2",
463        );
464        let err = adapter.store("test", insert).await.unwrap_err();
465        matches!(err, MemoryError::RedactionRequired);
466    }
467}