yarli_cli/yarli-memory/src/
memory_cli.rs1use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::process::Stdio;
14
15use chrono::{DateTime, Utc};
16use serde_json::Value;
17use tokio::process::Command;
18use uuid::Uuid;
19
20use crate::yarli_memory::adapter::MemoryAdapter;
21use crate::yarli_memory::error::MemoryError;
22use crate::yarli_memory::types::{
23 content_may_contain_secrets, InsertMemory, LinkMemories, MemoryClass, MemoryQuery,
24 MemoryRecord, RelationshipKind, ScopeId,
25};
26
27#[derive(Debug, Clone)]
28pub struct MemoryCliAdapter {
29 command: String,
30 project_dir: PathBuf,
31}
32
33impl MemoryCliAdapter {
34 pub fn new(command: impl Into<String>, project_dir: impl Into<PathBuf>) -> Self {
35 Self {
36 command: command.into(),
37 project_dir: project_dir.into(),
38 }
39 }
40
41 pub fn project_dir(&self) -> &Path {
42 &self.project_dir
43 }
44
45 async fn run(&self, args: &[String]) -> Result<String, MemoryError> {
46 let child_fut = Command::new(&self.command)
47 .args(args)
48 .current_dir(&self.project_dir)
49 .stdin(Stdio::null())
50 .stdout(Stdio::piped())
51 .stderr(Stdio::piped())
52 .output();
53
54 let output = tokio::time::timeout(std::time::Duration::from_secs(10), child_fut)
55 .await
56 .map_err(|_| {
57 MemoryError::ConnectionFailed("memory backend CLI timed out after 10s".to_string())
58 })?
59 .map_err(|e| MemoryError::ConnectionFailed(e.to_string()))?;
60
61 if !output.status.success() {
62 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
63 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
64 let msg = if stderr.is_empty() { stdout } else { stderr };
65 return Err(MemoryError::Backend(msg));
66 }
67
68 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
69 }
70
71 fn class_to_backend(class: MemoryClass) -> &'static str {
72 match class {
73 MemoryClass::Working => "working",
74 MemoryClass::Semantic => "semantic",
75 MemoryClass::Episodic => "episodic",
76 }
77 }
78
79 fn relation_to_backend(kind: RelationshipKind) -> &'static str {
80 match kind {
81 RelationshipKind::RelatesTo => "relates-to",
82 RelationshipKind::CauseEffect => "caused-by",
83 RelationshipKind::Supersedes => "supersedes",
84 RelationshipKind::DependsOn => "references",
85 }
86 }
87
88 fn parse_datetime(value: &Value) -> Option<DateTime<Utc>> {
89 let s = value.as_str()?;
90 DateTime::parse_from_rfc3339(s)
91 .ok()
92 .map(|dt| dt.with_timezone(&Utc))
93 }
94
95 fn map_query_item(scope_fallback: &ScopeId, item: &Value) -> Option<MemoryRecord> {
96 let obj = item.as_object()?;
97
98 let memory_id = obj
99 .get("memory_id")
100 .or_else(|| obj.get("id"))
101 .and_then(|v| v.as_str())
102 .unwrap_or("")
103 .to_string();
104 if memory_id.trim().is_empty() {
105 return None;
106 }
107
108 let content = obj
109 .get("content")
110 .and_then(|v| v.as_str())
111 .unwrap_or("")
112 .to_string();
113
114 let scope_id = obj
115 .get("scope_id")
116 .and_then(|v| v.as_str())
117 .map(|s| ScopeId(s.to_string()))
118 .unwrap_or_else(|| scope_fallback.clone());
119
120 let memory_class = obj
121 .get("memory_type")
122 .or_else(|| obj.get("memory_class"))
123 .and_then(|v| v.as_str())
124 .and_then(|raw| match raw {
125 "working" => Some(MemoryClass::Working),
126 "semantic" => Some(MemoryClass::Semantic),
127 "episodic" => Some(MemoryClass::Episodic),
128 _ => None,
129 })
130 .unwrap_or(MemoryClass::Semantic);
131
132 let relevance_score = obj
133 .get("relevance_score")
134 .or_else(|| obj.get("score"))
135 .and_then(|v| v.as_f64())
136 .unwrap_or(0.0);
137
138 let retrieval_count = obj
139 .get("retrieval_count")
140 .and_then(|v| v.as_u64())
141 .unwrap_or(0);
142
143 let mut metadata = HashMap::new();
144 if let Some(meta) = obj.get("metadata").and_then(|v| v.as_object()) {
145 for (k, v) in meta {
146 if let Some(s) = v.as_str() {
147 metadata.insert(k.clone(), s.to_string());
148 } else {
149 metadata.insert(k.clone(), v.to_string());
150 }
151 }
152 }
153
154 let created_at = obj
155 .get("created_at")
156 .and_then(Self::parse_datetime)
157 .unwrap_or_else(Utc::now);
158 let updated_at = obj
159 .get("updated_at")
160 .and_then(Self::parse_datetime)
161 .unwrap_or(created_at);
162
163 Some(MemoryRecord {
164 memory_id,
165 scope_id,
166 memory_class,
167 content,
168 metadata,
169 relevance_score,
170 retrieval_count,
171 created_at,
172 updated_at,
173 })
174 }
175
176 fn parse_insert_memory_id(stdout: &str) -> Option<String> {
177 if stdout.trim().is_empty() {
178 return None;
179 }
180 if let Ok(value) = serde_json::from_str::<Value>(stdout) {
181 if let Some(id) = value
182 .get("memory_id")
183 .or_else(|| value.get("id"))
184 .and_then(|v| v.as_str())
185 {
186 return Some(id.to_string());
187 }
188 }
189
190 for token in stdout.split_whitespace() {
191 if Uuid::parse_str(token).is_ok() {
192 return Some(token.to_string());
193 }
194 }
195 None
196 }
197}
198
199impl MemoryAdapter for MemoryCliAdapter {
200 async fn store(
201 &self,
202 _project: &str,
203 request: InsertMemory,
204 ) -> Result<MemoryRecord, MemoryError> {
205 if content_may_contain_secrets(&request.content) {
206 return Err(MemoryError::RedactionRequired);
207 }
208
209 let mut args = vec![
210 "memory".to_string(),
211 "insert".to_string(),
212 request.scope_id.as_str().to_string(),
213 request.content.clone(),
214 "--memory-type".to_string(),
215 Self::class_to_backend(request.memory_class).to_string(),
216 ];
217
218 for (k, v) in &request.metadata {
219 args.push("--meta".to_string());
220 args.push(format!("{k}={v}"));
221 }
222
223 let stdout = self.run(&args).await?;
224 let memory_id = Self::parse_insert_memory_id(&stdout).unwrap_or_else(|| "unknown".into());
225 let now = Utc::now();
226
227 Ok(MemoryRecord {
228 memory_id,
229 scope_id: request.scope_id,
230 memory_class: request.memory_class,
231 content: request.content,
232 metadata: request.metadata,
233 relevance_score: 0.0,
234 retrieval_count: 0,
235 created_at: now,
236 updated_at: now,
237 })
238 }
239
240 async fn query(
241 &self,
242 _project: &str,
243 query: MemoryQuery,
244 ) -> Result<Vec<MemoryRecord>, MemoryError> {
245 let args = vec![
246 "query".to_string(),
247 query.scope_id.as_str().to_string(),
248 query.query_text.clone(),
249 "--output".to_string(),
250 "json".to_string(),
251 "--limit".to_string(),
252 query.limit.to_string(),
253 ];
254
255 let stdout = self.run(&args).await?;
256 let value: Value =
257 serde_json::from_str(&stdout).map_err(|e| MemoryError::Serialization(e.to_string()))?;
258
259 let items = match value {
260 Value::Array(items) => items,
261 Value::Object(obj) => obj
262 .get("items")
263 .and_then(|v| v.as_array())
264 .cloned()
265 .unwrap_or_default(),
266 _ => Vec::new(),
267 };
268
269 let mut out = Vec::new();
270 for item in &items {
271 if let Some(record) = Self::map_query_item(&query.scope_id, item) {
272 if let Some(class) = query.memory_class {
273 if record.memory_class != class {
274 continue;
275 }
276 }
277 out.push(record);
278 }
279 }
280 Ok(out)
281 }
282
283 async fn get(&self, project: &str, memory_id: &str) -> Result<MemoryRecord, MemoryError> {
284 let scope = ScopeId(format!("project/{project}"));
285 let args = vec![
286 "memory".to_string(),
287 "get".to_string(),
288 scope.as_str().to_string(),
289 memory_id.to_string(),
290 ];
291 let stdout = self.run(&args).await?;
292 let now = Utc::now();
293 Ok(MemoryRecord {
294 memory_id: memory_id.to_string(),
295 scope_id: scope,
296 memory_class: MemoryClass::Semantic,
297 content: stdout,
298 metadata: HashMap::new(),
299 relevance_score: 0.0,
300 retrieval_count: 0,
301 created_at: now,
302 updated_at: now,
303 })
304 }
305
306 async fn delete(&self, _project: &str, memory_id: &str) -> Result<(), MemoryError> {
307 let args = vec![
308 "memory".to_string(),
309 "delete".to_string(),
310 "--yes".to_string(),
311 memory_id.to_string(),
312 ];
313 let _ = self.run(&args).await?;
314 Ok(())
315 }
316
317 async fn link(&self, _project: &str, link: LinkMemories) -> Result<(), MemoryError> {
318 let args = vec![
319 "memory".to_string(),
320 "link".to_string(),
321 "--relation".to_string(),
322 Self::relation_to_backend(link.relationship).to_string(),
323 link.from_memory_id,
324 link.to_memory_id,
325 ];
326 let _ = self.run(&args).await?;
327 Ok(())
328 }
329
330 async fn unlink(
331 &self,
332 _project: &str,
333 _from_memory_id: &str,
334 _to_memory_id: &str,
335 ) -> Result<(), MemoryError> {
336 Err(MemoryError::Backend(
337 "memory backend CLI does not support unlink".to_string(),
338 ))
339 }
340
341 async fn create_scope(
342 &self,
343 _project: &str,
344 _scope_id: &ScopeId,
345 _parent: Option<&ScopeId>,
346 ) -> Result<(), MemoryError> {
347 Ok(())
349 }
350
351 async fn close_scope(&self, _project: &str, _scope_id: &ScopeId) -> Result<(), MemoryError> {
352 Ok(())
354 }
355
356 async fn health_check(&self) -> Result<bool, MemoryError> {
357 let args = vec!["doctor".to_string()];
358 match self.run(&args).await {
359 Ok(_) => Ok(true),
360 Err(MemoryError::Backend(_)) => Ok(false),
361 Err(e) => Err(e),
362 }
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use std::fs;
369 use std::os::unix::fs::PermissionsExt;
370
371 use tempfile::TempDir;
372
373 use super::*;
374
375 fn write_fake_memory_cli(dir: &Path) -> PathBuf {
376 let path = dir.join("memory-backend");
377 let script = r#"#!/bin/sh
378set -eu
379log="${MEMORY_BACKEND_LOG:-}"
380if [ -n "$log" ]; then
381 printf "%s\n" "$*" >> "$log"
382fi
383
384if [ "$1" = "query" ]; then
385 # memory-backend query <scope> <query> --output json --limit N
386 printf '[{"id":"11111111-1111-1111-1111-111111111111","scope_id":"%s","memory_type":"semantic","content":"remember to run migrations","score":0.9,"metadata":{"fingerprint":"abc"}}]\n' "$2"
387 exit 0
388fi
389
390if [ "$1" = "memory" ] && [ "$2" = "insert" ]; then
391 # memory-backend memory insert <scope> <content> --memory-type semantic ...
392 echo "22222222-2222-2222-2222-222222222222"
393 exit 0
394fi
395
396if [ "$1" = "doctor" ]; then
397 echo "ok"
398 exit 0
399fi
400
401if [ "$1" = "memory" ] && [ "$2" = "delete" ]; then
402 exit 0
403fi
404
405if [ "$1" = "memory" ] && [ "$2" = "link" ]; then
406 exit 0
407fi
408
409if [ "$1" = "memory" ] && [ "$2" = "get" ]; then
410 echo "content"
411 exit 0
412fi
413
414echo "unsupported" 1>&2
415exit 2
416"#;
417 fs::write(&path, script).unwrap();
418 let mut perms = fs::metadata(&path).unwrap().permissions();
419 perms.set_mode(0o755);
420 fs::set_permissions(&path, perms).unwrap();
421 path
422 }
423
424 #[tokio::test]
425 async fn memory_cli_store_and_query_roundtrip() {
426 let tmp = TempDir::new().unwrap();
427 let log_path = tmp.path().join("memory-backend.log");
428 let fake = write_fake_memory_cli(tmp.path());
429
430 std::env::set_var("MEMORY_BACKEND_LOG", &log_path);
431 let adapter = MemoryCliAdapter::new(fake.to_string_lossy().to_string(), tmp.path());
432
433 let mut insert =
434 InsertMemory::new(ScopeId("project/test".into()), MemoryClass::Semantic, "hi");
435 insert
436 .metadata
437 .insert("run_id".to_string(), "r1".to_string());
438
439 let stored = adapter.store("test", insert).await.unwrap();
440 assert_eq!(stored.memory_id, "22222222-2222-2222-2222-222222222222");
441
442 let query = MemoryQuery::new(ScopeId("project/test".into()), "migrations").with_limit(3);
443 let results = adapter.query("test", query).await.unwrap();
444 assert_eq!(results.len(), 1);
445 assert_eq!(results[0].memory_id, "11111111-1111-1111-1111-111111111111");
446
447 let log = fs::read_to_string(log_path).unwrap();
449 assert!(log.contains("memory insert project/test hi --memory-type semantic"));
450 assert!(log.contains("query project/test migrations --output json --limit 3"));
451 }
452
453 #[tokio::test]
454 async fn memory_cli_redaction_required() {
455 let tmp = TempDir::new().unwrap();
456 let fake = write_fake_memory_cli(tmp.path());
457 let adapter = MemoryCliAdapter::new(fake.to_string_lossy().to_string(), tmp.path());
458
459 let insert = InsertMemory::new(
460 ScopeId("project/test".into()),
461 MemoryClass::Semantic,
462 "password=hunter2",
463 );
464 let err = adapter.store("test", insert).await.unwrap_err();
465 matches!(err, MemoryError::RedactionRequired);
466 }
467}