Skip to main content

flow_db/
schema.rs

1use flow_core::Result;
2use rusqlite::Connection;
3use std::time::Duration;
4
5/// Initialize a database connection with performance optimizations.
6pub fn init_connection(conn: &Connection) -> Result<()> {
7    // Enable WAL mode for better concurrency
8    conn.pragma_update(None, "journal_mode", "WAL")
9        .map_err(|e| flow_core::FlowError::Database(format!("failed to set WAL mode: {e}")))?;
10
11    // Synchronous = NORMAL for better performance (still safe with WAL)
12    conn.pragma_update(None, "synchronous", "NORMAL")
13        .map_err(|e| {
14            flow_core::FlowError::Database(format!("failed to set synchronous mode: {e}"))
15        })?;
16
17    // Cache size: 64MB (negative value means kibibytes)
18    conn.pragma_update(None, "cache_size", -64000)
19        .map_err(|e| {
20            flow_core::FlowError::Database(format!("failed to set cache size: {e}"))
21        })?;
22
23    // Store temp tables in memory for speed
24    conn.pragma_update(None, "temp_store", "MEMORY")
25        .map_err(|e| {
26            flow_core::FlowError::Database(format!("failed to set temp_store: {e}"))
27        })?;
28
29    // Enable memory-mapped I/O (30GB max)
30    conn.pragma_update(None, "mmap_size", 30_000_000_000_i64)
31        .map_err(|e| {
32            flow_core::FlowError::Database(format!("failed to set mmap_size: {e}"))
33        })?;
34
35    // Set busy handler: retry up to 100 times with 10ms sleep
36    conn.busy_timeout(Duration::from_millis(1000))
37        .map_err(|e| {
38            flow_core::FlowError::Database(format!("failed to set busy timeout: {e}"))
39        })?;
40
41    Ok(())
42}
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47
48    #[test]
49    fn test_init_connection() {
50        let conn = Connection::open_in_memory().unwrap();
51        init_connection(&conn).unwrap();
52
53        // Note: In-memory databases use MEMORY journal mode, not WAL
54        // WAL is only supported for file-based databases
55        let journal_mode: String = conn
56            .pragma_query_value(None, "journal_mode", |row| row.get(0))
57            .unwrap();
58        assert_eq!(journal_mode.to_uppercase(), "MEMORY");
59
60        // Verify cache size
61        let cache_size: i32 = conn
62            .pragma_query_value(None, "cache_size", |row| row.get(0))
63            .unwrap();
64        assert_eq!(cache_size, -64000);
65    }
66}