Skip to main content

sparrow/memory/
mod.rs

1use rusqlite::{Connection, params};
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex};
5
6use crate::engine::Identity;
7use crate::event::RunId;
8use crate::provider::Msg;
9use crate::redaction::RedactionFilter;
10
11#[cfg(feature = "treesitter")]
12pub mod symbol_index;
13
14pub mod cli;
15pub mod fts;
16
17pub const MEMORY_MD_LIMIT: usize = 2200;
18pub const USER_MD_LIMIT: usize = 1375;
19
20// ─── Repo map ───────────────────────────────────────────────────────────────────
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RepoMap {
24    pub root: PathBuf,
25    pub files: Vec<FileEntry>,
26    pub symbols: Vec<SymbolEntry>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct FileEntry {
31    pub path: String,
32    pub size: u64,
33    pub modified: String,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct SymbolEntry {
38    pub file: String,
39    pub name: String,
40    pub kind: String, // "fn", "struct", "impl", "mod", etc.
41    pub line: u32,
42}
43
44impl RepoMap {
45    pub fn scan(root: &Path) -> Self {
46        let mut files = Vec::new();
47        let mut symbols = Vec::new();
48        scan_dir(root, root, &mut files, &mut symbols);
49        Self {
50            root: root.to_path_buf(),
51            files,
52            symbols,
53        }
54    }
55}
56
57fn scan_dir(base: &Path, dir: &Path, files: &mut Vec<FileEntry>, symbols: &mut Vec<SymbolEntry>) {
58    if let Ok(entries) = std::fs::read_dir(dir) {
59        for entry in entries.flatten() {
60            let path = entry.path();
61            let name = entry.file_name().to_string_lossy().to_string();
62
63            // Skip hidden, node_modules, target, .git
64            if name.starts_with('.')
65                || name == "node_modules"
66                || name == "target"
67                || name == "build"
68                || name == "dist"
69            {
70                continue;
71            }
72
73            if path.is_dir() {
74                scan_dir(base, &path, files, symbols);
75            } else {
76                let rel = path
77                    .strip_prefix(base)
78                    .unwrap_or(&path)
79                    .to_string_lossy()
80                    .to_string();
81
82                let modified = path
83                    .metadata()
84                    .ok()
85                    .and_then(|m| m.modified().ok())
86                    .and_then(|t| {
87                        chrono::DateTime::from_timestamp(
88                            t.duration_since(std::time::UNIX_EPOCH)
89                                .unwrap_or_default()
90                                .as_secs() as i64,
91                            0,
92                        )
93                    })
94                    .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
95                    .unwrap_or_default();
96
97                files.push(FileEntry {
98                    path: rel.clone(),
99                    size: path.metadata().map(|m| m.len()).unwrap_or(0),
100                    modified,
101                });
102
103                // Basic symbol extraction for Rust files
104                if rel.ends_with(".rs") {
105                    if let Ok(content) = std::fs::read_to_string(&path) {
106                        for (i, line) in content.lines().enumerate() {
107                            let trimmed = line.trim();
108                            if trimmed.starts_with("pub fn ") || trimmed.starts_with("fn ") {
109                                let name = trimmed
110                                    .trim_start_matches("pub fn ")
111                                    .trim_start_matches("fn ")
112                                    .split('(')
113                                    .next()
114                                    .unwrap_or("");
115                                symbols.push(SymbolEntry {
116                                    file: rel.clone(),
117                                    name: name.to_string(),
118                                    kind: "fn".into(),
119                                    line: (i + 1) as u32,
120                                });
121                            } else if trimmed.starts_with("pub struct ")
122                                || trimmed.starts_with("struct ")
123                            {
124                                let name = trimmed
125                                    .trim_start_matches("pub struct ")
126                                    .trim_start_matches("struct ")
127                                    .split(|c: char| c == '<' || c == '{' || c == '(')
128                                    .next()
129                                    .unwrap_or("");
130                                symbols.push(SymbolEntry {
131                                    file: rel.clone(),
132                                    name: name.to_string(),
133                                    kind: "struct".into(),
134                                    line: (i + 1) as u32,
135                                });
136                            } else if trimmed.starts_with("pub enum ")
137                                || trimmed.starts_with("enum ")
138                            {
139                                let name = trimmed
140                                    .trim_start_matches("pub enum ")
141                                    .trim_start_matches("enum ")
142                                    .split(|c: char| c == '<' || c == '{')
143                                    .next()
144                                    .unwrap_or("");
145                                symbols.push(SymbolEntry {
146                                    file: rel.clone(),
147                                    name: name.to_string(),
148                                    kind: "enum".into(),
149                                    line: (i + 1) as u32,
150                                });
151                            } else if trimmed.starts_with("pub trait ")
152                                || trimmed.starts_with("trait ")
153                            {
154                                let name = trimmed
155                                    .trim_start_matches("pub trait ")
156                                    .trim_start_matches("trait ")
157                                    .split(|c: char| c == '<' || c == '{')
158                                    .next()
159                                    .unwrap_or("");
160                                symbols.push(SymbolEntry {
161                                    file: rel.clone(),
162                                    name: name.to_string(),
163                                    kind: "trait".into(),
164                                    line: (i + 1) as u32,
165                                });
166                            } else if trimmed.starts_with("pub mod ") || trimmed.starts_with("mod ")
167                            {
168                                let name = trimmed
169                                    .trim_start_matches("pub mod ")
170                                    .trim_start_matches("mod ")
171                                    .split(';')
172                                    .next()
173                                    .unwrap_or("");
174                                symbols.push(SymbolEntry {
175                                    file: rel.clone(),
176                                    name: name.to_string(),
177                                    kind: "mod".into(),
178                                    line: (i + 1) as u32,
179                                });
180                            }
181                        }
182                    }
183                }
184            }
185        }
186    }
187}
188
189// ─── Task memory ────────────────────────────────────────────────────────────────
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct TaskMem {
193    pub run_id: String,
194    pub messages: Vec<Msg>,
195    pub created_at: String,
196}
197
198// ─── Shared memory ──────────────────────────────────────────────────────────────
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct SharedSignal {
202    pub id: String,
203    pub from_agent: String,
204    pub to_agent: String,
205    pub content: String,
206    pub timestamp: String,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct WorkingDoc {
211    pub id: String,
212    pub title: String,
213    pub content: String,
214    pub updated_at: String,
215}
216
217// ─── Durable facts ──────────────────────────────────────────────────────────────
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct Fact {
221    pub id: String,
222    pub key: String,
223    pub value: String,
224    pub created_at: String,
225    pub updated_at: String,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
229pub enum MemoryDocKind {
230    Memory,
231    User,
232}
233
234impl MemoryDocKind {
235    pub fn as_str(self) -> &'static str {
236        match self {
237            Self::Memory => "MEMORY.md",
238            Self::User => "USER.md",
239        }
240    }
241
242    pub fn limit(self) -> usize {
243        match self {
244            Self::Memory => MEMORY_MD_LIMIT,
245            Self::User => USER_MD_LIMIT,
246        }
247    }
248
249    pub fn parse(value: &str) -> Option<Self> {
250        match value.trim().to_ascii_lowercase().as_str() {
251            "memory" | "memory.md" => Some(Self::Memory),
252            "user" | "user.md" => Some(Self::User),
253            _ => None,
254        }
255    }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct MemoryDoc {
260    pub kind: MemoryDocKind,
261    pub content: String,
262    pub updated_at: String,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct MemoryStats {
267    pub facts: usize,
268    pub memory_chars: usize,
269    pub memory_limit: usize,
270    pub user_chars: usize,
271    pub user_limit: usize,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
275pub struct GraphNode {
276    pub id: String,
277    pub label: String,
278    pub kind: String,
279    #[serde(default)]
280    pub properties: serde_json::Value,
281    pub created_at: String,
282    pub updated_at: String,
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
286pub struct GraphEdge {
287    pub id: String,
288    pub from_id: String,
289    pub to_id: String,
290    pub relation: String,
291    pub weight: f64,
292    #[serde(default)]
293    pub properties: serde_json::Value,
294    pub created_at: String,
295    pub updated_at: String,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize, Default)]
299pub struct KnowledgeGraph {
300    pub nodes: Vec<GraphNode>,
301    pub edges: Vec<GraphEdge>,
302}
303
304#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
305pub enum GraphDirection {
306    Incoming,
307    Outgoing,
308    Both,
309}
310
311impl GraphDirection {
312    pub fn parse(value: &str) -> Self {
313        match value.trim().to_ascii_lowercase().as_str() {
314            "incoming" | "in" => Self::Incoming,
315            "outgoing" | "out" => Self::Outgoing,
316            _ => Self::Both,
317        }
318    }
319}
320
321pub fn validate_memory_text(label: &str, text: &str, limit: usize) -> anyhow::Result<()> {
322    let chars = text.chars().count();
323    if chars > limit {
324        anyhow::bail!("{label} is too large: {chars}/{limit} chars");
325    }
326    if text.chars().any(is_forbidden_invisible_char) {
327        anyhow::bail!("{label} contains invisible control characters");
328    }
329
330    let lower = text.to_ascii_lowercase();
331    let blocked = [
332        "ignore previous instructions",
333        "ignore all previous instructions",
334        "disregard previous instructions",
335        "reveal your system prompt",
336        "print your system prompt",
337        "exfiltrate",
338        "steal secrets",
339        "dump secrets",
340        "print env",
341        "cat ~/.ssh",
342        "backdoor",
343        "reverse shell",
344        "rm -rf /",
345    ];
346    if blocked.iter().any(|needle| lower.contains(needle)) {
347        anyhow::bail!("{label} looks like prompt injection or credential exfiltration");
348    }
349    Ok(())
350}
351
352fn is_forbidden_invisible_char(c: char) -> bool {
353    matches!(
354        c,
355        '\u{200B}'..='\u{200F}'
356            | '\u{202A}'..='\u{202E}'
357            | '\u{2060}'..='\u{206F}'
358            | '\u{FEFF}'
359    ) || (c.is_control() && c != '\n' && c != '\r' && c != '\t')
360}
361
362fn truncate_chars(text: &str, limit: usize) -> String {
363    text.chars().take(limit).collect()
364}
365
366// ─── THE MEMORY TRAIT ───────────────────────────────────────────────────────────
367
368pub trait Memory: Send + Sync {
369    fn repo_map(&self, root: &Path) -> RepoMap;
370    fn identity(&self, agent: &str) -> Option<Identity>;
371    fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()>;
372    fn task(&self, run: &RunId) -> Option<TaskMem>;
373    fn save_task(&self, task: &TaskMem) -> anyhow::Result<()>;
374    fn shared_signals(&self) -> Vec<SharedSignal>;
375    fn shared_docs(&self) -> Vec<WorkingDoc>;
376    fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()>;
377    fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()>;
378    fn remember(&self, fact: Fact) -> anyhow::Result<()>;
379    fn recall(&self, q: &str, k: usize) -> Vec<Fact>;
380    fn all_facts(&self) -> Vec<Fact>;
381    fn forget(&self, id: &str) -> anyhow::Result<()>;
382    fn memory_doc(&self, _kind: MemoryDocKind) -> Option<MemoryDoc> {
383        None
384    }
385    fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
386        anyhow::bail!("memory documents are not supported by this memory backend")
387    }
388    fn remove_memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<()> {
389        anyhow::bail!("memory documents are not supported by this memory backend")
390    }
391    fn memory_stats(&self) -> MemoryStats {
392        MemoryStats {
393            facts: self.all_facts().len(),
394            memory_chars: 0,
395            memory_limit: MEMORY_MD_LIMIT,
396            user_chars: 0,
397            user_limit: USER_MD_LIMIT,
398        }
399    }
400    fn consolidate_memory(&self) -> anyhow::Result<()> {
401        anyhow::bail!("memory consolidation is not supported by this memory backend")
402    }
403    fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()>;
404    fn get_discovered_models(&self, provider_id: &str) -> Vec<String>;
405    fn upsert_graph_node(&self, _node: GraphNode) -> anyhow::Result<()> {
406        anyhow::bail!("knowledge graph is not supported by this memory backend")
407    }
408    fn upsert_graph_edge(&self, _edge: GraphEdge) -> anyhow::Result<()> {
409        anyhow::bail!("knowledge graph is not supported by this memory backend")
410    }
411    fn graph_node(&self, _id: &str) -> Option<GraphNode> {
412        None
413    }
414    fn graph_neighbors(
415        &self,
416        _id: &str,
417        _direction: GraphDirection,
418        _limit: usize,
419    ) -> Vec<(GraphEdge, GraphNode)> {
420        Vec::new()
421    }
422    fn search_graph(&self, _query: &str, _limit: usize) -> Vec<GraphNode> {
423        Vec::new()
424    }
425    fn graph_export(&self) -> KnowledgeGraph {
426        KnowledgeGraph::default()
427    }
428    fn delete_graph_node(&self, _id: &str) -> anyhow::Result<()> {
429        anyhow::bail!("knowledge graph is not supported by this memory backend")
430    }
431    fn delete_graph_edge(&self, _id: &str) -> anyhow::Result<()> {
432        anyhow::bail!("knowledge graph is not supported by this memory backend")
433    }
434}
435
436pub trait MemoryProvider: Send + Sync {
437    fn name(&self) -> &str;
438    fn remember(&self, fact: Fact) -> anyhow::Result<()>;
439    fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>>;
440    fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>>;
441    fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()>;
442}
443
444pub struct LocalMemoryProvider {
445    memory: Arc<dyn Memory>,
446}
447
448impl LocalMemoryProvider {
449    pub fn new(memory: Arc<dyn Memory>) -> Self {
450        Self { memory }
451    }
452}
453
454impl MemoryProvider for LocalMemoryProvider {
455    fn name(&self) -> &str {
456        "local"
457    }
458
459    fn remember(&self, fact: Fact) -> anyhow::Result<()> {
460        self.memory.remember(fact)
461    }
462
463    fn recall(&self, query: &str, limit: usize) -> anyhow::Result<Vec<Fact>> {
464        Ok(self.memory.recall(query, limit))
465    }
466
467    fn memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
468        Ok(self.memory.memory_doc(kind))
469    }
470
471    fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
472        self.memory.upsert_memory_doc(kind, content)
473    }
474}
475
476pub struct ExternalMemoryProvider {
477    name: String,
478}
479
480impl ExternalMemoryProvider {
481    pub fn mem0() -> Self {
482        Self {
483            name: "mem0".into(),
484        }
485    }
486
487    pub fn honcho() -> Self {
488        Self {
489            name: "honcho".into(),
490        }
491    }
492
493    pub fn supermemory() -> Self {
494        Self {
495            name: "supermemory".into(),
496        }
497    }
498
499    fn not_configured(&self) -> anyhow::Error {
500        anyhow::anyhow!(
501            "external memory provider '{}' is not configured; use local SQLite memory or configure a connector first",
502            self.name
503        )
504    }
505}
506
507impl MemoryProvider for ExternalMemoryProvider {
508    fn name(&self) -> &str {
509        &self.name
510    }
511
512    fn remember(&self, _fact: Fact) -> anyhow::Result<()> {
513        Err(self.not_configured())
514    }
515
516    fn recall(&self, _query: &str, _limit: usize) -> anyhow::Result<Vec<Fact>> {
517        Err(self.not_configured())
518    }
519
520    fn memory_doc(&self, _kind: MemoryDocKind) -> anyhow::Result<Option<MemoryDoc>> {
521        Err(self.not_configured())
522    }
523
524    fn upsert_memory_doc(&self, _kind: MemoryDocKind, _content: &str) -> anyhow::Result<()> {
525        Err(self.not_configured())
526    }
527}
528
529// ─── SQLite-backed memory implementation ────────────────────────────────────────
530
531pub struct SqliteMemory {
532    conn: Mutex<Connection>,
533}
534
535impl SqliteMemory {
536    pub fn open(db_path: &Path) -> anyhow::Result<Self> {
537        if let Some(parent) = db_path.parent() {
538            std::fs::create_dir_all(parent)?;
539        }
540        let conn = Connection::open(db_path)?;
541        let memory = Self {
542            conn: Mutex::new(conn),
543        };
544        memory.migrate()?;
545        Ok(memory)
546    }
547
548    fn migrate(&self) -> anyhow::Result<()> {
549        let conn = self.conn.lock().unwrap();
550        conn.execute_batch(
551            "
552            CREATE TABLE IF NOT EXISTS identities (
553                agent TEXT PRIMARY KEY,
554                name TEXT NOT NULL,
555                role TEXT NOT NULL,
556                personality TEXT NOT NULL
557            );
558            CREATE TABLE IF NOT EXISTS tasks (
559                run_id TEXT PRIMARY KEY,
560                messages_json TEXT NOT NULL,
561                created_at TEXT NOT NULL DEFAULT (datetime('now'))
562            );
563            CREATE TABLE IF NOT EXISTS signals (
564                id TEXT PRIMARY KEY,
565                from_agent TEXT NOT NULL,
566                to_agent TEXT NOT NULL,
567                content TEXT NOT NULL,
568                timestamp TEXT NOT NULL DEFAULT (datetime('now'))
569            );
570            CREATE TABLE IF NOT EXISTS working_docs (
571                id TEXT PRIMARY KEY,
572                title TEXT NOT NULL,
573                content TEXT NOT NULL,
574                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
575            );
576            CREATE TABLE IF NOT EXISTS facts (
577                id TEXT PRIMARY KEY,
578                key TEXT NOT NULL UNIQUE,
579                value TEXT NOT NULL,
580                created_at TEXT NOT NULL DEFAULT (datetime('now')),
581                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
582            );
583            CREATE TABLE IF NOT EXISTS discovered_models (
584                provider_id TEXT NOT NULL,
585                model_name TEXT NOT NULL,
586                fetched_at TEXT NOT NULL,
587                PRIMARY KEY (provider_id, model_name)
588            );
589            CREATE TABLE IF NOT EXISTS memory_docs (
590                kind TEXT PRIMARY KEY,
591                content TEXT NOT NULL,
592                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
593            );
594            CREATE TABLE IF NOT EXISTS kg_nodes (
595                id TEXT PRIMARY KEY,
596                label TEXT NOT NULL,
597                kind TEXT NOT NULL,
598                properties_json TEXT NOT NULL DEFAULT '{}',
599                created_at TEXT NOT NULL DEFAULT (datetime('now')),
600                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
601            );
602            CREATE TABLE IF NOT EXISTS kg_edges (
603                id TEXT PRIMARY KEY,
604                from_id TEXT NOT NULL,
605                to_id TEXT NOT NULL,
606                relation TEXT NOT NULL,
607                weight REAL NOT NULL DEFAULT 1.0,
608                properties_json TEXT NOT NULL DEFAULT '{}',
609                created_at TEXT NOT NULL DEFAULT (datetime('now')),
610                updated_at TEXT NOT NULL DEFAULT (datetime('now')),
611                FOREIGN KEY(from_id) REFERENCES kg_nodes(id) ON DELETE CASCADE,
612                FOREIGN KEY(to_id) REFERENCES kg_nodes(id) ON DELETE CASCADE
613            );
614            CREATE INDEX IF NOT EXISTS kg_nodes_label_idx ON kg_nodes(label);
615            CREATE INDEX IF NOT EXISTS kg_nodes_kind_idx ON kg_nodes(kind);
616            CREATE INDEX IF NOT EXISTS kg_edges_from_idx ON kg_edges(from_id);
617            CREATE INDEX IF NOT EXISTS kg_edges_to_idx ON kg_edges(to_id);
618            CREATE INDEX IF NOT EXISTS kg_edges_relation_idx ON kg_edges(relation);
619            -- FTS5 full-text search for memory recall (M1)
620            CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts USING fts5(
621                key, value, content='facts', content_rowid='rowid'
622            );
623            -- Triggers to keep FTS5 index in sync
624            CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
625                INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
626            END;
627            CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
628                INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
629            END;
630            CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN
631                INSERT INTO facts_fts(facts_fts, rowid, key, value) VALUES ('delete', old.rowid, old.key, old.value);
632                INSERT INTO facts_fts(rowid, key, value) VALUES (new.rowid, new.key, new.value);
633            END;
634            ",
635        )?;
636        Ok(())
637    }
638}
639
640fn graph_node_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphNode> {
641    let properties_json: String = row.get(3)?;
642    Ok(GraphNode {
643        id: row.get(0)?,
644        label: row.get(1)?,
645        kind: row.get(2)?,
646        properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
647        created_at: row.get(4)?,
648        updated_at: row.get(5)?,
649    })
650}
651
652fn graph_edge_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GraphEdge> {
653    let properties_json: String = row.get(5)?;
654    Ok(GraphEdge {
655        id: row.get(0)?,
656        from_id: row.get(1)?,
657        to_id: row.get(2)?,
658        relation: row.get(3)?,
659        weight: row.get(4)?,
660        properties: serde_json::from_str(&properties_json).unwrap_or(serde_json::Value::Null),
661        created_at: row.get(6)?,
662        updated_at: row.get(7)?,
663    })
664}
665
666impl Memory for SqliteMemory {
667    fn repo_map(&self, root: &Path) -> RepoMap {
668        RepoMap::scan(root)
669    }
670
671    fn identity(&self, agent: &str) -> Option<Identity> {
672        let conn = self.conn.lock().unwrap();
673        conn.query_row(
674            "SELECT name, role, personality FROM identities WHERE agent = ?1",
675            params![agent],
676            |row| {
677                Ok(Identity {
678                    name: row.get(0)?,
679                    role: row.get(1)?,
680                    personality: row.get(2)?,
681                })
682            },
683        )
684        .ok()
685    }
686
687    fn save_identity(&self, agent: &str, identity: &Identity) -> anyhow::Result<()> {
688        let conn = self.conn.lock().unwrap();
689        conn.execute(
690            "INSERT OR REPLACE INTO identities (agent, name, role, personality) VALUES (?1, ?2, ?3, ?4)",
691            params![agent, identity.name, identity.role, identity.personality],
692        )?;
693        Ok(())
694    }
695
696    fn task(&self, run: &RunId) -> Option<TaskMem> {
697        let conn = self.conn.lock().unwrap();
698        conn.query_row(
699            "SELECT run_id, messages_json, created_at FROM tasks WHERE run_id = ?1",
700            params![run.0],
701            |row| {
702                let messages_json: String = row.get(1)?;
703                let messages: Vec<Msg> = serde_json::from_str(&messages_json).unwrap_or_default();
704                Ok(TaskMem {
705                    run_id: row.get(0)?,
706                    messages,
707                    created_at: row.get(2)?,
708                })
709            },
710        )
711        .ok()
712    }
713
714    fn save_task(&self, task: &TaskMem) -> anyhow::Result<()> {
715        let conn = self.conn.lock().unwrap();
716        let messages_json = serde_json::to_string(&task.messages)?;
717        conn.execute(
718            "INSERT OR REPLACE INTO tasks (run_id, messages_json, created_at) VALUES (?1, ?2, ?3)",
719            params![task.run_id, messages_json, task.created_at],
720        )?;
721        Ok(())
722    }
723
724    fn shared_signals(&self) -> Vec<SharedSignal> {
725        let conn = self.conn.lock().unwrap();
726        let mut stmt = conn
727            .prepare("SELECT id, from_agent, to_agent, content, timestamp FROM signals ORDER BY timestamp DESC")
728            .unwrap();
729        let signals = stmt
730            .query_map([], |row| {
731                Ok(SharedSignal {
732                    id: row.get(0)?,
733                    from_agent: row.get(1)?,
734                    to_agent: row.get(2)?,
735                    content: row.get(3)?,
736                    timestamp: row.get(4)?,
737                })
738            })
739            .unwrap()
740            .filter_map(|r| r.ok())
741            .collect();
742        signals
743    }
744
745    fn shared_docs(&self) -> Vec<WorkingDoc> {
746        let conn = self.conn.lock().unwrap();
747        let mut stmt = conn
748            .prepare(
749                "SELECT id, title, content, updated_at FROM working_docs ORDER BY updated_at DESC",
750            )
751            .unwrap();
752        let docs = stmt
753            .query_map([], |row| {
754                Ok(WorkingDoc {
755                    id: row.get(0)?,
756                    title: row.get(1)?,
757                    content: row.get(2)?,
758                    updated_at: row.get(3)?,
759                })
760            })
761            .unwrap()
762            .filter_map(|r| r.ok())
763            .collect();
764        docs
765    }
766
767    fn post_signal(&self, signal: SharedSignal) -> anyhow::Result<()> {
768        let conn = self.conn.lock().unwrap();
769        conn.execute(
770            "INSERT INTO signals (id, from_agent, to_agent, content, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)",
771            params![signal.id, signal.from_agent, signal.to_agent, signal.content, signal.timestamp],
772        )?;
773        Ok(())
774    }
775
776    fn upsert_doc(&self, doc: WorkingDoc) -> anyhow::Result<()> {
777        let conn = self.conn.lock().unwrap();
778        conn.execute(
779            "INSERT OR REPLACE INTO working_docs (id, title, content, updated_at) VALUES (?1, ?2, ?3, ?4)",
780            params![doc.id, doc.title, doc.content, doc.updated_at],
781        )?;
782        Ok(())
783    }
784
785    fn remember(&self, fact: Fact) -> anyhow::Result<()> {
786        let redaction = RedactionFilter::new();
787        validate_memory_text("fact key", &fact.key, 256)?;
788        validate_memory_text("fact value", &fact.value, 1200)?;
789        let safe_value = redaction.redact_str(&fact.value);
790        let safe_key = redaction.redact_str(&fact.key);
791        if redaction.contains_secret(&fact.value) {
792            tracing::warn!("Redacted secret from memory fact: {}", fact.key);
793        }
794        let conn = self.conn.lock().unwrap();
795        let existing: Option<String> = conn
796            .query_row(
797                "SELECT id FROM facts WHERE key = ?1",
798                params![safe_key.clone()],
799                |row| row.get(0),
800            )
801            .ok();
802        if existing.as_deref().is_some_and(|id| id != fact.id) {
803            anyhow::bail!(
804                "memory fact '{}' already exists; use replace/consolidate",
805                safe_key
806            );
807        }
808        conn.execute(
809            "INSERT OR REPLACE INTO facts (id, key, value, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5)",
810            params![fact.id, safe_key, safe_value, fact.created_at, fact.updated_at],
811        )?;
812        Ok(())
813    }
814
815    fn recall(&self, q: &str, k: usize) -> Vec<Fact> {
816        let conn = self.conn.lock().unwrap();
817
818        // Build a safe FTS5 MATCH expression. FTS5 chokes on unescaped tokens
819        // containing punctuation, hyphens, quotes, or its reserved keywords
820        // (AND, OR, NOT, NEAR). Strategy: split on whitespace, drop non-word
821        // chars from each token, double-quote it, and append a prefix wildcard.
822        // Example: `users.id rm -rf` -> `"usersid"* "rm"* "rf"*`.
823        let tokens: Vec<String> = q
824            .split_whitespace()
825            .map(|w| {
826                w.chars()
827                    .filter(|c| c.is_alphanumeric() || *c == '_')
828                    .collect::<String>()
829            })
830            .filter(|w| !w.is_empty())
831            .map(|w| format!("\"{}\"*", w))
832            .collect();
833
834        // No usable tokens (e.g. query was only punctuation) — go straight to LIKE.
835        let try_fts = !tokens.is_empty();
836        if try_fts {
837            let pattern = tokens.join(" ");
838            if let Ok(mut stmt) = conn.prepare(
839                "SELECT f.id, f.key, f.value, f.created_at, f.updated_at FROM facts f
840                 INNER JOIN facts_fts ft ON f.rowid = ft.rowid
841                 WHERE facts_fts MATCH ?1 ORDER BY rank LIMIT ?2",
842            ) {
843                if let Ok(rows) = stmt.query_map(params![pattern, k as i64], |row| {
844                    Ok(Fact {
845                        id: row.get(0)?,
846                        key: row.get(1)?,
847                        value: row.get(2)?,
848                        created_at: row.get(3)?,
849                        updated_at: row.get(4)?,
850                    })
851                }) {
852                    let facts: Vec<Fact> = rows.filter_map(|r| r.ok()).collect();
853                    if !facts.is_empty() {
854                        return facts;
855                    }
856                    // Fall through to LIKE if FTS returned nothing — LIKE catches
857                    // substring matches inside larger tokens that FTS won't.
858                }
859            }
860        }
861
862        // LIKE fallback. Escape % and _ in the user pattern so they don't act
863        // as wildcards. Use ESCAPE '\' so the literal characters survive.
864        let escaped = q
865            .replace('\\', "\\\\")
866            .replace('%', "\\%")
867            .replace('_', "\\_");
868        let like_pattern = format!("%{}%", escaped);
869        let Ok(mut stmt) = conn.prepare(
870            "SELECT id, key, value, created_at, updated_at FROM facts \
871             WHERE key LIKE ?1 ESCAPE '\\' OR value LIKE ?1 ESCAPE '\\' LIMIT ?2",
872        ) else {
873            return Vec::new();
874        };
875        let Ok(rows) = stmt.query_map(params![like_pattern, k as i64], |row| {
876            Ok(Fact {
877                id: row.get(0)?,
878                key: row.get(1)?,
879                value: row.get(2)?,
880                created_at: row.get(3)?,
881                updated_at: row.get(4)?,
882            })
883        }) else {
884            return Vec::new();
885        };
886        rows.filter_map(|r| r.ok()).collect()
887    }
888
889    fn all_facts(&self) -> Vec<Fact> {
890        let conn = self.conn.lock().unwrap();
891        let mut stmt = conn
892            .prepare(
893                "SELECT id, key, value, created_at, updated_at FROM facts ORDER BY updated_at DESC",
894            )
895            .unwrap();
896        stmt.query_map([], |row| {
897            Ok(Fact {
898                id: row.get(0)?,
899                key: row.get(1)?,
900                value: row.get(2)?,
901                created_at: row.get(3)?,
902                updated_at: row.get(4)?,
903            })
904        })
905        .unwrap()
906        .filter_map(|r| r.ok())
907        .collect()
908    }
909
910    fn forget(&self, id: &str) -> anyhow::Result<()> {
911        let conn = self.conn.lock().unwrap();
912        conn.execute("DELETE FROM facts WHERE id = ?1", params![id])?;
913        Ok(())
914    }
915
916    fn memory_doc(&self, kind: MemoryDocKind) -> Option<MemoryDoc> {
917        let conn = self.conn.lock().unwrap();
918        conn.query_row(
919            "SELECT content, updated_at FROM memory_docs WHERE kind = ?1",
920            params![kind.as_str()],
921            |row| {
922                Ok(MemoryDoc {
923                    kind,
924                    content: row.get(0)?,
925                    updated_at: row.get(1)?,
926                })
927            },
928        )
929        .ok()
930    }
931
932    fn upsert_memory_doc(&self, kind: MemoryDocKind, content: &str) -> anyhow::Result<()> {
933        validate_memory_text(kind.as_str(), content, kind.limit())?;
934        let conn = self.conn.lock().unwrap();
935        conn.execute(
936            "INSERT OR REPLACE INTO memory_docs (kind, content, updated_at)
937             VALUES (?1, ?2, datetime('now'))",
938            params![kind.as_str(), content],
939        )?;
940        Ok(())
941    }
942
943    fn remove_memory_doc(&self, kind: MemoryDocKind) -> anyhow::Result<()> {
944        let conn = self.conn.lock().unwrap();
945        conn.execute(
946            "DELETE FROM memory_docs WHERE kind = ?1",
947            params![kind.as_str()],
948        )?;
949        Ok(())
950    }
951
952    fn memory_stats(&self) -> MemoryStats {
953        let memory_chars = self
954            .memory_doc(MemoryDocKind::Memory)
955            .map(|doc| doc.content.chars().count())
956            .unwrap_or(0);
957        let user_chars = self
958            .memory_doc(MemoryDocKind::User)
959            .map(|doc| doc.content.chars().count())
960            .unwrap_or(0);
961        MemoryStats {
962            facts: self.all_facts().len(),
963            memory_chars,
964            memory_limit: MEMORY_MD_LIMIT,
965            user_chars,
966            user_limit: USER_MD_LIMIT,
967        }
968    }
969
970    fn consolidate_memory(&self) -> anyhow::Result<()> {
971        let facts = self.all_facts();
972        let mut memory_lines = Vec::new();
973        memory_lines.push("# MEMORY.md".to_string());
974        memory_lines.push("Durable Sparrow memory distilled from accepted facts.".to_string());
975        for fact in facts.iter().take(40) {
976            memory_lines.push(format!("- {}: {}", fact.key, fact.value));
977        }
978        let memory = truncate_chars(&memory_lines.join("\n"), MEMORY_MD_LIMIT);
979        self.upsert_memory_doc(MemoryDocKind::Memory, &memory)?;
980
981        let mut user_lines = Vec::new();
982        user_lines.push("# USER.md".to_string());
983        for fact in facts
984            .iter()
985            .filter(|fact| fact.key.starts_with("user"))
986            .take(24)
987        {
988            user_lines.push(format!("- {}: {}", fact.key, fact.value));
989        }
990        if user_lines.len() > 1 {
991            let user = truncate_chars(&user_lines.join("\n"), USER_MD_LIMIT);
992            self.upsert_memory_doc(MemoryDocKind::User, &user)?;
993        }
994        Ok(())
995    }
996
997    fn cache_discovered_models(&self, provider_id: &str, models: &[String]) -> anyhow::Result<()> {
998        let mut conn = self.conn.lock().unwrap();
999        let tx = conn.transaction()?;
1000        tx.execute(
1001            "DELETE FROM discovered_models WHERE provider_id = ?1",
1002            params![provider_id],
1003        )?;
1004        let fetched_at = chrono::Utc::now().to_rfc3339();
1005        for model in models {
1006            let model = model.trim();
1007            if model.is_empty() {
1008                continue;
1009            }
1010            tx.execute(
1011                "INSERT OR REPLACE INTO discovered_models (provider_id, model_name, fetched_at)
1012                 VALUES (?1, ?2, ?3)",
1013                params![provider_id, model, fetched_at],
1014            )?;
1015        }
1016        tx.commit()?;
1017        Ok(())
1018    }
1019
1020    fn get_discovered_models(&self, provider_id: &str) -> Vec<String> {
1021        let conn = self.conn.lock().unwrap();
1022        // Discovered model lists are stable for weeks at a time — vendors don't
1023        // rotate the catalogue daily. The previous 24-hour TTL silently dropped
1024        // a working scan after one day, forcing the user to redo it. Keep the
1025        // cache for 30 days; an explicit `sparrow scan` always overwrites it.
1026        let Ok(mut stmt) = conn.prepare(
1027            "SELECT model_name FROM discovered_models
1028             WHERE provider_id = ?1
1029               AND datetime(fetched_at) >= datetime('now', '-30 days')
1030             ORDER BY model_name ASC",
1031        ) else {
1032            return Vec::new();
1033        };
1034        let Ok(rows) = stmt.query_map(params![provider_id], |row| row.get::<_, String>(0)) else {
1035            return Vec::new();
1036        };
1037        rows.filter_map(|row| row.ok()).collect()
1038    }
1039
1040    fn upsert_graph_node(&self, node: GraphNode) -> anyhow::Result<()> {
1041        validate_memory_text("graph node id", &node.id, 160)?;
1042        validate_memory_text("graph node label", &node.label, 512)?;
1043        validate_memory_text("graph node kind", &node.kind, 80)?;
1044        validate_memory_text("graph node properties", &node.properties.to_string(), 4000)?;
1045        let redaction = RedactionFilter::new();
1046        let safe_id = redaction.redact_str(&node.id);
1047        let safe_label = redaction.redact_str(&node.label);
1048        let safe_kind = redaction.redact_str(&node.kind);
1049        let safe_properties = serde_json::to_string(&node.properties)?;
1050        let safe_properties = redaction.redact_str(&safe_properties);
1051        let now = chrono::Utc::now().to_rfc3339();
1052        let created_at = if node.created_at.trim().is_empty() {
1053            now.clone()
1054        } else {
1055            node.created_at
1056        };
1057        let updated_at = if node.updated_at.trim().is_empty() {
1058            now
1059        } else {
1060            node.updated_at
1061        };
1062        let conn = self.conn.lock().unwrap();
1063        conn.execute(
1064            "INSERT INTO kg_nodes (id, label, kind, properties_json, created_at, updated_at)
1065             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1066             ON CONFLICT(id) DO UPDATE SET
1067                label = excluded.label,
1068                kind = excluded.kind,
1069                properties_json = excluded.properties_json,
1070                updated_at = excluded.updated_at",
1071            params![
1072                safe_id,
1073                safe_label,
1074                safe_kind,
1075                safe_properties,
1076                created_at,
1077                updated_at
1078            ],
1079        )?;
1080        Ok(())
1081    }
1082
1083    fn upsert_graph_edge(&self, edge: GraphEdge) -> anyhow::Result<()> {
1084        validate_memory_text("graph edge id", &edge.id, 160)?;
1085        validate_memory_text("graph edge from_id", &edge.from_id, 160)?;
1086        validate_memory_text("graph edge to_id", &edge.to_id, 160)?;
1087        validate_memory_text("graph edge relation", &edge.relation, 120)?;
1088        validate_memory_text("graph edge properties", &edge.properties.to_string(), 4000)?;
1089        let redaction = RedactionFilter::new();
1090        let safe_properties = serde_json::to_string(&edge.properties)?;
1091        let safe_properties = redaction.redact_str(&safe_properties);
1092        let now = chrono::Utc::now().to_rfc3339();
1093        let created_at = if edge.created_at.trim().is_empty() {
1094            now.clone()
1095        } else {
1096            edge.created_at
1097        };
1098        let updated_at = if edge.updated_at.trim().is_empty() {
1099            now
1100        } else {
1101            edge.updated_at
1102        };
1103        let conn = self.conn.lock().unwrap();
1104        conn.execute(
1105            "INSERT INTO kg_edges (id, from_id, to_id, relation, weight, properties_json, created_at, updated_at)
1106             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1107             ON CONFLICT(id) DO UPDATE SET
1108                from_id = excluded.from_id,
1109                to_id = excluded.to_id,
1110                relation = excluded.relation,
1111                weight = excluded.weight,
1112                properties_json = excluded.properties_json,
1113                updated_at = excluded.updated_at",
1114            params![
1115                redaction.redact_str(&edge.id),
1116                redaction.redact_str(&edge.from_id),
1117                redaction.redact_str(&edge.to_id),
1118                redaction.redact_str(&edge.relation),
1119                edge.weight,
1120                safe_properties,
1121                created_at,
1122                updated_at
1123            ],
1124        )?;
1125        Ok(())
1126    }
1127
1128    fn graph_node(&self, id: &str) -> Option<GraphNode> {
1129        let conn = self.conn.lock().unwrap();
1130        conn.query_row(
1131            "SELECT id, label, kind, properties_json, created_at, updated_at
1132             FROM kg_nodes WHERE id = ?1",
1133            params![id],
1134            graph_node_from_row,
1135        )
1136        .ok()
1137    }
1138
1139    fn graph_neighbors(
1140        &self,
1141        id: &str,
1142        direction: GraphDirection,
1143        limit: usize,
1144    ) -> Vec<(GraphEdge, GraphNode)> {
1145        let conn = self.conn.lock().unwrap();
1146        let limit = limit.clamp(1, 100) as i64;
1147        let sql = match direction {
1148            GraphDirection::Outgoing => {
1149                "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1150                        n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1151                 FROM kg_edges e
1152                 JOIN kg_nodes n ON n.id = e.to_id
1153                 WHERE e.from_id = ?1
1154                 ORDER BY e.weight DESC, e.updated_at DESC
1155                 LIMIT ?2"
1156            }
1157            GraphDirection::Incoming => {
1158                "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1159                        n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1160                 FROM kg_edges e
1161                 JOIN kg_nodes n ON n.id = e.from_id
1162                 WHERE e.to_id = ?1
1163                 ORDER BY e.weight DESC, e.updated_at DESC
1164                 LIMIT ?2"
1165            }
1166            GraphDirection::Both => {
1167                "SELECT e.id, e.from_id, e.to_id, e.relation, e.weight, e.properties_json, e.created_at, e.updated_at,
1168                        n.id, n.label, n.kind, n.properties_json, n.created_at, n.updated_at
1169                 FROM kg_edges e
1170                 JOIN kg_nodes n ON n.id = CASE WHEN e.from_id = ?1 THEN e.to_id ELSE e.from_id END
1171                 WHERE e.from_id = ?1 OR e.to_id = ?1
1172                 ORDER BY e.weight DESC, e.updated_at DESC
1173                 LIMIT ?2"
1174            }
1175        };
1176        let Ok(mut stmt) = conn.prepare(sql) else {
1177            return Vec::new();
1178        };
1179        let Ok(rows) = stmt.query_map(params![id, limit], |row| {
1180            let edge = graph_edge_from_row(row)?;
1181            let properties_json: String = row.get(11)?;
1182            let node = GraphNode {
1183                id: row.get(8)?,
1184                label: row.get(9)?,
1185                kind: row.get(10)?,
1186                properties: serde_json::from_str(&properties_json)
1187                    .unwrap_or(serde_json::Value::Null),
1188                created_at: row.get(12)?,
1189                updated_at: row.get(13)?,
1190            };
1191            Ok((edge, node))
1192        }) else {
1193            return Vec::new();
1194        };
1195        rows.filter_map(|row| row.ok()).collect()
1196    }
1197
1198    fn search_graph(&self, query: &str, limit: usize) -> Vec<GraphNode> {
1199        let query = query.trim();
1200        if query.is_empty() {
1201            return Vec::new();
1202        }
1203        let escaped = query
1204            .replace('\\', "\\\\")
1205            .replace('%', "\\%")
1206            .replace('_', "\\_");
1207        let pattern = format!("%{}%", escaped);
1208        let conn = self.conn.lock().unwrap();
1209        let Ok(mut stmt) = conn.prepare(
1210            "SELECT id, label, kind, properties_json, created_at, updated_at
1211             FROM kg_nodes
1212             WHERE id LIKE ?1 ESCAPE '\\'
1213                OR label LIKE ?1 ESCAPE '\\'
1214                OR kind LIKE ?1 ESCAPE '\\'
1215                OR properties_json LIKE ?1 ESCAPE '\\'
1216             ORDER BY updated_at DESC
1217             LIMIT ?2",
1218        ) else {
1219            return Vec::new();
1220        };
1221        let Ok(rows) = stmt.query_map(
1222            params![pattern, limit.clamp(1, 100) as i64],
1223            graph_node_from_row,
1224        ) else {
1225            return Vec::new();
1226        };
1227        rows.filter_map(|row| row.ok()).collect()
1228    }
1229
1230    fn graph_export(&self) -> KnowledgeGraph {
1231        let conn = self.conn.lock().unwrap();
1232        let nodes = conn
1233            .prepare(
1234                "SELECT id, label, kind, properties_json, created_at, updated_at
1235                 FROM kg_nodes ORDER BY kind, label",
1236            )
1237            .and_then(|mut stmt| {
1238                let rows = stmt.query_map([], graph_node_from_row)?;
1239                Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1240            })
1241            .unwrap_or_default();
1242        let edges = conn
1243            .prepare(
1244                "SELECT id, from_id, to_id, relation, weight, properties_json, created_at, updated_at
1245                 FROM kg_edges ORDER BY relation, from_id, to_id",
1246            )
1247            .and_then(|mut stmt| {
1248                let rows = stmt.query_map([], graph_edge_from_row)?;
1249                Ok(rows.filter_map(|row| row.ok()).collect::<Vec<_>>())
1250            })
1251            .unwrap_or_default();
1252        KnowledgeGraph { nodes, edges }
1253    }
1254
1255    fn delete_graph_node(&self, id: &str) -> anyhow::Result<()> {
1256        let conn = self.conn.lock().unwrap();
1257        conn.execute(
1258            "DELETE FROM kg_edges WHERE from_id = ?1 OR to_id = ?1",
1259            params![id],
1260        )?;
1261        conn.execute("DELETE FROM kg_nodes WHERE id = ?1", params![id])?;
1262        Ok(())
1263    }
1264
1265    fn delete_graph_edge(&self, id: &str) -> anyhow::Result<()> {
1266        let conn = self.conn.lock().unwrap();
1267        conn.execute("DELETE FROM kg_edges WHERE id = ?1", params![id])?;
1268        Ok(())
1269    }
1270}