Skip to main content

flow_db/
migration.rs

1use flow_core::Result;
2use rusqlite::Connection;
3
4const CURRENT_VERSION: i32 = 1;
5
6/// Run database migrations to bring the schema up to date.
7pub fn run_migrations(conn: &Connection) -> Result<()> {
8    let version: i32 = conn
9        .pragma_query_value(None, "user_version", |row| row.get(0))
10        .map_err(|e| {
11            flow_core::FlowError::Database(format!("failed to get user_version: {e}"))
12        })?;
13
14    if version < 1 {
15        migrate_v1(conn)?;
16    }
17
18    // Future migrations would go here
19    // if version < 2 { migrate_v2(conn)?; }
20
21    Ok(())
22}
23
24fn migrate_v1(conn: &Connection) -> Result<()> {
25    tracing::info!("Running migration v1: creating initial schema");
26
27    // Create features table
28    conn.execute(
29        r"
30        CREATE TABLE IF NOT EXISTS features (
31            id INTEGER PRIMARY KEY AUTOINCREMENT,
32            priority INTEGER NOT NULL DEFAULT 0,
33            category TEXT NOT NULL DEFAULT '',
34            name TEXT NOT NULL,
35            description TEXT NOT NULL DEFAULT '',
36            steps TEXT NOT NULL DEFAULT '[]',
37            passes INTEGER NOT NULL DEFAULT 0,
38            in_progress INTEGER NOT NULL DEFAULT 0,
39            dependencies TEXT NOT NULL DEFAULT '[]',
40            created_at TEXT NOT NULL DEFAULT (datetime('now')),
41            updated_at TEXT NOT NULL DEFAULT (datetime('now'))
42        )
43        ",
44        [],
45    )
46    .map_err(|e| {
47        flow_core::FlowError::Database(format!("failed to create features table: {e}"))
48    })?;
49
50    // Create change_events table
51    conn.execute(
52        r"
53        CREATE TABLE IF NOT EXISTS change_events (
54            id INTEGER PRIMARY KEY AUTOINCREMENT,
55            feature_id INTEGER NOT NULL,
56            event_type TEXT NOT NULL,
57            field TEXT,
58            old_value TEXT,
59            new_value TEXT,
60            agent TEXT,
61            source TEXT NOT NULL,
62            created_at TEXT NOT NULL DEFAULT (datetime('now')),
63            FOREIGN KEY (feature_id) REFERENCES features(id) ON DELETE CASCADE
64        )
65        ",
66        [],
67    )
68    .map_err(|e| {
69        flow_core::FlowError::Database(format!("failed to create change_events table: {e}"))
70    })?;
71
72    // Create indexes for common queries
73    conn.execute(
74        "CREATE INDEX IF NOT EXISTS ix_feature_status ON features(passes, in_progress)",
75        [],
76    )
77    .map_err(|e| {
78        flow_core::FlowError::Database(format!("failed to create status index: {e}"))
79    })?;
80
81    conn.execute(
82        "CREATE INDEX IF NOT EXISTS ix_feature_priority ON features(priority)",
83        [],
84    )
85    .map_err(|e| {
86        flow_core::FlowError::Database(format!("failed to create priority index: {e}"))
87    })?;
88
89    conn.execute(
90        "CREATE INDEX IF NOT EXISTS ix_change_events_feature ON change_events(feature_id)",
91        [],
92    )
93    .map_err(|e| {
94        flow_core::FlowError::Database(format!("failed to create events index: {e}"))
95    })?;
96
97    // Update version
98    conn.pragma_update(None, "user_version", CURRENT_VERSION)
99        .map_err(|e| {
100            flow_core::FlowError::Database(format!("failed to update user_version: {e}"))
101        })?;
102
103    tracing::info!("Migration v1 complete");
104    Ok(())
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn test_migrations() {
113        let conn = Connection::open_in_memory().unwrap();
114
115        // Initially version should be 0
116        let version: i32 = conn
117            .pragma_query_value(None, "user_version", |row| row.get(0))
118            .unwrap();
119        assert_eq!(version, 0);
120
121        // Run migrations
122        run_migrations(&conn).unwrap();
123
124        // Version should now be CURRENT_VERSION
125        let version: i32 = conn
126            .pragma_query_value(None, "user_version", |row| row.get(0))
127            .unwrap();
128        assert_eq!(version, CURRENT_VERSION);
129
130        // Tables should exist
131        let tables: Vec<String> = conn
132            .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
133            .unwrap()
134            .query_map([], |row| row.get(0))
135            .unwrap()
136            .collect::<std::result::Result<Vec<_>, _>>()
137            .unwrap();
138
139        assert!(tables.contains(&"features".to_string()));
140        assert!(tables.contains(&"change_events".to_string()));
141
142        // Running migrations again should be idempotent
143        run_migrations(&conn).unwrap();
144        let version: i32 = conn
145            .pragma_query_value(None, "user_version", |row| row.get(0))
146            .unwrap();
147        assert_eq!(version, CURRENT_VERSION);
148    }
149
150    #[test]
151    fn test_feature_table_schema() {
152        let conn = Connection::open_in_memory().unwrap();
153        run_migrations(&conn).unwrap();
154
155        // Verify we can insert and retrieve a feature
156        conn.execute(
157            r#"
158            INSERT INTO features (name, description, priority, category, steps, dependencies)
159            VALUES (?, ?, ?, ?, ?, ?)
160            "#,
161            rusqlite::params![
162                "Test Feature",
163                "Test Description",
164                1,
165                "Test",
166                "[]",
167                "[1, 2]"
168            ],
169        )
170        .unwrap();
171
172        let (name, deps): (String, String) = conn
173            .query_row("SELECT name, dependencies FROM features WHERE id = 1", [], |row| {
174                Ok((row.get(0)?, row.get(1)?))
175            })
176            .unwrap();
177
178        assert_eq!(name, "Test Feature");
179        assert_eq!(deps, "[1, 2]");
180    }
181}