intent_engine/db/
mod.rs

1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8    let options = SqliteConnectOptions::new()
9        .filename(db_path)
10        .create_if_missing(true)
11        .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12        .busy_timeout(std::time::Duration::from_millis(5000));
13
14    let pool = SqlitePoolOptions::new()
15        .max_connections(5)
16        .connect_with(options)
17        .await?;
18
19    Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23    // Enable FTS5
24    sqlx::query("PRAGMA journal_mode=WAL;")
25        .execute(pool)
26        .await?;
27
28    // Create tasks table
29    sqlx::query(
30        r#"
31        CREATE TABLE IF NOT EXISTS tasks (
32            id INTEGER PRIMARY KEY AUTOINCREMENT,
33            parent_id INTEGER,
34            name TEXT NOT NULL,
35            spec TEXT,
36            status TEXT NOT NULL DEFAULT 'todo',
37            complexity INTEGER,
38            priority INTEGER DEFAULT 0,
39            first_todo_at DATETIME,
40            first_doing_at DATETIME,
41            first_done_at DATETIME,
42            FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
43            CHECK (status IN ('todo', 'doing', 'done'))
44        )
45        "#,
46    )
47    .execute(pool)
48    .await?;
49
50    // Create FTS5 virtual table for tasks
51    sqlx::query(
52        r#"
53        CREATE VIRTUAL TABLE IF NOT EXISTS tasks_fts USING fts5(
54            name,
55            spec,
56            content=tasks,
57            content_rowid=id
58        )
59        "#,
60    )
61    .execute(pool)
62    .await?;
63
64    // Create triggers to keep FTS in sync
65    sqlx::query(
66        r#"
67        CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
68            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
69        END
70        "#,
71    )
72    .execute(pool)
73    .await?;
74
75    sqlx::query(
76        r#"
77        CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
78            DELETE FROM tasks_fts WHERE rowid = old.id;
79        END
80        "#,
81    )
82    .execute(pool)
83    .await?;
84
85    sqlx::query(
86        r#"
87        CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
88            UPDATE tasks_fts SET name = new.name, spec = new.spec WHERE rowid = old.id;
89        END
90        "#,
91    )
92    .execute(pool)
93    .await?;
94
95    // Create events table
96    sqlx::query(
97        r#"
98        CREATE TABLE IF NOT EXISTS events (
99            id INTEGER PRIMARY KEY AUTOINCREMENT,
100            task_id INTEGER NOT NULL,
101            timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
102            log_type TEXT NOT NULL,
103            discussion_data TEXT NOT NULL,
104            FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
105        )
106        "#,
107    )
108    .execute(pool)
109    .await?;
110
111    // Create index on task_id for events
112    sqlx::query(
113        r#"
114        CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
115        "#,
116    )
117    .execute(pool)
118    .await?;
119
120    // Create FTS5 virtual table for events
121    sqlx::query(
122        r#"
123        CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
124            discussion_data,
125            content=events,
126            content_rowid=id
127        )
128        "#,
129    )
130    .execute(pool)
131    .await?;
132
133    // Create triggers for events FTS
134    sqlx::query(
135        r#"
136        CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
137            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
138        END
139        "#,
140    )
141    .execute(pool)
142    .await?;
143
144    sqlx::query(
145        r#"
146        CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
147            DELETE FROM events_fts WHERE rowid = old.id;
148        END
149        "#,
150    )
151    .execute(pool)
152    .await?;
153
154    sqlx::query(
155        r#"
156        CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
157            UPDATE events_fts SET discussion_data = new.discussion_data WHERE rowid = old.id;
158        END
159        "#,
160    )
161    .execute(pool)
162    .await?;
163
164    // Create workspace_state table
165    sqlx::query(
166        r#"
167        CREATE TABLE IF NOT EXISTS workspace_state (
168            key TEXT PRIMARY KEY,
169            value TEXT NOT NULL
170        )
171        "#,
172    )
173    .execute(pool)
174    .await?;
175
176    // Create dependencies table for v0.2.0
177    sqlx::query(
178        r#"
179        CREATE TABLE IF NOT EXISTS dependencies (
180            id INTEGER PRIMARY KEY AUTOINCREMENT,
181            blocking_task_id INTEGER NOT NULL,
182            blocked_task_id INTEGER NOT NULL,
183            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
184            FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
185            FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
186            UNIQUE(blocking_task_id, blocked_task_id),
187            CHECK(blocking_task_id != blocked_task_id)
188        )
189        "#,
190    )
191    .execute(pool)
192    .await?;
193
194    // Create indexes for dependencies table
195    sqlx::query(
196        r#"
197        CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
198        ON dependencies(blocking_task_id)
199        "#,
200    )
201    .execute(pool)
202    .await?;
203
204    sqlx::query(
205        r#"
206        CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
207        ON dependencies(blocked_task_id)
208        "#,
209    )
210    .execute(pool)
211    .await?;
212
213    // Create composite index for event filtering (v0.2.0)
214    sqlx::query(
215        r#"
216        CREATE INDEX IF NOT EXISTS idx_events_task_type_time
217        ON events(task_id, log_type, timestamp)
218        "#,
219    )
220    .execute(pool)
221    .await?;
222
223    // Update schema version to 0.2.0
224    sqlx::query(
225        r#"
226        INSERT INTO workspace_state (key, value)
227        VALUES ('schema_version', '0.2.0')
228        ON CONFLICT(key) DO UPDATE SET value = '0.2.0'
229        "#,
230    )
231    .execute(pool)
232    .await?;
233
234    Ok(())
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use tempfile::TempDir;
241
242    #[tokio::test]
243    async fn test_create_pool_success() {
244        let temp_dir = TempDir::new().unwrap();
245        let db_path = temp_dir.path().join("test.db");
246
247        let pool = create_pool(&db_path).await.unwrap();
248
249        // Verify we can execute a query
250        let result: i64 = sqlx::query_scalar("SELECT 1")
251            .fetch_one(&pool)
252            .await
253            .unwrap();
254
255        assert_eq!(result, 1);
256    }
257
258    #[tokio::test]
259    async fn test_run_migrations_creates_tables() {
260        let temp_dir = TempDir::new().unwrap();
261        let db_path = temp_dir.path().join("test.db");
262        let pool = create_pool(&db_path).await.unwrap();
263
264        run_migrations(&pool).await.unwrap();
265
266        // Verify tables were created
267        let tables: Vec<String> =
268            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
269                .fetch_all(&pool)
270                .await
271                .unwrap();
272
273        assert!(tables.contains(&"tasks".to_string()));
274        assert!(tables.contains(&"events".to_string()));
275        assert!(tables.contains(&"workspace_state".to_string()));
276    }
277
278    #[tokio::test]
279    async fn test_run_migrations_creates_fts_tables() {
280        let temp_dir = TempDir::new().unwrap();
281        let db_path = temp_dir.path().join("test.db");
282        let pool = create_pool(&db_path).await.unwrap();
283
284        run_migrations(&pool).await.unwrap();
285
286        // Verify FTS tables were created
287        let tables: Vec<String> = sqlx::query_scalar(
288            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
289        )
290        .fetch_all(&pool)
291        .await
292        .unwrap();
293
294        assert!(tables.contains(&"tasks_fts".to_string()));
295        assert!(tables.contains(&"events_fts".to_string()));
296    }
297
298    #[tokio::test]
299    async fn test_run_migrations_creates_triggers() {
300        let temp_dir = TempDir::new().unwrap();
301        let db_path = temp_dir.path().join("test.db");
302        let pool = create_pool(&db_path).await.unwrap();
303
304        run_migrations(&pool).await.unwrap();
305
306        // Verify triggers were created
307        let triggers: Vec<String> =
308            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
309                .fetch_all(&pool)
310                .await
311                .unwrap();
312
313        assert!(triggers.contains(&"tasks_ai".to_string()));
314        assert!(triggers.contains(&"tasks_ad".to_string()));
315        assert!(triggers.contains(&"tasks_au".to_string()));
316        assert!(triggers.contains(&"events_ai".to_string()));
317        assert!(triggers.contains(&"events_ad".to_string()));
318        assert!(triggers.contains(&"events_au".to_string()));
319    }
320
321    #[tokio::test]
322    async fn test_run_migrations_idempotent() {
323        let temp_dir = TempDir::new().unwrap();
324        let db_path = temp_dir.path().join("test.db");
325        let pool = create_pool(&db_path).await.unwrap();
326
327        // Run migrations twice
328        run_migrations(&pool).await.unwrap();
329        run_migrations(&pool).await.unwrap();
330
331        // Should not fail - migrations are idempotent
332        let tables: Vec<String> =
333            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
334                .fetch_all(&pool)
335                .await
336                .unwrap();
337
338        assert!(tables.len() >= 3);
339    }
340
341    #[tokio::test]
342    async fn test_fts_triggers_work() {
343        let temp_dir = TempDir::new().unwrap();
344        let db_path = temp_dir.path().join("test.db");
345        let pool = create_pool(&db_path).await.unwrap();
346        run_migrations(&pool).await.unwrap();
347
348        // Insert a task
349        sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
350            .bind("Test task")
351            .bind("Test spec")
352            .bind("todo")
353            .execute(&pool)
354            .await
355            .unwrap();
356
357        // Verify FTS was updated
358        let count: i64 =
359            sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
360                .fetch_one(&pool)
361                .await
362                .unwrap();
363
364        assert_eq!(count, 1);
365    }
366
367    #[tokio::test]
368    async fn test_workspace_state_table_structure() {
369        let temp_dir = TempDir::new().unwrap();
370        let db_path = temp_dir.path().join("test.db");
371        let pool = create_pool(&db_path).await.unwrap();
372        run_migrations(&pool).await.unwrap();
373
374        // Insert and retrieve workspace state
375        sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
376            .bind("test_key")
377            .bind("test_value")
378            .execute(&pool)
379            .await
380            .unwrap();
381
382        let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
383            .bind("test_key")
384            .fetch_one(&pool)
385            .await
386            .unwrap();
387
388        assert_eq!(value, "test_value");
389    }
390
391    #[tokio::test]
392    async fn test_task_status_constraint() {
393        let temp_dir = TempDir::new().unwrap();
394        let db_path = temp_dir.path().join("test.db");
395        let pool = create_pool(&db_path).await.unwrap();
396        run_migrations(&pool).await.unwrap();
397
398        // Try to insert task with invalid status
399        let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
400            .bind("Test")
401            .bind("invalid_status")
402            .execute(&pool)
403            .await;
404
405        // Should fail due to CHECK constraint
406        assert!(result.is_err());
407    }
408
409    // v0.2.0 Migration Tests
410
411    #[tokio::test]
412    async fn test_dependencies_table_created() {
413        let temp_dir = TempDir::new().unwrap();
414        let db_path = temp_dir.path().join("test.db");
415        let pool = create_pool(&db_path).await.unwrap();
416        run_migrations(&pool).await.unwrap();
417
418        // Verify dependencies table exists
419        let tables: Vec<String> = sqlx::query_scalar(
420            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
421        )
422        .fetch_all(&pool)
423        .await
424        .unwrap();
425
426        assert!(tables.contains(&"dependencies".to_string()));
427    }
428
429    #[tokio::test]
430    async fn test_dependencies_indexes_created() {
431        let temp_dir = TempDir::new().unwrap();
432        let db_path = temp_dir.path().join("test.db");
433        let pool = create_pool(&db_path).await.unwrap();
434        run_migrations(&pool).await.unwrap();
435
436        // Verify indexes exist
437        let indexes: Vec<String> = sqlx::query_scalar(
438            "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
439        )
440        .fetch_all(&pool)
441        .await
442        .unwrap();
443
444        assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
445        assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
446        assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
447    }
448
449    #[tokio::test]
450    async fn test_dependencies_self_dependency_constraint() {
451        let temp_dir = TempDir::new().unwrap();
452        let db_path = temp_dir.path().join("test.db");
453        let pool = create_pool(&db_path).await.unwrap();
454        run_migrations(&pool).await.unwrap();
455
456        // Create a task
457        sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
458            .bind("Task 1")
459            .bind("todo")
460            .execute(&pool)
461            .await
462            .unwrap();
463
464        // Try to create self-dependency (should fail)
465        let result = sqlx::query(
466            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
467        )
468        .bind(1)
469        .bind(1)
470        .execute(&pool)
471        .await;
472
473        assert!(result.is_err());
474    }
475
476    #[tokio::test]
477    async fn test_dependencies_unique_constraint() {
478        let temp_dir = TempDir::new().unwrap();
479        let db_path = temp_dir.path().join("test.db");
480        let pool = create_pool(&db_path).await.unwrap();
481        run_migrations(&pool).await.unwrap();
482
483        // Create tasks
484        for i in 1..=2 {
485            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
486                .bind(format!("Task {}", i))
487                .bind("todo")
488                .execute(&pool)
489                .await
490                .unwrap();
491        }
492
493        // Create dependency
494        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
495            .bind(1)
496            .bind(2)
497            .execute(&pool)
498            .await
499            .unwrap();
500
501        // Try to create duplicate dependency (should fail)
502        let result = sqlx::query(
503            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
504        )
505        .bind(1)
506        .bind(2)
507        .execute(&pool)
508        .await;
509
510        assert!(result.is_err());
511    }
512
513    #[tokio::test]
514    async fn test_dependencies_cascade_delete() {
515        let temp_dir = TempDir::new().unwrap();
516        let db_path = temp_dir.path().join("test.db");
517        let pool = create_pool(&db_path).await.unwrap();
518        run_migrations(&pool).await.unwrap();
519
520        // Create tasks
521        for i in 1..=2 {
522            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
523                .bind(format!("Task {}", i))
524                .bind("todo")
525                .execute(&pool)
526                .await
527                .unwrap();
528        }
529
530        // Create dependency
531        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
532            .bind(1)
533            .bind(2)
534            .execute(&pool)
535            .await
536            .unwrap();
537
538        // Verify dependency exists
539        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
540            .fetch_one(&pool)
541            .await
542            .unwrap();
543        assert_eq!(count, 1);
544
545        // Delete blocking task
546        sqlx::query("DELETE FROM tasks WHERE id = ?")
547            .bind(1)
548            .execute(&pool)
549            .await
550            .unwrap();
551
552        // Verify dependency was cascade deleted
553        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
554            .fetch_one(&pool)
555            .await
556            .unwrap();
557        assert_eq!(count, 0);
558    }
559
560    #[tokio::test]
561    async fn test_schema_version_tracking() {
562        let temp_dir = TempDir::new().unwrap();
563        let db_path = temp_dir.path().join("test.db");
564        let pool = create_pool(&db_path).await.unwrap();
565        run_migrations(&pool).await.unwrap();
566
567        // Verify schema version is set to 0.2.0
568        let version: String =
569            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
570                .fetch_one(&pool)
571                .await
572                .unwrap();
573
574        assert_eq!(version, "0.2.0");
575    }
576
577    #[tokio::test]
578    async fn test_migration_idempotency_v0_2_0() {
579        let temp_dir = TempDir::new().unwrap();
580        let db_path = temp_dir.path().join("test.db");
581        let pool = create_pool(&db_path).await.unwrap();
582
583        // Run migrations multiple times
584        run_migrations(&pool).await.unwrap();
585        run_migrations(&pool).await.unwrap();
586        run_migrations(&pool).await.unwrap();
587
588        // Verify dependencies table exists and is functional
589        let tables: Vec<String> = sqlx::query_scalar(
590            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
591        )
592        .fetch_all(&pool)
593        .await
594        .unwrap();
595
596        assert!(tables.contains(&"dependencies".to_string()));
597
598        // Verify schema version is still correct
599        let version: String =
600            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
601                .fetch_one(&pool)
602                .await
603                .unwrap();
604
605        assert_eq!(version, "0.2.0");
606    }
607}