1use rusqlite::{Connection, params};
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5
6use crate::engine::Identity;
7use crate::event::RunId;
8use crate::provider::Msg;
9use crate::redaction::RedactionFilter;
10
11#[cfg(feature = "treesitter")]
12pub mod symbol_index;
13
14pub mod cli;
15pub mod fts;
16
17pub const MEMORY_MD_LIMIT: usize = 2200;
18pub const USER_MD_LIMIT: usize = 1375;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RepoMap {
24 pub root: PathBuf,
25 pub files: Vec<FileEntry>,
26 pub symbols: Vec<SymbolEntry>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct FileEntry {
31 pub path: String,
32 pub size: u64,
33 pub modified: String,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct SymbolEntry {
38 pub file: String,
39 pub name: String,
40 pub kind: String, pub line: u32,
42}
43
44impl RepoMap {
45 pub fn scan(root: &Path) -> Self {
46 let mut files = Vec::new();
47 let mut symbols = Vec::new();
48 scan_dir(root, root, &mut files, &mut symbols);
49 Self {
50 root: root.to_path_buf(),
51 files,
52 symbols,
53 }
54 }
55}
56
57fn scan_dir(base: &Path, dir: &Path, files: &mut Vec<FileEntry>, symbols: &mut Vec<SymbolEntry>) {
58 if let Ok(entries) = std::fs::read_dir(dir) {
59 for entry in entries.flatten() {
60 let path = entry.path();
61 let name = entry.file_name().to_string_lossy().to_string();
62
63 if name.starts_with('.')
65 || name == "node_modules"
66 || name == "target"
67 || name == "build"
68 || name == "dist"
69 {
70 continue;
71 }
72
73 if path.is_dir() {
74 scan_dir(base, &path, files, symbols);
75 } else {
76 let rel = path
77 .strip_prefix(base)
78 .unwrap_or(&path)
79 .to_string_lossy()
80 .to_string();
81
82 let modified = path
83 .metadata()
84 .ok()
85 .and_then(|m| m.modified().ok())
86 .and_then(|t| {
87 chrono::DateTime::from_timestamp(
88 t.duration_since(std::time::UNIX_EPOCH)
89 .unwrap_or_default()
90 .as_secs() as i64,
91 0,
92 )
93 })
94 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
95 .unwrap_or_default();
96
97 files.push(FileEntry {
98 path: rel.clone(),
99 size: path.metadata().map(|m| m.len()).unwrap_or(0),
100 modified,
101 });
102
103 if rel.ends_with(".rs") {
105 if let Ok(content) = std::fs::read_to_string(&path) {
106 for (i, line) in content.lines().enumerate() {
107 let trimmed = line.trim();
108 if trimmed.starts_with("pub fn ") || trimmed.starts_with("fn ") {
109 let name = trimmed
110 .trim_start_matches("pub fn ")
111 .trim_start_matches("fn ")
112 .split('(')
113 .next()
114 .unwrap_or("");
115 symbols.push(SymbolEntry {
116 file: rel.clone(),
117 name: name.to_string(),
118 kind: "fn".into(),
119 line: (i + 1) as u32,
120 });
121 } else if trimmed.starts_with("pub struct ")
122 || trimmed.starts_with("struct ")
123 {
124 let name = trimmed
125 .trim_start_matches("pub struct ")
126 .trim_start_matches("struct ")
127 .split(|c: char| c == '<' || c == '{' || c == '(')
128 .next()
129 .unwrap_or("");
130 symbols.push(SymbolEntry {
131 file: rel.clone(),
132 name: name.to_string(),
133 kind: "struct".into(),
134 line: (i + 1) as u32,
135 });
136 } else if trimmed.starts_with("pub enum ")
137 || trimmed.starts_with("enum ")
138 {
139 let name = trimmed
140 .trim_start_matches("pub enum ")
141 .trim_start_matches("enum ")
142 .split(|c: char| c == '<' || c == '{')
143 .next()
144 .unwrap_or("");
145 symbols.push(SymbolEntry {
146 file: rel.clone(),
147 name: name.to_string(),
148 kind: "enum".into(),
149 line: (i + 1) as u32,
150 });
151 } else if trimmed.starts_with("pub trait ")
152 || trimmed.starts_with("trait ")
153 {
154 let name = trimmed
155 .trim_start_matches("pub trait ")
156 .trim_start_matches("trait ")
157 .split(|c: char| c == '<' || c == '{')
158 .next()
159 .unwrap_or("");
160 symbols.push(SymbolEntry {
161 file: rel.clone(),
162 name: name.to_string(),
163 kind: "trait".into(),
164 line: (i + 1) as u32,
165 });
166 } else if trimmed.starts_with("pub mod ") || trimmed.starts_with("mod ")
167 {
168 let name = trimmed
169 .trim_start_matches("pub mod ")
170 .trim_start_matches("mod ")
171 .split(';')
172 .next()
173 .unwrap_or("");
174 symbols.push(SymbolEntry {
175 file: rel.clone(),
176 name: name.to_string(),
177 kind: "mod".into(),
178 line: (i + 1) as u32,
179 });
180 }
181 }
182 }
183 }
184 }
185 }
186 }
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct TaskMem {
193 pub run_id: String,
194 pub messages: Vec<Msg>,
195 pub created_at: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct SharedSignal {
202 pub id: String,
203 pub from_agent: String,
204 pub to_agent: String,
205 pub content: String,
206 pub timestamp: String,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct WorkingDoc {
211 pub id: String,
212 pub title: String,
213 pub content: String,
214 pub updated_at: String,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct Fact {
221 pub id: String,
222 pub key: String,
223 pub value: String,
224 pub created_at: String,
225 pub updated_at: String,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
229pub enum MemoryDocKind {
230 Memory,
231 User,
232}
233
234impl MemoryDocKind {
235 pub fn as_str(self) -> &'static str {
236 match self {
237 Self::Memory => "MEMORY.md",
238 Self::User => "USER.md",
239 }
240 }
241
242 pub fn limit(self) -> usize {
243 match self {
244 Self::Memory => MEMORY_MD_LIMIT,
245 Self::User => USER_MD_LIMIT,
246 }
247 }
248
249 pub fn parse(value: &str) -> Option<Self> {
250 match value.trim().to_ascii_lowercase().as_str() {
251 "memory" | "memory.md" => Some(Self::Memory),
252 "user" | "user.md" => Some(Self::User),
253 _ => None,
254 }
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MemoryDoc {
260 pub kind: MemoryDocKind,
261 pub content: String,
262 pub updated_at: String,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct MemoryStats {
267 pub facts: usize,
268 pub memory_chars: usize,
269 pub memory_limit: usize,
270 pub user_chars: usize,
271 pub user_limit: usize,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
275pub struct GraphNode {
276 pub id: String,
277 pub label: String,
278 pub kind: String,
279 #[serde(default)]
280 pub properties: serde_json::Value,
281 pub created_at: String,
282 pub updated_at: String,
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
286pub struct GraphEdge {
287 pub id: String,
288 pub from_id: String,
289 pub to_id: String,
290 pub relation: String,
291 pub weight: f64,
292 #[serde(default)]
293 pub properties: serde_json::Value,
294 pub created_at: String,
295 pub updated_at: String,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize, Default)]
299pub struct KnowledgeGraph {
300 pub nodes: Vec<GraphNode>,
301 pub edges: Vec<GraphEdge>,
302}
303
304#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
305pub enum GraphDirection {
306 Incoming,
307 Outgoing,
308 Both,
309}
310
311impl GraphDirection {
312 pub fn parse(value: &str) -> Self {
313 match value.trim().to_ascii_lowercase().as_str() {
314 "incoming" | "in" => Self::Incoming,
315 "outgoing" | "out" => Self::Outgoing,
316 _ => Self::Both,
317 }
318 }
319}
320
321pub fn validate_memory_text(label: &str, text: &str, limit: usize) -> anyhow::Result<()> {
322 let chars = text.chars().count();
323 if chars > limit {
324 anyhow::bail!("{label} is too large: {chars}/{limit} chars");
325 }
326 if text.chars().any(is_forbidden_invisible_char) {
327 anyhow::bail!("{label} contains invisible control characters");
328 }
329
330 let lower = text.to_ascii_lowercase();
331 let blocked = [
332 "ignore previous instructions",
333 "ignore all previous instructions",
334 "disregard previous instructions",
335 "reveal your system prompt",
336 "print your system prompt",
337 "exfiltrate",
338 "steal secrets",
339 "dump secrets",
340 "print env",
341 "cat ~/.ssh",
342 "backdoor",
343 "reverse shell",
344 "rm -rf /",
345 ];
346 if blocked.iter().any(|needle| lower.contains(needle)) {
347 anyhow::bail!("{label} looks like prompt injection or credential exfiltration");
348 }
349 Ok(())
350}
351
352fn is_forbidden_invisible_char(c: char) -> bool {
353 matches!(
354 c,
355 '\u{200B}'..='\u{200F}'
356 | '\u{202A}'..='\u{202E}'
357 | '\u{2060}'..='\u{206F}'
358 | '\u{FEFF}'
359 ) || (c.is_control() && c != '\n' && c != '\r' && c != '\t')
360}
361
362fn truncate_chars(text: &str, limit: usize) -> String {
363 text.chars().take(limit).collect()
364}
365
366pub trait Memory: Send + Sync {
369 fn repo_map(&self, root: &Path) -> RepoMap;
370 fn identity(&self, agent: &str) -> Option<Identity>;
371 fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()>;
372 fn task(&self, run: &RunId) -> Option<TaskMem>;
373 fn save_task(&self, task: &TaskMem) -> anyhow::Result<()>;
374 fn shared_signals(&self) -> Vec<SharedSignal>;
375 fn shared_docs(&self) -> Vec<WorkingDoc>;
376 fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()>;
377 fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()>;
378 fn remember(&self, fact: Fact) -> anyhow::Result<()>;
379 fn recall(&self, q: &str, k: usize) -> Vec<Fact>;
380 fn all_facts(&self) -> Vec<Fact>;
381 fn forget(&self, id: &str) -> anyhow::Result<()>;
382 fn memory_doc(&self, _kind: MemoryDocKind) -> Option<MemoryDoc> {
383 None
384 }
385 fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
386 anyhow::bail!("memory documents are not supported by this memory backend")
387 }
388 fn remove_memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<()> {
389 anyhow::bail!("memory documents are not supported by this memory backend")
390 }
391 fn memory_stats(&self) -> MemoryStats {
392 MemoryStats {
393 facts: self.all_facts().len(),
394 memory_chars: 0,
395 memory_limit: MEMORY_MD_LIMIT,
396 user_chars: 0,
397 user_limit: USER_MD_LIMIT,
398 }
399 }
400 fn consolidate_memory(&self) -> anyhow::Result<()> {
401 anyhow::bail!("memory consolidation is not supported by this memory backend")
402 }
403 fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()>;
404 fn get_discovered_models(&self, provider_id: &str) -> Vec<String>;
405 fn upsert_graph_node(&self, _node: GraphNode) -> anyhow::Result<()> {
406 anyhow::bail!("knowledge graph is not supported by this memory backend")
407 }
408 fn upsert_graph_edge(&self, _edge: GraphEdge) -> anyhow::Result<()> {
409 anyhow::bail!("knowledge graph is not supported by this memory backend")
410 }
411 fn graph_node(&self, _id: &str) -> Option<GraphNode> {
412 None
413 }
414 fn graph_neighbors(
415 &self,
416 _id: &str,
417 _direction: GraphDirection,
418 _limit: usize,
419 ) -> Vec<(GraphEdge, GraphNode)> {
420 Vec::new()
421 }
422 fn search_graph(&self, _query: &str, _limit: usize) -> Vec<GraphNode> {
423 Vec::new()
424 }
425 fn graph_export(&self) -> KnowledgeGraph {
426 KnowledgeGraph::default()
427 }
428 fn delete_graph_node(&self, _id: &str) -> anyhow::Result<()> {
429 anyhow::bail!("knowledge graph is not supported by this memory backend")
430 }
431 fn delete_graph_edge(&self, _id: &str) -> anyhow::Result<()> {
432 anyhow::bail!("knowledge graph is not supported by this memory backend")
433 }
434}
435
436pub trait MemoryProvider: Send + Sync {
437 fn name(&self) -> &str;
438 fn remember(&self, fact: Fact) -> anyhow::Result<()>;
439 fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>>;
440 fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>>;
441 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()>;
442}
443
444pub struct LocalMemoryProvider {
445 memory: Arc<dyn Memory>,
446}
447
448impl LocalMemoryProvider {
449 pub fn new(memory: Arc<dyn Memory>) -> Self {
450 Self { memory }
451 }
452}
453
454impl MemoryProvider for LocalMemoryProvider {
455 fn name(&self) -> &str {
456 "local"
457 }
458
459 fn remember(&self, fact: Fact) -> anyhow::Result<()> {
460 self.memory.remember(fact)
461 }
462
463 fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>> {
464 Ok(self.memory.recall(query, limit))
465 }
466
467 fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
468 Ok(self.memory.memory_doc(kind))
469 }
470
471 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
472 self.memory.upsert_memory_doc(kind, content)
473 }
474}
475
476pub struct ExternalMemoryProvider {
477 name: String,
478}
479
480impl ExternalMemoryProvider {
481 pub fn mem0() -> Self {
482 Self {
483 name: "mem0".into(),
484 }
485 }
486
487 pub fn honcho() -> Self {
488 Self {
489 name: "honcho".into(),
490 }
491 }
492
493 pub fn supermemory() -> Self {
494 Self {
495 name: "supermemory".into(),
496 }
497 }
498
499 fn not_configured(&self) -> anyhow::Error {
500 anyhow::anyhow!(
501 "external memory provider '{}' is not configured; use local SQLite memory or configure a connector first",
502 self.name
503 )
504 }
505}
506
507impl MemoryProvider for ExternalMemoryProvider {
508 fn name(&self) -> &str {
509 &self.name
510 }
511
512 fn remember(&self, _fact: Fact) -> anyhow::Result<()> {
513 Err(self.not_configured())
514 }
515
516 fn recall(&self, _query: &str, _limit: usize) -> anyhow::Result<Vec<Fact>> {
517 Err(self.not_configured())
518 }
519
520 fn memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
521 Err(self.not_configured())
522 }
523
524 fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
525 Err(self.not_configured())
526 }
527}
528
529pub struct SqliteMemory {
532 conn: Mutex<Connection>,
533}
534
535impl SqliteMemory {
536 pub fn open(db_path: &Path) -> anyhow::Result<Self> {
537 if let Some(parent) = db_path.parent() {
538 std::fs::create_dir_all(parent)?;
539 }
540 let conn = Connection::open(db_path)?;
541 let memory = Self {
542 conn: Mutex::new(conn),
543 };
544 memory.migrate()?;
545 Ok(memory)
546 }
547
548 fn migrate(&self) -> anyhow::Result<()> {
549 let conn = self.conn.lock().unwrap();
550 conn.execute_batch(
551 "
552 CREATE TABLE IF NOT EXISTS identities (
553 agent TEXT PRIMARY KEY,
554 name TEXT NOT NULL,
555 role TEXT NOT NULL,
556 personality TEXT NOT NULL
557 );
558 CREATE TABLE IF NOT EXISTS tasks (
559 run_id TEXT PRIMARY KEY,
560 messages_json TEXT NOT NULL,
561 created_at TEXT NOT NULL DEFAULT (datetime('now'))
562 );
563 CREATE TABLE IF NOT EXISTS signals (
564 id TEXT PRIMARY KEY,
565 from_agent TEXT NOT NULL,
566 to_agent TEXT NOT NULL,
567 content TEXT NOT NULL,
568 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
569 );
570 CREATE TABLE IF NOT EXISTS working_docs (
571 id TEXT PRIMARY KEY,
572 title TEXT NOT NULL,
573 content TEXT NOT NULL,
574 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
575 );
576 CREATE TABLE IF NOT EXISTS facts (
577 id TEXT PRIMARY KEY,
578 key TEXT NOT NULL UNIQUE,
579 value TEXT NOT NULL,
580 created_at TEXT NOT NULL DEFAULT (datetime('now')),
581 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
582 );
583 CREATE TABLE IF NOT EXISTS discovered_models (
584 provider_id TEXT NOT NULL,
585 model_name TEXT NOT NULL,
586 fetched_at TEXT NOT NULL,
587 PRIMARY KEY (provider_id, model_name)
588 );
589 CREATE TABLE IF NOT EXISTS memory_docs (
590 kind TEXT PRIMARY KEY,
591 content TEXT NOT NULL,
592 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
593 );
594 CREATE TABLE IF NOT EXISTS kg_nodes (
595 id TEXT PRIMARY KEY,
596 label TEXT NOT NULL,
597 kind TEXT NOT NULL,
598 properties_json TEXT NOT NULL DEFAULT '{}',
599 created_at TEXT NOT NULL DEFAULT (datetime('now')),
600 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
601 );
602 CREATE TABLE IF NOT EXISTS kg_edges (
603 id TEXT PRIMARY KEY,
604 from_id TEXT NOT NULL,
605 to_id TEXT NOT NULL,
606 relation TEXT NOT NULL,
607 weight REAL NOT NULL DEFAULT 1.0,
608 properties_json TEXT NOT NULL DEFAULT '{}',
609 created_at TEXT NOT NULL DEFAULT (datetime('now')),
610 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
611 FOREIGN KEY(from_id) REFERENCES kg_nodes(id) ON DELETE CASCADE,
612 FOREIGN KEY(to_id) REFERENCES kg_nodes(id) ON DELETE CASCADE
613 );
614 CREATE INDEX IF NOT EXISTS kg_nodes_label_idx ON kg_nodes(label);
615 CREATE INDEX IF NOT EXISTS kg_nodes_kind_idx ON kg_nodes(kind);
616 CREATE INDEX IF NOT EXISTS kg_edges_from_idx ON kg_edges(from_id);
617 CREATE INDEX IF NOT EXISTS kg_edges_to_idx ON kg_edges(to_id);
618 CREATE INDEX IF NOT EXISTS kg_edges_relation_idx ON kg_edges(relation);
619 -- FTS5 full-text search for memory recall (M1)
620 CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts USING fts5(
621 key, value, content='facts', content_rowid='rowid'
622 );
623 -- Triggers to keep FTS5 index in sync
624 CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
625 INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
626 END;
627 CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
628 INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
629 END;
630 CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN
631 INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
632 INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
633 END;
634 ",
635 )?;
636 Ok(())
637 }
638}
639
640fn graph_node_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphNode> {
641 let properties_json: String = row.get(3)?;
642 Ok(GraphNode {
643 id: row.get(0)?,
644 label: row.get(1)?,
645 kind: row.get(2)?,
646 properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
647 created_at: row.get(4)?,
648 updated_at: row.get(5)?,
649 })
650}
651
652fn graph_edge_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphEdge> {
653 let properties_json: String = row.get(5)?;
654 Ok(GraphEdge {
655 id: row.get(0)?,
656 from_id: row.get(1)?,
657 to_id: row.get(2)?,
658 relation: row.get(3)?,
659 weight: row.get(4)?,
660 properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
661 created_at: row.get(6)?,
662 updated_at: row.get(7)?,
663 })
664}
665
666impl Memory for SqliteMemory {
667 fn repo_map(&self, root: &Path) -> RepoMap {
668 RepoMap::scan(root)
669 }
670
671 fn identity(&self, agent: &str) -> Option<Identity> {
672 let conn = self.conn.lock().unwrap();
673 conn.query_row(
674 "SELECT name, role, personality FROM identities WHERE agent = ?1",
675 params![agent],
676 |row| {
677 Ok(Identity {
678 name: row.get(0)?,
679 role: row.get(1)?,
680 personality: row.get(2)?,
681 })
682 },
683 )
684 .ok()
685 }
686
687 fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()> {
688 let conn = self.conn.lock().unwrap();
689 conn.execute(
690 "INSERT OR REPLACE INTO identities (agent, name, role, personality) VALUES (?1, ?2, ?3, ?4)",
691 params![agent, identity.name, identity.role, identity.personality],
692 )?;
693 Ok(())
694 }
695
696 fn task(&self, run: &RunId) -> Option<TaskMem> {
697 let conn = self.conn.lock().unwrap();
698 conn.query_row(
699 "SELECT run_id, messages_json, created_at FROM tasks WHERE run_id = ?1",
700 params![run.0],
701 |row| {
702 let messages_json: String = row.get(1)?;
703 let messages: Vec<Msg> = serde_json::from_str(&messages_json).unwrap_or_default();
704 Ok(TaskMem {
705 run_id: row.get(0)?,
706 messages,
707 created_at: row.get(2)?,
708 })
709 },
710 )
711 .ok()
712 }
713
714 fn save_task(&self, task: &TaskMem) -> anyhow::Result<()> {
715 let conn = self.conn.lock().unwrap();
716 let messages_json = serde_json::to_string(&task.messages)?;
717 conn.execute(
718 "INSERT OR REPLACE INTO tasks (run_id, messages_json, created_at) VALUES (?1, ?2, ?3)",
719 params![task.run_id, messages_json, task.created_at],
720 )?;
721 Ok(())
722 }
723
724 fn shared_signals(&self) -> Vec<SharedSignal> {
725 let conn = self.conn.lock().unwrap();
726 let mut stmt = conn
727 .prepare("SELECT id, from_agent, to_agent, content, timestamp FROM signals ORDER BY timestamp DESC")
728 .unwrap();
729 let signals = stmt
730 .query_map([], |row| {
731 Ok(SharedSignal {
732 id: row.get(0)?,
733 from_agent: row.get(1)?,
734 to_agent: row.get(2)?,
735 content: row.get(3)?,
736 timestamp: row.get(4)?,
737 })
738 })
739 .unwrap()
740 .filter_map(|r| r.ok())
741 .collect();
742 signals
743 }
744
745 fn shared_docs(&self) -> Vec<WorkingDoc> {
746 let conn = self.conn.lock().unwrap();
747 let mut stmt = conn
748 .prepare(
749 "SELECT id, title, content, updated_at FROM working_docs ORDER BY updated_at DESC",
750 )
751 .unwrap();
752 let docs = stmt
753 .query_map([], |row| {
754 Ok(WorkingDoc {
755 id: row.get(0)?,
756 title: row.get(1)?,
757 content: row.get(2)?,
758 updated_at: row.get(3)?,
759 })
760 })
761 .unwrap()
762 .filter_map(|r| r.ok())
763 .collect();
764 docs
765 }
766
767 fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()> {
768 let conn = self.conn.lock().unwrap();
769 conn.execute(
770 "INSERT INTO signals (id, from_agent, to_agent, content, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
771 params![signal.id, signal.from_agent, signal.to_agent, signal.content, signal.timestamp],
772 )?;
773 Ok(())
774 }
775
776 fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()> {
777 let conn = self.conn.lock().unwrap();
778 conn.execute(
779 "INSERT OR REPLACE INTO working_docs (id, title, content, updated_at) VALUES (?1, ?2, ?3, ?4)",
780 params![doc.id, doc.title, doc.content, doc.updated_at],
781 )?;
782 Ok(())
783 }
784
785 fn remember(&self, fact: Fact) -> anyhow::Result<()> {
786 let redaction = RedactionFilter::new();
787 validate_memory_text("fact key", &fact.key, 256)?;
788 validate_memory_text("fact value", &fact.value, 1200)?;
789 let safe_value = redaction.redact_str(&fact.value);
790 let safe_key = redaction.redact_str(&fact.key);
791 if redaction.contains_secret(&fact.value) {
792 tracing::warn!("Redacted secret from memory fact: {}", fact.key);
793 }
794 let conn = self.conn.lock().unwrap();
795 let existing: Option<String> = conn
796 .query_row(
797 "SELECT id FROM facts WHERE key = ?1",
798 params![safe_key.clone()],
799 |row| row.get(0),
800 )
801 .ok();
802 if existing.as_deref().is_some_and(|id| id != fact.id) {
803 anyhow::bail!(
804 "memory fact '{}' already exists; use replace/consolidate",
805 safe_key
806 );
807 }
808 conn.execute(
809 "INSERT OR REPLACE INTO facts (id, key, value, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)",
810 params![fact.id, safe_key, safe_value, fact.created_at, fact.updated_at],
811 )?;
812 Ok(())
813 }
814
815 fn recall(&self, q: &str, k: usize) -> Vec<Fact> {
816 let conn = self.conn.lock().unwrap();
817
818 let tokens: Vec<String> = q
824 .split_whitespace()
825 .map(|w| {
826 w.chars()
827 .filter(|c| c.is_alphanumeric() || *c == '_')
828 .collect::<String>()
829 })
830 .filter(|w| !w.is_empty())
831 .map(|w| format!("\"{}\"*", w))
832 .collect();
833
834 let try_fts = !tokens.is_empty();
836 if try_fts {
837 let pattern = tokens.join(" ");
838 if let Ok(mut stmt) = conn.prepare(
839 "SELECT f.id, f.key, f.value, f.created_at, f.updated_at FROM facts f
840 INNER JOIN facts_fts ft ON f.rowid = ft.rowid
841 WHERE facts_fts MATCH ?1 ORDER BY rank LIMIT ?2",
842 ) {
843 if let Ok(rows) = stmt.query_map(params![pattern, k as i64], |row| {
844 Ok(Fact {
845 id: row.get(0)?,
846 key: row.get(1)?,
847 value: row.get(2)?,
848 created_at: row.get(3)?,
849 updated_at: row.get(4)?,
850 })
851 }) {
852 let facts: Vec<Fact> = rows.filter_map(|r| r.ok()).collect();
853 if !facts.is_empty() {
854 return facts;
855 }
856 }
859 }
860 }
861
862 let escaped = q
865 .replace('\\', "\\\\")
866 .replace('%', "\\%")
867 .replace('_', "\\_");
868 let like_pattern = format!("%{}%", escaped);
869 let Ok(mut stmt) = conn.prepare(
870 "SELECT id, key, value, created_at, updated_at FROM facts \
871 WHERE key LIKE ?1 ESCAPE '\\' OR value LIKE ?1 ESCAPE '\\' LIMIT ?2",
872 ) else {
873 return Vec::new();
874 };
875 let Ok(rows) = stmt.query_map(params![like_pattern, k as i64], |row| {
876 Ok(Fact {
877 id: row.get(0)?,
878 key: row.get(1)?,
879 value: row.get(2)?,
880 created_at: row.get(3)?,
881 updated_at: row.get(4)?,
882 })
883 }) else {
884 return Vec::new();
885 };
886 rows.filter_map(|r| r.ok()).collect()
887 }
888
889 fn all_facts(&self) -> Vec<Fact> {
890 let conn = self.conn.lock().unwrap();
891 let mut stmt = conn
892 .prepare(
893 "SELECT id, key, value, created_at, updated_at FROM facts ORDER BY updated_at DESC",
894 )
895 .unwrap();
896 stmt.query_map([], |row| {
897 Ok(Fact {
898 id: row.get(0)?,
899 key: row.get(1)?,
900 value: row.get(2)?,
901 created_at: row.get(3)?,
902 updated_at: row.get(4)?,
903 })
904 })
905 .unwrap()
906 .filter_map(|r| r.ok())
907 .collect()
908 }
909
910 fn forget(&self, id: &str) -> anyhow::Result<()> {
911 let conn = self.conn.lock().unwrap();
912 conn.execute("DELETE FROM facts WHERE id = ?1", params![id])?;
913 Ok(())
914 }
915
916 fn memory_doc(&self, kind: MemoryDocKind) -> Option<MemoryDoc> {
917 let conn = self.conn.lock().unwrap();
918 conn.query_row(
919 "SELECT content, updated_at FROM memory_docs WHERE kind = ?1",
920 params![kind.as_str()],
921 |row| {
922 Ok(MemoryDoc {
923 kind,
924 content: row.get(0)?,
925 updated_at: row.get(1)?,
926 })
927 },
928 )
929 .ok()
930 }
931
932 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
933 validate_memory_text(kind.as_str(), content, kind.limit())?;
934 let conn = self.conn.lock().unwrap();
935 conn.execute(
936 "INSERT OR REPLACE INTO memory_docs (kind, content, updated_at)
937 VALUES (?1, ?2, datetime('now'))",
938 params![kind.as_str(), content],
939 )?;
940 Ok(())
941 }
942
943 fn remove_memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<()> {
944 let conn = self.conn.lock().unwrap();
945 conn.execute(
946 "DELETE FROM memory_docs WHERE kind = ?1",
947 params![kind.as_str()],
948 )?;
949 Ok(())
950 }
951
952 fn memory_stats(&self) -> MemoryStats {
953 let memory_chars = self
954 .memory_doc(MemoryDocKind::Memory)
955 .map(|doc| doc.content.chars().count())
956 .unwrap_or(0);
957 let user_chars = self
958 .memory_doc(MemoryDocKind::User)
959 .map(|doc| doc.content.chars().count())
960 .unwrap_or(0);
961 MemoryStats {
962 facts: self.all_facts().len(),
963 memory_chars,
964 memory_limit: MEMORY_MD_LIMIT,
965 user_chars,
966 user_limit: USER_MD_LIMIT,
967 }
968 }
969
970 fn consolidate_memory(&self) -> anyhow::Result<()> {
971 let facts = self.all_facts();
972 let mut memory_lines = Vec::new();
973 memory_lines.push("# MEMORY.md".to_string());
974 memory_lines.push("Durable Sparrow memory distilled from accepted facts.".to_string());
975 for fact in facts.iter().take(40) {
976 memory_lines.push(format!("- {}: {}", fact.key, fact.value));
977 }
978 let memory = truncate_chars(&memory_lines.join("\n"), MEMORY_MD_LIMIT);
979 self.upsert_memory_doc(MemoryDocKind::Memory, &memory)?;
980
981 let mut user_lines = Vec::new();
982 user_lines.push("# USER.md".to_string());
983 for fact in facts
984 .iter()
985 .filter(|fact| fact.key.starts_with("user"))
986 .take(24)
987 {
988 user_lines.push(format!("- {}: {}", fact.key, fact.value));
989 }
990 if user_lines.len() > 1 {
991 let user = truncate_chars(&user_lines.join("\n"), USER_MD_LIMIT);
992 self.upsert_memory_doc(MemoryDocKind::User, &user)?;
993 }
994 Ok(())
995 }
996
997 fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()> {
998 let mut conn = self.conn.lock().unwrap();
999 let tx = conn.transaction()?;
1000 tx.execute(
1001 "DELETE FROM discovered_models WHERE provider_id = ?1",
1002 params![provider_id],
1003 )?;
1004 let fetched_at = chrono::Utc::now().to_rfc3339();
1005 for model in models {
1006 let model = model.trim();
1007 if model.is_empty() {
1008 continue;
1009 }
1010 tx.execute(
1011 "INSERT OR REPLACE INTO discovered_models (provider_id, model_name, fetched_at)
1012 VALUES (?1, ?2, ?3)",
1013 params![provider_id, model, fetched_at],
1014 )?;
1015 }
1016 tx.commit()?;
1017 Ok(())
1018 }
1019
1020 fn get_discovered_models(&self, provider_id: &str) -> Vec<String> {
1021 let conn = self.conn.lock().unwrap();
1022 let Ok(mut stmt) = conn.prepare(
1027 "SELECT model_name FROM discovered_models
1028 WHERE provider_id = ?1
1029 AND datetime(fetched_at) >= datetime('now', '-30 days')
1030 ORDER BY model_name ASC",
1031 ) else {
1032 return Vec::new();
1033 };
1034 let Ok(rows) = stmt.query_map(params![provider_id], |row| row.get::<_, String>(0)) else {
1035 return Vec::new();
1036 };
1037 rows.filter_map(|row| row.ok()).collect()
1038 }
1039
1040 fn upsert_graph_node(&self, node: GraphNode) -> anyhow::Result<()> {
1041 validate_memory_text("graph node id", &node.id, 160)?;
1042 validate_memory_text("graph node label", &node.label, 512)?;
1043 validate_memory_text("graph node kind", &node.kind, 80)?;
1044 validate_memory_text("graph node properties", &node.properties.to_string(), 4000)?;
1045 let redaction = RedactionFilter::new();
1046 let safe_id = redaction.redact_str(&node.id);
1047 let safe_label = redaction.redact_str(&node.label);
1048 let safe_kind = redaction.redact_str(&node.kind);
1049 let safe_properties = serde_json::to_string(&node.properties)?;
1050 let safe_properties = redaction.redact_str(&safe_properties);
1051 let now = chrono::Utc::now().to_rfc3339();
1052 let created_at = if node.created_at.trim().is_empty() {
1053 now.clone()
1054 } else {
1055 node.created_at
1056 };
1057 let updated_at = if node.updated_at.trim().is_empty() {
1058 now
1059 } else {
1060 node.updated_at
1061 };
1062 let conn = self.conn.lock().unwrap();
1063 conn.execute(
1064 "INSERT INTO kg_nodes (id, label, kind, properties_json, created_at, updated_at)
1065 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1066 ON CONFLICT(id) DO UPDATE SET
1067 label = excluded.label,
1068 kind = excluded.kind,
1069 properties_json = excluded.properties_json,
1070 updated_at = excluded.updated_at",
1071 params![
1072 safe_id,
1073 safe_label,
1074 safe_kind,
1075 safe_properties,
1076 created_at,
1077 updated_at
1078 ],
1079 )?;
1080 Ok(())
1081 }
1082
1083 fn upsert_graph_edge(&self, edge: GraphEdge) -> anyhow::Result<()> {
1084 validate_memory_text("graph edge id", &edge.id, 160)?;
1085 validate_memory_text("graph edge from_id", &edge.from_id, 160)?;
1086 validate_memory_text("graph edge to_id", &edge.to_id, 160)?;
1087 validate_memory_text("graph edge relation", &edge.relation, 120)?;
1088 validate_memory_text("graph edge properties", &edge.properties.to_string(), 4000)?;
1089 let redaction = RedactionFilter::new();
1090 let safe_properties = serde_json::to_string(&edge.properties)?;
1091 let safe_properties = redaction.redact_str(&safe_properties);
1092 let now = chrono::Utc::now().to_rfc3339();
1093 let created_at = if edge.created_at.trim().is_empty() {
1094 now.clone()
1095 } else {
1096 edge.created_at
1097 };
1098 let updated_at = if edge.updated_at.trim().is_empty() {
1099 now
1100 } else {
1101 edge.updated_at
1102 };
1103 let conn = self.conn.lock().unwrap();
1104 conn.execute(
1105 "INSERT INTO kg_edges (id, from_id, to_id, relation, weight, properties_json, created_at, updated_at)
1106 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1107 ON CONFLICT(id) DO UPDATE SET
1108 from_id = excluded.from_id,
1109 to_id = excluded.to_id,
1110 relation = excluded.relation,
1111 weight = excluded.weight,
1112 properties_json = excluded.properties_json,
1113 updated_at = excluded.updated_at",
1114 params![
1115 redaction.redact_str(&edge.id),
1116 redaction.redact_str(&edge.from_id),
1117 redaction.redact_str(&edge.to_id),
1118 redaction.redact_str(&edge.relation),
1119 edge.weight,
1120 safe_properties,
1121 created_at,
1122 updated_at
1123 ],
1124 )?;
1125 Ok(())
1126 }
1127
1128 fn graph_node(&self, id: &str) -> Option<GraphNode> {
1129 let conn = self.conn.lock().unwrap();
1130 conn.query_row(
1131 "SELECT id, label, kind, properties_json, created_at, updated_at
1132 FROM kg_nodes WHERE id = ?1",
1133 params![id],
1134 graph_node_from_row,
1135 )
1136 .ok()
1137 }
1138
1139 fn graph_neighbors(
1140 &self,
1141 id: &str,
1142 direction: GraphDirection,
1143 limit: usize,
1144 ) -> Vec<(GraphEdge, GraphNode)> {
1145 let conn = self.conn.lock().unwrap();
1146 let limit = limit.clamp(1, 100) as i64;
1147 let sql = match direction {
1148 GraphDirection::Outgoing => {
1149 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1150 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1151 FROM kg_edges e
1152 JOIN kg_nodes n ON n.id = e.to_id
1153 WHERE e.from_id = ?1
1154 ORDER BY e.weight DESC, e.updated_at DESC
1155 LIMIT ?2"
1156 }
1157 GraphDirection::Incoming => {
1158 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1159 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1160 FROM kg_edges e
1161 JOIN kg_nodes n ON n.id = e.from_id
1162 WHERE e.to_id = ?1
1163 ORDER BY e.weight DESC, e.updated_at DESC
1164 LIMIT ?2"
1165 }
1166 GraphDirection::Both => {
1167 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1168 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1169 FROM kg_edges e
1170 JOIN kg_nodes n ON n.id = CASE WHEN e.from_id = ?1 THEN e.to_id ELSE e.from_id END
1171 WHERE e.from_id = ?1 OR e.to_id = ?1
1172 ORDER BY e.weight DESC, e.updated_at DESC
1173 LIMIT ?2"
1174 }
1175 };
1176 let Ok(mut stmt) = conn.prepare(sql) else {
1177 return Vec::new();
1178 };
1179 let Ok(rows) = stmt.query_map(params![id, limit], |row| {
1180 let edge = graph_edge_from_row(row)?;
1181 let properties_json: String = row.get(11)?;
1182 let node = GraphNode {
1183 id: row.get(8)?,
1184 label: row.get(9)?,
1185 kind: row.get(10)?,
1186 properties: serde_json::from_str(&properties_json)
1187 .unwrap_or(serde_json::Value::Null),
1188 created_at: row.get(12)?,
1189 updated_at: row.get(13)?,
1190 };
1191 Ok((edge, node))
1192 }) else {
1193 return Vec::new();
1194 };
1195 rows.filter_map(|row| row.ok()).collect()
1196 }
1197
1198 fn search_graph(&self, query: &str, limit: usize) -> Vec<GraphNode> {
1199 let query = query.trim();
1200 if query.is_empty() {
1201 return Vec::new();
1202 }
1203 let escaped = query
1204 .replace('\\', "\\\\")
1205 .replace('%', "\\%")
1206 .replace('_', "\\_");
1207 let pattern = format!("%{}%", escaped);
1208 let conn = self.conn.lock().unwrap();
1209 let Ok(mut stmt) = conn.prepare(
1210 "SELECT id, label, kind, properties_json, created_at, updated_at
1211 FROM kg_nodes
1212 WHERE id LIKE ?1 ESCAPE '\\'
1213 OR label LIKE ?1 ESCAPE '\\'
1214 OR kind LIKE ?1 ESCAPE '\\'
1215 OR properties_json LIKE ?1 ESCAPE '\\'
1216 ORDER BY updated_at DESC
1217 LIMIT ?2",
1218 ) else {
1219 return Vec::new();
1220 };
1221 let Ok(rows) = stmt.query_map(
1222 params![pattern, limit.clamp(1, 100) as i64],
1223 graph_node_from_row,
1224 ) else {
1225 return Vec::new();
1226 };
1227 rows.filter_map(|row| row.ok()).collect()
1228 }
1229
1230 fn graph_export(&self) -> KnowledgeGraph {
1231 let conn = self.conn.lock().unwrap();
1232 let nodes = conn
1233 .prepare(
1234 "SELECT id, label, kind, properties_json, created_at, updated_at
1235 FROM kg_nodes ORDER BY kind, label",
1236 )
1237 .and_then(|mut stmt| {
1238 let rows = stmt.query_map([], graph_node_from_row)?;
1239 Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1240 })
1241 .unwrap_or_default();
1242 let edges = conn
1243 .prepare(
1244 "SELECT id, from_id, to_id, relation, weight, properties_json, created_at, updated_at
1245 FROM kg_edges ORDER BY relation, from_id, to_id",
1246 )
1247 .and_then(|mut stmt| {
1248 let rows = stmt.query_map([], graph_edge_from_row)?;
1249 Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1250 })
1251 .unwrap_or_default();
1252 KnowledgeGraph { nodes, edges }
1253 }
1254
1255 fn delete_graph_node(&self, id: &str) -> anyhow::Result<()> {
1256 let conn = self.conn.lock().unwrap();
1257 conn.execute(
1258 "DELETE FROM kg_edges WHERE from_id = ?1 OR to_id = ?1",
1259 params![id],
1260 )?;
1261 conn.execute("DELETE FROM kg_nodes WHERE id = ?1", params![id])?;
1262 Ok(())
1263 }
1264
1265 fn delete_graph_edge(&self, id: &str) -> anyhow::Result<()> {
1266 let conn = self.conn.lock().unwrap();
1267 conn.execute("DELETE FROM kg_edges WHERE id = ?1", params![id])?;
1268 Ok(())
1269 }
1270}