Skip to main content

sparrow/memory/
mod.rs

1use rusqlite::{Connection, params};
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5
6use crate::engine::Identity;
7use crate::event::RunId;
8use crate::provider::Msg;
9use crate::redaction::RedactionFilter;
10
11#[cfg(feature = "treesitter")]
12pub mod symbol_index;
13
14pub const MEMORY_MD_LIMIT: usize = 2200;
15pub const USER_MD_LIMIT: usize = 1375;
16
17// ─── Repo map ───────────────────────────────────────────────────────────────────
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct RepoMap {
21    pub root: PathBuf,
22    pub files: Vec<FileEntry>,
23    pub symbols: Vec<SymbolEntry>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct FileEntry {
28    pub path: String,
29    pub size: u64,
30    pub modified: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct SymbolEntry {
35    pub file: String,
36    pub name: String,
37    pub kind: String, // "fn", "struct", "impl", "mod", etc.
38    pub line: u32,
39}
40
41impl RepoMap {
42    pub fn scan(root: &Path) -> Self {
43        let mut files = Vec::new();
44        let mut symbols = Vec::new();
45        scan_dir(root, root, &mut files, &mut symbols);
46        Self {
47            root: root.to_path_buf(),
48            files,
49            symbols,
50        }
51    }
52}
53
54fn scan_dir(base: &Path, dir: &Path, files: &mut Vec<FileEntry>, symbols: &mut Vec<SymbolEntry>) {
55    if let Ok(entries) = std::fs::read_dir(dir) {
56        for entry in entries.flatten() {
57            let path = entry.path();
58            let name = entry.file_name().to_string_lossy().to_string();
59
60            // Skip hidden, node_modules, target, .git
61            if name.starts_with('.')
62                || name == "node_modules"
63                || name == "target"
64                || name == "build"
65                || name == "dist"
66            {
67                continue;
68            }
69
70            if path.is_dir() {
71                scan_dir(base, &path, files, symbols);
72            } else {
73                let rel = path
74                    .strip_prefix(base)
75                    .unwrap_or(&path)
76                    .to_string_lossy()
77                    .to_string();
78
79                let modified = path
80                    .metadata()
81                    .ok()
82                    .and_then(|m| m.modified().ok())
83                    .and_then(|t| {
84                        chrono::DateTime::from_timestamp(
85                            t.duration_since(std::time::UNIX_EPOCH)
86                                .unwrap_or_default()
87                                .as_secs() as i64,
88                            0,
89                        )
90                    })
91                    .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
92                    .unwrap_or_default();
93
94                files.push(FileEntry {
95                    path: rel.clone(),
96                    size: path.metadata().map(|m| m.len()).unwrap_or(0),
97                    modified,
98                });
99
100                // Basic symbol extraction for Rust files
101                if rel.ends_with(".rs") {
102                    if let Ok(content) = std::fs::read_to_string(&path) {
103                        for (i, line) in content.lines().enumerate() {
104                            let trimmed = line.trim();
105                            if trimmed.starts_with("pub fn ") || trimmed.starts_with("fn ") {
106                                let name = trimmed
107                                    .trim_start_matches("pub fn ")
108                                    .trim_start_matches("fn ")
109                                    .split('(')
110                                    .next()
111                                    .unwrap_or("");
112                                symbols.push(SymbolEntry {
113                                    file: rel.clone(),
114                                    name: name.to_string(),
115                                    kind: "fn".into(),
116                                    line: (i + 1) as u32,
117                                });
118                            } else if trimmed.starts_with("pub struct ")
119                                || trimmed.starts_with("struct ")
120                            {
121                                let name = trimmed
122                                    .trim_start_matches("pub struct ")
123                                    .trim_start_matches("struct ")
124                                    .split(|c: char| c == '<' || c == '{' || c == '(')
125                                    .next()
126                                    .unwrap_or("");
127                                symbols.push(SymbolEntry {
128                                    file: rel.clone(),
129                                    name: name.to_string(),
130                                    kind: "struct".into(),
131                                    line: (i + 1) as u32,
132                                });
133                            } else if trimmed.starts_with("pub enum ")
134                                || trimmed.starts_with("enum ")
135                            {
136                                let name = trimmed
137                                    .trim_start_matches("pub enum ")
138                                    .trim_start_matches("enum ")
139                                    .split(|c: char| c == '<' || c == '{')
140                                    .next()
141                                    .unwrap_or("");
142                                symbols.push(SymbolEntry {
143                                    file: rel.clone(),
144                                    name: name.to_string(),
145                                    kind: "enum".into(),
146                                    line: (i + 1) as u32,
147                                });
148                            } else if trimmed.starts_with("pub trait ")
149                                || trimmed.starts_with("trait ")
150                            {
151                                let name = trimmed
152                                    .trim_start_matches("pub trait ")
153                                    .trim_start_matches("trait ")
154                                    .split(|c: char| c == '<' || c == '{')
155                                    .next()
156                                    .unwrap_or("");
157                                symbols.push(SymbolEntry {
158                                    file: rel.clone(),
159                                    name: name.to_string(),
160                                    kind: "trait".into(),
161                                    line: (i + 1) as u32,
162                                });
163                            } else if trimmed.starts_with("pub mod ") || trimmed.starts_with("mod ")
164                            {
165                                let name = trimmed
166                                    .trim_start_matches("pub mod ")
167                                    .trim_start_matches("mod ")
168                                    .split(';')
169                                    .next()
170                                    .unwrap_or("");
171                                symbols.push(SymbolEntry {
172                                    file: rel.clone(),
173                                    name: name.to_string(),
174                                    kind: "mod".into(),
175                                    line: (i + 1) as u32,
176                                });
177                            }
178                        }
179                    }
180                }
181            }
182        }
183    }
184}
185
186// ─── Task memory ────────────────────────────────────────────────────────────────
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct TaskMem {
190    pub run_id: String,
191    pub messages: Vec<Msg>,
192    pub created_at: String,
193}
194
195// ─── Shared memory ──────────────────────────────────────────────────────────────
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct SharedSignal {
199    pub id: String,
200    pub from_agent: String,
201    pub to_agent: String,
202    pub content: String,
203    pub timestamp: String,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct WorkingDoc {
208    pub id: String,
209    pub title: String,
210    pub content: String,
211    pub updated_at: String,
212}
213
214// ─── Durable facts ──────────────────────────────────────────────────────────────
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct Fact {
218    pub id: String,
219    pub key: String,
220    pub value: String,
221    pub created_at: String,
222    pub updated_at: String,
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
226pub enum MemoryDocKind {
227    Memory,
228    User,
229}
230
231impl MemoryDocKind {
232    pub fn as_str(self) -> &'static str {
233        match self {
234            Self::Memory => "MEMORY.md",
235            Self::User => "USER.md",
236        }
237    }
238
239    pub fn limit(self) -> usize {
240        match self {
241            Self::Memory => MEMORY_MD_LIMIT,
242            Self::User => USER_MD_LIMIT,
243        }
244    }
245
246    pub fn parse(value: &str) -> Option<Self> {
247        match value.trim().to_ascii_lowercase().as_str() {
248            "memory" | "memory.md" => Some(Self::Memory),
249            "user" | "user.md" => Some(Self::User),
250            _ => None,
251        }
252    }
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct MemoryDoc {
257    pub kind: MemoryDocKind,
258    pub content: String,
259    pub updated_at: String,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct MemoryStats {
264    pub facts: usize,
265    pub memory_chars: usize,
266    pub memory_limit: usize,
267    pub user_chars: usize,
268    pub user_limit: usize,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
272pub struct GraphNode {
273    pub id: String,
274    pub label: String,
275    pub kind: String,
276    #[serde(default)]
277    pub properties: serde_json::Value,
278    pub created_at: String,
279    pub updated_at: String,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
283pub struct GraphEdge {
284    pub id: String,
285    pub from_id: String,
286    pub to_id: String,
287    pub relation: String,
288    pub weight: f64,
289    #[serde(default)]
290    pub properties: serde_json::Value,
291    pub created_at: String,
292    pub updated_at: String,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, Default)]
296pub struct KnowledgeGraph {
297    pub nodes: Vec<GraphNode>,
298    pub edges: Vec<GraphEdge>,
299}
300
301#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
302pub enum GraphDirection {
303    Incoming,
304    Outgoing,
305    Both,
306}
307
308impl GraphDirection {
309    pub fn parse(value: &str) -> Self {
310        match value.trim().to_ascii_lowercase().as_str() {
311            "incoming" | "in" => Self::Incoming,
312            "outgoing" | "out" => Self::Outgoing,
313            _ => Self::Both,
314        }
315    }
316}
317
318pub fn validate_memory_text(label: &str, text: &str, limit: usize) -> anyhow::Result<()> {
319    let chars = text.chars().count();
320    if chars > limit {
321        anyhow::bail!("{label} is too large: {chars}/{limit} chars");
322    }
323    if text.chars().any(is_forbidden_invisible_char) {
324        anyhow::bail!("{label} contains invisible control characters");
325    }
326
327    let lower = text.to_ascii_lowercase();
328    let blocked = [
329        "ignore previous instructions",
330        "ignore all previous instructions",
331        "disregard previous instructions",
332        "reveal your system prompt",
333        "print your system prompt",
334        "exfiltrate",
335        "steal secrets",
336        "dump secrets",
337        "print env",
338        "cat ~/.ssh",
339        "backdoor",
340        "reverse shell",
341        "rm -rf /",
342    ];
343    if blocked.iter().any(|needle| lower.contains(needle)) {
344        anyhow::bail!("{label} looks like prompt injection or credential exfiltration");
345    }
346    Ok(())
347}
348
349fn is_forbidden_invisible_char(c: char) -> bool {
350    matches!(
351        c,
352        '\u{200B}'..='\u{200F}'
353            | '\u{202A}'..='\u{202E}'
354            | '\u{2060}'..='\u{206F}'
355            | '\u{FEFF}'
356    ) || (c.is_control() && c != '\n' && c != '\r' && c != '\t')
357}
358
359fn truncate_chars(text: &str, limit: usize) -> String {
360    text.chars().take(limit).collect()
361}
362
363// ─── THE MEMORY TRAIT ───────────────────────────────────────────────────────────
364
365pub trait Memory: Send + Sync {
366    fn repo_map(&self, root: &Path) -> RepoMap;
367    fn identity(&self, agent: &str) -> Option<Identity>;
368    fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()>;
369    fn task(&self, run: &RunId) -> Option<TaskMem>;
370    fn save_task(&self, task: &TaskMem) -> anyhow::Result<()>;
371    fn shared_signals(&self) -> Vec<SharedSignal>;
372    fn shared_docs(&self) -> Vec<WorkingDoc>;
373    fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()>;
374    fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()>;
375    fn remember(&self, fact: Fact) -> anyhow::Result<()>;
376    fn recall(&self, q: &str, k: usize) -> Vec<Fact>;
377    fn all_facts(&self) -> Vec<Fact>;
378    fn forget(&self, id: &str) -> anyhow::Result<()>;
379    fn memory_doc(&self, _kind: MemoryDocKind) -> Option<MemoryDoc> {
380        None
381    }
382    fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
383        anyhow::bail!("memory documents are not supported by this memory backend")
384    }
385    fn remove_memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<()> {
386        anyhow::bail!("memory documents are not supported by this memory backend")
387    }
388    fn memory_stats(&self) -> MemoryStats {
389        MemoryStats {
390            facts: self.all_facts().len(),
391            memory_chars: 0,
392            memory_limit: MEMORY_MD_LIMIT,
393            user_chars: 0,
394            user_limit: USER_MD_LIMIT,
395        }
396    }
397    fn consolidate_memory(&self) -> anyhow::Result<()> {
398        anyhow::bail!("memory consolidation is not supported by this memory backend")
399    }
400    fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()>;
401    fn get_discovered_models(&self, provider_id: &str) -> Vec<String>;
402    fn upsert_graph_node(&self, _node: GraphNode) -> anyhow::Result<()> {
403        anyhow::bail!("knowledge graph is not supported by this memory backend")
404    }
405    fn upsert_graph_edge(&self, _edge: GraphEdge) -> anyhow::Result<()> {
406        anyhow::bail!("knowledge graph is not supported by this memory backend")
407    }
408    fn graph_node(&self, _id: &str) -> Option<GraphNode> {
409        None
410    }
411    fn graph_neighbors(
412        &self,
413        _id: &str,
414        _direction: GraphDirection,
415        _limit: usize,
416    ) -> Vec<(GraphEdge, GraphNode)> {
417        Vec::new()
418    }
419    fn search_graph(&self, _query: &str, _limit: usize) -> Vec<GraphNode> {
420        Vec::new()
421    }
422    fn graph_export(&self) -> KnowledgeGraph {
423        KnowledgeGraph::default()
424    }
425    fn delete_graph_node(&self, _id: &str) -> anyhow::Result<()> {
426        anyhow::bail!("knowledge graph is not supported by this memory backend")
427    }
428    fn delete_graph_edge(&self, _id: &str) -> anyhow::Result<()> {
429        anyhow::bail!("knowledge graph is not supported by this memory backend")
430    }
431}
432
433pub trait MemoryProvider: Send + Sync {
434    fn name(&self) -> &str;
435    fn remember(&self, fact: Fact) -> anyhow::Result<()>;
436    fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>>;
437    fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>>;
438    fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()>;
439}
440
441pub struct LocalMemoryProvider {
442    memory: Arc<dyn Memory>,
443}
444
445impl LocalMemoryProvider {
446    pub fn new(memory: Arc<dyn Memory>) -> Self {
447        Self { memory }
448    }
449}
450
451impl MemoryProvider for LocalMemoryProvider {
452    fn name(&self) -> &str {
453        "local"
454    }
455
456    fn remember(&self, fact: Fact) -> anyhow::Result<()> {
457        self.memory.remember(fact)
458    }
459
460    fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>> {
461        Ok(self.memory.recall(query, limit))
462    }
463
464    fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
465        Ok(self.memory.memory_doc(kind))
466    }
467
468    fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
469        self.memory.upsert_memory_doc(kind, content)
470    }
471}
472
473pub struct ExternalMemoryProvider {
474    name: String,
475}
476
477impl ExternalMemoryProvider {
478    pub fn mem0() -> Self {
479        Self {
480            name: "mem0".into(),
481        }
482    }
483
484    pub fn honcho() -> Self {
485        Self {
486            name: "honcho".into(),
487        }
488    }
489
490    pub fn supermemory() -> Self {
491        Self {
492            name: "supermemory".into(),
493        }
494    }
495
496    fn not_configured(&self) -> anyhow::Error {
497        anyhow::anyhow!(
498            "external memory provider '{}' is not configured; use local SQLite memory or configure a connector first",
499            self.name
500        )
501    }
502}
503
504impl MemoryProvider for ExternalMemoryProvider {
505    fn name(&self) -> &str {
506        &self.name
507    }
508
509    fn remember(&self, _fact: Fact) -> anyhow::Result<()> {
510        Err(self.not_configured())
511    }
512
513    fn recall(&self, _query: &str, _limit: usize) -> anyhow::Result<Vec<Fact>> {
514        Err(self.not_configured())
515    }
516
517    fn memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
518        Err(self.not_configured())
519    }
520
521    fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
522        Err(self.not_configured())
523    }
524}
525
526// ─── SQLite-backed memory implementation ────────────────────────────────────────
527
528pub struct SqliteMemory {
529    conn: Mutex<Connection>,
530}
531
532impl SqliteMemory {
533    pub fn open(db_path: &Path) -> anyhow::Result<Self> {
534        if let Some(parent) = db_path.parent() {
535            std::fs::create_dir_all(parent)?;
536        }
537        let conn = Connection::open(db_path)?;
538        let memory = Self {
539            conn: Mutex::new(conn),
540        };
541        memory.migrate()?;
542        Ok(memory)
543    }
544
545    fn migrate(&self) -> anyhow::Result<()> {
546        let conn = self.conn.lock().unwrap();
547        conn.execute_batch(
548            "
549            CREATE TABLE IF NOT EXISTS identities (
550                agent TEXT PRIMARY KEY,
551                name TEXT NOT NULL,
552                role TEXT NOT NULL,
553                personality TEXT NOT NULL
554            );
555            CREATE TABLE IF NOT EXISTS tasks (
556                run_id TEXT PRIMARY KEY,
557                messages_json TEXT NOT NULL,
558                created_at TEXT NOT NULL DEFAULT (datetime('now'))
559            );
560            CREATE TABLE IF NOT EXISTS signals (
561                id TEXT PRIMARY KEY,
562                from_agent TEXT NOT NULL,
563                to_agent TEXT NOT NULL,
564                content TEXT NOT NULL,
565                timestamp TEXT NOT NULL DEFAULT (datetime('now'))
566            );
567            CREATE TABLE IF NOT EXISTS working_docs (
568                id TEXT PRIMARY KEY,
569                title TEXT NOT NULL,
570                content TEXT NOT NULL,
571                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
572            );
573            CREATE TABLE IF NOT EXISTS facts (
574                id TEXT PRIMARY KEY,
575                key TEXT NOT NULL UNIQUE,
576                value TEXT NOT NULL,
577                created_at TEXT NOT NULL DEFAULT (datetime('now')),
578                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
579            );
580            CREATE TABLE IF NOT EXISTS discovered_models (
581                provider_id TEXT NOT NULL,
582                model_name TEXT NOT NULL,
583                fetched_at TEXT NOT NULL,
584                PRIMARY KEY (provider_id, model_name)
585            );
586            CREATE TABLE IF NOT EXISTS memory_docs (
587                kind TEXT PRIMARY KEY,
588                content TEXT NOT NULL,
589                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
590            );
591            CREATE TABLE IF NOT EXISTS kg_nodes (
592                id TEXT PRIMARY KEY,
593                label TEXT NOT NULL,
594                kind TEXT NOT NULL,
595                properties_json TEXT NOT NULL DEFAULT '{}',
596                created_at TEXT NOT NULL DEFAULT (datetime('now')),
597                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
598            );
599            CREATE TABLE IF NOT EXISTS kg_edges (
600                id TEXT PRIMARY KEY,
601                from_id TEXT NOT NULL,
602                to_id TEXT NOT NULL,
603                relation TEXT NOT NULL,
604                weight REAL NOT NULL DEFAULT 1.0,
605                properties_json TEXT NOT NULL DEFAULT '{}',
606                created_at TEXT NOT NULL DEFAULT (datetime('now')),
607                updated_at TEXT NOT NULL DEFAULT (datetime('now')),
608                FOREIGN KEY(from_id) REFERENCES kg_nodes(id) ON DELETE CASCADE,
609                FOREIGN KEY(to_id) REFERENCES kg_nodes(id) ON DELETE CASCADE
610            );
611            CREATE INDEX IF NOT EXISTS kg_nodes_label_idx ON kg_nodes(label);
612            CREATE INDEX IF NOT EXISTS kg_nodes_kind_idx ON kg_nodes(kind);
613            CREATE INDEX IF NOT EXISTS kg_edges_from_idx ON kg_edges(from_id);
614            CREATE INDEX IF NOT EXISTS kg_edges_to_idx ON kg_edges(to_id);
615            CREATE INDEX IF NOT EXISTS kg_edges_relation_idx ON kg_edges(relation);
616            -- FTS5 full-text search for memory recall (M1)
617            CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts USING fts5(
618                key, value, content='facts', content_rowid='rowid'
619            );
620            -- Triggers to keep FTS5 index in sync
621            CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
622                INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
623            END;
624            CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
625                INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
626            END;
627            CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN
628                INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
629                INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
630            END;
631            ",
632        )?;
633        Ok(())
634    }
635}
636
637fn graph_node_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphNode> {
638    let properties_json: String = row.get(3)?;
639    Ok(GraphNode {
640        id: row.get(0)?,
641        label: row.get(1)?,
642        kind: row.get(2)?,
643        properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
644        created_at: row.get(4)?,
645        updated_at: row.get(5)?,
646    })
647}
648
649fn graph_edge_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphEdge> {
650    let properties_json: String = row.get(5)?;
651    Ok(GraphEdge {
652        id: row.get(0)?,
653        from_id: row.get(1)?,
654        to_id: row.get(2)?,
655        relation: row.get(3)?,
656        weight: row.get(4)?,
657        properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
658        created_at: row.get(6)?,
659        updated_at: row.get(7)?,
660    })
661}
662
663impl Memory for SqliteMemory {
664    fn repo_map(&self, root: &Path) -> RepoMap {
665        RepoMap::scan(root)
666    }
667
668    fn identity(&self, agent: &str) -> Option<Identity> {
669        let conn = self.conn.lock().unwrap();
670        conn.query_row(
671            "SELECT name, role, personality FROM identities WHERE agent = ?1",
672            params![agent],
673            |row| {
674                Ok(Identity {
675                    name: row.get(0)?,
676                    role: row.get(1)?,
677                    personality: row.get(2)?,
678                })
679            },
680        )
681        .ok()
682    }
683
684    fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()> {
685        let conn = self.conn.lock().unwrap();
686        conn.execute(
687            "INSERT OR REPLACE INTO identities (agent, name, role, personality) VALUES (?1, ?2, ?3, ?4)",
688            params![agent, identity.name, identity.role, identity.personality],
689        )?;
690        Ok(())
691    }
692
693    fn task(&self, run: &RunId) -> Option<TaskMem> {
694        let conn = self.conn.lock().unwrap();
695        conn.query_row(
696            "SELECT run_id, messages_json, created_at FROM tasks WHERE run_id = ?1",
697            params![run.0],
698            |row| {
699                let messages_json: String = row.get(1)?;
700                let messages: Vec<Msg> = serde_json::from_str(&messages_json).unwrap_or_default();
701                Ok(TaskMem {
702                    run_id: row.get(0)?,
703                    messages,
704                    created_at: row.get(2)?,
705                })
706            },
707        )
708        .ok()
709    }
710
711    fn save_task(&self, task: &TaskMem) -> anyhow::Result<()> {
712        let conn = self.conn.lock().unwrap();
713        let messages_json = serde_json::to_string(&task.messages)?;
714        conn.execute(
715            "INSERT OR REPLACE INTO tasks (run_id, messages_json, created_at) VALUES (?1, ?2, ?3)",
716            params![task.run_id, messages_json, task.created_at],
717        )?;
718        Ok(())
719    }
720
721    fn shared_signals(&self) -> Vec<SharedSignal> {
722        let conn = self.conn.lock().unwrap();
723        let mut stmt = conn
724            .prepare("SELECT id, from_agent, to_agent, content, timestamp FROM signals ORDER BY timestamp DESC")
725            .unwrap();
726        let signals = stmt
727            .query_map([], |row| {
728                Ok(SharedSignal {
729                    id: row.get(0)?,
730                    from_agent: row.get(1)?,
731                    to_agent: row.get(2)?,
732                    content: row.get(3)?,
733                    timestamp: row.get(4)?,
734                })
735            })
736            .unwrap()
737            .filter_map(|r| r.ok())
738            .collect();
739        signals
740    }
741
742    fn shared_docs(&self) -> Vec<WorkingDoc> {
743        let conn = self.conn.lock().unwrap();
744        let mut stmt = conn
745            .prepare(
746                "SELECT id, title, content, updated_at FROM working_docs ORDER BY updated_at DESC",
747            )
748            .unwrap();
749        let docs = stmt
750            .query_map([], |row| {
751                Ok(WorkingDoc {
752                    id: row.get(0)?,
753                    title: row.get(1)?,
754                    content: row.get(2)?,
755                    updated_at: row.get(3)?,
756                })
757            })
758            .unwrap()
759            .filter_map(|r| r.ok())
760            .collect();
761        docs
762    }
763
764    fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()> {
765        let conn = self.conn.lock().unwrap();
766        conn.execute(
767            "INSERT INTO signals (id, from_agent, to_agent, content, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
768            params![signal.id, signal.from_agent, signal.to_agent, signal.content, signal.timestamp],
769        )?;
770        Ok(())
771    }
772
773    fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()> {
774        let conn = self.conn.lock().unwrap();
775        conn.execute(
776            "INSERT OR REPLACE INTO working_docs (id, title, content, updated_at) VALUES (?1, ?2, ?3, ?4)",
777            params![doc.id, doc.title, doc.content, doc.updated_at],
778        )?;
779        Ok(())
780    }
781
782    fn remember(&self, fact: Fact) -> anyhow::Result<()> {
783        let redaction = RedactionFilter::new();
784        validate_memory_text("fact key", &fact.key, 256)?;
785        validate_memory_text("fact value", &fact.value, 1200)?;
786        let safe_value = redaction.redact_str(&fact.value);
787        let safe_key = redaction.redact_str(&fact.key);
788        if redaction.contains_secret(&fact.value) {
789            tracing::warn!("Redacted secret from memory fact: {}", fact.key);
790        }
791        let conn = self.conn.lock().unwrap();
792        let existing: Option<String> = conn
793            .query_row(
794                "SELECT id FROM facts WHERE key = ?1",
795                params![safe_key.clone()],
796                |row| row.get(0),
797            )
798            .ok();
799        if existing.as_deref().is_some_and(|id| id != fact.id) {
800            anyhow::bail!(
801                "memory fact '{}' already exists; use replace/consolidate",
802                safe_key
803            );
804        }
805        conn.execute(
806            "INSERT OR REPLACE INTO facts (id, key, value, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)",
807            params![fact.id, safe_key, safe_value, fact.created_at, fact.updated_at],
808        )?;
809        Ok(())
810    }
811
812    fn recall(&self, q: &str, k: usize) -> Vec<Fact> {
813        let conn = self.conn.lock().unwrap();
814
815        // Build a safe FTS5 MATCH expression. FTS5 chokes on unescaped tokens
816        // containing punctuation, hyphens, quotes, or its reserved keywords
817        // (AND, OR, NOT, NEAR). Strategy: split on whitespace, drop non-word
818        // chars from each token, double-quote it, and append a prefix wildcard.
819        // Example: `users.id rm -rf` -> `"usersid"* "rm"* "rf"*`.
820        let tokens: Vec<String> = q
821            .split_whitespace()
822            .map(|w| {
823                w.chars()
824                    .filter(|c| c.is_alphanumeric() || *c == '_')
825                    .collect::<String>()
826            })
827            .filter(|w| !w.is_empty())
828            .map(|w| format!("\"{}\"*", w))
829            .collect();
830
831        // No usable tokens (e.g. query was only punctuation) — go straight to LIKE.
832        let try_fts = !tokens.is_empty();
833        if try_fts {
834            let pattern = tokens.join(" ");
835            if let Ok(mut stmt) = conn.prepare(
836                "SELECT f.id, f.key, f.value, f.created_at, f.updated_at FROM facts f
837                 INNER JOIN facts_fts ft ON f.rowid = ft.rowid
838                 WHERE facts_fts MATCH ?1 ORDER BY rank LIMIT ?2",
839            ) {
840                if let Ok(rows) = stmt.query_map(params![pattern, k as i64], |row| {
841                    Ok(Fact {
842                        id: row.get(0)?,
843                        key: row.get(1)?,
844                        value: row.get(2)?,
845                        created_at: row.get(3)?,
846                        updated_at: row.get(4)?,
847                    })
848                }) {
849                    let facts: Vec<Fact> = rows.filter_map(|r| r.ok()).collect();
850                    if !facts.is_empty() {
851                        return facts;
852                    }
853                    // Fall through to LIKE if FTS returned nothing — LIKE catches
854                    // substring matches inside larger tokens that FTS won't.
855                }
856            }
857        }
858
859        // LIKE fallback. Escape % and _ in the user pattern so they don't act
860        // as wildcards. Use ESCAPE '\' so the literal characters survive.
861        let escaped = q
862            .replace('\\', "\\\\")
863            .replace('%', "\\%")
864            .replace('_', "\\_");
865        let like_pattern = format!("%{}%", escaped);
866        let Ok(mut stmt) = conn.prepare(
867            "SELECT id, key, value, created_at, updated_at FROM facts \
868             WHERE key LIKE ?1 ESCAPE '\\' OR value LIKE ?1 ESCAPE '\\' LIMIT ?2",
869        ) else {
870            return Vec::new();
871        };
872        let Ok(rows) = stmt.query_map(params![like_pattern, k as i64], |row| {
873            Ok(Fact {
874                id: row.get(0)?,
875                key: row.get(1)?,
876                value: row.get(2)?,
877                created_at: row.get(3)?,
878                updated_at: row.get(4)?,
879            })
880        }) else {
881            return Vec::new();
882        };
883        rows.filter_map(|r| r.ok()).collect()
884    }
885
886    fn all_facts(&self) -> Vec<Fact> {
887        let conn = self.conn.lock().unwrap();
888        let mut stmt = conn
889            .prepare(
890                "SELECT id, key, value, created_at, updated_at FROM facts ORDER BY updated_at DESC",
891            )
892            .unwrap();
893        stmt.query_map([], |row| {
894            Ok(Fact {
895                id: row.get(0)?,
896                key: row.get(1)?,
897                value: row.get(2)?,
898                created_at: row.get(3)?,
899                updated_at: row.get(4)?,
900            })
901        })
902        .unwrap()
903        .filter_map(|r| r.ok())
904        .collect()
905    }
906
907    fn forget(&self, id: &str) -> anyhow::Result<()> {
908        let conn = self.conn.lock().unwrap();
909        conn.execute("DELETE FROM facts WHERE id = ?1", params![id])?;
910        Ok(())
911    }
912
913    fn memory_doc(&self, kind: MemoryDocKind) -> Option<MemoryDoc> {
914        let conn = self.conn.lock().unwrap();
915        conn.query_row(
916            "SELECT content, updated_at FROM memory_docs WHERE kind = ?1",
917            params![kind.as_str()],
918            |row| {
919                Ok(MemoryDoc {
920                    kind,
921                    content: row.get(0)?,
922                    updated_at: row.get(1)?,
923                })
924            },
925        )
926        .ok()
927    }
928
929    fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
930        validate_memory_text(kind.as_str(), content, kind.limit())?;
931        let conn = self.conn.lock().unwrap();
932        conn.execute(
933            "INSERT OR REPLACE INTO memory_docs (kind, content, updated_at)
934             VALUES (?1, ?2, datetime('now'))",
935            params![kind.as_str(), content],
936        )?;
937        Ok(())
938    }
939
940    fn remove_memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<()> {
941        let conn = self.conn.lock().unwrap();
942        conn.execute(
943            "DELETE FROM memory_docs WHERE kind = ?1",
944            params![kind.as_str()],
945        )?;
946        Ok(())
947    }
948
949    fn memory_stats(&self) -> MemoryStats {
950        let memory_chars = self
951            .memory_doc(MemoryDocKind::Memory)
952            .map(|doc| doc.content.chars().count())
953            .unwrap_or(0);
954        let user_chars = self
955            .memory_doc(MemoryDocKind::User)
956            .map(|doc| doc.content.chars().count())
957            .unwrap_or(0);
958        MemoryStats {
959            facts: self.all_facts().len(),
960            memory_chars,
961            memory_limit: MEMORY_MD_LIMIT,
962            user_chars,
963            user_limit: USER_MD_LIMIT,
964        }
965    }
966
967    fn consolidate_memory(&self) -> anyhow::Result<()> {
968        let facts = self.all_facts();
969        let mut memory_lines = Vec::new();
970        memory_lines.push("# MEMORY.md".to_string());
971        memory_lines.push("Durable Sparrow memory distilled from accepted facts.".to_string());
972        for fact in facts.iter().take(40) {
973            memory_lines.push(format!("- {}: {}", fact.key, fact.value));
974        }
975        let memory = truncate_chars(&memory_lines.join("\n"), MEMORY_MD_LIMIT);
976        self.upsert_memory_doc(MemoryDocKind::Memory, &memory)?;
977
978        let mut user_lines = Vec::new();
979        user_lines.push("# USER.md".to_string());
980        for fact in facts
981            .iter()
982            .filter(|fact| fact.key.starts_with("user"))
983            .take(24)
984        {
985            user_lines.push(format!("- {}: {}", fact.key, fact.value));
986        }
987        if user_lines.len() > 1 {
988            let user = truncate_chars(&user_lines.join("\n"), USER_MD_LIMIT);
989            self.upsert_memory_doc(MemoryDocKind::User, &user)?;
990        }
991        Ok(())
992    }
993
994    fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()> {
995        let mut conn = self.conn.lock().unwrap();
996        let tx = conn.transaction()?;
997        tx.execute(
998            "DELETE FROM discovered_models WHERE provider_id = ?1",
999            params![provider_id],
1000        )?;
1001        let fetched_at = chrono::Utc::now().to_rfc3339();
1002        for model in models {
1003            let model = model.trim();
1004            if model.is_empty() {
1005                continue;
1006            }
1007            tx.execute(
1008                "INSERT OR REPLACE INTO discovered_models (provider_id, model_name, fetched_at)
1009                 VALUES (?1, ?2, ?3)",
1010                params![provider_id, model, fetched_at],
1011            )?;
1012        }
1013        tx.commit()?;
1014        Ok(())
1015    }
1016
1017    fn get_discovered_models(&self, provider_id: &str) -> Vec<String> {
1018        let conn = self.conn.lock().unwrap();
1019        // Discovered model lists are stable for weeks at a time — vendors don't
1020        // rotate the catalogue daily. The previous 24-hour TTL silently dropped
1021        // a working scan after one day, forcing the user to redo it. Keep the
1022        // cache for 30 days; an explicit `sparrow scan` always overwrites it.
1023        let Ok(mut stmt) = conn.prepare(
1024            "SELECT model_name FROM discovered_models
1025             WHERE provider_id = ?1
1026               AND datetime(fetched_at) >= datetime('now', '-30 days')
1027             ORDER BY model_name ASC",
1028        ) else {
1029            return Vec::new();
1030        };
1031        let Ok(rows) = stmt.query_map(params![provider_id], |row| row.get::<_, String>(0)) else {
1032            return Vec::new();
1033        };
1034        rows.filter_map(|row| row.ok()).collect()
1035    }
1036
1037    fn upsert_graph_node(&self, node: GraphNode) -> anyhow::Result<()> {
1038        validate_memory_text("graph node id", &node.id, 160)?;
1039        validate_memory_text("graph node label", &node.label, 512)?;
1040        validate_memory_text("graph node kind", &node.kind, 80)?;
1041        validate_memory_text("graph node properties", &node.properties.to_string(), 4000)?;
1042        let redaction = RedactionFilter::new();
1043        let safe_id = redaction.redact_str(&node.id);
1044        let safe_label = redaction.redact_str(&node.label);
1045        let safe_kind = redaction.redact_str(&node.kind);
1046        let safe_properties = serde_json::to_string(&node.properties)?;
1047        let safe_properties = redaction.redact_str(&safe_properties);
1048        let now = chrono::Utc::now().to_rfc3339();
1049        let created_at = if node.created_at.trim().is_empty() {
1050            now.clone()
1051        } else {
1052            node.created_at
1053        };
1054        let updated_at = if node.updated_at.trim().is_empty() {
1055            now
1056        } else {
1057            node.updated_at
1058        };
1059        let conn = self.conn.lock().unwrap();
1060        conn.execute(
1061            "INSERT INTO kg_nodes (id, label, kind, properties_json, created_at, updated_at)
1062             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1063             ON CONFLICT(id) DO UPDATE SET
1064                label = excluded.label,
1065                kind = excluded.kind,
1066                properties_json = excluded.properties_json,
1067                updated_at = excluded.updated_at",
1068            params![
1069                safe_id,
1070                safe_label,
1071                safe_kind,
1072                safe_properties,
1073                created_at,
1074                updated_at
1075            ],
1076        )?;
1077        Ok(())
1078    }
1079
1080    fn upsert_graph_edge(&self, edge: GraphEdge) -> anyhow::Result<()> {
1081        validate_memory_text("graph edge id", &edge.id, 160)?;
1082        validate_memory_text("graph edge from_id", &edge.from_id, 160)?;
1083        validate_memory_text("graph edge to_id", &edge.to_id, 160)?;
1084        validate_memory_text("graph edge relation", &edge.relation, 120)?;
1085        validate_memory_text("graph edge properties", &edge.properties.to_string(), 4000)?;
1086        let redaction = RedactionFilter::new();
1087        let safe_properties = serde_json::to_string(&edge.properties)?;
1088        let safe_properties = redaction.redact_str(&safe_properties);
1089        let now = chrono::Utc::now().to_rfc3339();
1090        let created_at = if edge.created_at.trim().is_empty() {
1091            now.clone()
1092        } else {
1093            edge.created_at
1094        };
1095        let updated_at = if edge.updated_at.trim().is_empty() {
1096            now
1097        } else {
1098            edge.updated_at
1099        };
1100        let conn = self.conn.lock().unwrap();
1101        conn.execute(
1102            "INSERT INTO kg_edges (id, from_id, to_id, relation, weight, properties_json, created_at, updated_at)
1103             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1104             ON CONFLICT(id) DO UPDATE SET
1105                from_id = excluded.from_id,
1106                to_id = excluded.to_id,
1107                relation = excluded.relation,
1108                weight = excluded.weight,
1109                properties_json = excluded.properties_json,
1110                updated_at = excluded.updated_at",
1111            params![
1112                redaction.redact_str(&edge.id),
1113                redaction.redact_str(&edge.from_id),
1114                redaction.redact_str(&edge.to_id),
1115                redaction.redact_str(&edge.relation),
1116                edge.weight,
1117                safe_properties,
1118                created_at,
1119                updated_at
1120            ],
1121        )?;
1122        Ok(())
1123    }
1124
1125    fn graph_node(&self, id: &str) -> Option<GraphNode> {
1126        let conn = self.conn.lock().unwrap();
1127        conn.query_row(
1128            "SELECT id, label, kind, properties_json, created_at, updated_at
1129             FROM kg_nodes WHERE id = ?1",
1130            params![id],
1131            graph_node_from_row,
1132        )
1133        .ok()
1134    }
1135
1136    fn graph_neighbors(
1137        &self,
1138        id: &str,
1139        direction: GraphDirection,
1140        limit: usize,
1141    ) -> Vec<(GraphEdge, GraphNode)> {
1142        let conn = self.conn.lock().unwrap();
1143        let limit = limit.clamp(1, 100) as i64;
1144        let sql = match direction {
1145            GraphDirection::Outgoing => {
1146                "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1147                        n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1148                 FROM kg_edges e
1149                 JOIN kg_nodes n ON n.id = e.to_id
1150                 WHERE e.from_id = ?1
1151                 ORDER BY e.weight DESC, e.updated_at DESC
1152                 LIMIT ?2"
1153            }
1154            GraphDirection::Incoming => {
1155                "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1156                        n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1157                 FROM kg_edges e
1158                 JOIN kg_nodes n ON n.id = e.from_id
1159                 WHERE e.to_id = ?1
1160                 ORDER BY e.weight DESC, e.updated_at DESC
1161                 LIMIT ?2"
1162            }
1163            GraphDirection::Both => {
1164                "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1165                        n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1166                 FROM kg_edges e
1167                 JOIN kg_nodes n ON n.id = CASE WHEN e.from_id = ?1 THEN e.to_id ELSE e.from_id END
1168                 WHERE e.from_id = ?1 OR e.to_id = ?1
1169                 ORDER BY e.weight DESC, e.updated_at DESC
1170                 LIMIT ?2"
1171            }
1172        };
1173        let Ok(mut stmt) = conn.prepare(sql) else {
1174            return Vec::new();
1175        };
1176        let Ok(rows) = stmt.query_map(params![id, limit], |row| {
1177            let edge = graph_edge_from_row(row)?;
1178            let properties_json: String = row.get(11)?;
1179            let node = GraphNode {
1180                id: row.get(8)?,
1181                label: row.get(9)?,
1182                kind: row.get(10)?,
1183                properties: serde_json::from_str(&properties_json)
1184                    .unwrap_or(serde_json::Value::Null),
1185                created_at: row.get(12)?,
1186                updated_at: row.get(13)?,
1187            };
1188            Ok((edge, node))
1189        }) else {
1190            return Vec::new();
1191        };
1192        rows.filter_map(|row| row.ok()).collect()
1193    }
1194
1195    fn search_graph(&self, query: &str, limit: usize) -> Vec<GraphNode> {
1196        let query = query.trim();
1197        if query.is_empty() {
1198            return Vec::new();
1199        }
1200        let escaped = query
1201            .replace('\\', "\\\\")
1202            .replace('%', "\\%")
1203            .replace('_', "\\_");
1204        let pattern = format!("%{}%", escaped);
1205        let conn = self.conn.lock().unwrap();
1206        let Ok(mut stmt) = conn.prepare(
1207            "SELECT id, label, kind, properties_json, created_at, updated_at
1208             FROM kg_nodes
1209             WHERE id LIKE ?1 ESCAPE '\\'
1210                OR label LIKE ?1 ESCAPE '\\'
1211                OR kind LIKE ?1 ESCAPE '\\'
1212                OR properties_json LIKE ?1 ESCAPE '\\'
1213             ORDER BY updated_at DESC
1214             LIMIT ?2",
1215        ) else {
1216            return Vec::new();
1217        };
1218        let Ok(rows) = stmt.query_map(
1219            params![pattern, limit.clamp(1, 100) as i64],
1220            graph_node_from_row,
1221        ) else {
1222            return Vec::new();
1223        };
1224        rows.filter_map(|row| row.ok()).collect()
1225    }
1226
1227    fn graph_export(&self) -> KnowledgeGraph {
1228        let conn = self.conn.lock().unwrap();
1229        let nodes = conn
1230            .prepare(
1231                "SELECT id, label, kind, properties_json, created_at, updated_at
1232                 FROM kg_nodes ORDER BY kind, label",
1233            )
1234            .and_then(|mut stmt| {
1235                let rows = stmt.query_map([], graph_node_from_row)?;
1236                Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1237            })
1238            .unwrap_or_default();
1239        let edges = conn
1240            .prepare(
1241                "SELECT id, from_id, to_id, relation, weight, properties_json, created_at, updated_at
1242                 FROM kg_edges ORDER BY relation, from_id, to_id",
1243            )
1244            .and_then(|mut stmt| {
1245                let rows = stmt.query_map([], graph_edge_from_row)?;
1246                Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1247            })
1248            .unwrap_or_default();
1249        KnowledgeGraph { nodes, edges }
1250    }
1251
1252    fn delete_graph_node(&self, id: &str) -> anyhow::Result<()> {
1253        let conn = self.conn.lock().unwrap();
1254        conn.execute(
1255            "DELETE FROM kg_edges WHERE from_id = ?1 OR to_id = ?1",
1256            params![id],
1257        )?;
1258        conn.execute("DELETE FROM kg_nodes WHERE id = ?1", params![id])?;
1259        Ok(())
1260    }
1261
1262    fn delete_graph_edge(&self, id: &str) -> anyhow::Result<()> {
1263        let conn = self.conn.lock().unwrap();
1264        conn.execute("DELETE FROM kg_edges WHERE id = ?1", params![id])?;
1265        Ok(())
1266    }
1267}