1use std::path::Path;
10use std::sync::{Arc, Mutex};
11
12use rusqlite::Connection;
13use thiserror::Error;
14use tracing::info;
15use uuid::Uuid;
16
17#[cfg(feature = "encryption")]
18use crate::encryption::Encryptor;
19
20#[derive(Debug, Error)]
22pub enum SqliteError {
23 #[error("SQLite error: {0}")]
24 Rusqlite(#[from] rusqlite::Error),
25
26 #[error("Lock poisoned")]
27 LockPoisoned,
28
29 #[error("Migration failed: {0}")]
30 Migration(String),
31}
32
33#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35pub struct ExportedFact {
36 pub id: String,
37 pub namespace: String,
38 pub category: String,
39 pub subject: String,
40 pub predicate: String,
41 pub object: String,
42 pub confidence: f64,
43 pub source_episode_id: Option<String>,
44}
45
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48pub struct ExportedEpisode {
49 pub id: String,
50 pub session_id: String,
51 pub session_channel: String,
52 #[serde(default = "default_namespace")]
53 pub namespace: String,
54 pub role: String,
55 pub content: String,
56 pub timestamp: String,
57 pub importance: f64,
58 pub reinforcement_count: i32,
59}
60
61fn default_namespace() -> String {
62 "personal".to_string()
63}
64
65#[derive(Debug, Clone)]
67pub struct Notification {
68 pub id: String,
69 pub content: String,
70 pub priority: i32,
71 pub triggered_by: String,
72 pub created_at: String,
73 pub delivered_at: Option<String>,
74 pub channel: Option<String>,
75}
76
77#[derive(Clone)]
86pub struct SqlitePool {
87 conn: Arc<Mutex<Connection>>,
88 #[cfg(feature = "encryption")]
89 encryptor: Option<Arc<Encryptor>>,
90}
91
92#[derive(Debug, Clone)]
94pub struct ScheduledIntent {
95 pub id: String,
96 pub description: String,
97 pub cron: Option<String>,
98 pub namespace: String,
99 pub created_at: String,
100 pub status: String,
101 pub metadata: Option<String>,
102}
103
104impl SqlitePool {
105 pub fn open(path: &Path) -> Result<Self, SqliteError> {
112 if let Some(parent) = path.parent() {
114 std::fs::create_dir_all(parent).map_err(|e| {
115 SqliteError::Migration(format!("Cannot create directory {}: {e}", parent.display()))
116 })?;
117 }
118
119 let conn = Connection::open(path)?;
120
121 conn.execute_batch(
123 "
124 PRAGMA journal_mode = WAL;
125 PRAGMA synchronous = NORMAL;
126 PRAGMA foreign_keys = ON;
127 PRAGMA busy_timeout = 5000;
128 PRAGMA cache_size = -8000;
129 ",
130 )?;
131
132 let pool = Self {
133 conn: Arc::new(Mutex::new(conn)),
134 #[cfg(feature = "encryption")]
135 encryptor: None,
136 };
137
138 pool.migrate()?;
140
141 info!("SQLite database opened at {}", path.display());
142 Ok(pool)
143 }
144
145 pub fn open_memory() -> Result<Self, SqliteError> {
147 let conn = Connection::open_in_memory()?;
148 conn.execute_batch(
149 "
150 PRAGMA journal_mode = WAL;
151 PRAGMA foreign_keys = ON;
152 ",
153 )?;
154
155 let pool = Self {
156 conn: Arc::new(Mutex::new(conn)),
157 #[cfg(feature = "encryption")]
158 encryptor: None,
159 };
160
161 pool.migrate()?;
162 Ok(pool)
163 }
164
165 pub fn with_conn<F, T>(&self, f: F) -> Result<T, SqliteError>
167 where
168 F: FnOnce(&Connection) -> Result<T, SqliteError>,
169 {
170 let conn = self.conn.lock().map_err(|_| SqliteError::LockPoisoned)?;
171 f(&conn)
172 }
173
174 #[cfg(feature = "encryption")]
179 pub fn with_encryptor(mut self, enc: Encryptor) -> Self {
180 self.encryptor = Some(Arc::new(enc));
181 self
182 }
183
184 pub fn is_encrypted(&self) -> bool {
186 #[cfg(feature = "encryption")]
187 {
188 self.encryptor.is_some()
189 }
190 #[cfg(not(feature = "encryption"))]
191 {
192 false
193 }
194 }
195
196 pub fn encrypt_content(&self, plaintext: &str) -> String {
198 #[cfg(feature = "encryption")]
199 {
200 if let Some(enc) = &self.encryptor {
201 return enc
202 .encrypt_string(plaintext)
203 .unwrap_or_else(|_| plaintext.to_string());
204 }
205 }
206 plaintext.to_string()
207 }
208
209 pub fn decrypt_content(&self, maybe_ciphertext: &str) -> String {
214 #[cfg(feature = "encryption")]
215 {
216 if let Some(enc) = &self.encryptor {
217 return enc
218 .decrypt_string(maybe_ciphertext)
219 .unwrap_or_else(|_| maybe_ciphertext.to_string());
220 }
221 }
222 maybe_ciphertext.to_string()
223 }
224
225 pub fn try_decrypt_content(&self, maybe_ciphertext: &str) -> Option<String> {
231 #[cfg(feature = "encryption")]
232 {
233 if let Some(enc) = &self.encryptor {
234 return enc.decrypt_string(maybe_ciphertext).ok();
235 }
236 }
237 Some(maybe_ciphertext.to_string())
238 }
239
240 pub fn wal_checkpoint(&self) -> Result<(), SqliteError> {
246 self.with_conn(|conn| {
247 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
248 Ok(())
249 })
250 }
251
252 pub fn insert_scheduled_intent(
254 &self,
255 description: &str,
256 cron: Option<&str>,
257 namespace: &str,
258 metadata: Option<&str>,
259 ) -> Result<String, SqliteError> {
260 let id = Uuid::new_v4().to_string();
261 self.with_conn(|conn| {
262 conn.execute(
263 "INSERT INTO scheduled_intents (id, description, cron, namespace, metadata)
264 VALUES (?1, ?2, ?3, ?4, ?5)",
265 rusqlite::params![id, description, cron, namespace, metadata],
266 )?;
267 Ok(())
268 })?;
269 Ok(id)
270 }
271
272 pub fn list_scheduled_intents(
274 &self,
275 namespace: Option<&str>,
276 ) -> Result<Vec<ScheduledIntent>, SqliteError> {
277 self.with_conn(|conn| {
278 let mut intents = Vec::new();
279 if let Some(ns) = namespace {
280 let mut stmt = conn.prepare(
281 "SELECT id, description, cron, namespace, created_at, status, metadata
282 FROM scheduled_intents
283 WHERE namespace = ?1 OR namespace LIKE ?2
284 ORDER BY created_at DESC",
285 )?;
286 let prefix = format!("{}/%", ns);
287 let rows = stmt.query_map([ns, &prefix], |row| {
288 Ok(ScheduledIntent {
289 id: row.get(0)?,
290 description: row.get(1)?,
291 cron: row.get(2)?,
292 namespace: row.get(3)?,
293 created_at: row.get(4)?,
294 status: row.get(5)?,
295 metadata: row.get(6)?,
296 })
297 })?;
298 for row in rows {
299 intents.push(row?);
300 }
301 } else {
302 let mut stmt = conn.prepare(
303 "SELECT id, description, cron, namespace, created_at, status, metadata
304 FROM scheduled_intents
305 ORDER BY created_at DESC",
306 )?;
307 let rows = stmt.query_map([], |row| {
308 Ok(ScheduledIntent {
309 id: row.get(0)?,
310 description: row.get(1)?,
311 cron: row.get(2)?,
312 namespace: row.get(3)?,
313 created_at: row.get(4)?,
314 status: row.get(5)?,
315 metadata: row.get(6)?,
316 })
317 })?;
318 for row in rows {
319 intents.push(row?);
320 }
321 }
322 Ok(intents)
323 })
324 }
325
326 pub fn update_scheduled_intent_status(
328 &self,
329 id: &str,
330 status: &str,
331 ) -> Result<bool, SqliteError> {
332 self.with_conn(|conn| {
333 let affected = conn.execute(
334 "UPDATE scheduled_intents SET status = ?2 WHERE id = ?1",
335 rusqlite::params![id, status],
336 )?;
337 Ok(affected > 0)
338 })
339 }
340
341 pub fn cancel_scheduled_intent(&self, id: &str) -> Result<bool, SqliteError> {
343 self.update_scheduled_intent_status(id, "cancelled")
344 }
345
346 pub fn due_scheduled_intents(&self) -> Result<Vec<ScheduledIntent>, SqliteError> {
348 self.with_conn(|conn| {
349 let mut stmt = conn.prepare(
350 "SELECT id, description, cron, namespace, created_at, status, metadata
351 FROM scheduled_intents
352 WHERE status = 'scheduled'
353 ORDER BY created_at ASC",
354 )?;
355 let rows = stmt.query_map([], |row| {
356 Ok(ScheduledIntent {
357 id: row.get(0)?,
358 description: row.get(1)?,
359 cron: row.get(2)?,
360 namespace: row.get(3)?,
361 created_at: row.get(4)?,
362 status: row.get(5)?,
363 metadata: row.get(6)?,
364 })
365 })?;
366 Ok(rows.filter_map(|r| r.ok()).collect())
367 })
368 }
369
370 fn migrate(&self) -> Result<(), SqliteError> {
372 self.with_conn(|conn| {
373 conn.execute_batch(
375 "CREATE TABLE IF NOT EXISTS _migrations (
376 version INTEGER PRIMARY KEY,
377 name TEXT NOT NULL,
378 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
379 );",
380 )?;
381
382 let current_version: i64 = conn
383 .query_row(
384 "SELECT COALESCE(MAX(version), 0) FROM _migrations",
385 [],
386 |row| row.get(0),
387 )
388 .unwrap_or(0);
389
390 let migrations = Self::migrations();
391
392 for (version, name, sql) in &migrations {
393 if *version > current_version {
394 info!("Running migration {version}: {name}");
395 conn.execute_batch(sql).map_err(|e| {
396 SqliteError::Migration(format!("Migration {version} ({name}) failed: {e}"))
397 })?;
398
399 conn.execute(
400 "INSERT INTO _migrations (version, name) VALUES (?1, ?2)",
401 rusqlite::params![version, name],
402 )?;
403 }
404 }
405
406 if current_version < migrations.last().map_or(0, |m| m.0) {
407 info!(
408 "Migrations complete (v{current_version} → v{})",
409 migrations.last().expect("BUG: migrations list is empty").0
410 );
411 }
412
413 Ok(())
414 })
415 }
416
417 fn migrations() -> Vec<(i64, &'static str, &'static str)> {
419 vec![
420 (
421 1,
422 "create_sessions",
423 "
424 CREATE TABLE IF NOT EXISTS sessions (
425 id TEXT PRIMARY KEY,
426 started_at TEXT NOT NULL DEFAULT (datetime('now')),
427 ended_at TEXT,
428 channel TEXT NOT NULL DEFAULT 'cli',
429 metadata TEXT
430 );
431 ",
432 ),
433 (
434 2,
435 "create_episodes",
436 "
437 CREATE TABLE IF NOT EXISTS episodes (
438 id TEXT PRIMARY KEY,
439 session_id TEXT NOT NULL REFERENCES sessions(id),
440 role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system')),
441 content TEXT NOT NULL,
442 timestamp TEXT NOT NULL DEFAULT (datetime('now')),
443 importance REAL NOT NULL DEFAULT 0.5,
444 decay_rate REAL NOT NULL DEFAULT 0.1,
445 reinforcement_count INTEGER NOT NULL DEFAULT 0,
446 last_accessed TEXT,
447 metadata TEXT
448 );
449
450 CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id);
451 CREATE INDEX IF NOT EXISTS idx_episodes_timestamp ON episodes(timestamp DESC);
452 CREATE INDEX IF NOT EXISTS idx_episodes_importance ON episodes(importance DESC);
453 ",
454 ),
455 (
456 3,
457 "create_episodes_fts",
458 "
459 CREATE VIRTUAL TABLE IF NOT EXISTS episodes_fts USING fts5(
460 content,
461 content_rowid='rowid',
462 tokenize='porter unicode61'
463 );
464 ",
465 ),
466 (
467 4,
468 "create_semantic_facts",
469 "
470 CREATE TABLE IF NOT EXISTS semantic_facts (
471 id TEXT PRIMARY KEY,
472 category TEXT NOT NULL,
473 subject TEXT NOT NULL,
474 predicate TEXT NOT NULL,
475 object TEXT NOT NULL,
476 confidence REAL NOT NULL DEFAULT 1.0,
477 source_episode_id TEXT REFERENCES episodes(id) ON DELETE SET NULL,
478 created_at TEXT NOT NULL DEFAULT (datetime('now')),
479 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
480 superseded_by TEXT REFERENCES semantic_facts(id) ON DELETE SET NULL
481 );
482
483 CREATE INDEX IF NOT EXISTS idx_facts_category ON semantic_facts(category);
484 CREATE INDEX IF NOT EXISTS idx_facts_subject ON semantic_facts(subject);
485 ",
486 ),
487 (
488 5,
489 "create_user_profile",
490 "
491 CREATE TABLE IF NOT EXISTS user_profile (
492 key TEXT PRIMARY KEY,
493 value TEXT NOT NULL,
494 source TEXT,
495 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
496 );
497 ",
498 ),
499 (
500 6,
501 "create_procedures",
502 "
503 CREATE TABLE IF NOT EXISTS procedures (
504 id TEXT PRIMARY KEY,
505 trigger_pattern TEXT NOT NULL,
506 steps_json TEXT NOT NULL DEFAULT '[]',
507 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
508 updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
509 use_count INTEGER NOT NULL DEFAULT 0
510 );
511 CREATE INDEX IF NOT EXISTS idx_procedures_trigger
512 ON procedures(trigger_pattern);
513 ",
514 ),
515 (
516 7,
517 "create_audit_log",
518 "
519 CREATE TABLE IF NOT EXISTS audit_log (
520 id INTEGER PRIMARY KEY AUTOINCREMENT,
521 timestamp TEXT NOT NULL DEFAULT (datetime('now')),
522 action TEXT NOT NULL,
523 details TEXT,
524 prev_hash TEXT,
525 hash TEXT NOT NULL
526 );
527
528 CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC);
529 ",
530 ),
531 (
532 10,
533 "add_namespace_to_semantic_facts",
534 "
535 ALTER TABLE semantic_facts ADD COLUMN namespace TEXT NOT NULL DEFAULT 'personal';
536 CREATE INDEX IF NOT EXISTS idx_facts_namespace ON semantic_facts(namespace);
537 ",
538 ),
539 (
540 11,
541 "add_namespace_to_episodes",
542 "
543 ALTER TABLE episodes ADD COLUMN namespace TEXT NOT NULL DEFAULT 'personal';
544 CREATE INDEX IF NOT EXISTS idx_episodes_namespace ON episodes(namespace);
545 ",
546 ),
547 (
548 12,
549 "rebuild_procedures_table",
550 "
551 DROP TABLE IF EXISTS procedures;
552 CREATE TABLE IF NOT EXISTS procedures (
553 id TEXT PRIMARY KEY,
554 trigger_pattern TEXT NOT NULL,
555 steps_json TEXT NOT NULL DEFAULT '[]',
556 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
557 updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
558 use_count INTEGER NOT NULL DEFAULT 0
559 );
560 CREATE INDEX IF NOT EXISTS idx_procedures_trigger
561 ON procedures(trigger_pattern);
562 ",
563 ),
564 (
565 13,
566 "create_episode_promotions",
567 "
568 CREATE TABLE IF NOT EXISTS episode_promotions (
569 episode_id TEXT PRIMARY KEY REFERENCES episodes(id) ON DELETE CASCADE,
570 fact_id TEXT NOT NULL REFERENCES semantic_facts(id) ON DELETE CASCADE,
571 promoted_at TEXT NOT NULL DEFAULT (datetime('now'))
572 );
573 ",
574 ),
575 (
576 14,
577 "create_scheduled_intents",
578 "
579 CREATE TABLE IF NOT EXISTS scheduled_intents (
580 id TEXT PRIMARY KEY,
581 description TEXT NOT NULL,
582 cron TEXT,
583 namespace TEXT NOT NULL DEFAULT 'personal',
584 created_at TEXT NOT NULL DEFAULT (datetime('now')),
585 status TEXT NOT NULL DEFAULT 'scheduled',
586 metadata TEXT
587 );
588 CREATE INDEX IF NOT EXISTS idx_scheduled_intents_namespace
589 ON scheduled_intents(namespace);
590 CREATE INDEX IF NOT EXISTS idx_scheduled_intents_status
591 ON scheduled_intents(status);
592 ",
593 ),
594 (
595 15,
596 "create_notification_outbox",
597 "
598 CREATE TABLE IF NOT EXISTS notification_outbox (
599 id TEXT PRIMARY KEY,
600 content TEXT NOT NULL,
601 priority INTEGER NOT NULL DEFAULT 1,
602 triggered_by TEXT NOT NULL DEFAULT '',
603 created_at TEXT NOT NULL DEFAULT (datetime('now')),
604 delivered_at TEXT,
605 channel TEXT
606 );
607 CREATE INDEX IF NOT EXISTS idx_outbox_pending
608 ON notification_outbox(delivered_at, priority, created_at)
609 WHERE delivered_at IS NULL;
610 ",
611 ),
612 (
613 16,
614 "add_agent_column",
615 "
616 ALTER TABLE episodes ADD COLUMN agent TEXT;
617 ALTER TABLE semantic_facts ADD COLUMN agent TEXT;
618 ",
619 ),
620 (
621 17,
622 "fix_orphaned_facts",
623 "
624 -- Clear orphaned source_episode_id references
625 UPDATE semantic_facts SET source_episode_id = NULL
626 WHERE source_episode_id NOT IN (SELECT id FROM episodes);
627 -- Clear orphaned superseded_by references
628 UPDATE semantic_facts SET superseded_by = NULL
629 WHERE superseded_by NOT IN (SELECT id FROM semantic_facts);
630 ",
631 ),
632 ]
633 }
634
635 pub fn schema_version(&self) -> Result<i64, SqliteError> {
637 self.with_conn(|conn| {
638 let version: i64 = conn
639 .query_row(
640 "SELECT COALESCE(MAX(version), 0) FROM _migrations",
641 [],
642 |row| row.get(0),
643 )
644 .unwrap_or(0);
645 Ok(version)
646 })
647 }
648
649 pub fn insert_notification(
651 &self,
652 content: &str,
653 priority: i32,
654 triggered_by: &str,
655 channel: Option<&str>,
656 ) -> Result<String, SqliteError> {
657 let id = Uuid::new_v4().to_string();
658 self.with_conn(|conn| {
659 conn.execute(
660 "INSERT INTO notification_outbox (id, content, priority, triggered_by, channel)
661 VALUES (?1, ?2, ?3, ?4, ?5)",
662 rusqlite::params![id, content, priority, triggered_by, channel],
663 )?;
664 Ok(())
665 })?;
666 Ok(id)
667 }
668
669 pub fn pending_notifications(&self, limit: usize) -> Result<Vec<Notification>, SqliteError> {
671 self.with_conn(|conn| {
672 let mut stmt = conn.prepare(
673 "SELECT id, content, priority, triggered_by, created_at, delivered_at, channel
674 FROM notification_outbox
675 WHERE delivered_at IS NULL
676 ORDER BY priority DESC, created_at ASC
677 LIMIT ?1",
678 )?;
679 let rows = stmt
680 .query_map([limit as i64], |row| {
681 Ok(Notification {
682 id: row.get(0)?,
683 content: row.get(1)?,
684 priority: row.get(2)?,
685 triggered_by: row.get(3)?,
686 created_at: row.get(4)?,
687 delivered_at: row.get(5)?,
688 channel: row.get(6)?,
689 })
690 })?
691 .collect::<Result<Vec<_>, _>>()?;
692 Ok(rows)
693 })
694 }
695
696 pub fn mark_notification_delivered(&self, id: &str) -> Result<bool, SqliteError> {
698 self.with_conn(|conn| {
699 let affected = conn.execute(
700 "UPDATE notification_outbox SET delivered_at = datetime('now') WHERE id = ?1 AND delivered_at IS NULL",
701 [id],
702 )?;
703 Ok(affected > 0)
704 })
705 }
706
707 pub fn prune_notifications(&self, max_age_days: u32) -> Result<usize, SqliteError> {
709 self.with_conn(|conn| {
710 let deleted = conn.execute(
711 "DELETE FROM notification_outbox
712 WHERE (delivered_at IS NOT NULL AND created_at < datetime('now', ?1))
713 OR created_at < datetime('now', ?1)",
714 [format!("-{max_age_days} days")],
715 )?;
716 Ok(deleted)
717 })
718 }
719
720 pub fn export_all_facts(&self) -> Result<Vec<ExportedFact>, SqliteError> {
724 self.with_conn(|conn| {
725 let mut stmt = conn.prepare(
726 "SELECT id, namespace, category, subject, predicate, object,
727 confidence, source_episode_id
728 FROM semantic_facts
729 ORDER BY id ASC",
730 )?;
731 let rows = stmt
732 .query_map([], |row| {
733 Ok(ExportedFact {
734 id: row.get(0)?,
735 namespace: row.get(1)?,
736 category: row.get(2)?,
737 subject: row.get(3)?,
738 predicate: row.get(4)?,
739 object: row.get(5)?,
740 confidence: row.get(6)?,
741 source_episode_id: row.get(7)?,
742 })
743 })?
744 .collect::<Result<Vec<_>, _>>()?;
745 Ok(rows)
746 })
747 }
748
749 pub fn export_all_episodes(&self) -> Result<Vec<ExportedEpisode>, SqliteError> {
751 self.with_conn(|conn| {
752 let mut stmt = conn.prepare(
753 "SELECT e.id, e.session_id, COALESCE(s.channel, 'cli'),
754 e.namespace, e.role, e.content, e.timestamp,
755 e.importance, e.reinforcement_count
756 FROM episodes e
757 LEFT JOIN sessions s ON s.id = e.session_id
758 ORDER BY e.timestamp ASC",
759 )?;
760 let rows = stmt
761 .query_map([], |row| {
762 Ok(ExportedEpisode {
763 id: row.get(0)?,
764 session_id: row.get(1)?,
765 session_channel: row.get(2)?,
766 namespace: row.get(3)?,
767 role: row.get(4)?,
768 content: row.get(5)?,
769 timestamp: row.get(6)?,
770 importance: row.get(7)?,
771 reinforcement_count: row.get(8)?,
772 })
773 })?
774 .collect::<Result<Vec<_>, _>>()?;
775 Ok(rows)
776 })
777 }
778
779 pub fn import_facts(&self, facts: &[ExportedFact]) -> Result<(usize, Vec<usize>), SqliteError> {
781 self.with_conn(|conn| {
782 let mut imported = 0usize;
783 let mut new_indices = Vec::new();
784 for (idx, f) in facts.iter().enumerate() {
785 let n = conn.execute(
786 "INSERT INTO semantic_facts
787 (id, namespace, category, subject, predicate, object,
788 confidence, source_episode_id)
789 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
790 ON CONFLICT(id) DO NOTHING",
791 rusqlite::params![
792 f.id,
793 f.namespace,
794 f.category,
795 f.subject,
796 f.predicate,
797 f.object,
798 f.confidence,
799 f.source_episode_id
800 ],
801 )?;
802 if n > 0 {
803 new_indices.push(idx);
804 }
805 imported += n;
806 }
807 Ok((imported, new_indices))
808 })
809 }
810
811 pub fn import_episodes(&self, episodes: &[ExportedEpisode]) -> Result<usize, SqliteError> {
813 self.with_conn(|conn| {
814 let mut sessions: std::collections::HashMap<String, String> =
816 std::collections::HashMap::new();
817 for ep in episodes {
818 sessions
819 .entry(ep.session_id.clone())
820 .or_insert_with(|| ep.session_channel.clone());
821 }
822 for (sid, channel) in &sessions {
823 conn.execute(
824 "INSERT INTO sessions (id, channel) VALUES (?1, ?2)
825 ON CONFLICT(id) DO NOTHING",
826 rusqlite::params![sid, channel],
827 )?;
828 }
829
830 let mut imported = 0usize;
831 for e in episodes {
832 let n = conn.execute(
833 "INSERT INTO episodes
834 (id, session_id, namespace, role, content, timestamp,
835 importance, reinforcement_count)
836 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
837 ON CONFLICT(id) DO NOTHING",
838 rusqlite::params![
839 e.id,
840 e.session_id,
841 e.namespace,
842 e.role,
843 e.content,
844 e.timestamp,
845 e.importance,
846 e.reinforcement_count
847 ],
848 )?;
849 imported += n;
850 }
851 Ok(imported)
852 })
853 }
854
855 pub fn table_stats(&self) -> Result<Vec<(String, i64)>, SqliteError> {
857 self.with_conn(|conn| {
858 let mut stats = Vec::new();
859 for table in &[
862 "sessions",
863 "episodes",
864 "semantic_facts",
865 "episode_promotions",
866 "scheduled_intents",
867 "notification_outbox",
868 "user_profile",
869 "procedures",
870 "audit_log",
871 ] {
872 let sql = match *table {
873 "sessions" => "SELECT COUNT(*) FROM sessions",
874 "episodes" => "SELECT COUNT(*) FROM episodes",
875 "semantic_facts" => "SELECT COUNT(*) FROM semantic_facts",
876 "episode_promotions" => "SELECT COUNT(*) FROM episode_promotions",
877 "scheduled_intents" => "SELECT COUNT(*) FROM scheduled_intents",
878 "notification_outbox" => "SELECT COUNT(*) FROM notification_outbox",
879 "user_profile" => "SELECT COUNT(*) FROM user_profile",
880 "procedures" => "SELECT COUNT(*) FROM procedures",
881 "audit_log" => "SELECT COUNT(*) FROM audit_log",
882 _ => continue,
883 };
884 let count: i64 = conn.query_row(sql, [], |row| row.get(0)).unwrap_or(0);
885 stats.push((table.to_string(), count));
886 }
887
888 Ok(stats)
889 })
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896
897 #[test]
898 fn test_open_memory() {
899 let pool = SqlitePool::open_memory().unwrap();
900 let version = pool.schema_version().unwrap();
901 assert_eq!(version, 17); }
903
904 #[test]
905 fn test_migrations_idempotent() {
906 let pool = SqlitePool::open_memory().unwrap();
907 pool.migrate().unwrap();
909 assert_eq!(pool.schema_version().unwrap(), 17);
910 }
911
912 #[test]
913 fn test_table_stats_empty() {
914 let pool = SqlitePool::open_memory().unwrap();
915 let stats = pool.table_stats().unwrap();
916 assert_eq!(stats.len(), 9);
917 for (_, count) in &stats {
918 assert_eq!(*count, 0);
919 }
920 }
921
922 #[test]
923 fn test_scheduled_intent_lifecycle() {
924 let pool = SqlitePool::open_memory().unwrap();
925 let id = pool
926 .insert_scheduled_intent(
927 "deploy release",
928 Some("0 9 * * 1-5"),
929 "work",
930 Some(r#"{"source":"test"}"#),
931 )
932 .unwrap();
933
934 let all = pool.list_scheduled_intents(None).unwrap();
935 assert_eq!(all.len(), 1);
936 assert_eq!(all[0].id, id);
937 assert_eq!(all[0].namespace, "work");
938 assert_eq!(all[0].status, "scheduled");
939
940 let personal = pool.list_scheduled_intents(Some("personal")).unwrap();
941 assert!(personal.is_empty());
942
943 let work = pool.list_scheduled_intents(Some("work")).unwrap();
944 assert_eq!(work.len(), 1);
945 assert_eq!(work[0].description, "deploy release");
946 assert_eq!(work[0].cron.as_deref(), Some("0 9 * * 1-5"));
947 assert!(work[0].created_at.contains(':'));
948 assert_eq!(work[0].metadata.as_deref(), Some(r#"{"source":"test"}"#));
949
950 let updated = pool
951 .update_scheduled_intent_status(&id, "cancelled")
952 .unwrap();
953 assert!(updated);
954
955 let work_after = pool.list_scheduled_intents(Some("work")).unwrap();
956 assert_eq!(work_after[0].status, "cancelled");
957 }
958
959 #[test]
960 fn test_insert_and_query_session() {
961 let pool = SqlitePool::open_memory().unwrap();
962 pool.with_conn(|conn| {
963 conn.execute(
964 "INSERT INTO sessions (id, channel) VALUES (?1, ?2)",
965 rusqlite::params!["sess001", "cli"],
966 )?;
967
968 let channel: String = conn.query_row(
969 "SELECT channel FROM sessions WHERE id = ?1",
970 ["sess001"],
971 |row| row.get(0),
972 )?;
973 assert_eq!(channel, "cli");
974 Ok(())
975 })
976 .unwrap();
977 }
978
979 #[test]
980 fn test_insert_episode_with_fk() {
981 let pool = SqlitePool::open_memory().unwrap();
982 pool.with_conn(|conn| {
983 conn.execute("INSERT INTO sessions (id) VALUES (?1)", ["sess001"])?;
985
986 conn.execute(
987 "INSERT INTO episodes (id, session_id, role, content)
988 VALUES (?1, ?2, ?3, ?4)",
989 rusqlite::params!["ep001", "sess001", "user", "Hello Brain!"],
990 )?;
991
992 let content: String = conn.query_row(
993 "SELECT content FROM episodes WHERE id = ?1",
994 ["ep001"],
995 |row| row.get(0),
996 )?;
997 assert_eq!(content, "Hello Brain!");
998 Ok(())
999 })
1000 .unwrap();
1001 }
1002
1003 #[test]
1004 fn test_fk_constraint_enforced() {
1005 let pool = SqlitePool::open_memory().unwrap();
1006 let result = pool.with_conn(|conn| {
1007 conn.execute(
1009 "INSERT INTO episodes (id, session_id, role, content)
1010 VALUES (?1, ?2, ?3, ?4)",
1011 rusqlite::params!["ep001", "nonexistent", "user", "Hello"],
1012 )?;
1013 Ok(())
1014 });
1015 assert!(result.is_err());
1016 }
1017
1018 #[test]
1019 fn test_semantic_fact_insert() {
1020 let pool = SqlitePool::open_memory().unwrap();
1021 pool.with_conn(|conn| {
1022 conn.execute(
1023 "INSERT INTO semantic_facts (id, category, subject, predicate, object)
1024 VALUES (?1, ?2, ?3, ?4, ?5)",
1025 rusqlite::params!["fact001", "personal", "user", "name_is", "Keshav"],
1026 )?;
1027
1028 let obj: String = conn.query_row(
1029 "SELECT object FROM semantic_facts WHERE subject = ?1 AND predicate = ?2",
1030 rusqlite::params!["user", "name_is"],
1031 |row| row.get(0),
1032 )?;
1033 assert_eq!(obj, "Keshav");
1034 Ok(())
1035 })
1036 .unwrap();
1037 }
1038
1039 #[test]
1040 fn test_namespace_column_on_semantic_facts() {
1041 let pool = SqlitePool::open_memory().unwrap();
1042 pool.with_conn(|conn| {
1043 conn.execute(
1045 "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1046 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1047 rusqlite::params!["factw1", "work", "user", "role_is", "developer", "work"],
1048 )?;
1049 conn.execute(
1050 "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1051 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1052 rusqlite::params!["factp1", "personal", "user", "name_is", "Keshav", "personal"],
1053 )?;
1054
1055 let count: i64 = conn.query_row(
1057 "SELECT COUNT(*) FROM semantic_facts WHERE namespace = 'work'",
1058 [],
1059 |row| row.get(0),
1060 )?;
1061 assert_eq!(count, 1, "work namespace should have 1 fact");
1062
1063 let count: i64 = conn.query_row(
1065 "SELECT COUNT(*) FROM semantic_facts WHERE namespace = 'personal'",
1066 [],
1067 |row| row.get(0),
1068 )?;
1069 assert_eq!(count, 1, "personal namespace should have 1 fact");
1070
1071 let found: bool = conn
1073 .query_row(
1074 "SELECT COUNT(*) > 0 FROM semantic_facts
1075 WHERE namespace = 'work' AND predicate = 'name_is'",
1076 [],
1077 |row| row.get(0),
1078 )
1079 .unwrap_or(false);
1080 assert!(!found, "work namespace must not contain personal facts");
1081
1082 Ok(())
1083 })
1084 .unwrap();
1085 }
1086
1087 #[test]
1088 fn test_namespace_default_is_personal() {
1089 let pool = SqlitePool::open_memory().unwrap();
1090 pool.with_conn(|conn| {
1091 conn.execute(
1093 "INSERT INTO semantic_facts (id, category, subject, predicate, object)
1094 VALUES (?1, ?2, ?3, ?4, ?5)",
1095 rusqlite::params!["factdefault", "personal", "user", "likes", "Rust"],
1096 )?;
1097
1098 let ns: String = conn.query_row(
1099 "SELECT namespace FROM semantic_facts WHERE id = 'factdefault'",
1100 [],
1101 |row| row.get(0),
1102 )?;
1103 assert_eq!(ns, "personal", "default namespace should be 'personal'");
1104 Ok(())
1105 })
1106 .unwrap();
1107 }
1108
1109 #[test]
1110 fn test_notification_outbox_lifecycle() {
1111 let pool = SqlitePool::open_memory().unwrap();
1112
1113 let id1 = pool
1115 .insert_notification("Low priority nudge", 1, "habit:morning_review", None)
1116 .unwrap();
1117 let id2 = pool
1118 .insert_notification("High priority reminder", 3, "open_loop:todo", Some("slack"))
1119 .unwrap();
1120
1121 let pending = pool.pending_notifications(10).unwrap();
1123 assert_eq!(pending.len(), 2);
1124 assert_eq!(pending[0].id, id2, "higher priority should come first");
1125 assert_eq!(pending[1].id, id1);
1126 assert!(pending[0].delivered_at.is_none());
1127 assert_eq!(pending[1].channel, None);
1128 assert_eq!(pending[0].channel.as_deref(), Some("slack"));
1129
1130 assert!(pool.mark_notification_delivered(&id2).unwrap());
1132 let pending = pool.pending_notifications(10).unwrap();
1133 assert_eq!(pending.len(), 1);
1134 assert_eq!(pending[0].id, id1);
1135
1136 assert!(!pool.mark_notification_delivered(&id2).unwrap());
1138 }
1139
1140 #[test]
1141 fn test_notification_prune() {
1142 let pool = SqlitePool::open_memory().unwrap();
1143 let id = pool.insert_notification("test", 1, "test", None).unwrap();
1144 pool.mark_notification_delivered(&id).unwrap();
1145
1146 let pruned = pool.prune_notifications(365).unwrap();
1148 assert_eq!(pruned, 0, "recently delivered notifications should be kept");
1149
1150 pool.with_conn(|conn| {
1152 conn.execute(
1153 "UPDATE notification_outbox SET created_at = datetime('now', '-400 days') WHERE id = ?1",
1154 [&id],
1155 )?;
1156 Ok(())
1157 })
1158 .unwrap();
1159 let pruned = pool.prune_notifications(365).unwrap();
1160 assert_eq!(pruned, 1, "old delivered notification should be pruned");
1161 }
1162
1163 #[test]
1164 fn test_list_namespaces_with_counts() {
1165 let pool = SqlitePool::open_memory().unwrap();
1166 pool.with_conn(|conn| {
1167 for i in 0..3 {
1169 conn.execute(
1170 "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1171 VALUES (?1, 'personal', 'user', 'fact', ?2, 'personal')",
1172 rusqlite::params![format!("p{i}"), format!("val{i}")],
1173 )?;
1174 }
1175 conn.execute(
1176 "INSERT INTO semantic_facts (id, category, subject, predicate, object, namespace)
1177 VALUES ('w1', 'work', 'user', 'role', 'dev', 'work')",
1178 [],
1179 )?;
1180
1181 let mut stmt = conn.prepare(
1183 "SELECT namespace, COUNT(*) as cnt FROM semantic_facts
1184 WHERE superseded_by IS NULL
1185 GROUP BY namespace ORDER BY namespace",
1186 )?;
1187 let rows: Vec<(String, i64)> = stmt
1188 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1189 .collect::<Result<Vec<_>, _>>()?;
1190
1191 assert_eq!(rows.len(), 2, "should have 2 namespaces");
1192 let personal = rows.iter().find(|(ns, _)| ns == "personal").unwrap();
1193 assert_eq!(personal.1, 3, "personal should have 3 facts");
1194 let work = rows.iter().find(|(ns, _)| ns == "work").unwrap();
1195 assert_eq!(work.1, 1, "work should have 1 fact");
1196
1197 Ok(())
1198 })
1199 .unwrap();
1200 }
1201}