Skip to main content

brainos_storage/
sqlite.rs

1//! SQLite storage backend.
2//!
3//! Provides connection management, schema migrations,
4//! and typed CRUD operations for all Brain data:
5//! - Episodes (conversations)
6//! - Semantic facts (user model, extracted knowledge)
7//! - Sessions (conversation grouping)
8
9use std::path::Path;
10use std::sync::{Arc, Mutex};
11
12use rusqlite::Connection;
13use thiserror::Error;
14use tracing::info;
15use uuid::Uuid;
16
17#[cfg(feature = "encryption")]
18use crate::encryption::Encryptor;
19
20/// Errors from the SQLite storage layer.
21#[derive(Debug, Error)]
22pub enum SqliteError {
23    #[error("SQLite error: {0}")]
24    Rusqlite(#[from] rusqlite::Error),
25
26    #[error("Lock poisoned")]
27    LockPoisoned,
28
29    #[error("Migration failed: {0}")]
30    Migration(String),
31}
32
33/// A semantic fact for export/import operations.
34#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35pub struct ExportedFact {
36    pub id: String,
37    pub namespace: String,
38    pub category: String,
39    pub subject: String,
40    pub predicate: String,
41    pub object: String,
42    pub confidence: f64,
43    pub source_episode_id: Option<String>,
44}
45
46/// An episodic memory entry for export/import operations.
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48pub struct ExportedEpisode {
49    pub id: String,
50    pub session_id: String,
51    pub session_channel: String,
52    #[serde(default = "default_namespace")]
53    pub namespace: String,
54    pub role: String,
55    pub content: String,
56    pub timestamp: String,
57    pub importance: f64,
58    pub reinforcement_count: i32,
59}
60
61fn default_namespace() -> String {
62    "personal".to_string()
63}
64
65/// A notification queued for delivery to the user.
66#[derive(Debug, Clone)]
67pub struct Notification {
68    pub id: String,
69    pub content: String,
70    pub priority: i32,
71    pub triggered_by: String,
72    pub created_at: String,
73    pub delivered_at: Option<String>,
74    pub channel: Option<String>,
75}
76
77/// Thread-safe SQLite connection wrapper.
78///
79/// Uses a `Mutex<Connection>` — sufficient for our single-process,
80/// moderate-write workload. If we ever need concurrent writers,
81/// switch to `r2d2` or WAL mode (already enabled).
82///
83/// When an `Encryptor` is set, `content` columns are transparently
84/// encrypted on write and decrypted on read by the store layers.
85#[derive(Clone)]
86pub struct SqlitePool {
87    conn: Arc<Mutex<Connection>>,
88    #[cfg(feature = "encryption")]
89    encryptor: Option<Arc<Encryptor>>,
90}
91
92/// Persisted scheduling intent (persist-only mode, no internal runtime).
93#[derive(Debug, Clone)]
94pub struct ScheduledIntent {
95    pub id: String,
96    pub description: String,
97    pub cron: Option<String>,
98    pub namespace: String,
99    pub created_at: String,
100    pub status: String,
101    pub metadata: Option<String>,
102}
103
104impl SqlitePool {
105    /// Open a new SQLite database at the given path.
106    ///
107    /// - Creates the file if it doesn't exist
108    /// - Enables WAL mode for concurrent reads
109    /// - Enables foreign keys
110    /// - Runs all schema migrations
111    pub fn open(path: &Path) -> Result<Self, SqliteError> {
112        // Ensure parent directory exists
113        if let Some(parent) = path.parent() {
114            std::fs::create_dir_all(parent).map_err(|e| {
115                SqliteError::Migration(format!("Cannot create directory {}: {e}", parent.display()))
116            })?;
117        }
118
119        let conn = Connection::open(path)?;
120
121        // Performance and safety pragmas — foreign_keys enforced by SQLite.
122        conn.execute_batch(
123            "
124            PRAGMA journal_mode = WAL;
125            PRAGMA synchronous = NORMAL;
126            PRAGMA foreign_keys = ON;
127            PRAGMA busy_timeout = 5000;
128            PRAGMA cache_size = -8000;
129            ",
130        )?;
131
132        let pool = Self {
133            conn: Arc::new(Mutex::new(conn)),
134            #[cfg(feature = "encryption")]
135            encryptor: None,
136        };
137
138        // Run migrations
139        pool.migrate()?;
140
141        info!("SQLite database opened at {}", path.display());
142        Ok(pool)
143    }
144
145    /// Open an in-memory database (for testing).
146    pub fn open_memory() -> Result<Self, SqliteError> {
147        let conn = Connection::open_in_memory()?;
148        conn.execute_batch(
149            "
150            PRAGMA journal_mode = WAL;
151            PRAGMA foreign_keys = ON;
152            ",
153        )?;
154
155        let pool = Self {
156            conn: Arc::new(Mutex::new(conn)),
157            #[cfg(feature = "encryption")]
158            encryptor: None,
159        };
160
161        pool.migrate()?;
162        Ok(pool)
163    }
164
165    /// Execute a closure with an exclusive lock on the connection.
166    pub fn with_conn<F, T>(&self, f: F) -> Result<T, SqliteError>
167    where
168        F: FnOnce(&Connection) -> Result<T, SqliteError>,
169    {
170        let conn = self.conn.lock().map_err(|_| SqliteError::LockPoisoned)?;
171        f(&conn)
172    }
173
174    /// Attach an encryptor to this pool (builder pattern).
175    ///
176    /// Once set, `encrypt_content` / `decrypt_content` are active on all
177    /// store layers that use this pool.
178    #[cfg(feature = "encryption")]
179    pub fn with_encryptor(mut self, enc: Encryptor) -> Self {
180        self.encryptor = Some(Arc::new(enc));
181        self
182    }
183
184    /// Returns true if an encryptor is active.
185    pub fn is_encrypted(&self) -> bool {
186        #[cfg(feature = "encryption")]
187        {
188            self.encryptor.is_some()
189        }
190        #[cfg(not(feature = "encryption"))]
191        {
192            false
193        }
194    }
195
196    /// Encrypt a string if encryption is enabled, otherwise return as-is.
197    pub fn encrypt_content(&self, plaintext: &str) -> String {
198        #[cfg(feature = "encryption")]
199        {
200            if let Some(enc) = &self.encryptor {
201                return enc
202                    .encrypt_string(plaintext)
203                    .unwrap_or_else(|_| plaintext.to_string());
204            }
205        }
206        plaintext.to_string()
207    }
208
209    /// Decrypt a string if encryption is enabled.
210    ///
211    /// Falls back to returning the input unchanged if decryption fails
212    /// (e.g. legacy plaintext rows written before encryption was enabled).
213    pub fn decrypt_content(&self, maybe_ciphertext: &str) -> String {
214        #[cfg(feature = "encryption")]
215        {
216            if let Some(enc) = &self.encryptor {
217                return enc
218                    .decrypt_string(maybe_ciphertext)
219                    .unwrap_or_else(|_| maybe_ciphertext.to_string());
220            }
221        }
222        maybe_ciphertext.to_string()
223    }
224
225    /// Try to decrypt a string, returning `None` if decryption fails.
226    ///
227    /// Unlike `decrypt_content`, this does NOT fall back to returning raw
228    /// ciphertext. Use this at read boundaries to filter out rows that
229    /// were encrypted with a different key or are corrupted.
230    pub fn try_decrypt_content(&self, maybe_ciphertext: &str) -> Option<String> {
231        #[cfg(feature = "encryption")]
232        {
233            if let Some(enc) = &self.encryptor {
234                return enc.decrypt_string(maybe_ciphertext).ok();
235            }
236        }
237        Some(maybe_ciphertext.to_string())
238    }
239
240    /// Flush the WAL file into the main database file.
241    ///
242    /// Should be called on graceful shutdown to ensure all committed writes are
243    /// fully persisted and the WAL file is clean. Uses `TRUNCATE` mode which
244    /// also resets the WAL to zero size.
245    pub fn wal_checkpoint(&self) -> Result<(), SqliteError> {
246        self.with_conn(|conn| {
247            conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
248            Ok(())
249        })
250    }
251
252    /// Persist a scheduled intent and return its generated ID.
253    pub fn insert_scheduled_intent(
254        &self,
255        description: &str,
256        cron: Option<&str>,
257        namespace: &str,
258        metadata: Option<&str>,
259    ) -> Result<String, SqliteError> {
260        let id = Uuid::new_v4().to_string();
261        self.with_conn(|conn| {
262            conn.execute(
263                "INSERT INTO scheduled_intents (id, description, cron, namespace, metadata)
264                 VALUES (?1, ?2, ?3, ?4, ?5)",
265                rusqlite::params![id, description, cron, namespace, metadata],
266            )?;
267            Ok(())
268        })?;
269        Ok(id)
270    }
271
272    /// List scheduled intents, optionally filtered by namespace.
273    pub fn list_scheduled_intents(
274        &self,
275        namespace: Option<&str>,
276    ) -> Result<Vec<ScheduledIntent>, SqliteError> {
277        self.with_conn(|conn| {
278            let mut intents = Vec::new();
279            if let Some(ns) = namespace {
280                let mut stmt = conn.prepare(
281                    "SELECT id, description, cron, namespace, created_at, status, metadata
282                     FROM scheduled_intents
283                     WHERE namespace = ?1 OR namespace LIKE ?2
284                     ORDER BY created_at DESC",
285                )?;
286                let prefix = format!("{}/%", ns);
287                let rows = stmt.query_map([ns, &prefix], |row| {
288                    Ok(ScheduledIntent {
289                        id: row.get(0)?,
290                        description: row.get(1)?,
291                        cron: row.get(2)?,
292                        namespace: row.get(3)?,
293                        created_at: row.get(4)?,
294                        status: row.get(5)?,
295                        metadata: row.get(6)?,
296                    })
297                })?;
298                for row in rows {
299                    intents.push(row?);
300                }
301            } else {
302                let mut stmt = conn.prepare(
303                    "SELECT id, description, cron, namespace, created_at, status, metadata
304                     FROM scheduled_intents
305                     ORDER BY created_at DESC",
306                )?;
307                let rows = stmt.query_map([], |row| {
308                    Ok(ScheduledIntent {
309                        id: row.get(0)?,
310                        description: row.get(1)?,
311                        cron: row.get(2)?,
312                        namespace: row.get(3)?,
313                        created_at: row.get(4)?,
314                        status: row.get(5)?,
315                        metadata: row.get(6)?,
316                    })
317                })?;
318                for row in rows {
319                    intents.push(row?);
320                }
321            }
322            Ok(intents)
323        })
324    }
325
326    /// Update a scheduled intent status. Returns true when a row was updated.
327    pub fn update_scheduled_intent_status(
328        &self,
329        id: &str,
330        status: &str,
331    ) -> Result<bool, SqliteError> {
332        self.with_conn(|conn| {
333            let affected = conn.execute(
334                "UPDATE scheduled_intents SET status = ?2 WHERE id = ?1",
335                rusqlite::params![id, status],
336            )?;
337            Ok(affected > 0)
338        })
339    }
340
341    /// Cancel a scheduled intent (set status to "cancelled").
342    pub fn cancel_scheduled_intent(&self, id: &str) -> Result<bool, SqliteError> {
343        self.update_scheduled_intent_status(id, "cancelled")
344    }
345
346    /// Return all scheduled intents with status `"scheduled"` (i.e. pending execution).
347    pub fn due_scheduled_intents(&self) -> Result<Vec<ScheduledIntent>, SqliteError> {
348        self.with_conn(|conn| {
349            let mut stmt = conn.prepare(
350                "SELECT id, description, cron, namespace, created_at, status, metadata
351                 FROM scheduled_intents
352                 WHERE status = 'scheduled'
353                 ORDER BY created_at ASC",
354            )?;
355            let rows = stmt.query_map([], |row| {
356                Ok(ScheduledIntent {
357                    id: row.get(0)?,
358                    description: row.get(1)?,
359                    cron: row.get(2)?,
360                    namespace: row.get(3)?,
361                    created_at: row.get(4)?,
362                    status: row.get(5)?,
363                    metadata: row.get(6)?,
364                })
365            })?;
366            Ok(rows.filter_map(|r| r.ok()).collect())
367        })
368    }
369
370    /// Run all schema migrations.
371    fn migrate(&self) -> Result<(), SqliteError> {
372        self.with_conn(|conn| {
373            // Create migrations tracking table
374            conn.execute_batch(
375                "CREATE TABLE IF NOT EXISTS _migrations (
376                    version INTEGER PRIMARY KEY,
377                    name TEXT NOT NULL,
378                    applied_at TEXT NOT NULL DEFAULT (datetime('now'))
379                );",
380            )?;
381
382            let current_version: i64 = conn
383                .query_row(
384                    "SELECT COALESCE(MAX(version), 0) FROM _migrations",
385                    [],
386                    |row| row.get(0),
387                )
388                .unwrap_or(0);
389
390            let migrations = Self::migrations();
391
392            for (version, name, sql) in &migrations {
393                if *version > current_version {
394                    info!("Running migration {version}: {name}");
395                    conn.execute_batch(sql).map_err(|e| {
396                        SqliteError::Migration(format!("Migration {version} ({name}) failed: {e}"))
397                    })?;
398
399                    conn.execute(
400                        "INSERT INTO _migrations (version, name) VALUES (?1, ?2)",
401                        rusqlite::params![version, name],
402                    )?;
403                }
404            }
405
406            if current_version < migrations.last().map_or(0, |m| m.0) {
407                info!(
408                    "Migrations complete (v{current_version} → v{})",
409                    migrations.last().expect("BUG: migrations list is empty").0
410                );
411            }
412
413            Ok(())
414        })
415    }
416
417    /// All schema migrations in order.
418    fn migrations() -> Vec<(i64, &'static str, &'static str)> {
419        vec![
420            (
421                1,
422                "create_sessions",
423                "
424                CREATE TABLE IF NOT EXISTS sessions (
425                    id TEXT PRIMARY KEY,
426                    started_at TEXT NOT NULL DEFAULT (datetime('now')),
427                    ended_at TEXT,
428                    channel TEXT NOT NULL DEFAULT 'cli',
429                    metadata TEXT
430                );
431            ",
432            ),
433            (
434                2,
435                "create_episodes",
436                "
437                CREATE TABLE IF NOT EXISTS episodes (
438                    id TEXT PRIMARY KEY,
439                    session_id TEXT NOT NULL REFERENCES sessions(id),
440                    role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')),
441                    content TEXT NOT NULL,
442                    timestamp TEXT NOT NULL DEFAULT (datetime('now')),
443                    importance REAL NOT NULL DEFAULT 0.5,
444                    decay_rate REAL NOT NULL DEFAULT 0.1,
445                    reinforcement_count INTEGER NOT NULL DEFAULT 0,
446                    last_accessed TEXT,
447                    metadata TEXT
448                );
449
450                CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id);
451                CREATE INDEX IF NOT EXISTS idx_episodes_timestamp ON episodes(timestamp DESC);
452                CREATE INDEX IF NOT EXISTS idx_episodes_importance ON episodes(importance DESC);
453            ",
454            ),
455            (
456                3,
457                "create_episodes_fts",
458                "
459                CREATE VIRTUAL TABLE IF NOT EXISTS episodes_fts USING fts5(
460                    content,
461                    content_rowid='rowid',
462                    tokenize='porter unicode61'
463                );
464            ",
465            ),
466            (
467                4,
468                "create_semantic_facts",
469                "
470                CREATE TABLE IF NOT EXISTS semantic_facts (
471                    id TEXT PRIMARY KEY,
472                    category TEXT NOT NULL,
473                    subject TEXT NOT NULL,
474                    predicate TEXT NOT NULL,
475                    object TEXT NOT NULL,
476                    confidence REAL NOT NULL DEFAULT 1.0,
477                    source_episode_id TEXT REFERENCES episodes(id) ON DELETE SET NULL,
478                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
479                    updated_at TEXT NOT NULL DEFAULT (datetime('now')),
480                    superseded_by TEXT REFERENCES semantic_facts(id) ON DELETE SET NULL
481                );
482
483                CREATE INDEX IF NOT EXISTS idx_facts_category ON semantic_facts(category);
484                CREATE INDEX IF NOT EXISTS idx_facts_subject ON semantic_facts(subject);
485            ",
486            ),
487            (
488                5,
489                "create_user_profile",
490                "
491                CREATE TABLE IF NOT EXISTS user_profile (
492                    key TEXT PRIMARY KEY,
493                    value TEXT NOT NULL,
494                    source TEXT,
495                    updated_at TEXT NOT NULL DEFAULT (datetime('now'))
496                );
497            ",
498            ),
499            (
500                6,
501                "create_procedures",
502                "
503                CREATE TABLE IF NOT EXISTS procedures (
504                    id              TEXT PRIMARY KEY,
505                    trigger_pattern TEXT NOT NULL,
506                    steps_json      TEXT NOT NULL DEFAULT '[]',
507                    created_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
508                    updated_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
509                    use_count       INTEGER NOT NULL DEFAULT 0
510                );
511                CREATE INDEX IF NOT EXISTS idx_procedures_trigger
512                    ON procedures(trigger_pattern);
513            ",
514            ),
515            (
516                7,
517                "create_audit_log",
518                "
519                CREATE TABLE IF NOT EXISTS audit_log (
520                    id INTEGER PRIMARY KEY AUTOINCREMENT,
521                    timestamp TEXT NOT NULL DEFAULT (datetime('now')),
522                    action TEXT NOT NULL,
523                    details TEXT,
524                    prev_hash TEXT,
525                    hash TEXT NOT NULL
526                );
527
528                CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC);
529            ",
530            ),
531            (
532                10,
533                "add_namespace_to_semantic_facts",
534                "
535                ALTER TABLE semantic_facts ADD COLUMN namespace TEXT NOT NULL DEFAULT 'personal';
536                CREATE INDEX IF NOT EXISTS idx_facts_namespace ON semantic_facts(namespace);
537            ",
538            ),
539            (
540                11,
541                "add_namespace_to_episodes",
542                "
543                ALTER TABLE episodes ADD COLUMN namespace TEXT NOT NULL DEFAULT 'personal';
544                CREATE INDEX IF NOT EXISTS idx_episodes_namespace ON episodes(namespace);
545            ",
546            ),
547            (
548                12,
549                "rebuild_procedures_table",
550                "
551                DROP TABLE IF EXISTS procedures;
552                CREATE TABLE IF NOT EXISTS procedures (
553                    id              TEXT PRIMARY KEY,
554                    trigger_pattern TEXT NOT NULL,
555                    steps_json      TEXT NOT NULL DEFAULT '[]',
556                    created_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
557                    updated_at      TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
558                    use_count       INTEGER NOT NULL DEFAULT 0
559                );
560                CREATE INDEX IF NOT EXISTS idx_procedures_trigger
561                    ON procedures(trigger_pattern);
562            ",
563            ),
564            (
565                13,
566                "create_episode_promotions",
567                "
568                CREATE TABLE IF NOT EXISTS episode_promotions (
569                    episode_id TEXT PRIMARY KEY REFERENCES episodes(id) ON DELETE CASCADE,
570                    fact_id TEXT NOT NULL REFERENCES semantic_facts(id) ON DELETE CASCADE,
571                    promoted_at TEXT NOT NULL DEFAULT (datetime('now'))
572                );
573            ",
574            ),
575            (
576                14,
577                "create_scheduled_intents",
578                "
579                CREATE TABLE IF NOT EXISTS scheduled_intents (
580                    id TEXT PRIMARY KEY,
581                    description TEXT NOT NULL,
582                    cron TEXT,
583                    namespace TEXT NOT NULL DEFAULT 'personal',
584                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
585                    status TEXT NOT NULL DEFAULT 'scheduled',
586                    metadata TEXT
587                );
588                CREATE INDEX IF NOT EXISTS idx_scheduled_intents_namespace
589                    ON scheduled_intents(namespace);
590                CREATE INDEX IF NOT EXISTS idx_scheduled_intents_status
591                    ON scheduled_intents(status);
592            ",
593            ),
594            (
595                15,
596                "create_notification_outbox",
597                "
598                CREATE TABLE IF NOT EXISTS notification_outbox (
599                    id TEXT PRIMARY KEY,
600                    content TEXT NOT NULL,
601                    priority INTEGER NOT NULL DEFAULT 1,
602                    triggered_by TEXT NOT NULL DEFAULT '',
603                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
604                    delivered_at TEXT,
605                    channel TEXT
606                );
607                CREATE INDEX IF NOT EXISTS idx_outbox_pending
608                    ON notification_outbox(delivered_at, priority, created_at)
609                    WHERE delivered_at IS NULL;
610            ",
611            ),
612            (
613                16,
614                "add_agent_column",
615                "
616                ALTER TABLE episodes ADD COLUMN agent TEXT;
617                ALTER TABLE semantic_facts ADD COLUMN agent TEXT;
618            ",
619            ),
620            (
621                17,
622                "fix_orphaned_facts",
623                "
624                -- Clear orphaned source_episode_id references
625                UPDATE semantic_facts SET source_episode_id = NULL 
626                WHERE source_episode_id NOT IN (SELECT id FROM episodes);
627                -- Clear orphaned superseded_by references
628                UPDATE semantic_facts SET superseded_by = NULL 
629                WHERE superseded_by NOT IN (SELECT id FROM semantic_facts);
630            ",
631            ),
632        ]
633    }
634
635    /// Get the current schema version.
636    pub fn schema_version(&self) -> Result<i64, SqliteError> {
637        self.with_conn(|conn| {
638            let version: i64 = conn
639                .query_row(
640                    "SELECT COALESCE(MAX(version), 0) FROM _migrations",
641                    [],
642                    |row| row.get(0),
643                )
644                .unwrap_or(0);
645            Ok(version)
646        })
647    }
648
649    /// Insert a notification into the outbox for later delivery.
650    pub fn insert_notification(
651        &self,
652        content: &str,
653        priority: i32,
654        triggered_by: &str,
655        channel: Option<&str>,
656    ) -> Result<String, SqliteError> {
657        let id = Uuid::new_v4().to_string();
658        self.with_conn(|conn| {
659            conn.execute(
660                "INSERT INTO notification_outbox (id, content, priority, triggered_by, channel)
661                 VALUES (?1, ?2, ?3, ?4, ?5)",
662                rusqlite::params![id, content, priority, triggered_by, channel],
663            )?;
664            Ok(())
665        })?;
666        Ok(id)
667    }
668
669    /// Fetch all pending (undelivered) notifications, ordered by priority then age.
670    pub fn pending_notifications(&self, limit: usize) -> Result<Vec<Notification>, SqliteError> {
671        self.with_conn(|conn| {
672            let mut stmt = conn.prepare(
673                "SELECT id, content, priority, triggered_by, created_at, delivered_at, channel
674                 FROM notification_outbox
675                 WHERE delivered_at IS NULL
676                 ORDER BY priority DESC, created_at ASC
677                 LIMIT ?1",
678            )?;
679            let rows = stmt
680                .query_map([limit as i64], |row| {
681                    Ok(Notification {
682                        id: row.get(0)?,
683                        content: row.get(1)?,
684                        priority: row.get(2)?,
685                        triggered_by: row.get(3)?,
686                        created_at: row.get(4)?,
687                        delivered_at: row.get(5)?,
688                        channel: row.get(6)?,
689                    })
690                })?
691                .collect::<Result<Vec<_>, _>>()?;
692            Ok(rows)
693        })
694    }
695
696    /// Mark a notification as delivered (sets `delivered_at` to now).
697    pub fn mark_notification_delivered(&self, id: &str) -> Result<bool, SqliteError> {
698        self.with_conn(|conn| {
699            let affected = conn.execute(
700                "UPDATE notification_outbox SET delivered_at = datetime('now') WHERE id = ?1 AND delivered_at IS NULL",
701                [id],
702            )?;
703            Ok(affected > 0)
704        })
705    }
706
707    /// Prune old delivered notifications and stale undelivered ones.
708    pub fn prune_notifications(&self, max_age_days: u32) -> Result<usize, SqliteError> {
709        self.with_conn(|conn| {
710            let deleted = conn.execute(
711                "DELETE FROM notification_outbox
712                 WHERE (delivered_at IS NOT NULL AND created_at < datetime('now', ?1))
713                    OR created_at < datetime('now', ?1)",
714                [format!("-{max_age_days} days")],
715            )?;
716            Ok(deleted)
717        })
718    }
719
720    // ── Export / Import ──────────────────────────────────────────────────────
721
722    /// Export all semantic facts ordered by ID.
723    pub fn export_all_facts(&self) -> Result<Vec<ExportedFact>, SqliteError> {
724        self.with_conn(|conn| {
725            let mut stmt = conn.prepare(
726                "SELECT id, namespace, category, subject, predicate, object,
727                        confidence, source_episode_id
728                 FROM semantic_facts
729                 ORDER BY id ASC",
730            )?;
731            let rows = stmt
732                .query_map([], |row| {
733                    Ok(ExportedFact {
734                        id: row.get(0)?,
735                        namespace: row.get(1)?,
736                        category: row.get(2)?,
737                        subject: row.get(3)?,
738                        predicate: row.get(4)?,
739                        object: row.get(5)?,
740                        confidence: row.get(6)?,
741                        source_episode_id: row.get(7)?,
742                    })
743                })?
744                .collect::<Result<Vec<_>, _>>()?;
745            Ok(rows)
746        })
747    }
748
749    /// Export all episodes with session info, ordered by timestamp.
750    pub fn export_all_episodes(&self) -> Result<Vec<ExportedEpisode>, SqliteError> {
751        self.with_conn(|conn| {
752            let mut stmt = conn.prepare(
753                "SELECT e.id, e.session_id, COALESCE(s.channel, 'cli'),
754                        e.namespace, e.role, e.content, e.timestamp,
755                        e.importance, e.reinforcement_count
756                 FROM episodes e
757                 LEFT JOIN sessions s ON s.id = e.session_id
758                 ORDER BY e.timestamp ASC",
759            )?;
760            let rows = stmt
761                .query_map([], |row| {
762                    Ok(ExportedEpisode {
763                        id: row.get(0)?,
764                        session_id: row.get(1)?,
765                        session_channel: row.get(2)?,
766                        namespace: row.get(3)?,
767                        role: row.get(4)?,
768                        content: row.get(5)?,
769                        timestamp: row.get(6)?,
770                        importance: row.get(7)?,
771                        reinforcement_count: row.get(8)?,
772                    })
773                })?
774                .collect::<Result<Vec<_>, _>>()?;
775            Ok(rows)
776        })
777    }
778
779    /// Import facts (ON CONFLICT DO NOTHING). Returns (imported_count, new_indices).
780    pub fn import_facts(&self, facts: &[ExportedFact]) -> Result<(usize, Vec<usize>), SqliteError> {
781        self.with_conn(|conn| {
782            let mut imported = 0usize;
783            let mut new_indices = Vec::new();
784            for (idx, f) in facts.iter().enumerate() {
785                let n = conn.execute(
786                    "INSERT INTO semantic_facts
787                        (id, namespace, category, subject, predicate, object,
788                         confidence, source_episode_id)
789                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
790                     ON CONFLICT(id) DO NOTHING",
791                    rusqlite::params![
792                        f.id,
793                        f.namespace,
794                        f.category,
795                        f.subject,
796                        f.predicate,
797                        f.object,
798                        f.confidence,
799                        f.source_episode_id
800                    ],
801                )?;
802                if n > 0 {
803                    new_indices.push(idx);
804                }
805                imported += n;
806            }
807            Ok((imported, new_indices))
808        })
809    }
810
811    /// Import episodes (ON CONFLICT DO NOTHING). Returns count of newly imported episodes.
812    pub fn import_episodes(&self, episodes: &[ExportedEpisode]) -> Result<usize, SqliteError> {
813        self.with_conn(|conn| {
814            // Ensure sessions exist first
815            let mut sessions: std::collections::HashMap<String, String> =
816                std::collections::HashMap::new();
817            for ep in episodes {
818                sessions
819                    .entry(ep.session_id.clone())
820                    .or_insert_with(|| ep.session_channel.clone());
821            }
822            for (sid, channel) in &sessions {
823                conn.execute(
824                    "INSERT INTO sessions (id, channel) VALUES (?1, ?2)
825                     ON CONFLICT(id) DO NOTHING",
826                    rusqlite::params![sid, channel],
827                )?;
828            }
829
830            let mut imported = 0usize;
831            for e in episodes {
832                let n = conn.execute(
833                    "INSERT INTO episodes
834                        (id, session_id, namespace, role, content, timestamp,
835                         importance, reinforcement_count)
836                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
837                     ON CONFLICT(id) DO NOTHING",
838                    rusqlite::params![
839                        e.id,
840                        e.session_id,
841                        e.namespace,
842                        e.role,
843                        e.content,
844                        e.timestamp,
845                        e.importance,
846                        e.reinforcement_count
847                    ],
848                )?;
849                imported += n;
850            }
851            Ok(imported)
852        })
853    }
854
855    /// Get table row counts for status display.
856    pub fn table_stats(&self) -> Result<Vec<(String, i64)>, SqliteError> {
857        self.with_conn(|conn| {
858            let mut stats = Vec::new();
859            // Whitelist approach: each match arm is both the table name and its SQL,
860            // preventing any possibility of SQL injection via table name interpolation.
861            for table in &[
862                "sessions",
863                "episodes",
864                "semantic_facts",
865                "episode_promotions",
866                "scheduled_intents",
867                "notification_outbox",
868                "user_profile",
869                "procedures",
870                "audit_log",
871            ] {
872                let sql = match *table {
873                    "sessions" => "SELECT COUNT(*) FROM sessions",
874                    "episodes" => "SELECT COUNT(*) FROM episodes",
875                    "semantic_facts" => "SELECT COUNT(*) FROM semantic_facts",
876                    "episode_promotions" => "SELECT COUNT(*) FROM episode_promotions",
877                    "scheduled_intents" => "SELECT COUNT(*) FROM scheduled_intents",
878                    "notification_outbox" => "SELECT COUNT(*) FROM notification_outbox",
879                    "user_profile" => "SELECT COUNT(*) FROM user_profile",
880                    "procedures" => "SELECT COUNT(*) FROM procedures",
881                    "audit_log" => "SELECT COUNT(*) FROM audit_log",
882                    _ => continue,
883                };
884                let count: i64 = conn.query_row(sql, [], |row| row.get(0)).unwrap_or(0);
885                stats.push((table.to_string(), count));
886            }
887
888            Ok(stats)
889        })
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896
897    #[test]
898    fn test_open_memory() {
899        let pool = SqlitePool::open_memory().unwrap();
900        let version = pool.schema_version().unwrap();
901        assert_eq!(version, 17); // All migrations applied
902    }
903
904    #[test]
905    fn test_migrations_idempotent() {
906        let pool = SqlitePool::open_memory().unwrap();
907        // Running migrate again should be a no-op
908        pool.migrate().unwrap();
909        assert_eq!(pool.schema_version().unwrap(), 17);
910    }
911
912    #[test]
913    fn test_table_stats_empty() {
914        let pool = SqlitePool::open_memory().unwrap();
915        let stats = pool.table_stats().unwrap();
916        assert_eq!(stats.len(), 9);
917        for (_, count) in &stats {
918            assert_eq!(*count, 0);
919        }
920    }
921
922    #[test]
923    fn test_scheduled_intent_lifecycle() {
924        let pool = SqlitePool::open_memory().unwrap();
925        let id = pool
926            .insert_scheduled_intent(
927                "deploy release",
928                Some("0 9 * * 1-5"),
929                "work",
930                Some(r#"{"source":"test"}"#),
931            )
932            .unwrap();
933
934        let all = pool.list_scheduled_intents(None).unwrap();
935        assert_eq!(all.len(), 1);
936        assert_eq!(all[0].id, id);
937        assert_eq!(all[0].namespace, "work");
938        assert_eq!(all[0].status, "scheduled");
939
940        let personal = pool.list_scheduled_intents(Some("personal")).unwrap();
941        assert!(personal.is_empty());
942
943        let work = pool.list_scheduled_intents(Some("work")).unwrap();
944        assert_eq!(work.len(), 1);
945        assert_eq!(work[0].description, "deploy release");
946        assert_eq!(work[0].cron.as_deref(), Some("0 9 * * 1-5"));
947        assert!(work[0].created_at.contains(':'));
948        assert_eq!(work[0].metadata.as_deref(), Some(r#"{"source":"test"}"#));
949
950        let updated = pool
951            .update_scheduled_intent_status(&id, "cancelled")
952            .unwrap();
953        assert!(updated);
954
955        let work_after = pool.list_scheduled_intents(Some("work")).unwrap();
956        assert_eq!(work_after[0].status, "cancelled");
957    }
958
959    #[test]
960    fn test_insert_and_query_session() {
961        let pool = SqlitePool::open_memory().unwrap();
962        pool.with_conn(|conn| {
963            conn.execute(
964                "INSERT INTO sessions (id, channel) VALUES (?1, ?2)",
965                rusqlite::params!["sess001", "cli"],
966            )?;
967
968            let channel: String = conn.query_row(
969                "SELECT channel FROM sessions WHERE id = ?1",
970                ["sess001"],
971                |row| row.get(0),
972            )?;
973            assert_eq!(channel, "cli");
974            Ok(())
975        })
976        .unwrap();
977    }
978
979    #[test]
980    fn test_insert_episode_with_fk() {
981        let pool = SqlitePool::open_memory().unwrap();
982        pool.with_conn(|conn| {
983            // Insert session first (FK constraint)
984            conn.execute("INSERT INTO sessions (id) VALUES (?1)", ["sess001"])?;
985
986            conn.execute(
987                "INSERT INTO episodes (id, session_id, role, content)
988                 VALUES (?1, ?2, ?3, ?4)",
989                rusqlite::params!["ep001", "sess001", "user", "Hello Brain!"],
990            )?;
991
992            let content: String = conn.query_row(
993                "SELECT content FROM episodes WHERE id = ?1",
994                ["ep001"],
995                |row| row.get(0),
996            )?;
997            assert_eq!(content, "Hello Brain!");
998            Ok(())
999        })
1000        .unwrap();
1001    }
1002
1003    #[test]
1004    fn test_fk_constraint_enforced() {
1005        let pool = SqlitePool::open_memory().unwrap();
1006        let result = pool.with_conn(|conn| {
1007            // Insert episode without session — should fail
1008            conn.execute(
1009                "INSERT INTO episodes (id, session_id, role, content)
1010                 VALUES (?1, ?2, ?3, ?4)",
1011                rusqlite::params!["ep001", "nonexistent", "user", "Hello"],
1012            )?;
1013            Ok(())
1014        });
1015        assert!(result.is_err());
1016    }
1017
1018    #[test]
1019    fn test_semantic_fact_insert() {
1020        let pool = SqlitePool::open_memory().unwrap();
1021        pool.with_conn(|conn| {
1022            conn.execute(
1023                "INSERT INTO semantic_facts (id, category, subject, predicate, object)
1024                 VALUES (?1, ?2, ?3, ?4, ?5)",
1025                rusqlite::params!["fact001", "personal", "user", "name_is", "Keshav"],
1026            )?;
1027
1028            let obj: String = conn.query_row(
1029                "SELECT object FROM semantic_facts WHERE subject = ?1 AND predicate = ?2",
1030                rusqlite::params!["user", "name_is"],
1031                |row| row.get(0),
1032            )?;
1033            assert_eq!(obj, "Keshav");
1034            Ok(())
1035        })
1036        .unwrap();
1037    }
1038
1039    #[test]
1040    fn test_namespace_column_on_semantic_facts() {
1041        let pool = SqlitePool::open_memory().unwrap();
1042        pool.with_conn(|conn| {
1043            // Insert facts in two different namespaces
1044            conn.execute(
1045                "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1046                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1047                rusqlite::params!["factw1", "work", "user", "role_is", "developer", "work"],
1048            )?;
1049            conn.execute(
1050                "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1051                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1052                rusqlite::params!["factp1", "personal", "user", "name_is", "Keshav", "personal"],
1053            )?;
1054
1055            // Query work namespace — should only return work fact
1056            let count: i64 = conn.query_row(
1057                "SELECT COUNT(*) FROM semantic_facts WHERE namespace = 'work'",
1058                [],
1059                |row| row.get(0),
1060            )?;
1061            assert_eq!(count, 1, "work namespace should have 1 fact");
1062
1063            // Query personal namespace — should only return personal fact
1064            let count: i64 = conn.query_row(
1065                "SELECT COUNT(*) FROM semantic_facts WHERE namespace = 'personal'",
1066                [],
1067                |row| row.get(0),
1068            )?;
1069            assert_eq!(count, 1, "personal namespace should have 1 fact");
1070
1071            // Namespace isolation: work search should not return personal facts
1072            let found: bool = conn
1073                .query_row(
1074                    "SELECT COUNT(*) > 0 FROM semantic_facts
1075                     WHERE namespace = 'work' AND predicate = 'name_is'",
1076                    [],
1077                    |row| row.get(0),
1078                )
1079                .unwrap_or(false);
1080            assert!(!found, "work namespace must not contain personal facts");
1081
1082            Ok(())
1083        })
1084        .unwrap();
1085    }
1086
1087    #[test]
1088    fn test_namespace_default_is_personal() {
1089        let pool = SqlitePool::open_memory().unwrap();
1090        pool.with_conn(|conn| {
1091            // Insert without specifying namespace — should default to 'personal'
1092            conn.execute(
1093                "INSERT INTO semantic_facts (id, category, subject, predicate, object)
1094                 VALUES (?1, ?2, ?3, ?4, ?5)",
1095                rusqlite::params!["factdefault", "personal", "user", "likes", "Rust"],
1096            )?;
1097
1098            let ns: String = conn.query_row(
1099                "SELECT namespace FROM semantic_facts WHERE id = 'factdefault'",
1100                [],
1101                |row| row.get(0),
1102            )?;
1103            assert_eq!(ns, "personal", "default namespace should be 'personal'");
1104            Ok(())
1105        })
1106        .unwrap();
1107    }
1108
1109    #[test]
1110    fn test_notification_outbox_lifecycle() {
1111        let pool = SqlitePool::open_memory().unwrap();
1112
1113        // Insert two notifications at different priorities
1114        let id1 = pool
1115            .insert_notification("Low priority nudge", 1, "habit:morning_review", None)
1116            .unwrap();
1117        let id2 = pool
1118            .insert_notification("High priority reminder", 3, "open_loop:todo", Some("slack"))
1119            .unwrap();
1120
1121        // Pending should return both, highest priority first
1122        let pending = pool.pending_notifications(10).unwrap();
1123        assert_eq!(pending.len(), 2);
1124        assert_eq!(pending[0].id, id2, "higher priority should come first");
1125        assert_eq!(pending[1].id, id1);
1126        assert!(pending[0].delivered_at.is_none());
1127        assert_eq!(pending[1].channel, None);
1128        assert_eq!(pending[0].channel.as_deref(), Some("slack"));
1129
1130        // Mark one as delivered
1131        assert!(pool.mark_notification_delivered(&id2).unwrap());
1132        let pending = pool.pending_notifications(10).unwrap();
1133        assert_eq!(pending.len(), 1);
1134        assert_eq!(pending[0].id, id1);
1135
1136        // Idempotency: marking the same one again returns false
1137        assert!(!pool.mark_notification_delivered(&id2).unwrap());
1138    }
1139
1140    #[test]
1141    fn test_notification_prune() {
1142        let pool = SqlitePool::open_memory().unwrap();
1143        let id = pool.insert_notification("test", 1, "test", None).unwrap();
1144        pool.mark_notification_delivered(&id).unwrap();
1145
1146        // Recently delivered notifications should NOT be pruned (M11 fix)
1147        let pruned = pool.prune_notifications(365).unwrap();
1148        assert_eq!(pruned, 0, "recently delivered notifications should be kept");
1149
1150        // Backdate the notification to simulate aging, then prune should work
1151        pool.with_conn(|conn| {
1152            conn.execute(
1153                "UPDATE notification_outbox SET created_at = datetime('now', '-400 days') WHERE id = ?1",
1154                [&id],
1155            )?;
1156            Ok(())
1157        })
1158        .unwrap();
1159        let pruned = pool.prune_notifications(365).unwrap();
1160        assert_eq!(pruned, 1, "old delivered notification should be pruned");
1161    }
1162
1163    #[test]
1164    fn test_list_namespaces_with_counts() {
1165        let pool = SqlitePool::open_memory().unwrap();
1166        pool.with_conn(|conn| {
1167            // Insert facts across three namespaces
1168            for i in 0..3 {
1169                conn.execute(
1170                    "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1171                     VALUES (?1, 'personal', 'user', 'fact', ?2, 'personal')",
1172                    rusqlite::params![format!("p{i}"), format!("val{i}")],
1173                )?;
1174            }
1175            conn.execute(
1176                "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1177                 VALUES ('w1', 'work', 'user', 'role', 'dev', 'work')",
1178                [],
1179            )?;
1180
1181            // Count facts per namespace
1182            let mut stmt = conn.prepare(
1183                "SELECT namespace, COUNT(*) as cnt FROM semantic_facts
1184                 WHERE superseded_by IS NULL
1185                 GROUP BY namespace ORDER BY namespace",
1186            )?;
1187            let rows: Vec<(String, i64)> = stmt
1188                .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1189                .collect::<Result<Vec<_>, _>>()?;
1190
1191            assert_eq!(rows.len(), 2, "should have 2 namespaces");
1192            let personal = rows.iter().find(|(ns, _)| ns == "personal").unwrap();
1193            assert_eq!(personal.1, 3, "personal should have 3 facts");
1194            let work = rows.iter().find(|(ns, _)| ns == "work").unwrap();
1195            assert_eq!(work.1, 1, "work should have 1 fact");
1196
1197            Ok(())
1198        })
1199        .unwrap();
1200    }
1201}