intent_engine/db/
mod.rs

1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8    let options = SqliteConnectOptions::new()
9        .filename(db_path)
10        .create_if_missing(true)
11        .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12        .busy_timeout(std::time::Duration::from_millis(5000));
13
14    let pool = SqlitePoolOptions::new()
15        .max_connections(5)
16        .connect_with(options)
17        .await?;
18
19    Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23    // Enable FTS5
24    sqlx::query("PRAGMA journal_mode=WAL;")
25        .execute(pool)
26        .await?;
27
28    // Create tasks table
29    sqlx::query(
30        r#"
31        CREATE TABLE IF NOT EXISTS tasks (
32            id INTEGER PRIMARY KEY AUTOINCREMENT,
33            parent_id INTEGER,
34            name TEXT NOT NULL,
35            spec TEXT,
36            status TEXT NOT NULL DEFAULT 'todo',
37            complexity INTEGER,
38            priority INTEGER DEFAULT 0,
39            first_todo_at DATETIME,
40            first_doing_at DATETIME,
41            first_done_at DATETIME,
42            FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
43            CHECK (status IN ('todo', 'doing', 'done'))
44        )
45        "#,
46    )
47    .execute(pool)
48    .await?;
49
50    // Create FTS5 virtual table for tasks with trigram tokenizer for better CJK support
51    // For existing databases, we need to drop and recreate if tokenizer changed
52    let _ = sqlx::query("DROP TABLE IF EXISTS tasks_fts")
53        .execute(pool)
54        .await; // Ignore error if table doesn't exist
55
56    sqlx::query(
57        r#"
58        CREATE VIRTUAL TABLE tasks_fts USING fts5(
59            name,
60            spec,
61            content=tasks,
62            content_rowid=id,
63            tokenize='trigram'
64        )
65        "#,
66    )
67    .execute(pool)
68    .await?;
69
70    // Create triggers to keep FTS in sync
71    sqlx::query(
72        r#"
73        CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
74            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
75        END
76        "#,
77    )
78    .execute(pool)
79    .await?;
80
81    sqlx::query(
82        r#"
83        CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
84            DELETE FROM tasks_fts WHERE rowid = old.id;
85        END
86        "#,
87    )
88    .execute(pool)
89    .await?;
90
91    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
92    // Note: We always drop first because SQLite doesn't support CREATE OR REPLACE TRIGGER,
93    // and we need to update existing databases that have the buggy trigger.
94    let _ = sqlx::query("DROP TRIGGER IF EXISTS tasks_au")
95        .execute(pool)
96        .await; // Ignore error if trigger doesn't exist
97
98    sqlx::query(
99        r#"
100        CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
101            INSERT INTO tasks_fts(tasks_fts, rowid, name, spec) VALUES('delete', old.id, old.name, old.spec);
102            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
103        END
104        "#,
105    )
106    .execute(pool)
107    .await?;
108
109    // Rebuild FTS index with existing data from tasks table
110    sqlx::query(
111        r#"
112        INSERT INTO tasks_fts(rowid, name, spec)
113        SELECT id, name, spec FROM tasks
114        "#,
115    )
116    .execute(pool)
117    .await?;
118
119    // Create events table
120    sqlx::query(
121        r#"
122        CREATE TABLE IF NOT EXISTS events (
123            id INTEGER PRIMARY KEY AUTOINCREMENT,
124            task_id INTEGER NOT NULL,
125            timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
126            log_type TEXT NOT NULL,
127            discussion_data TEXT NOT NULL,
128            FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
129        )
130        "#,
131    )
132    .execute(pool)
133    .await?;
134
135    // Create index on task_id for events
136    sqlx::query(
137        r#"
138        CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
139        "#,
140    )
141    .execute(pool)
142    .await?;
143
144    // Create FTS5 virtual table for events
145    sqlx::query(
146        r#"
147        CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
148            discussion_data,
149            content=events,
150            content_rowid=id
151        )
152        "#,
153    )
154    .execute(pool)
155    .await?;
156
157    // Create triggers for events FTS
158    sqlx::query(
159        r#"
160        CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
161            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
162        END
163        "#,
164    )
165    .execute(pool)
166    .await?;
167
168    sqlx::query(
169        r#"
170        CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
171            DELETE FROM events_fts WHERE rowid = old.id;
172        END
173        "#,
174    )
175    .execute(pool)
176    .await?;
177
178    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
179    let _ = sqlx::query("DROP TRIGGER IF EXISTS events_au")
180        .execute(pool)
181        .await; // Ignore error if trigger doesn't exist
182
183    sqlx::query(
184        r#"
185        CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
186            INSERT INTO events_fts(events_fts, rowid, discussion_data) VALUES('delete', old.id, old.discussion_data);
187            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
188        END
189        "#,
190    )
191    .execute(pool)
192    .await?;
193
194    // Create workspace_state table
195    sqlx::query(
196        r#"
197        CREATE TABLE IF NOT EXISTS workspace_state (
198            key TEXT PRIMARY KEY,
199            value TEXT NOT NULL
200        )
201        "#,
202    )
203    .execute(pool)
204    .await?;
205
206    // Create dependencies table for v0.2.0
207    sqlx::query(
208        r#"
209        CREATE TABLE IF NOT EXISTS dependencies (
210            id INTEGER PRIMARY KEY AUTOINCREMENT,
211            blocking_task_id INTEGER NOT NULL,
212            blocked_task_id INTEGER NOT NULL,
213            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
214            FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
215            FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
216            UNIQUE(blocking_task_id, blocked_task_id),
217            CHECK(blocking_task_id != blocked_task_id)
218        )
219        "#,
220    )
221    .execute(pool)
222    .await?;
223
224    // Create indexes for dependencies table
225    sqlx::query(
226        r#"
227        CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
228        ON dependencies(blocking_task_id)
229        "#,
230    )
231    .execute(pool)
232    .await?;
233
234    sqlx::query(
235        r#"
236        CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
237        ON dependencies(blocked_task_id)
238        "#,
239    )
240    .execute(pool)
241    .await?;
242
243    // Create composite index for event filtering (v0.2.0)
244    sqlx::query(
245        r#"
246        CREATE INDEX IF NOT EXISTS idx_events_task_type_time
247        ON events(task_id, log_type, timestamp)
248        "#,
249    )
250    .execute(pool)
251    .await?;
252
253    // Update schema version to 0.2.0
254    sqlx::query(
255        r#"
256        INSERT INTO workspace_state (key, value)
257        VALUES ('schema_version', '0.2.0')
258        ON CONFLICT(key) DO UPDATE SET value = '0.2.0'
259        "#,
260    )
261    .execute(pool)
262    .await?;
263
264    Ok(())
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use tempfile::TempDir;
271
272    #[tokio::test]
273    async fn test_create_pool_success() {
274        let temp_dir = TempDir::new().unwrap();
275        let db_path = temp_dir.path().join("test.db");
276
277        let pool = create_pool(&db_path).await.unwrap();
278
279        // Verify we can execute a query
280        let result: i64 = sqlx::query_scalar("SELECT 1")
281            .fetch_one(&pool)
282            .await
283            .unwrap();
284
285        assert_eq!(result, 1);
286    }
287
288    #[tokio::test]
289    async fn test_run_migrations_creates_tables() {
290        let temp_dir = TempDir::new().unwrap();
291        let db_path = temp_dir.path().join("test.db");
292        let pool = create_pool(&db_path).await.unwrap();
293
294        run_migrations(&pool).await.unwrap();
295
296        // Verify tables were created
297        let tables: Vec<String> =
298            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
299                .fetch_all(&pool)
300                .await
301                .unwrap();
302
303        assert!(tables.contains(&"tasks".to_string()));
304        assert!(tables.contains(&"events".to_string()));
305        assert!(tables.contains(&"workspace_state".to_string()));
306    }
307
308    #[tokio::test]
309    async fn test_run_migrations_creates_fts_tables() {
310        let temp_dir = TempDir::new().unwrap();
311        let db_path = temp_dir.path().join("test.db");
312        let pool = create_pool(&db_path).await.unwrap();
313
314        run_migrations(&pool).await.unwrap();
315
316        // Verify FTS tables were created
317        let tables: Vec<String> = sqlx::query_scalar(
318            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
319        )
320        .fetch_all(&pool)
321        .await
322        .unwrap();
323
324        assert!(tables.contains(&"tasks_fts".to_string()));
325        assert!(tables.contains(&"events_fts".to_string()));
326    }
327
328    #[tokio::test]
329    async fn test_run_migrations_creates_triggers() {
330        let temp_dir = TempDir::new().unwrap();
331        let db_path = temp_dir.path().join("test.db");
332        let pool = create_pool(&db_path).await.unwrap();
333
334        run_migrations(&pool).await.unwrap();
335
336        // Verify triggers were created
337        let triggers: Vec<String> =
338            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
339                .fetch_all(&pool)
340                .await
341                .unwrap();
342
343        assert!(triggers.contains(&"tasks_ai".to_string()));
344        assert!(triggers.contains(&"tasks_ad".to_string()));
345        assert!(triggers.contains(&"tasks_au".to_string()));
346        assert!(triggers.contains(&"events_ai".to_string()));
347        assert!(triggers.contains(&"events_ad".to_string()));
348        assert!(triggers.contains(&"events_au".to_string()));
349    }
350
351    #[tokio::test]
352    async fn test_run_migrations_idempotent() {
353        let temp_dir = TempDir::new().unwrap();
354        let db_path = temp_dir.path().join("test.db");
355        let pool = create_pool(&db_path).await.unwrap();
356
357        // Run migrations twice
358        run_migrations(&pool).await.unwrap();
359        run_migrations(&pool).await.unwrap();
360
361        // Should not fail - migrations are idempotent
362        let tables: Vec<String> =
363            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
364                .fetch_all(&pool)
365                .await
366                .unwrap();
367
368        assert!(tables.len() >= 3);
369    }
370
371    #[tokio::test]
372    async fn test_fts_triggers_work() {
373        let temp_dir = TempDir::new().unwrap();
374        let db_path = temp_dir.path().join("test.db");
375        let pool = create_pool(&db_path).await.unwrap();
376        run_migrations(&pool).await.unwrap();
377
378        // Insert a task
379        sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
380            .bind("Test task")
381            .bind("Test spec")
382            .bind("todo")
383            .execute(&pool)
384            .await
385            .unwrap();
386
387        // Verify FTS was updated
388        let count: i64 =
389            sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
390                .fetch_one(&pool)
391                .await
392                .unwrap();
393
394        assert_eq!(count, 1);
395    }
396
397    #[tokio::test]
398    async fn test_workspace_state_table_structure() {
399        let temp_dir = TempDir::new().unwrap();
400        let db_path = temp_dir.path().join("test.db");
401        let pool = create_pool(&db_path).await.unwrap();
402        run_migrations(&pool).await.unwrap();
403
404        // Insert and retrieve workspace state
405        sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
406            .bind("test_key")
407            .bind("test_value")
408            .execute(&pool)
409            .await
410            .unwrap();
411
412        let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
413            .bind("test_key")
414            .fetch_one(&pool)
415            .await
416            .unwrap();
417
418        assert_eq!(value, "test_value");
419    }
420
421    #[tokio::test]
422    async fn test_task_status_constraint() {
423        let temp_dir = TempDir::new().unwrap();
424        let db_path = temp_dir.path().join("test.db");
425        let pool = create_pool(&db_path).await.unwrap();
426        run_migrations(&pool).await.unwrap();
427
428        // Try to insert task with invalid status
429        let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
430            .bind("Test")
431            .bind("invalid_status")
432            .execute(&pool)
433            .await;
434
435        // Should fail due to CHECK constraint
436        assert!(result.is_err());
437    }
438
439    // v0.2.0 Migration Tests
440
441    #[tokio::test]
442    async fn test_dependencies_table_created() {
443        let temp_dir = TempDir::new().unwrap();
444        let db_path = temp_dir.path().join("test.db");
445        let pool = create_pool(&db_path).await.unwrap();
446        run_migrations(&pool).await.unwrap();
447
448        // Verify dependencies table exists
449        let tables: Vec<String> = sqlx::query_scalar(
450            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
451        )
452        .fetch_all(&pool)
453        .await
454        .unwrap();
455
456        assert!(tables.contains(&"dependencies".to_string()));
457    }
458
459    #[tokio::test]
460    async fn test_dependencies_indexes_created() {
461        let temp_dir = TempDir::new().unwrap();
462        let db_path = temp_dir.path().join("test.db");
463        let pool = create_pool(&db_path).await.unwrap();
464        run_migrations(&pool).await.unwrap();
465
466        // Verify indexes exist
467        let indexes: Vec<String> = sqlx::query_scalar(
468            "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
469        )
470        .fetch_all(&pool)
471        .await
472        .unwrap();
473
474        assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
475        assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
476        assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
477    }
478
479    #[tokio::test]
480    async fn test_dependencies_self_dependency_constraint() {
481        let temp_dir = TempDir::new().unwrap();
482        let db_path = temp_dir.path().join("test.db");
483        let pool = create_pool(&db_path).await.unwrap();
484        run_migrations(&pool).await.unwrap();
485
486        // Create a task
487        sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
488            .bind("Task 1")
489            .bind("todo")
490            .execute(&pool)
491            .await
492            .unwrap();
493
494        // Try to create self-dependency (should fail)
495        let result = sqlx::query(
496            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
497        )
498        .bind(1)
499        .bind(1)
500        .execute(&pool)
501        .await;
502
503        assert!(result.is_err());
504    }
505
506    #[tokio::test]
507    async fn test_dependencies_unique_constraint() {
508        let temp_dir = TempDir::new().unwrap();
509        let db_path = temp_dir.path().join("test.db");
510        let pool = create_pool(&db_path).await.unwrap();
511        run_migrations(&pool).await.unwrap();
512
513        // Create tasks
514        for i in 1..=2 {
515            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
516                .bind(format!("Task {}", i))
517                .bind("todo")
518                .execute(&pool)
519                .await
520                .unwrap();
521        }
522
523        // Create dependency
524        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
525            .bind(1)
526            .bind(2)
527            .execute(&pool)
528            .await
529            .unwrap();
530
531        // Try to create duplicate dependency (should fail)
532        let result = sqlx::query(
533            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
534        )
535        .bind(1)
536        .bind(2)
537        .execute(&pool)
538        .await;
539
540        assert!(result.is_err());
541    }
542
543    #[tokio::test]
544    async fn test_dependencies_cascade_delete() {
545        let temp_dir = TempDir::new().unwrap();
546        let db_path = temp_dir.path().join("test.db");
547        let pool = create_pool(&db_path).await.unwrap();
548        run_migrations(&pool).await.unwrap();
549
550        // Create tasks
551        for i in 1..=2 {
552            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
553                .bind(format!("Task {}", i))
554                .bind("todo")
555                .execute(&pool)
556                .await
557                .unwrap();
558        }
559
560        // Create dependency
561        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
562            .bind(1)
563            .bind(2)
564            .execute(&pool)
565            .await
566            .unwrap();
567
568        // Verify dependency exists
569        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
570            .fetch_one(&pool)
571            .await
572            .unwrap();
573        assert_eq!(count, 1);
574
575        // Delete blocking task
576        sqlx::query("DELETE FROM tasks WHERE id = ?")
577            .bind(1)
578            .execute(&pool)
579            .await
580            .unwrap();
581
582        // Verify dependency was cascade deleted
583        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
584            .fetch_one(&pool)
585            .await
586            .unwrap();
587        assert_eq!(count, 0);
588    }
589
590    #[tokio::test]
591    async fn test_schema_version_tracking() {
592        let temp_dir = TempDir::new().unwrap();
593        let db_path = temp_dir.path().join("test.db");
594        let pool = create_pool(&db_path).await.unwrap();
595        run_migrations(&pool).await.unwrap();
596
597        // Verify schema version is set to 0.2.0
598        let version: String =
599            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
600                .fetch_one(&pool)
601                .await
602                .unwrap();
603
604        assert_eq!(version, "0.2.0");
605    }
606
607    #[tokio::test]
608    async fn test_migration_idempotency_v0_2_0() {
609        let temp_dir = TempDir::new().unwrap();
610        let db_path = temp_dir.path().join("test.db");
611        let pool = create_pool(&db_path).await.unwrap();
612
613        // Run migrations multiple times
614        run_migrations(&pool).await.unwrap();
615        run_migrations(&pool).await.unwrap();
616        run_migrations(&pool).await.unwrap();
617
618        // Verify dependencies table exists and is functional
619        let tables: Vec<String> = sqlx::query_scalar(
620            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
621        )
622        .fetch_all(&pool)
623        .await
624        .unwrap();
625
626        assert!(tables.contains(&"dependencies".to_string()));
627
628        // Verify schema version is still correct
629        let version: String =
630            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
631                .fetch_one(&pool)
632                .await
633                .unwrap();
634
635        assert_eq!(version, "0.2.0");
636    }
637}