Skip to main content

flow_db/
lib.rs

1//! `SQLite` database layer for Agent Flow feature management.
2//!
3//! Provides thread-safe database access with WAL mode for concurrent reads,
4//! automatic schema migrations, and event logging for audit trails.
5//!
6//! # Architecture
7//!
8//! - Single write connection behind `Arc<Mutex<Connection>>` for thread safety
9//! - Read-only connection spawning for parallel queries
10//! - Automatic WAL mode and performance pragmas
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! use flow_db::Database;
16//!
17//! let db = Database::open(std::path::Path::new("flow.db")).unwrap();
18//! let features = flow_db::FeatureStore::get_all(&db.writer().lock().unwrap()).unwrap();
19//! ```
20
21pub mod event_log;
22pub mod feature;
23pub mod migration;
24pub mod schema;
25pub mod task_sync;
26
27pub use feature::FeatureStore;
28
29use flow_core::Result;
30use rusqlite::Connection;
31use std::path::Path;
32use std::sync::{Arc, Mutex};
33
34/// Database handle with thread-safe write connection and ability to spawn read-only connections.
35pub struct Database {
36    writer: Arc<Mutex<Connection>>,
37    db_path: std::path::PathBuf,
38}
39
40impl Database {
41    /// Open a database at the specified path, creating it if necessary.
42    /// Runs migrations and applies performance optimizations.
43    pub fn open(path: &Path) -> Result<Self> {
44        let conn = Connection::open(path)
45            .map_err(|e| flow_core::FlowError::Database(format!("failed to open database: {e}")))?;
46
47        schema::init_connection(&conn)?;
48        migration::run_migrations(&conn)?;
49
50        Ok(Self {
51            writer: Arc::new(Mutex::new(conn)),
52            db_path: path.to_path_buf(),
53        })
54    }
55
56    /// Open an in-memory database for testing.
57    pub fn open_in_memory() -> Result<Self> {
58        let conn = Connection::open_in_memory().map_err(|e| {
59            flow_core::FlowError::Database(format!("failed to open in-memory database: {e}"))
60        })?;
61
62        schema::init_connection(&conn)?;
63        migration::run_migrations(&conn)?;
64
65        Ok(Self {
66            writer: Arc::new(Mutex::new(conn)),
67            db_path: std::path::PathBuf::from(":memory:"),
68        })
69    }
70
71    /// Get the shared writer connection (behind a mutex).
72    pub const fn writer(&self) -> &Arc<Mutex<Connection>> {
73        &self.writer
74    }
75
76    /// Create a new read-only connection to the same database.
77    pub fn new_reader(&self) -> Result<Connection> {
78        if self.db_path.to_str() == Some(":memory:") {
79            return Err(flow_core::FlowError::Database(
80                "cannot create reader for in-memory database".to_string(),
81            ));
82        }
83
84        let conn =
85            Connection::open_with_flags(&self.db_path, rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY)
86                .map_err(|e| {
87                    flow_core::FlowError::Database(format!(
88                        "failed to open read-only connection: {e}"
89                    ))
90                })?;
91
92        schema::init_connection(&conn)?;
93        Ok(conn)
94    }
95}
96
97#[cfg(test)]
98#[allow(clippy::significant_drop_tightening)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn test_open_in_memory() {
104        let db = Database::open_in_memory().unwrap();
105        let conn = db.writer().lock().unwrap();
106
107        // Verify tables exist
108        let mut stmt = conn
109            .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
110            .unwrap();
111        let tables: Vec<String> = stmt
112            .query_map([], |row| row.get(0))
113            .unwrap()
114            .collect::<std::result::Result<Vec<_>, _>>()
115            .unwrap();
116
117        assert!(tables.contains(&"features".to_string()));
118        assert!(tables.contains(&"change_events".to_string()));
119    }
120
121    #[test]
122    fn test_concurrent_access() {
123        let db = Database::open_in_memory().unwrap();
124
125        // Writer creates a feature
126        {
127            let conn = db.writer().lock().unwrap();
128            FeatureStore::create(
129                &conn,
130                &flow_core::CreateFeatureInput {
131                    name: "Test Feature".to_string(),
132                    description: "Test".to_string(),
133                    priority: Some(1),
134                    category: "Test".to_string(),
135                    steps: vec![],
136                    dependencies: vec![],
137                },
138            )
139            .unwrap();
140        }
141
142        // Reader can read it
143        {
144            let conn = db.writer().lock().unwrap();
145            let features = FeatureStore::get_all(&conn).unwrap();
146            assert_eq!(features.len(), 1);
147            assert_eq!(features[0].name, "Test Feature");
148        }
149    }
150}