1use rusqlite::{Connection, params};
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5
6use crate::engine::Identity;
7use crate::event::RunId;
8use crate::provider::Msg;
9use crate::redaction::RedactionFilter;
10
11#[cfg(feature = "treesitter")]
12pub mod symbol_index;
13
14pub const MEMORY_MD_LIMIT: usize = 2200;
15pub const USER_MD_LIMIT: usize = 1375;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct RepoMap {
21 pub root: PathBuf,
22 pub files: Vec<FileEntry>,
23 pub symbols: Vec<SymbolEntry>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct FileEntry {
28 pub path: String,
29 pub size: u64,
30 pub modified: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct SymbolEntry {
35 pub file: String,
36 pub name: String,
37 pub kind: String, pub line: u32,
39}
40
41impl RepoMap {
42 pub fn scan(root: &Path) -> Self {
43 let mut files = Vec::new();
44 let mut symbols = Vec::new();
45 scan_dir(root, root, &mut files, &mut symbols);
46 Self {
47 root: root.to_path_buf(),
48 files,
49 symbols,
50 }
51 }
52}
53
54fn scan_dir(base: &Path, dir: &Path, files: &mut Vec<FileEntry>, symbols: &mut Vec<SymbolEntry>) {
55 if let Ok(entries) = std::fs::read_dir(dir) {
56 for entry in entries.flatten() {
57 let path = entry.path();
58 let name = entry.file_name().to_string_lossy().to_string();
59
60 if name.starts_with('.')
62 || name == "node_modules"
63 || name == "target"
64 || name == "build"
65 || name == "dist"
66 {
67 continue;
68 }
69
70 if path.is_dir() {
71 scan_dir(base, &path, files, symbols);
72 } else {
73 let rel = path
74 .strip_prefix(base)
75 .unwrap_or(&path)
76 .to_string_lossy()
77 .to_string();
78
79 let modified = path
80 .metadata()
81 .ok()
82 .and_then(|m| m.modified().ok())
83 .and_then(|t| {
84 chrono::DateTime::from_timestamp(
85 t.duration_since(std::time::UNIX_EPOCH)
86 .unwrap_or_default()
87 .as_secs() as i64,
88 0,
89 )
90 })
91 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
92 .unwrap_or_default();
93
94 files.push(FileEntry {
95 path: rel.clone(),
96 size: path.metadata().map(|m| m.len()).unwrap_or(0),
97 modified,
98 });
99
100 if rel.ends_with(".rs") {
102 if let Ok(content) = std::fs::read_to_string(&path) {
103 for (i, line) in content.lines().enumerate() {
104 let trimmed = line.trim();
105 if trimmed.starts_with("pub fn ") || trimmed.starts_with("fn ") {
106 let name = trimmed
107 .trim_start_matches("pub fn ")
108 .trim_start_matches("fn ")
109 .split('(')
110 .next()
111 .unwrap_or("");
112 symbols.push(SymbolEntry {
113 file: rel.clone(),
114 name: name.to_string(),
115 kind: "fn".into(),
116 line: (i + 1) as u32,
117 });
118 } else if trimmed.starts_with("pub struct ")
119 || trimmed.starts_with("struct ")
120 {
121 let name = trimmed
122 .trim_start_matches("pub struct ")
123 .trim_start_matches("struct ")
124 .split(|c: char| c == '<' || c == '{' || c == '(')
125 .next()
126 .unwrap_or("");
127 symbols.push(SymbolEntry {
128 file: rel.clone(),
129 name: name.to_string(),
130 kind: "struct".into(),
131 line: (i + 1) as u32,
132 });
133 } else if trimmed.starts_with("pub enum ")
134 || trimmed.starts_with("enum ")
135 {
136 let name = trimmed
137 .trim_start_matches("pub enum ")
138 .trim_start_matches("enum ")
139 .split(|c: char| c == '<' || c == '{')
140 .next()
141 .unwrap_or("");
142 symbols.push(SymbolEntry {
143 file: rel.clone(),
144 name: name.to_string(),
145 kind: "enum".into(),
146 line: (i + 1) as u32,
147 });
148 } else if trimmed.starts_with("pub trait ")
149 || trimmed.starts_with("trait ")
150 {
151 let name = trimmed
152 .trim_start_matches("pub trait ")
153 .trim_start_matches("trait ")
154 .split(|c: char| c == '<' || c == '{')
155 .next()
156 .unwrap_or("");
157 symbols.push(SymbolEntry {
158 file: rel.clone(),
159 name: name.to_string(),
160 kind: "trait".into(),
161 line: (i + 1) as u32,
162 });
163 } else if trimmed.starts_with("pub mod ") || trimmed.starts_with("mod ")
164 {
165 let name = trimmed
166 .trim_start_matches("pub mod ")
167 .trim_start_matches("mod ")
168 .split(';')
169 .next()
170 .unwrap_or("");
171 symbols.push(SymbolEntry {
172 file: rel.clone(),
173 name: name.to_string(),
174 kind: "mod".into(),
175 line: (i + 1) as u32,
176 });
177 }
178 }
179 }
180 }
181 }
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct TaskMem {
190 pub run_id: String,
191 pub messages: Vec<Msg>,
192 pub created_at: String,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct SharedSignal {
199 pub id: String,
200 pub from_agent: String,
201 pub to_agent: String,
202 pub content: String,
203 pub timestamp: String,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct WorkingDoc {
208 pub id: String,
209 pub title: String,
210 pub content: String,
211 pub updated_at: String,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct Fact {
218 pub id: String,
219 pub key: String,
220 pub value: String,
221 pub created_at: String,
222 pub updated_at: String,
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
226pub enum MemoryDocKind {
227 Memory,
228 User,
229}
230
231impl MemoryDocKind {
232 pub fn as_str(self) -> &'static str {
233 match self {
234 Self::Memory => "MEMORY.md",
235 Self::User => "USER.md",
236 }
237 }
238
239 pub fn limit(self) -> usize {
240 match self {
241 Self::Memory => MEMORY_MD_LIMIT,
242 Self::User => USER_MD_LIMIT,
243 }
244 }
245
246 pub fn parse(value: &str) -> Option<Self> {
247 match value.trim().to_ascii_lowercase().as_str() {
248 "memory" | "memory.md" => Some(Self::Memory),
249 "user" | "user.md" => Some(Self::User),
250 _ => None,
251 }
252 }
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct MemoryDoc {
257 pub kind: MemoryDocKind,
258 pub content: String,
259 pub updated_at: String,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct MemoryStats {
264 pub facts: usize,
265 pub memory_chars: usize,
266 pub memory_limit: usize,
267 pub user_chars: usize,
268 pub user_limit: usize,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
272pub struct GraphNode {
273 pub id: String,
274 pub label: String,
275 pub kind: String,
276 #[serde(default)]
277 pub properties: serde_json::Value,
278 pub created_at: String,
279 pub updated_at: String,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
283pub struct GraphEdge {
284 pub id: String,
285 pub from_id: String,
286 pub to_id: String,
287 pub relation: String,
288 pub weight: f64,
289 #[serde(default)]
290 pub properties: serde_json::Value,
291 pub created_at: String,
292 pub updated_at: String,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, Default)]
296pub struct KnowledgeGraph {
297 pub nodes: Vec<GraphNode>,
298 pub edges: Vec<GraphEdge>,
299}
300
301#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
302pub enum GraphDirection {
303 Incoming,
304 Outgoing,
305 Both,
306}
307
308impl GraphDirection {
309 pub fn parse(value: &str) -> Self {
310 match value.trim().to_ascii_lowercase().as_str() {
311 "incoming" | "in" => Self::Incoming,
312 "outgoing" | "out" => Self::Outgoing,
313 _ => Self::Both,
314 }
315 }
316}
317
318pub fn validate_memory_text(label: &str, text: &str, limit: usize) -> anyhow::Result<()> {
319 let chars = text.chars().count();
320 if chars > limit {
321 anyhow::bail!("{label} is too large: {chars}/{limit} chars");
322 }
323 if text.chars().any(is_forbidden_invisible_char) {
324 anyhow::bail!("{label} contains invisible control characters");
325 }
326
327 let lower = text.to_ascii_lowercase();
328 let blocked = [
329 "ignore previous instructions",
330 "ignore all previous instructions",
331 "disregard previous instructions",
332 "reveal your system prompt",
333 "print your system prompt",
334 "exfiltrate",
335 "steal secrets",
336 "dump secrets",
337 "print env",
338 "cat ~/.ssh",
339 "backdoor",
340 "reverse shell",
341 "rm -rf /",
342 ];
343 if blocked.iter().any(|needle| lower.contains(needle)) {
344 anyhow::bail!("{label} looks like prompt injection or credential exfiltration");
345 }
346 Ok(())
347}
348
349fn is_forbidden_invisible_char(c: char) -> bool {
350 matches!(
351 c,
352 '\u{200B}'..='\u{200F}'
353 | '\u{202A}'..='\u{202E}'
354 | '\u{2060}'..='\u{206F}'
355 | '\u{FEFF}'
356 ) || (c.is_control() && c != '\n' && c != '\r' && c != '\t')
357}
358
359fn truncate_chars(text: &str, limit: usize) -> String {
360 text.chars().take(limit).collect()
361}
362
363pub trait Memory: Send + Sync {
366 fn repo_map(&self, root: &Path) -> RepoMap;
367 fn identity(&self, agent: &str) -> Option<Identity>;
368 fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()>;
369 fn task(&self, run: &RunId) -> Option<TaskMem>;
370 fn save_task(&self, task: &TaskMem) -> anyhow::Result<()>;
371 fn shared_signals(&self) -> Vec<SharedSignal>;
372 fn shared_docs(&self) -> Vec<WorkingDoc>;
373 fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()>;
374 fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()>;
375 fn remember(&self, fact: Fact) -> anyhow::Result<()>;
376 fn recall(&self, q: &str, k: usize) -> Vec<Fact>;
377 fn all_facts(&self) -> Vec<Fact>;
378 fn forget(&self, id: &str) -> anyhow::Result<()>;
379 fn memory_doc(&self, _kind: MemoryDocKind) -> Option<MemoryDoc> {
380 None
381 }
382 fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
383 anyhow::bail!("memory documents are not supported by this memory backend")
384 }
385 fn remove_memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<()> {
386 anyhow::bail!("memory documents are not supported by this memory backend")
387 }
388 fn memory_stats(&self) -> MemoryStats {
389 MemoryStats {
390 facts: self.all_facts().len(),
391 memory_chars: 0,
392 memory_limit: MEMORY_MD_LIMIT,
393 user_chars: 0,
394 user_limit: USER_MD_LIMIT,
395 }
396 }
397 fn consolidate_memory(&self) -> anyhow::Result<()> {
398 anyhow::bail!("memory consolidation is not supported by this memory backend")
399 }
400 fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()>;
401 fn get_discovered_models(&self, provider_id: &str) -> Vec<String>;
402 fn upsert_graph_node(&self, _node: GraphNode) -> anyhow::Result<()> {
403 anyhow::bail!("knowledge graph is not supported by this memory backend")
404 }
405 fn upsert_graph_edge(&self, _edge: GraphEdge) -> anyhow::Result<()> {
406 anyhow::bail!("knowledge graph is not supported by this memory backend")
407 }
408 fn graph_node(&self, _id: &str) -> Option<GraphNode> {
409 None
410 }
411 fn graph_neighbors(
412 &self,
413 _id: &str,
414 _direction: GraphDirection,
415 _limit: usize,
416 ) -> Vec<(GraphEdge, GraphNode)> {
417 Vec::new()
418 }
419 fn search_graph(&self, _query: &str, _limit: usize) -> Vec<GraphNode> {
420 Vec::new()
421 }
422 fn graph_export(&self) -> KnowledgeGraph {
423 KnowledgeGraph::default()
424 }
425 fn delete_graph_node(&self, _id: &str) -> anyhow::Result<()> {
426 anyhow::bail!("knowledge graph is not supported by this memory backend")
427 }
428 fn delete_graph_edge(&self, _id: &str) -> anyhow::Result<()> {
429 anyhow::bail!("knowledge graph is not supported by this memory backend")
430 }
431}
432
433pub trait MemoryProvider: Send + Sync {
434 fn name(&self) -> &str;
435 fn remember(&self, fact: Fact) -> anyhow::Result<()>;
436 fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>>;
437 fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>>;
438 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()>;
439}
440
441pub struct LocalMemoryProvider {
442 memory: Arc<dyn Memory>,
443}
444
445impl LocalMemoryProvider {
446 pub fn new(memory: Arc<dyn Memory>) -> Self {
447 Self { memory }
448 }
449}
450
451impl MemoryProvider for LocalMemoryProvider {
452 fn name(&self) -> &str {
453 "local"
454 }
455
456 fn remember(&self, fact: Fact) -> anyhow::Result<()> {
457 self.memory.remember(fact)
458 }
459
460 fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>> {
461 Ok(self.memory.recall(query, limit))
462 }
463
464 fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
465 Ok(self.memory.memory_doc(kind))
466 }
467
468 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
469 self.memory.upsert_memory_doc(kind, content)
470 }
471}
472
473pub struct ExternalMemoryProvider {
474 name: String,
475}
476
477impl ExternalMemoryProvider {
478 pub fn mem0() -> Self {
479 Self {
480 name: "mem0".into(),
481 }
482 }
483
484 pub fn honcho() -> Self {
485 Self {
486 name: "honcho".into(),
487 }
488 }
489
490 pub fn supermemory() -> Self {
491 Self {
492 name: "supermemory".into(),
493 }
494 }
495
496 fn not_configured(&self) -> anyhow::Error {
497 anyhow::anyhow!(
498 "external memory provider '{}' is not configured; use local SQLite memory or configure a connector first",
499 self.name
500 )
501 }
502}
503
504impl MemoryProvider for ExternalMemoryProvider {
505 fn name(&self) -> &str {
506 &self.name
507 }
508
509 fn remember(&self, _fact: Fact) -> anyhow::Result<()> {
510 Err(self.not_configured())
511 }
512
513 fn recall(&self, _query: &str, _limit: usize) -> anyhow::Result<Vec<Fact>> {
514 Err(self.not_configured())
515 }
516
517 fn memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
518 Err(self.not_configured())
519 }
520
521 fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
522 Err(self.not_configured())
523 }
524}
525
526pub struct SqliteMemory {
529 conn: Mutex<Connection>,
530}
531
532impl SqliteMemory {
533 pub fn open(db_path: &Path) -> anyhow::Result<Self> {
534 if let Some(parent) = db_path.parent() {
535 std::fs::create_dir_all(parent)?;
536 }
537 let conn = Connection::open(db_path)?;
538 let memory = Self {
539 conn: Mutex::new(conn),
540 };
541 memory.migrate()?;
542 Ok(memory)
543 }
544
545 fn migrate(&self) -> anyhow::Result<()> {
546 let conn = self.conn.lock().unwrap();
547 conn.execute_batch(
548 "
549 CREATE TABLE IF NOT EXISTS identities (
550 agent TEXT PRIMARY KEY,
551 name TEXT NOT NULL,
552 role TEXT NOT NULL,
553 personality TEXT NOT NULL
554 );
555 CREATE TABLE IF NOT EXISTS tasks (
556 run_id TEXT PRIMARY KEY,
557 messages_json TEXT NOT NULL,
558 created_at TEXT NOT NULL DEFAULT (datetime('now'))
559 );
560 CREATE TABLE IF NOT EXISTS signals (
561 id TEXT PRIMARY KEY,
562 from_agent TEXT NOT NULL,
563 to_agent TEXT NOT NULL,
564 content TEXT NOT NULL,
565 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
566 );
567 CREATE TABLE IF NOT EXISTS working_docs (
568 id TEXT PRIMARY KEY,
569 title TEXT NOT NULL,
570 content TEXT NOT NULL,
571 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
572 );
573 CREATE TABLE IF NOT EXISTS facts (
574 id TEXT PRIMARY KEY,
575 key TEXT NOT NULL UNIQUE,
576 value TEXT NOT NULL,
577 created_at TEXT NOT NULL DEFAULT (datetime('now')),
578 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
579 );
580 CREATE TABLE IF NOT EXISTS discovered_models (
581 provider_id TEXT NOT NULL,
582 model_name TEXT NOT NULL,
583 fetched_at TEXT NOT NULL,
584 PRIMARY KEY (provider_id, model_name)
585 );
586 CREATE TABLE IF NOT EXISTS memory_docs (
587 kind TEXT PRIMARY KEY,
588 content TEXT NOT NULL,
589 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
590 );
591 CREATE TABLE IF NOT EXISTS kg_nodes (
592 id TEXT PRIMARY KEY,
593 label TEXT NOT NULL,
594 kind TEXT NOT NULL,
595 properties_json TEXT NOT NULL DEFAULT '{}',
596 created_at TEXT NOT NULL DEFAULT (datetime('now')),
597 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
598 );
599 CREATE TABLE IF NOT EXISTS kg_edges (
600 id TEXT PRIMARY KEY,
601 from_id TEXT NOT NULL,
602 to_id TEXT NOT NULL,
603 relation TEXT NOT NULL,
604 weight REAL NOT NULL DEFAULT 1.0,
605 properties_json TEXT NOT NULL DEFAULT '{}',
606 created_at TEXT NOT NULL DEFAULT (datetime('now')),
607 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
608 FOREIGN KEY(from_id) REFERENCES kg_nodes(id) ON DELETE CASCADE,
609 FOREIGN KEY(to_id) REFERENCES kg_nodes(id) ON DELETE CASCADE
610 );
611 CREATE INDEX IF NOT EXISTS kg_nodes_label_idx ON kg_nodes(label);
612 CREATE INDEX IF NOT EXISTS kg_nodes_kind_idx ON kg_nodes(kind);
613 CREATE INDEX IF NOT EXISTS kg_edges_from_idx ON kg_edges(from_id);
614 CREATE INDEX IF NOT EXISTS kg_edges_to_idx ON kg_edges(to_id);
615 CREATE INDEX IF NOT EXISTS kg_edges_relation_idx ON kg_edges(relation);
616 -- FTS5 full-text search for memory recall (M1)
617 CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts USING fts5(
618 key, value, content='facts', content_rowid='rowid'
619 );
620 -- Triggers to keep FTS5 index in sync
621 CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
622 INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
623 END;
624 CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
625 INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
626 END;
627 CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN
628 INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
629 INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
630 END;
631 ",
632 )?;
633 Ok(())
634 }
635}
636
637fn graph_node_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphNode> {
638 let properties_json: String = row.get(3)?;
639 Ok(GraphNode {
640 id: row.get(0)?,
641 label: row.get(1)?,
642 kind: row.get(2)?,
643 properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
644 created_at: row.get(4)?,
645 updated_at: row.get(5)?,
646 })
647}
648
649fn graph_edge_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphEdge> {
650 let properties_json: String = row.get(5)?;
651 Ok(GraphEdge {
652 id: row.get(0)?,
653 from_id: row.get(1)?,
654 to_id: row.get(2)?,
655 relation: row.get(3)?,
656 weight: row.get(4)?,
657 properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
658 created_at: row.get(6)?,
659 updated_at: row.get(7)?,
660 })
661}
662
663impl Memory for SqliteMemory {
664 fn repo_map(&self, root: &Path) -> RepoMap {
665 RepoMap::scan(root)
666 }
667
668 fn identity(&self, agent: &str) -> Option<Identity> {
669 let conn = self.conn.lock().unwrap();
670 conn.query_row(
671 "SELECT name, role, personality FROM identities WHERE agent = ?1",
672 params![agent],
673 |row| {
674 Ok(Identity {
675 name: row.get(0)?,
676 role: row.get(1)?,
677 personality: row.get(2)?,
678 })
679 },
680 )
681 .ok()
682 }
683
684 fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()> {
685 let conn = self.conn.lock().unwrap();
686 conn.execute(
687 "INSERT OR REPLACE INTO identities (agent, name, role, personality) VALUES (?1, ?2, ?3, ?4)",
688 params![agent, identity.name, identity.role, identity.personality],
689 )?;
690 Ok(())
691 }
692
693 fn task(&self, run: &RunId) -> Option<TaskMem> {
694 let conn = self.conn.lock().unwrap();
695 conn.query_row(
696 "SELECT run_id, messages_json, created_at FROM tasks WHERE run_id = ?1",
697 params![run.0],
698 |row| {
699 let messages_json: String = row.get(1)?;
700 let messages: Vec<Msg> = serde_json::from_str(&messages_json).unwrap_or_default();
701 Ok(TaskMem {
702 run_id: row.get(0)?,
703 messages,
704 created_at: row.get(2)?,
705 })
706 },
707 )
708 .ok()
709 }
710
711 fn save_task(&self, task: &TaskMem) -> anyhow::Result<()> {
712 let conn = self.conn.lock().unwrap();
713 let messages_json = serde_json::to_string(&task.messages)?;
714 conn.execute(
715 "INSERT OR REPLACE INTO tasks (run_id, messages_json, created_at) VALUES (?1, ?2, ?3)",
716 params![task.run_id, messages_json, task.created_at],
717 )?;
718 Ok(())
719 }
720
721 fn shared_signals(&self) -> Vec<SharedSignal> {
722 let conn = self.conn.lock().unwrap();
723 let mut stmt = conn
724 .prepare("SELECT id, from_agent, to_agent, content, timestamp FROM signals ORDER BY timestamp DESC")
725 .unwrap();
726 let signals = stmt
727 .query_map([], |row| {
728 Ok(SharedSignal {
729 id: row.get(0)?,
730 from_agent: row.get(1)?,
731 to_agent: row.get(2)?,
732 content: row.get(3)?,
733 timestamp: row.get(4)?,
734 })
735 })
736 .unwrap()
737 .filter_map(|r| r.ok())
738 .collect();
739 signals
740 }
741
742 fn shared_docs(&self) -> Vec<WorkingDoc> {
743 let conn = self.conn.lock().unwrap();
744 let mut stmt = conn
745 .prepare(
746 "SELECT id, title, content, updated_at FROM working_docs ORDER BY updated_at DESC",
747 )
748 .unwrap();
749 let docs = stmt
750 .query_map([], |row| {
751 Ok(WorkingDoc {
752 id: row.get(0)?,
753 title: row.get(1)?,
754 content: row.get(2)?,
755 updated_at: row.get(3)?,
756 })
757 })
758 .unwrap()
759 .filter_map(|r| r.ok())
760 .collect();
761 docs
762 }
763
764 fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()> {
765 let conn = self.conn.lock().unwrap();
766 conn.execute(
767 "INSERT INTO signals (id, from_agent, to_agent, content, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
768 params![signal.id, signal.from_agent, signal.to_agent, signal.content, signal.timestamp],
769 )?;
770 Ok(())
771 }
772
773 fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()> {
774 let conn = self.conn.lock().unwrap();
775 conn.execute(
776 "INSERT OR REPLACE INTO working_docs (id, title, content, updated_at) VALUES (?1, ?2, ?3, ?4)",
777 params![doc.id, doc.title, doc.content, doc.updated_at],
778 )?;
779 Ok(())
780 }
781
782 fn remember(&self, fact: Fact) -> anyhow::Result<()> {
783 let redaction = RedactionFilter::new();
784 validate_memory_text("fact key", &fact.key, 256)?;
785 validate_memory_text("fact value", &fact.value, 1200)?;
786 let safe_value = redaction.redact_str(&fact.value);
787 let safe_key = redaction.redact_str(&fact.key);
788 if redaction.contains_secret(&fact.value) {
789 tracing::warn!("Redacted secret from memory fact: {}", fact.key);
790 }
791 let conn = self.conn.lock().unwrap();
792 let existing: Option<String> = conn
793 .query_row(
794 "SELECT id FROM facts WHERE key = ?1",
795 params![safe_key.clone()],
796 |row| row.get(0),
797 )
798 .ok();
799 if existing.as_deref().is_some_and(|id| id != fact.id) {
800 anyhow::bail!(
801 "memory fact '{}' already exists; use replace/consolidate",
802 safe_key
803 );
804 }
805 conn.execute(
806 "INSERT OR REPLACE INTO facts (id, key, value, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)",
807 params![fact.id, safe_key, safe_value, fact.created_at, fact.updated_at],
808 )?;
809 Ok(())
810 }
811
812 fn recall(&self, q: &str, k: usize) -> Vec<Fact> {
813 let conn = self.conn.lock().unwrap();
814
815 let tokens: Vec<String> = q
821 .split_whitespace()
822 .map(|w| {
823 w.chars()
824 .filter(|c| c.is_alphanumeric() || *c == '_')
825 .collect::<String>()
826 })
827 .filter(|w| !w.is_empty())
828 .map(|w| format!("\"{}\"*", w))
829 .collect();
830
831 let try_fts = !tokens.is_empty();
833 if try_fts {
834 let pattern = tokens.join(" ");
835 if let Ok(mut stmt) = conn.prepare(
836 "SELECT f.id, f.key, f.value, f.created_at, f.updated_at FROM facts f
837 INNER JOIN facts_fts ft ON f.rowid = ft.rowid
838 WHERE facts_fts MATCH ?1 ORDER BY rank LIMIT ?2",
839 ) {
840 if let Ok(rows) = stmt.query_map(params![pattern, k as i64], |row| {
841 Ok(Fact {
842 id: row.get(0)?,
843 key: row.get(1)?,
844 value: row.get(2)?,
845 created_at: row.get(3)?,
846 updated_at: row.get(4)?,
847 })
848 }) {
849 let facts: Vec<Fact> = rows.filter_map(|r| r.ok()).collect();
850 if !facts.is_empty() {
851 return facts;
852 }
853 }
856 }
857 }
858
859 let escaped = q
862 .replace('\\', "\\\\")
863 .replace('%', "\\%")
864 .replace('_', "\\_");
865 let like_pattern = format!("%{}%", escaped);
866 let Ok(mut stmt) = conn.prepare(
867 "SELECT id, key, value, created_at, updated_at FROM facts \
868 WHERE key LIKE ?1 ESCAPE '\\' OR value LIKE ?1 ESCAPE '\\' LIMIT ?2",
869 ) else {
870 return Vec::new();
871 };
872 let Ok(rows) = stmt.query_map(params![like_pattern, k as i64], |row| {
873 Ok(Fact {
874 id: row.get(0)?,
875 key: row.get(1)?,
876 value: row.get(2)?,
877 created_at: row.get(3)?,
878 updated_at: row.get(4)?,
879 })
880 }) else {
881 return Vec::new();
882 };
883 rows.filter_map(|r| r.ok()).collect()
884 }
885
886 fn all_facts(&self) -> Vec<Fact> {
887 let conn = self.conn.lock().unwrap();
888 let mut stmt = conn
889 .prepare(
890 "SELECT id, key, value, created_at, updated_at FROM facts ORDER BY updated_at DESC",
891 )
892 .unwrap();
893 stmt.query_map([], |row| {
894 Ok(Fact {
895 id: row.get(0)?,
896 key: row.get(1)?,
897 value: row.get(2)?,
898 created_at: row.get(3)?,
899 updated_at: row.get(4)?,
900 })
901 })
902 .unwrap()
903 .filter_map(|r| r.ok())
904 .collect()
905 }
906
907 fn forget(&self, id: &str) -> anyhow::Result<()> {
908 let conn = self.conn.lock().unwrap();
909 conn.execute("DELETE FROM facts WHERE id = ?1", params![id])?;
910 Ok(())
911 }
912
913 fn memory_doc(&self, kind: MemoryDocKind) -> Option<MemoryDoc> {
914 let conn = self.conn.lock().unwrap();
915 conn.query_row(
916 "SELECT content, updated_at FROM memory_docs WHERE kind = ?1",
917 params![kind.as_str()],
918 |row| {
919 Ok(MemoryDoc {
920 kind,
921 content: row.get(0)?,
922 updated_at: row.get(1)?,
923 })
924 },
925 )
926 .ok()
927 }
928
929 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
930 validate_memory_text(kind.as_str(), content, kind.limit())?;
931 let conn = self.conn.lock().unwrap();
932 conn.execute(
933 "INSERT OR REPLACE INTO memory_docs (kind, content, updated_at)
934 VALUES (?1, ?2, datetime('now'))",
935 params![kind.as_str(), content],
936 )?;
937 Ok(())
938 }
939
940 fn remove_memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<()> {
941 let conn = self.conn.lock().unwrap();
942 conn.execute(
943 "DELETE FROM memory_docs WHERE kind = ?1",
944 params![kind.as_str()],
945 )?;
946 Ok(())
947 }
948
949 fn memory_stats(&self) -> MemoryStats {
950 let memory_chars = self
951 .memory_doc(MemoryDocKind::Memory)
952 .map(|doc| doc.content.chars().count())
953 .unwrap_or(0);
954 let user_chars = self
955 .memory_doc(MemoryDocKind::User)
956 .map(|doc| doc.content.chars().count())
957 .unwrap_or(0);
958 MemoryStats {
959 facts: self.all_facts().len(),
960 memory_chars,
961 memory_limit: MEMORY_MD_LIMIT,
962 user_chars,
963 user_limit: USER_MD_LIMIT,
964 }
965 }
966
967 fn consolidate_memory(&self) -> anyhow::Result<()> {
968 let facts = self.all_facts();
969 let mut memory_lines = Vec::new();
970 memory_lines.push("# MEMORY.md".to_string());
971 memory_lines.push("Durable Sparrow memory distilled from accepted facts.".to_string());
972 for fact in facts.iter().take(40) {
973 memory_lines.push(format!("- {}: {}", fact.key, fact.value));
974 }
975 let memory = truncate_chars(&memory_lines.join("\n"), MEMORY_MD_LIMIT);
976 self.upsert_memory_doc(MemoryDocKind::Memory, &memory)?;
977
978 let mut user_lines = Vec::new();
979 user_lines.push("# USER.md".to_string());
980 for fact in facts
981 .iter()
982 .filter(|fact| fact.key.starts_with("user"))
983 .take(24)
984 {
985 user_lines.push(format!("- {}: {}", fact.key, fact.value));
986 }
987 if user_lines.len() > 1 {
988 let user = truncate_chars(&user_lines.join("\n"), USER_MD_LIMIT);
989 self.upsert_memory_doc(MemoryDocKind::User, &user)?;
990 }
991 Ok(())
992 }
993
994 fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()> {
995 let mut conn = self.conn.lock().unwrap();
996 let tx = conn.transaction()?;
997 tx.execute(
998 "DELETE FROM discovered_models WHERE provider_id = ?1",
999 params![provider_id],
1000 )?;
1001 let fetched_at = chrono::Utc::now().to_rfc3339();
1002 for model in models {
1003 let model = model.trim();
1004 if model.is_empty() {
1005 continue;
1006 }
1007 tx.execute(
1008 "INSERT OR REPLACE INTO discovered_models (provider_id, model_name, fetched_at)
1009 VALUES (?1, ?2, ?3)",
1010 params![provider_id, model, fetched_at],
1011 )?;
1012 }
1013 tx.commit()?;
1014 Ok(())
1015 }
1016
1017 fn get_discovered_models(&self, provider_id: &str) -> Vec<String> {
1018 let conn = self.conn.lock().unwrap();
1019 let Ok(mut stmt) = conn.prepare(
1024 "SELECT model_name FROM discovered_models
1025 WHERE provider_id = ?1
1026 AND datetime(fetched_at) >= datetime('now', '-30 days')
1027 ORDER BY model_name ASC",
1028 ) else {
1029 return Vec::new();
1030 };
1031 let Ok(rows) = stmt.query_map(params![provider_id], |row| row.get::<_, String>(0)) else {
1032 return Vec::new();
1033 };
1034 rows.filter_map(|row| row.ok()).collect()
1035 }
1036
1037 fn upsert_graph_node(&self, node: GraphNode) -> anyhow::Result<()> {
1038 validate_memory_text("graph node id", &node.id, 160)?;
1039 validate_memory_text("graph node label", &node.label, 512)?;
1040 validate_memory_text("graph node kind", &node.kind, 80)?;
1041 validate_memory_text("graph node properties", &node.properties.to_string(), 4000)?;
1042 let redaction = RedactionFilter::new();
1043 let safe_id = redaction.redact_str(&node.id);
1044 let safe_label = redaction.redact_str(&node.label);
1045 let safe_kind = redaction.redact_str(&node.kind);
1046 let safe_properties = serde_json::to_string(&node.properties)?;
1047 let safe_properties = redaction.redact_str(&safe_properties);
1048 let now = chrono::Utc::now().to_rfc3339();
1049 let created_at = if node.created_at.trim().is_empty() {
1050 now.clone()
1051 } else {
1052 node.created_at
1053 };
1054 let updated_at = if node.updated_at.trim().is_empty() {
1055 now
1056 } else {
1057 node.updated_at
1058 };
1059 let conn = self.conn.lock().unwrap();
1060 conn.execute(
1061 "INSERT INTO kg_nodes (id, label, kind, properties_json, created_at, updated_at)
1062 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1063 ON CONFLICT(id) DO UPDATE SET
1064 label = excluded.label,
1065 kind = excluded.kind,
1066 properties_json = excluded.properties_json,
1067 updated_at = excluded.updated_at",
1068 params![
1069 safe_id,
1070 safe_label,
1071 safe_kind,
1072 safe_properties,
1073 created_at,
1074 updated_at
1075 ],
1076 )?;
1077 Ok(())
1078 }
1079
1080 fn upsert_graph_edge(&self, edge: GraphEdge) -> anyhow::Result<()> {
1081 validate_memory_text("graph edge id", &edge.id, 160)?;
1082 validate_memory_text("graph edge from_id", &edge.from_id, 160)?;
1083 validate_memory_text("graph edge to_id", &edge.to_id, 160)?;
1084 validate_memory_text("graph edge relation", &edge.relation, 120)?;
1085 validate_memory_text("graph edge properties", &edge.properties.to_string(), 4000)?;
1086 let redaction = RedactionFilter::new();
1087 let safe_properties = serde_json::to_string(&edge.properties)?;
1088 let safe_properties = redaction.redact_str(&safe_properties);
1089 let now = chrono::Utc::now().to_rfc3339();
1090 let created_at = if edge.created_at.trim().is_empty() {
1091 now.clone()
1092 } else {
1093 edge.created_at
1094 };
1095 let updated_at = if edge.updated_at.trim().is_empty() {
1096 now
1097 } else {
1098 edge.updated_at
1099 };
1100 let conn = self.conn.lock().unwrap();
1101 conn.execute(
1102 "INSERT INTO kg_edges (id, from_id, to_id, relation, weight, properties_json, created_at, updated_at)
1103 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1104 ON CONFLICT(id) DO UPDATE SET
1105 from_id = excluded.from_id,
1106 to_id = excluded.to_id,
1107 relation = excluded.relation,
1108 weight = excluded.weight,
1109 properties_json = excluded.properties_json,
1110 updated_at = excluded.updated_at",
1111 params![
1112 redaction.redact_str(&edge.id),
1113 redaction.redact_str(&edge.from_id),
1114 redaction.redact_str(&edge.to_id),
1115 redaction.redact_str(&edge.relation),
1116 edge.weight,
1117 safe_properties,
1118 created_at,
1119 updated_at
1120 ],
1121 )?;
1122 Ok(())
1123 }
1124
1125 fn graph_node(&self, id: &str) -> Option<GraphNode> {
1126 let conn = self.conn.lock().unwrap();
1127 conn.query_row(
1128 "SELECT id, label, kind, properties_json, created_at, updated_at
1129 FROM kg_nodes WHERE id = ?1",
1130 params![id],
1131 graph_node_from_row,
1132 )
1133 .ok()
1134 }
1135
1136 fn graph_neighbors(
1137 &self,
1138 id: &str,
1139 direction: GraphDirection,
1140 limit: usize,
1141 ) -> Vec<(GraphEdge, GraphNode)> {
1142 let conn = self.conn.lock().unwrap();
1143 let limit = limit.clamp(1, 100) as i64;
1144 let sql = match direction {
1145 GraphDirection::Outgoing => {
1146 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1147 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1148 FROM kg_edges e
1149 JOIN kg_nodes n ON n.id = e.to_id
1150 WHERE e.from_id = ?1
1151 ORDER BY e.weight DESC, e.updated_at DESC
1152 LIMIT ?2"
1153 }
1154 GraphDirection::Incoming => {
1155 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1156 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1157 FROM kg_edges e
1158 JOIN kg_nodes n ON n.id = e.from_id
1159 WHERE e.to_id = ?1
1160 ORDER BY e.weight DESC, e.updated_at DESC
1161 LIMIT ?2"
1162 }
1163 GraphDirection::Both => {
1164 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1165 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1166 FROM kg_edges e
1167 JOIN kg_nodes n ON n.id = CASE WHEN e.from_id = ?1 THEN e.to_id ELSE e.from_id END
1168 WHERE e.from_id = ?1 OR e.to_id = ?1
1169 ORDER BY e.weight DESC, e.updated_at DESC
1170 LIMIT ?2"
1171 }
1172 };
1173 let Ok(mut stmt) = conn.prepare(sql) else {
1174 return Vec::new();
1175 };
1176 let Ok(rows) = stmt.query_map(params![id, limit], |row| {
1177 let edge = graph_edge_from_row(row)?;
1178 let properties_json: String = row.get(11)?;
1179 let node = GraphNode {
1180 id: row.get(8)?,
1181 label: row.get(9)?,
1182 kind: row.get(10)?,
1183 properties: serde_json::from_str(&properties_json)
1184 .unwrap_or(serde_json::Value::Null),
1185 created_at: row.get(12)?,
1186 updated_at: row.get(13)?,
1187 };
1188 Ok((edge, node))
1189 }) else {
1190 return Vec::new();
1191 };
1192 rows.filter_map(|row| row.ok()).collect()
1193 }
1194
1195 fn search_graph(&self, query: &str, limit: usize) -> Vec<GraphNode> {
1196 let query = query.trim();
1197 if query.is_empty() {
1198 return Vec::new();
1199 }
1200 let escaped = query
1201 .replace('\\', "\\\\")
1202 .replace('%', "\\%")
1203 .replace('_', "\\_");
1204 let pattern = format!("%{}%", escaped);
1205 let conn = self.conn.lock().unwrap();
1206 let Ok(mut stmt) = conn.prepare(
1207 "SELECT id, label, kind, properties_json, created_at, updated_at
1208 FROM kg_nodes
1209 WHERE id LIKE ?1 ESCAPE '\\'
1210 OR label LIKE ?1 ESCAPE '\\'
1211 OR kind LIKE ?1 ESCAPE '\\'
1212 OR properties_json LIKE ?1 ESCAPE '\\'
1213 ORDER BY updated_at DESC
1214 LIMIT ?2",
1215 ) else {
1216 return Vec::new();
1217 };
1218 let Ok(rows) = stmt.query_map(
1219 params![pattern, limit.clamp(1, 100) as i64],
1220 graph_node_from_row,
1221 ) else {
1222 return Vec::new();
1223 };
1224 rows.filter_map(|row| row.ok()).collect()
1225 }
1226
1227 fn graph_export(&self) -> KnowledgeGraph {
1228 let conn = self.conn.lock().unwrap();
1229 let nodes = conn
1230 .prepare(
1231 "SELECT id, label, kind, properties_json, created_at, updated_at
1232 FROM kg_nodes ORDER BY kind, label",
1233 )
1234 .and_then(|mut stmt| {
1235 let rows = stmt.query_map([], graph_node_from_row)?;
1236 Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1237 })
1238 .unwrap_or_default();
1239 let edges = conn
1240 .prepare(
1241 "SELECT id, from_id, to_id, relation, weight, properties_json, created_at, updated_at
1242 FROM kg_edges ORDER BY relation, from_id, to_id",
1243 )
1244 .and_then(|mut stmt| {
1245 let rows = stmt.query_map([], graph_edge_from_row)?;
1246 Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1247 })
1248 .unwrap_or_default();
1249 KnowledgeGraph { nodes, edges }
1250 }
1251
1252 fn delete_graph_node(&self, id: &str) -> anyhow::Result<()> {
1253 let conn = self.conn.lock().unwrap();
1254 conn.execute(
1255 "DELETE FROM kg_edges WHERE from_id = ?1 OR to_id = ?1",
1256 params![id],
1257 )?;
1258 conn.execute("DELETE FROM kg_nodes WHERE id = ?1", params![id])?;
1259 Ok(())
1260 }
1261
1262 fn delete_graph_edge(&self, id: &str) -> anyhow::Result<()> {
1263 let conn = self.conn.lock().unwrap();
1264 conn.execute("DELETE FROM kg_edges WHERE id = ?1", params![id])?;
1265 Ok(())
1266 }
1267}