Skip to main content

flow_db/
schema.rs

1use flow_core::Result;
2use rusqlite::Connection;
3use std::time::Duration;
4
5/// Initialize a database connection with performance optimizations.
6pub fn init_connection(conn: &Connection) -> Result<()> {
7    // Enable WAL mode for better concurrency
8    conn.pragma_update(None, "journal_mode", "WAL")
9        .map_err(|e| flow_core::FlowError::Database(format!("failed to set WAL mode: {e}")))?;
10
11    // Synchronous = NORMAL for better performance (still safe with WAL)
12    conn.pragma_update(None, "synchronous", "NORMAL")
13        .map_err(|e| {
14            flow_core::FlowError::Database(format!("failed to set synchronous mode: {e}"))
15        })?;
16
17    // Cache size: 64MB (negative value means kibibytes)
18    conn.pragma_update(None, "cache_size", -64000)
19        .map_err(|e| flow_core::FlowError::Database(format!("failed to set cache size: {e}")))?;
20
21    // Store temp tables in memory for speed
22    conn.pragma_update(None, "temp_store", "MEMORY")
23        .map_err(|e| flow_core::FlowError::Database(format!("failed to set temp_store: {e}")))?;
24
25    // Enable memory-mapped I/O (30GB max)
26    conn.pragma_update(None, "mmap_size", 30_000_000_000_i64)
27        .map_err(|e| flow_core::FlowError::Database(format!("failed to set mmap_size: {e}")))?;
28
29    // Set busy handler: retry up to 100 times with 10ms sleep
30    conn.busy_timeout(Duration::from_millis(1000))
31        .map_err(|e| flow_core::FlowError::Database(format!("failed to set busy timeout: {e}")))?;
32
33    Ok(())
34}
35
36#[cfg(test)]
37mod tests {
38    use super::*;
39
40    #[test]
41    fn test_init_connection() {
42        let conn = Connection::open_in_memory().unwrap();
43        init_connection(&conn).unwrap();
44
45        // Note: In-memory databases use MEMORY journal mode, not WAL
46        // WAL is only supported for file-based databases
47        let journal_mode: String = conn
48            .pragma_query_value(None, "journal_mode", |row| row.get(0))
49            .unwrap();
50        assert_eq!(journal_mode.to_uppercase(), "MEMORY");
51
52        // Verify cache size
53        let cache_size: i32 = conn
54            .pragma_query_value(None, "cache_size", |row| row.get(0))
55            .unwrap();
56        assert_eq!(cache_size, -64000);
57    }
58}