1#![allow(
3 clippy::collapsible_if,
4 clippy::collapsible_match,
5 clippy::derivable_impls,
6 clippy::format_in_format_args,
7 clippy::if_same_then_else,
8 clippy::iter_cloned_collect,
9 clippy::manual_clamp,
10 clippy::manual_div_ceil,
11 clippy::manual_is_multiple_of,
12 clippy::manual_pattern_char_comparison,
13 clippy::needless_borrow,
14 clippy::needless_range_loop,
15 clippy::new_without_default,
16 clippy::ptr_arg,
17 clippy::should_implement_trait,
18 clippy::single_match,
19 clippy::type_complexity,
20 clippy::unnecessary_cast,
21 clippy::let_and_return,
22 clippy::useless_conversion,
23 clippy::useless_format,
24 clippy::while_let_loop
25)]
26
27use rusqlite::{Connection, params};
28use serde::{Deserialize, Serialize};
29use std::path::{Path, PathBuf};
30use std::sync::{Arc, Mutex};
31
32use sparrow_core::Identity;
33use sparrow_core::event::RunId;
34use sparrow_providers::Msg;
35use crate::redaction::RedactionFilter;
36
37pub mod redaction;
38#[cfg(feature = "treesitter")]
39pub mod symbol_index;
40
41pub mod cli;
42pub mod fts;
43
44pub const MEMORY_MD_LIMIT: usize = 2200;
45pub const USER_MD_LIMIT: usize = 1375;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct RepoMap {
51 pub root: PathBuf,
52 pub files: Vec<FileEntry>,
53 pub symbols: Vec<SymbolEntry>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct FileEntry {
58 pub path: String,
59 pub size: u64,
60 pub modified: String,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct SymbolEntry {
65 pub file: String,
66 pub name: String,
67 pub kind: String, pub line: u32,
69}
70
71impl RepoMap {
72 pub fn scan(root: &Path) -> Self {
73 let mut files = Vec::new();
74 let mut symbols = Vec::new();
75 scan_dir(root, root, &mut files, &mut symbols);
76 Self {
77 root: root.to_path_buf(),
78 files,
79 symbols,
80 }
81 }
82}
83
84fn scan_dir(base: &Path, dir: &Path, files: &mut Vec<FileEntry>, symbols: &mut Vec<SymbolEntry>) {
85 if let Ok(entries) = std::fs::read_dir(dir) {
86 for entry in entries.flatten() {
87 let path = entry.path();
88 let name = entry.file_name().to_string_lossy().to_string();
89
90 if name.starts_with('.')
92 || name == "node_modules"
93 || name == "target"
94 || name == "build"
95 || name == "dist"
96 {
97 continue;
98 }
99
100 if path.is_dir() {
101 scan_dir(base, &path, files, symbols);
102 } else {
103 let rel = path
104 .strip_prefix(base)
105 .unwrap_or(&path)
106 .to_string_lossy()
107 .to_string();
108
109 let modified = path
110 .metadata()
111 .ok()
112 .and_then(|m| m.modified().ok())
113 .and_then(|t| {
114 chrono::DateTime::from_timestamp(
115 t.duration_since(std::time::UNIX_EPOCH)
116 .unwrap_or_default()
117 .as_secs() as i64,
118 0,
119 )
120 })
121 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
122 .unwrap_or_default();
123
124 files.push(FileEntry {
125 path: rel.clone(),
126 size: path.metadata().map(|m| m.len()).unwrap_or(0),
127 modified,
128 });
129
130 if rel.ends_with(".rs") {
132 if let Ok(content) = std::fs::read_to_string(&path) {
133 for (i, line) in content.lines().enumerate() {
134 let trimmed = line.trim();
135 if trimmed.starts_with("pub fn ") || trimmed.starts_with("fn ") {
136 let name = trimmed
137 .trim_start_matches("pub fn ")
138 .trim_start_matches("fn ")
139 .split('(')
140 .next()
141 .unwrap_or("");
142 symbols.push(SymbolEntry {
143 file: rel.clone(),
144 name: name.to_string(),
145 kind: "fn".into(),
146 line: (i + 1) as u32,
147 });
148 } else if trimmed.starts_with("pub struct ")
149 || trimmed.starts_with("struct ")
150 {
151 let name = trimmed
152 .trim_start_matches("pub struct ")
153 .trim_start_matches("struct ")
154 .split(|c: char| c == '<' || c == '{' || c == '(')
155 .next()
156 .unwrap_or("");
157 symbols.push(SymbolEntry {
158 file: rel.clone(),
159 name: name.to_string(),
160 kind: "struct".into(),
161 line: (i + 1) as u32,
162 });
163 } else if trimmed.starts_with("pub enum ")
164 || trimmed.starts_with("enum ")
165 {
166 let name = trimmed
167 .trim_start_matches("pub enum ")
168 .trim_start_matches("enum ")
169 .split(|c: char| c == '<' || c == '{')
170 .next()
171 .unwrap_or("");
172 symbols.push(SymbolEntry {
173 file: rel.clone(),
174 name: name.to_string(),
175 kind: "enum".into(),
176 line: (i + 1) as u32,
177 });
178 } else if trimmed.starts_with("pub trait ")
179 || trimmed.starts_with("trait ")
180 {
181 let name = trimmed
182 .trim_start_matches("pub trait ")
183 .trim_start_matches("trait ")
184 .split(|c: char| c == '<' || c == '{')
185 .next()
186 .unwrap_or("");
187 symbols.push(SymbolEntry {
188 file: rel.clone(),
189 name: name.to_string(),
190 kind: "trait".into(),
191 line: (i + 1) as u32,
192 });
193 } else if trimmed.starts_with("pub mod ") || trimmed.starts_with("mod ")
194 {
195 let name = trimmed
196 .trim_start_matches("pub mod ")
197 .trim_start_matches("mod ")
198 .split(';')
199 .next()
200 .unwrap_or("");
201 symbols.push(SymbolEntry {
202 file: rel.clone(),
203 name: name.to_string(),
204 kind: "mod".into(),
205 line: (i + 1) as u32,
206 });
207 }
208 }
209 }
210 }
211 }
212 }
213 }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct TaskMem {
220 pub run_id: String,
221 pub messages: Vec<Msg>,
222 pub created_at: String,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct SharedSignal {
229 pub id: String,
230 pub from_agent: String,
231 pub to_agent: String,
232 pub content: String,
233 pub timestamp: String,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct WorkingDoc {
238 pub id: String,
239 pub title: String,
240 pub content: String,
241 pub updated_at: String,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct Fact {
248 pub id: String,
249 pub key: String,
250 pub value: String,
251 pub created_at: String,
252 pub updated_at: String,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
256pub enum MemoryDocKind {
257 Memory,
258 User,
259}
260
261impl MemoryDocKind {
262 pub fn as_str(self) -> &'static str {
263 match self {
264 Self::Memory => "MEMORY.md",
265 Self::User => "USER.md",
266 }
267 }
268
269 pub fn limit(self) -> usize {
270 match self {
271 Self::Memory => MEMORY_MD_LIMIT,
272 Self::User => USER_MD_LIMIT,
273 }
274 }
275
276 pub fn parse(value: &str) -> Option<Self> {
277 match value.trim().to_ascii_lowercase().as_str() {
278 "memory" | "memory.md" => Some(Self::Memory),
279 "user" | "user.md" => Some(Self::User),
280 _ => None,
281 }
282 }
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct MemoryDoc {
287 pub kind: MemoryDocKind,
288 pub content: String,
289 pub updated_at: String,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct MemoryStats {
294 pub facts: usize,
295 pub memory_chars: usize,
296 pub memory_limit: usize,
297 pub user_chars: usize,
298 pub user_limit: usize,
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
302pub struct GraphNode {
303 pub id: String,
304 pub label: String,
305 pub kind: String,
306 #[serde(default)]
307 pub properties: serde_json::Value,
308 pub created_at: String,
309 pub updated_at: String,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
313pub struct GraphEdge {
314 pub id: String,
315 pub from_id: String,
316 pub to_id: String,
317 pub relation: String,
318 pub weight: f64,
319 #[serde(default)]
320 pub properties: serde_json::Value,
321 pub created_at: String,
322 pub updated_at: String,
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize, Default)]
326pub struct KnowledgeGraph {
327 pub nodes: Vec<GraphNode>,
328 pub edges: Vec<GraphEdge>,
329}
330
331#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
332pub enum GraphDirection {
333 Incoming,
334 Outgoing,
335 Both,
336}
337
338impl GraphDirection {
339 pub fn parse(value: &str) -> Self {
340 match value.trim().to_ascii_lowercase().as_str() {
341 "incoming" | "in" => Self::Incoming,
342 "outgoing" | "out" => Self::Outgoing,
343 _ => Self::Both,
344 }
345 }
346}
347
348pub fn validate_memory_text(label: &str, text: &str, limit: usize) -> anyhow::Result<()> {
349 let chars = text.chars().count();
350 if chars > limit {
351 anyhow::bail!("{label} is too large: {chars}/{limit} chars");
352 }
353 if text.chars().any(is_forbidden_invisible_char) {
354 anyhow::bail!("{label} contains invisible control characters");
355 }
356
357 let lower = text.to_ascii_lowercase();
358 let blocked = [
359 "ignore previous instructions",
360 "ignore all previous instructions",
361 "disregard previous instructions",
362 "reveal your system prompt",
363 "print your system prompt",
364 "exfiltrate",
365 "steal secrets",
366 "dump secrets",
367 "print env",
368 "cat ~/.ssh",
369 "backdoor",
370 "reverse shell",
371 "rm -rf /",
372 ];
373 if blocked.iter().any(|needle| lower.contains(needle)) {
374 anyhow::bail!("{label} looks like prompt injection or credential exfiltration");
375 }
376 Ok(())
377}
378
379fn is_forbidden_invisible_char(c: char) -> bool {
380 matches!(
381 c,
382 '\u{200B}'..='\u{200F}'
383 | '\u{202A}'..='\u{202E}'
384 | '\u{2060}'..='\u{206F}'
385 | '\u{FEFF}'
386 ) || (c.is_control() && c != '\n' && c != '\r' && c != '\t')
387}
388
389fn truncate_chars(text: &str, limit: usize) -> String {
390 text.chars().take(limit).collect()
391}
392
393pub trait Memory: Send + Sync {
396 fn repo_map(&self, root: &Path) -> RepoMap;
397 fn identity(&self, agent: &str) -> Option<Identity>;
398 fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()>;
399 fn task(&self, run: &RunId) -> Option<TaskMem>;
400 fn save_task(&self, task: &TaskMem) -> anyhow::Result<()>;
401 fn shared_signals(&self) -> Vec<SharedSignal>;
402 fn shared_docs(&self) -> Vec<WorkingDoc>;
403 fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()>;
404 fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()>;
405 fn remember(&self, fact: Fact) -> anyhow::Result<()>;
406 fn recall(&self, q: &str, k: usize) -> Vec<Fact>;
407 fn all_facts(&self) -> Vec<Fact>;
408 fn forget(&self, id: &str) -> anyhow::Result<()>;
409 fn memory_doc(&self, _kind: MemoryDocKind) -> Option<MemoryDoc> {
410 None
411 }
412 fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
413 anyhow::bail!("memory documents are not supported by this memory backend")
414 }
415 fn remove_memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<()> {
416 anyhow::bail!("memory documents are not supported by this memory backend")
417 }
418 fn memory_stats(&self) -> MemoryStats {
419 MemoryStats {
420 facts: self.all_facts().len(),
421 memory_chars: 0,
422 memory_limit: MEMORY_MD_LIMIT,
423 user_chars: 0,
424 user_limit: USER_MD_LIMIT,
425 }
426 }
427 fn consolidate_memory(&self) -> anyhow::Result<()> {
428 anyhow::bail!("memory consolidation is not supported by this memory backend")
429 }
430 fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()>;
431 fn get_discovered_models(&self, provider_id: &str) -> Vec<String>;
432 fn upsert_graph_node(&self, _node: GraphNode) -> anyhow::Result<()> {
433 anyhow::bail!("knowledge graph is not supported by this memory backend")
434 }
435 fn upsert_graph_edge(&self, _edge: GraphEdge) -> anyhow::Result<()> {
436 anyhow::bail!("knowledge graph is not supported by this memory backend")
437 }
438 fn graph_node(&self, _id: &str) -> Option<GraphNode> {
439 None
440 }
441 fn graph_neighbors(
442 &self,
443 _id: &str,
444 _direction: GraphDirection,
445 _limit: usize,
446 ) -> Vec<(GraphEdge, GraphNode)> {
447 Vec::new()
448 }
449 fn search_graph(&self, _query: &str, _limit: usize) -> Vec<GraphNode> {
450 Vec::new()
451 }
452 fn graph_export(&self) -> KnowledgeGraph {
453 KnowledgeGraph::default()
454 }
455 fn delete_graph_node(&self, _id: &str) -> anyhow::Result<()> {
456 anyhow::bail!("knowledge graph is not supported by this memory backend")
457 }
458 fn delete_graph_edge(&self, _id: &str) -> anyhow::Result<()> {
459 anyhow::bail!("knowledge graph is not supported by this memory backend")
460 }
461}
462
463pub trait MemoryProvider: Send + Sync {
464 fn name(&self) -> &str;
465 fn remember(&self, fact: Fact) -> anyhow::Result<()>;
466 fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>>;
467 fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>>;
468 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()>;
469}
470
471pub struct LocalMemoryProvider {
472 memory: Arc<dyn Memory>,
473}
474
475impl LocalMemoryProvider {
476 pub fn new(memory: Arc<dyn Memory>) -> Self {
477 Self { memory }
478 }
479}
480
481impl MemoryProvider for LocalMemoryProvider {
482 fn name(&self) -> &str {
483 "local"
484 }
485
486 fn remember(&self, fact: Fact) -> anyhow::Result<()> {
487 self.memory.remember(fact)
488 }
489
490 fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>> {
491 Ok(self.memory.recall(query, limit))
492 }
493
494 fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
495 Ok(self.memory.memory_doc(kind))
496 }
497
498 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
499 self.memory.upsert_memory_doc(kind, content)
500 }
501}
502
503pub struct ExternalMemoryProvider {
504 name: String,
505}
506
507impl ExternalMemoryProvider {
508 pub fn mem0() -> Self {
509 Self {
510 name: "mem0".into(),
511 }
512 }
513
514 pub fn honcho() -> Self {
515 Self {
516 name: "honcho".into(),
517 }
518 }
519
520 pub fn supermemory() -> Self {
521 Self {
522 name: "supermemory".into(),
523 }
524 }
525
526 fn not_configured(&self) -> anyhow::Error {
527 anyhow::anyhow!(
528 "external memory provider '{}' is not configured; use local SQLite memory or configure a connector first",
529 self.name
530 )
531 }
532}
533
534impl MemoryProvider for ExternalMemoryProvider {
535 fn name(&self) -> &str {
536 &self.name
537 }
538
539 fn remember(&self, _fact: Fact) -> anyhow::Result<()> {
540 Err(self.not_configured())
541 }
542
543 fn recall(&self, _query: &str, _limit: usize) -> anyhow::Result<Vec<Fact>> {
544 Err(self.not_configured())
545 }
546
547 fn memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
548 Err(self.not_configured())
549 }
550
551 fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
552 Err(self.not_configured())
553 }
554}
555
556pub struct SqliteMemory {
559 conn: Mutex<Connection>,
560}
561
562impl SqliteMemory {
563 pub fn open(db_path: &Path) -> anyhow::Result<Self> {
564 if let Some(parent) = db_path.parent() {
565 std::fs::create_dir_all(parent)?;
566 }
567 let conn = Connection::open(db_path)?;
568 let memory = Self {
569 conn: Mutex::new(conn),
570 };
571 memory.migrate()?;
572 Ok(memory)
573 }
574
575 fn migrate(&self) -> anyhow::Result<()> {
576 let conn = self.conn.lock().unwrap();
577 conn.execute_batch(
578 "
579 CREATE TABLE IF NOT EXISTS identities (
580 agent TEXT PRIMARY KEY,
581 name TEXT NOT NULL,
582 role TEXT NOT NULL,
583 personality TEXT NOT NULL
584 );
585 CREATE TABLE IF NOT EXISTS tasks (
586 run_id TEXT PRIMARY KEY,
587 messages_json TEXT NOT NULL,
588 created_at TEXT NOT NULL DEFAULT (datetime('now'))
589 );
590 CREATE TABLE IF NOT EXISTS signals (
591 id TEXT PRIMARY KEY,
592 from_agent TEXT NOT NULL,
593 to_agent TEXT NOT NULL,
594 content TEXT NOT NULL,
595 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
596 );
597 CREATE TABLE IF NOT EXISTS working_docs (
598 id TEXT PRIMARY KEY,
599 title TEXT NOT NULL,
600 content TEXT NOT NULL,
601 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
602 );
603 CREATE TABLE IF NOT EXISTS facts (
604 id TEXT PRIMARY KEY,
605 key TEXT NOT NULL UNIQUE,
606 value TEXT NOT NULL,
607 created_at TEXT NOT NULL DEFAULT (datetime('now')),
608 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
609 );
610 CREATE TABLE IF NOT EXISTS discovered_models (
611 provider_id TEXT NOT NULL,
612 model_name TEXT NOT NULL,
613 fetched_at TEXT NOT NULL,
614 PRIMARY KEY (provider_id, model_name)
615 );
616 CREATE TABLE IF NOT EXISTS memory_docs (
617 kind TEXT PRIMARY KEY,
618 content TEXT NOT NULL,
619 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
620 );
621 CREATE TABLE IF NOT EXISTS kg_nodes (
622 id TEXT PRIMARY KEY,
623 label TEXT NOT NULL,
624 kind TEXT NOT NULL,
625 properties_json TEXT NOT NULL DEFAULT '{}',
626 created_at TEXT NOT NULL DEFAULT (datetime('now')),
627 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
628 );
629 CREATE TABLE IF NOT EXISTS kg_edges (
630 id TEXT PRIMARY KEY,
631 from_id TEXT NOT NULL,
632 to_id TEXT NOT NULL,
633 relation TEXT NOT NULL,
634 weight REAL NOT NULL DEFAULT 1.0,
635 properties_json TEXT NOT NULL DEFAULT '{}',
636 created_at TEXT NOT NULL DEFAULT (datetime('now')),
637 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
638 FOREIGN KEY(from_id) REFERENCES kg_nodes(id) ON DELETE CASCADE,
639 FOREIGN KEY(to_id) REFERENCES kg_nodes(id) ON DELETE CASCADE
640 );
641 CREATE INDEX IF NOT EXISTS kg_nodes_label_idx ON kg_nodes(label);
642 CREATE INDEX IF NOT EXISTS kg_nodes_kind_idx ON kg_nodes(kind);
643 CREATE INDEX IF NOT EXISTS kg_edges_from_idx ON kg_edges(from_id);
644 CREATE INDEX IF NOT EXISTS kg_edges_to_idx ON kg_edges(to_id);
645 CREATE INDEX IF NOT EXISTS kg_edges_relation_idx ON kg_edges(relation);
646 -- FTS5 full-text search for memory recall (M1)
647 CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts USING fts5(
648 key, value, content='facts', content_rowid='rowid'
649 );
650 -- Triggers to keep FTS5 index in sync
651 CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
652 INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
653 END;
654 CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
655 INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
656 END;
657 CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN
658 INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
659 INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
660 END;
661 ",
662 )?;
663 Ok(())
664 }
665}
666
667fn graph_node_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphNode> {
668 let properties_json: String = row.get(3)?;
669 Ok(GraphNode {
670 id: row.get(0)?,
671 label: row.get(1)?,
672 kind: row.get(2)?,
673 properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
674 created_at: row.get(4)?,
675 updated_at: row.get(5)?,
676 })
677}
678
679fn graph_edge_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphEdge> {
680 let properties_json: String = row.get(5)?;
681 Ok(GraphEdge {
682 id: row.get(0)?,
683 from_id: row.get(1)?,
684 to_id: row.get(2)?,
685 relation: row.get(3)?,
686 weight: row.get(4)?,
687 properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
688 created_at: row.get(6)?,
689 updated_at: row.get(7)?,
690 })
691}
692
693impl Memory for SqliteMemory {
694 fn repo_map(&self, root: &Path) -> RepoMap {
695 RepoMap::scan(root)
696 }
697
698 fn identity(&self, agent: &str) -> Option<Identity> {
699 let conn = self.conn.lock().unwrap();
700 conn.query_row(
701 "SELECT name, role, personality FROM identities WHERE agent = ?1",
702 params![agent],
703 |row| {
704 Ok(Identity {
705 name: row.get(0)?,
706 role: row.get(1)?,
707 personality: row.get(2)?,
708 })
709 },
710 )
711 .ok()
712 }
713
714 fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()> {
715 let conn = self.conn.lock().unwrap();
716 conn.execute(
717 "INSERT OR REPLACE INTO identities (agent, name, role, personality) VALUES (?1, ?2, ?3, ?4)",
718 params![agent, identity.name, identity.role, identity.personality],
719 )?;
720 Ok(())
721 }
722
723 fn task(&self, run: &RunId) -> Option<TaskMem> {
724 let conn = self.conn.lock().unwrap();
725 conn.query_row(
726 "SELECT run_id, messages_json, created_at FROM tasks WHERE run_id = ?1",
727 params![run.0],
728 |row| {
729 let messages_json: String = row.get(1)?;
730 let messages: Vec<Msg> = serde_json::from_str(&messages_json).unwrap_or_default();
731 Ok(TaskMem {
732 run_id: row.get(0)?,
733 messages,
734 created_at: row.get(2)?,
735 })
736 },
737 )
738 .ok()
739 }
740
741 fn save_task(&self, task: &TaskMem) -> anyhow::Result<()> {
742 let conn = self.conn.lock().unwrap();
743 let messages_json = serde_json::to_string(&task.messages)?;
744 conn.execute(
745 "INSERT OR REPLACE INTO tasks (run_id, messages_json, created_at) VALUES (?1, ?2, ?3)",
746 params![task.run_id, messages_json, task.created_at],
747 )?;
748 Ok(())
749 }
750
751 fn shared_signals(&self) -> Vec<SharedSignal> {
752 let conn = self.conn.lock().unwrap();
753 let mut stmt = conn
754 .prepare("SELECT id, from_agent, to_agent, content, timestamp FROM signals ORDER BY timestamp DESC")
755 .unwrap();
756 let signals = stmt
757 .query_map([], |row| {
758 Ok(SharedSignal {
759 id: row.get(0)?,
760 from_agent: row.get(1)?,
761 to_agent: row.get(2)?,
762 content: row.get(3)?,
763 timestamp: row.get(4)?,
764 })
765 })
766 .unwrap()
767 .filter_map(|r| r.ok())
768 .collect();
769 signals
770 }
771
772 fn shared_docs(&self) -> Vec<WorkingDoc> {
773 let conn = self.conn.lock().unwrap();
774 let mut stmt = conn
775 .prepare(
776 "SELECT id, title, content, updated_at FROM working_docs ORDER BY updated_at DESC",
777 )
778 .unwrap();
779 let docs = stmt
780 .query_map([], |row| {
781 Ok(WorkingDoc {
782 id: row.get(0)?,
783 title: row.get(1)?,
784 content: row.get(2)?,
785 updated_at: row.get(3)?,
786 })
787 })
788 .unwrap()
789 .filter_map(|r| r.ok())
790 .collect();
791 docs
792 }
793
794 fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()> {
795 let conn = self.conn.lock().unwrap();
796 conn.execute(
797 "INSERT INTO signals (id, from_agent, to_agent, content, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
798 params![signal.id, signal.from_agent, signal.to_agent, signal.content, signal.timestamp],
799 )?;
800 Ok(())
801 }
802
803 fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()> {
804 let conn = self.conn.lock().unwrap();
805 conn.execute(
806 "INSERT OR REPLACE INTO working_docs (id, title, content, updated_at) VALUES (?1, ?2, ?3, ?4)",
807 params![doc.id, doc.title, doc.content, doc.updated_at],
808 )?;
809 Ok(())
810 }
811
812 fn remember(&self, fact: Fact) -> anyhow::Result<()> {
813 let redaction = RedactionFilter::new();
814 validate_memory_text("fact key", &fact.key, 256)?;
815 validate_memory_text("fact value", &fact.value, 1200)?;
816 let safe_value = redaction.redact_str(&fact.value);
817 let safe_key = redaction.redact_str(&fact.key);
818 if redaction.contains_secret(&fact.value) {
819 tracing::warn!("Redacted secret from memory fact: {}", fact.key);
820 }
821 let conn = self.conn.lock().unwrap();
822 let existing: Option<String> = conn
823 .query_row(
824 "SELECT id FROM facts WHERE key = ?1",
825 params![safe_key.clone()],
826 |row| row.get(0),
827 )
828 .ok();
829 if existing.as_deref().is_some_and(|id| id != fact.id) {
830 anyhow::bail!(
831 "memory fact '{}' already exists; use replace/consolidate",
832 safe_key
833 );
834 }
835 conn.execute(
836 "INSERT OR REPLACE INTO facts (id, key, value, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)",
837 params![fact.id, safe_key, safe_value, fact.created_at, fact.updated_at],
838 )?;
839 Ok(())
840 }
841
842 fn recall(&self, q: &str, k: usize) -> Vec<Fact> {
843 let conn = self.conn.lock().unwrap();
844
845 let tokens: Vec<String> = q
851 .split_whitespace()
852 .map(|w| {
853 w.chars()
854 .filter(|c| c.is_alphanumeric() || *c == '_')
855 .collect::<String>()
856 })
857 .filter(|w| !w.is_empty())
858 .map(|w| format!("\"{}\"*", w))
859 .collect();
860
861 let try_fts = !tokens.is_empty();
863 if try_fts {
864 let pattern = tokens.join(" ");
865 if let Ok(mut stmt) = conn.prepare(
866 "SELECT f.id, f.key, f.value, f.created_at, f.updated_at FROM facts f
867 INNER JOIN facts_fts ft ON f.rowid = ft.rowid
868 WHERE facts_fts MATCH ?1 ORDER BY rank LIMIT ?2",
869 ) {
870 if let Ok(rows) = stmt.query_map(params![pattern, k as i64], |row| {
871 Ok(Fact {
872 id: row.get(0)?,
873 key: row.get(1)?,
874 value: row.get(2)?,
875 created_at: row.get(3)?,
876 updated_at: row.get(4)?,
877 })
878 }) {
879 let facts: Vec<Fact> = rows.filter_map(|r| r.ok()).collect();
880 if !facts.is_empty() {
881 return facts;
882 }
883 }
886 }
887 }
888
889 let escaped = q
892 .replace('\\', "\\\\")
893 .replace('%', "\\%")
894 .replace('_', "\\_");
895 let like_pattern = format!("%{}%", escaped);
896 let Ok(mut stmt) = conn.prepare(
897 "SELECT id, key, value, created_at, updated_at FROM facts \
898 WHERE key LIKE ?1 ESCAPE '\\' OR value LIKE ?1 ESCAPE '\\' LIMIT ?2",
899 ) else {
900 return Vec::new();
901 };
902 let Ok(rows) = stmt.query_map(params![like_pattern, k as i64], |row| {
903 Ok(Fact {
904 id: row.get(0)?,
905 key: row.get(1)?,
906 value: row.get(2)?,
907 created_at: row.get(3)?,
908 updated_at: row.get(4)?,
909 })
910 }) else {
911 return Vec::new();
912 };
913 rows.filter_map(|r| r.ok()).collect()
914 }
915
916 fn all_facts(&self) -> Vec<Fact> {
917 let conn = self.conn.lock().unwrap();
918 let mut stmt = conn
919 .prepare(
920 "SELECT id, key, value, created_at, updated_at FROM facts ORDER BY updated_at DESC",
921 )
922 .unwrap();
923 stmt.query_map([], |row| {
924 Ok(Fact {
925 id: row.get(0)?,
926 key: row.get(1)?,
927 value: row.get(2)?,
928 created_at: row.get(3)?,
929 updated_at: row.get(4)?,
930 })
931 })
932 .unwrap()
933 .filter_map(|r| r.ok())
934 .collect()
935 }
936
937 fn forget(&self, id: &str) -> anyhow::Result<()> {
938 let conn = self.conn.lock().unwrap();
939 conn.execute("DELETE FROM facts WHERE id = ?1", params![id])?;
940 Ok(())
941 }
942
943 fn memory_doc(&self, kind: MemoryDocKind) -> Option<MemoryDoc> {
944 let conn = self.conn.lock().unwrap();
945 conn.query_row(
946 "SELECT content, updated_at FROM memory_docs WHERE kind = ?1",
947 params![kind.as_str()],
948 |row| {
949 Ok(MemoryDoc {
950 kind,
951 content: row.get(0)?,
952 updated_at: row.get(1)?,
953 })
954 },
955 )
956 .ok()
957 }
958
959 fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
960 validate_memory_text(kind.as_str(), content, kind.limit())?;
961 let conn = self.conn.lock().unwrap();
962 conn.execute(
963 "INSERT OR REPLACE INTO memory_docs (kind, content, updated_at)
964 VALUES (?1, ?2, datetime('now'))",
965 params![kind.as_str(), content],
966 )?;
967 Ok(())
968 }
969
970 fn remove_memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<()> {
971 let conn = self.conn.lock().unwrap();
972 conn.execute(
973 "DELETE FROM memory_docs WHERE kind = ?1",
974 params![kind.as_str()],
975 )?;
976 Ok(())
977 }
978
979 fn memory_stats(&self) -> MemoryStats {
980 let memory_chars = self
981 .memory_doc(MemoryDocKind::Memory)
982 .map(|doc| doc.content.chars().count())
983 .unwrap_or(0);
984 let user_chars = self
985 .memory_doc(MemoryDocKind::User)
986 .map(|doc| doc.content.chars().count())
987 .unwrap_or(0);
988 MemoryStats {
989 facts: self.all_facts().len(),
990 memory_chars,
991 memory_limit: MEMORY_MD_LIMIT,
992 user_chars,
993 user_limit: USER_MD_LIMIT,
994 }
995 }
996
997 fn consolidate_memory(&self) -> anyhow::Result<()> {
998 let facts = self.all_facts();
999 let mut memory_lines = Vec::new();
1000 memory_lines.push("# MEMORY.md".to_string());
1001 memory_lines.push("Durable Sparrow memory distilled from accepted facts.".to_string());
1002 for fact in facts.iter().take(40) {
1003 memory_lines.push(format!("- {}: {}", fact.key, fact.value));
1004 }
1005 let memory = truncate_chars(&memory_lines.join("\n"), MEMORY_MD_LIMIT);
1006 self.upsert_memory_doc(MemoryDocKind::Memory, &memory)?;
1007
1008 let mut user_lines = Vec::new();
1009 user_lines.push("# USER.md".to_string());
1010 for fact in facts
1011 .iter()
1012 .filter(|fact| fact.key.starts_with("user"))
1013 .take(24)
1014 {
1015 user_lines.push(format!("- {}: {}", fact.key, fact.value));
1016 }
1017 if user_lines.len() > 1 {
1018 let user = truncate_chars(&user_lines.join("\n"), USER_MD_LIMIT);
1019 self.upsert_memory_doc(MemoryDocKind::User, &user)?;
1020 }
1021 Ok(())
1022 }
1023
1024 fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()> {
1025 let mut conn = self.conn.lock().unwrap();
1026 let tx = conn.transaction()?;
1027 tx.execute(
1028 "DELETE FROM discovered_models WHERE provider_id = ?1",
1029 params![provider_id],
1030 )?;
1031 let fetched_at = chrono::Utc::now().to_rfc3339();
1032 for model in models {
1033 let model = model.trim();
1034 if model.is_empty() {
1035 continue;
1036 }
1037 tx.execute(
1038 "INSERT OR REPLACE INTO discovered_models (provider_id, model_name, fetched_at)
1039 VALUES (?1, ?2, ?3)",
1040 params![provider_id, model, fetched_at],
1041 )?;
1042 }
1043 tx.commit()?;
1044 Ok(())
1045 }
1046
1047 fn get_discovered_models(&self, provider_id: &str) -> Vec<String> {
1048 let conn = self.conn.lock().unwrap();
1049 let Ok(mut stmt) = conn.prepare(
1054 "SELECT model_name FROM discovered_models
1055 WHERE provider_id = ?1
1056 AND datetime(fetched_at) >= datetime('now', '-30 days')
1057 ORDER BY model_name ASC",
1058 ) else {
1059 return Vec::new();
1060 };
1061 let Ok(rows) = stmt.query_map(params![provider_id], |row| row.get::<_, String>(0)) else {
1062 return Vec::new();
1063 };
1064 rows.filter_map(|row| row.ok()).collect()
1065 }
1066
1067 fn upsert_graph_node(&self, node: GraphNode) -> anyhow::Result<()> {
1068 validate_memory_text("graph node id", &node.id, 160)?;
1069 validate_memory_text("graph node label", &node.label, 512)?;
1070 validate_memory_text("graph node kind", &node.kind, 80)?;
1071 validate_memory_text("graph node properties", &node.properties.to_string(), 4000)?;
1072 let redaction = RedactionFilter::new();
1073 let safe_id = redaction.redact_str(&node.id);
1074 let safe_label = redaction.redact_str(&node.label);
1075 let safe_kind = redaction.redact_str(&node.kind);
1076 let safe_properties = serde_json::to_string(&node.properties)?;
1077 let safe_properties = redaction.redact_str(&safe_properties);
1078 let now = chrono::Utc::now().to_rfc3339();
1079 let created_at = if node.created_at.trim().is_empty() {
1080 now.clone()
1081 } else {
1082 node.created_at
1083 };
1084 let updated_at = if node.updated_at.trim().is_empty() {
1085 now
1086 } else {
1087 node.updated_at
1088 };
1089 let conn = self.conn.lock().unwrap();
1090 conn.execute(
1091 "INSERT INTO kg_nodes (id, label, kind, properties_json, created_at, updated_at)
1092 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1093 ON CONFLICT(id) DO UPDATE SET
1094 label = excluded.label,
1095 kind = excluded.kind,
1096 properties_json = excluded.properties_json,
1097 updated_at = excluded.updated_at",
1098 params![
1099 safe_id,
1100 safe_label,
1101 safe_kind,
1102 safe_properties,
1103 created_at,
1104 updated_at
1105 ],
1106 )?;
1107 Ok(())
1108 }
1109
1110 fn upsert_graph_edge(&self, edge: GraphEdge) -> anyhow::Result<()> {
1111 validate_memory_text("graph edge id", &edge.id, 160)?;
1112 validate_memory_text("graph edge from_id", &edge.from_id, 160)?;
1113 validate_memory_text("graph edge to_id", &edge.to_id, 160)?;
1114 validate_memory_text("graph edge relation", &edge.relation, 120)?;
1115 validate_memory_text("graph edge properties", &edge.properties.to_string(), 4000)?;
1116 let redaction = RedactionFilter::new();
1117 let safe_properties = serde_json::to_string(&edge.properties)?;
1118 let safe_properties = redaction.redact_str(&safe_properties);
1119 let now = chrono::Utc::now().to_rfc3339();
1120 let created_at = if edge.created_at.trim().is_empty() {
1121 now.clone()
1122 } else {
1123 edge.created_at
1124 };
1125 let updated_at = if edge.updated_at.trim().is_empty() {
1126 now
1127 } else {
1128 edge.updated_at
1129 };
1130 let conn = self.conn.lock().unwrap();
1131 conn.execute(
1132 "INSERT INTO kg_edges (id, from_id, to_id, relation, weight, properties_json, created_at, updated_at)
1133 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1134 ON CONFLICT(id) DO UPDATE SET
1135 from_id = excluded.from_id,
1136 to_id = excluded.to_id,
1137 relation = excluded.relation,
1138 weight = excluded.weight,
1139 properties_json = excluded.properties_json,
1140 updated_at = excluded.updated_at",
1141 params![
1142 redaction.redact_str(&edge.id),
1143 redaction.redact_str(&edge.from_id),
1144 redaction.redact_str(&edge.to_id),
1145 redaction.redact_str(&edge.relation),
1146 edge.weight,
1147 safe_properties,
1148 created_at,
1149 updated_at
1150 ],
1151 )?;
1152 Ok(())
1153 }
1154
1155 fn graph_node(&self, id: &str) -> Option<GraphNode> {
1156 let conn = self.conn.lock().unwrap();
1157 conn.query_row(
1158 "SELECT id, label, kind, properties_json, created_at, updated_at
1159 FROM kg_nodes WHERE id = ?1",
1160 params![id],
1161 graph_node_from_row,
1162 )
1163 .ok()
1164 }
1165
1166 fn graph_neighbors(
1167 &self,
1168 id: &str,
1169 direction: GraphDirection,
1170 limit: usize,
1171 ) -> Vec<(GraphEdge, GraphNode)> {
1172 let conn = self.conn.lock().unwrap();
1173 let limit = limit.clamp(1, 100) as i64;
1174 let sql = match direction {
1175 GraphDirection::Outgoing => {
1176 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1177 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1178 FROM kg_edges e
1179 JOIN kg_nodes n ON n.id = e.to_id
1180 WHERE e.from_id = ?1
1181 ORDER BY e.weight DESC, e.updated_at DESC
1182 LIMIT ?2"
1183 }
1184 GraphDirection::Incoming => {
1185 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1186 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1187 FROM kg_edges e
1188 JOIN kg_nodes n ON n.id = e.from_id
1189 WHERE e.to_id = ?1
1190 ORDER BY e.weight DESC, e.updated_at DESC
1191 LIMIT ?2"
1192 }
1193 GraphDirection::Both => {
1194 "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1195 n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1196 FROM kg_edges e
1197 JOIN kg_nodes n ON n.id = CASE WHEN e.from_id = ?1 THEN e.to_id ELSE e.from_id END
1198 WHERE e.from_id = ?1 OR e.to_id = ?1
1199 ORDER BY e.weight DESC, e.updated_at DESC
1200 LIMIT ?2"
1201 }
1202 };
1203 let Ok(mut stmt) = conn.prepare(sql) else {
1204 return Vec::new();
1205 };
1206 let Ok(rows) = stmt.query_map(params![id, limit], |row| {
1207 let edge = graph_edge_from_row(row)?;
1208 let properties_json: String = row.get(11)?;
1209 let node = GraphNode {
1210 id: row.get(8)?,
1211 label: row.get(9)?,
1212 kind: row.get(10)?,
1213 properties: serde_json::from_str(&properties_json)
1214 .unwrap_or(serde_json::Value::Null),
1215 created_at: row.get(12)?,
1216 updated_at: row.get(13)?,
1217 };
1218 Ok((edge, node))
1219 }) else {
1220 return Vec::new();
1221 };
1222 rows.filter_map(|row| row.ok()).collect()
1223 }
1224
1225 fn search_graph(&self, query: &str, limit: usize) -> Vec<GraphNode> {
1226 let query = query.trim();
1227 if query.is_empty() {
1228 return Vec::new();
1229 }
1230 let escaped = query
1231 .replace('\\', "\\\\")
1232 .replace('%', "\\%")
1233 .replace('_', "\\_");
1234 let pattern = format!("%{}%", escaped);
1235 let conn = self.conn.lock().unwrap();
1236 let Ok(mut stmt) = conn.prepare(
1237 "SELECT id, label, kind, properties_json, created_at, updated_at
1238 FROM kg_nodes
1239 WHERE id LIKE ?1 ESCAPE '\\'
1240 OR label LIKE ?1 ESCAPE '\\'
1241 OR kind LIKE ?1 ESCAPE '\\'
1242 OR properties_json LIKE ?1 ESCAPE '\\'
1243 ORDER BY updated_at DESC
1244 LIMIT ?2",
1245 ) else {
1246 return Vec::new();
1247 };
1248 let Ok(rows) = stmt.query_map(
1249 params![pattern, limit.clamp(1, 100) as i64],
1250 graph_node_from_row,
1251 ) else {
1252 return Vec::new();
1253 };
1254 rows.filter_map(|row| row.ok()).collect()
1255 }
1256
1257 fn graph_export(&self) -> KnowledgeGraph {
1258 let conn = self.conn.lock().unwrap();
1259 let nodes = conn
1260 .prepare(
1261 "SELECT id, label, kind, properties_json, created_at, updated_at
1262 FROM kg_nodes ORDER BY kind, label",
1263 )
1264 .and_then(|mut stmt| {
1265 let rows = stmt.query_map([], graph_node_from_row)?;
1266 Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1267 })
1268 .unwrap_or_default();
1269 let edges = conn
1270 .prepare(
1271 "SELECT id, from_id, to_id, relation, weight, properties_json, created_at, updated_at
1272 FROM kg_edges ORDER BY relation, from_id, to_id",
1273 )
1274 .and_then(|mut stmt| {
1275 let rows = stmt.query_map([], graph_edge_from_row)?;
1276 Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1277 })
1278 .unwrap_or_default();
1279 KnowledgeGraph { nodes, edges }
1280 }
1281
1282 fn delete_graph_node(&self, id: &str) -> anyhow::Result<()> {
1283 let conn = self.conn.lock().unwrap();
1284 conn.execute(
1285 "DELETE FROM kg_edges WHERE from_id = ?1 OR to_id = ?1",
1286 params![id],
1287 )?;
1288 conn.execute("DELETE FROM kg_nodes WHERE id = ?1", params![id])?;
1289 Ok(())
1290 }
1291
1292 fn delete_graph_edge(&self, id: &str) -> anyhow::Result<()> {
1293 let conn = self.conn.lock().unwrap();
1294 conn.execute("DELETE FROM kg_edges WHERE id = ?1", params![id])?;
1295 Ok(())
1296 }
1297}