1pub mod event_log;
22pub mod feature;
23pub mod migration;
24pub mod schema;
25pub mod task_sync;
26
27pub use feature::FeatureStore;
28
29use flow_core::Result;
30use rusqlite::Connection;
31use std::path::Path;
32use std::sync::{Arc, Mutex};
33
34pub struct Database {
36 writer: Arc<Mutex<Connection>>,
37 db_path: std::path::PathBuf,
38}
39
40impl Database {
41 pub fn open(path: &Path) -> Result<Self> {
44 let conn = Connection::open(path)
45 .map_err(|e| flow_core::FlowError::Database(format!("failed to open database: {e}")))?;
46
47 schema::init_connection(&conn)?;
48 migration::run_migrations(&conn)?;
49
50 Ok(Self {
51 writer: Arc::new(Mutex::new(conn)),
52 db_path: path.to_path_buf(),
53 })
54 }
55
56 pub fn open_in_memory() -> Result<Self> {
58 let conn = Connection::open_in_memory().map_err(|e| {
59 flow_core::FlowError::Database(format!("failed to open in-memory database: {e}"))
60 })?;
61
62 schema::init_connection(&conn)?;
63 migration::run_migrations(&conn)?;
64
65 Ok(Self {
66 writer: Arc::new(Mutex::new(conn)),
67 db_path: std::path::PathBuf::from(":memory:"),
68 })
69 }
70
71 pub const fn writer(&self) -> &Arc<Mutex<Connection>> {
73 &self.writer
74 }
75
76 pub fn new_reader(&self) -> Result<Connection> {
78 if self.db_path.to_str() == Some(":memory:") {
79 return Err(flow_core::FlowError::Database(
80 "cannot create reader for in-memory database".to_string(),
81 ));
82 }
83
84 let conn =
85 Connection::open_with_flags(&self.db_path, rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY)
86 .map_err(|e| {
87 flow_core::FlowError::Database(format!(
88 "failed to open read-only connection: {e}"
89 ))
90 })?;
91
92 schema::init_connection(&conn)?;
93 Ok(conn)
94 }
95}
96
97#[cfg(test)]
98#[allow(clippy::significant_drop_tightening)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn test_open_in_memory() {
104 let db = Database::open_in_memory().unwrap();
105 let conn = db.writer().lock().unwrap();
106
107 let mut stmt = conn
109 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
110 .unwrap();
111 let tables: Vec<String> = stmt
112 .query_map([], |row| row.get(0))
113 .unwrap()
114 .collect::<std::result::Result<Vec<_>, _>>()
115 .unwrap();
116
117 assert!(tables.contains(&"features".to_string()));
118 assert!(tables.contains(&"change_events".to_string()));
119 }
120
121 #[test]
122 fn test_concurrent_access() {
123 let db = Database::open_in_memory().unwrap();
124
125 {
127 let conn = db.writer().lock().unwrap();
128 FeatureStore::create(
129 &conn,
130 &flow_core::CreateFeatureInput {
131 name: "Test Feature".to_string(),
132 description: "Test".to_string(),
133 priority: Some(1),
134 category: "Test".to_string(),
135 steps: vec![],
136 dependencies: vec![],
137 },
138 )
139 .unwrap();
140 }
141
142 {
144 let conn = db.writer().lock().unwrap();
145 let features = FeatureStore::get_all(&conn).unwrap();
146 assert_eq!(features.len(), 1);
147 assert_eq!(features[0].name, "Test Feature");
148 }
149 }
150}