Skip to main content

flow_db/
lib.rs

1pub mod event_log;
2pub mod feature;
3pub mod migration;
4pub mod schema;
5pub mod task_sync;
6
7pub use feature::FeatureStore;
8
9use flow_core::Result;
10use rusqlite::Connection;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13
14/// Database handle with thread-safe write connection and ability to spawn read-only connections.
15pub struct Database {
16    writer: Arc<Mutex<Connection>>,
17    db_path: std::path::PathBuf,
18}
19
20impl Database {
21    /// Open a database at the specified path, creating it if necessary.
22    /// Runs migrations and applies performance optimizations.
23    pub fn open(path: &Path) -> Result<Self> {
24        let conn = Connection::open(path).map_err(|e| {
25            flow_core::FlowError::Database(format!("failed to open database: {e}"))
26        })?;
27
28        schema::init_connection(&conn)?;
29        migration::run_migrations(&conn)?;
30
31        Ok(Self {
32            writer: Arc::new(Mutex::new(conn)),
33            db_path: path.to_path_buf(),
34        })
35    }
36
37    /// Open an in-memory database for testing.
38    pub fn open_in_memory() -> Result<Self> {
39        let conn = Connection::open_in_memory().map_err(|e| {
40            flow_core::FlowError::Database(format!("failed to open in-memory database: {e}"))
41        })?;
42
43        schema::init_connection(&conn)?;
44        migration::run_migrations(&conn)?;
45
46        Ok(Self {
47            writer: Arc::new(Mutex::new(conn)),
48            db_path: std::path::PathBuf::from(":memory:"),
49        })
50    }
51
52    /// Get the shared writer connection (behind a mutex).
53    pub const fn writer(&self) -> &Arc<Mutex<Connection>> {
54        &self.writer
55    }
56
57    /// Create a new read-only connection to the same database.
58    pub fn new_reader(&self) -> Result<Connection> {
59        if self.db_path.to_str() == Some(":memory:") {
60            return Err(flow_core::FlowError::Database(
61                "cannot create reader for in-memory database".to_string(),
62            ));
63        }
64
65        let conn = Connection::open_with_flags(
66            &self.db_path,
67            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
68        )
69        .map_err(|e| {
70            flow_core::FlowError::Database(format!("failed to open read-only connection: {e}"))
71        })?;
72
73        schema::init_connection(&conn)?;
74        Ok(conn)
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    #[test]
83    fn test_open_in_memory() {
84        let db = Database::open_in_memory().unwrap();
85        let conn = db.writer().lock().unwrap();
86
87        // Verify tables exist
88        let mut stmt = conn
89            .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
90            .unwrap();
91        let tables: Vec<String> = stmt
92            .query_map([], |row| row.get(0))
93            .unwrap()
94            .collect::<std::result::Result<Vec<_>, _>>()
95            .unwrap();
96
97        assert!(tables.contains(&"features".to_string()));
98        assert!(tables.contains(&"change_events".to_string()));
99    }
100
101    #[test]
102    fn test_concurrent_access() {
103        let db = Database::open_in_memory().unwrap();
104
105        // Writer creates a feature
106        {
107            let conn = db.writer().lock().unwrap();
108            FeatureStore::create(
109                &conn,
110                &flow_core::CreateFeatureInput {
111                    name: "Test Feature".to_string(),
112                    description: "Test".to_string(),
113                    priority: Some(1),
114                    category: "Test".to_string(),
115                    steps: vec![],
116                    dependencies: vec![],
117                },
118            )
119            .unwrap();
120        }
121
122        // Reader can read it
123        {
124            let conn = db.writer().lock().unwrap();
125            let features = FeatureStore::get_all(&conn).unwrap();
126            assert_eq!(features.len(), 1);
127            assert_eq!(features[0].name, "Test Feature");
128        }
129    }
130}