1pub mod event_log;
2pub mod feature;
3pub mod migration;
4pub mod schema;
5pub mod task_sync;
6
7pub use feature::FeatureStore;
8
9use flow_core::Result;
10use rusqlite::Connection;
11use std::path::Path;
12use std::sync::{Arc, Mutex};
13
14pub struct Database {
16 writer: Arc<Mutex<Connection>>,
17 db_path: std::path::PathBuf,
18}
19
20impl Database {
21 pub fn open(path: &Path) -> Result<Self> {
24 let conn = Connection::open(path).map_err(|e| {
25 flow_core::FlowError::Database(format!("failed to open database: {e}"))
26 })?;
27
28 schema::init_connection(&conn)?;
29 migration::run_migrations(&conn)?;
30
31 Ok(Self {
32 writer: Arc::new(Mutex::new(conn)),
33 db_path: path.to_path_buf(),
34 })
35 }
36
37 pub fn open_in_memory() -> Result<Self> {
39 let conn = Connection::open_in_memory().map_err(|e| {
40 flow_core::FlowError::Database(format!("failed to open in-memory database: {e}"))
41 })?;
42
43 schema::init_connection(&conn)?;
44 migration::run_migrations(&conn)?;
45
46 Ok(Self {
47 writer: Arc::new(Mutex::new(conn)),
48 db_path: std::path::PathBuf::from(":memory:"),
49 })
50 }
51
52 pub const fn writer(&self) -> &Arc<Mutex<Connection>> {
54 &self.writer
55 }
56
57 pub fn new_reader(&self) -> Result<Connection> {
59 if self.db_path.to_str() == Some(":memory:") {
60 return Err(flow_core::FlowError::Database(
61 "cannot create reader for in-memory database".to_string(),
62 ));
63 }
64
65 let conn = Connection::open_with_flags(
66 &self.db_path,
67 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
68 )
69 .map_err(|e| {
70 flow_core::FlowError::Database(format!("failed to open read-only connection: {e}"))
71 })?;
72
73 schema::init_connection(&conn)?;
74 Ok(conn)
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81
82 #[test]
83 fn test_open_in_memory() {
84 let db = Database::open_in_memory().unwrap();
85 let conn = db.writer().lock().unwrap();
86
87 let mut stmt = conn
89 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
90 .unwrap();
91 let tables: Vec<String> = stmt
92 .query_map([], |row| row.get(0))
93 .unwrap()
94 .collect::<std::result::Result<Vec<_>, _>>()
95 .unwrap();
96
97 assert!(tables.contains(&"features".to_string()));
98 assert!(tables.contains(&"change_events".to_string()));
99 }
100
101 #[test]
102 fn test_concurrent_access() {
103 let db = Database::open_in_memory().unwrap();
104
105 {
107 let conn = db.writer().lock().unwrap();
108 FeatureStore::create(
109 &conn,
110 &flow_core::CreateFeatureInput {
111 name: "Test Feature".to_string(),
112 description: "Test".to_string(),
113 priority: Some(1),
114 category: "Test".to_string(),
115 steps: vec![],
116 dependencies: vec![],
117 },
118 )
119 .unwrap();
120 }
121
122 {
124 let conn = db.writer().lock().unwrap();
125 let features = FeatureStore::get_all(&conn).unwrap();
126 assert_eq!(features.len(), 1);
127 assert_eq!(features[0].name, "Test Feature");
128 }
129 }
130}