intent_engine/db/
mod.rs

1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8    let options = SqliteConnectOptions::new()
9        .filename(db_path)
10        .create_if_missing(true)
11        .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12        .busy_timeout(std::time::Duration::from_millis(5000));
13
14    let pool = SqlitePoolOptions::new()
15        .max_connections(5)
16        .connect_with(options)
17        .await?;
18
19    Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23    // Enable FTS5
24    sqlx::query("PRAGMA journal_mode=WAL;")
25        .execute(pool)
26        .await?;
27
28    // Create tasks table
29    sqlx::query(
30        r#"
31        CREATE TABLE IF NOT EXISTS tasks (
32            id INTEGER PRIMARY KEY AUTOINCREMENT,
33            parent_id INTEGER,
34            name TEXT NOT NULL,
35            spec TEXT,
36            status TEXT NOT NULL DEFAULT 'todo',
37            complexity INTEGER,
38            priority INTEGER DEFAULT 0,
39            first_todo_at DATETIME,
40            first_doing_at DATETIME,
41            first_done_at DATETIME,
42            active_form TEXT,
43            owner TEXT NOT NULL DEFAULT 'human',
44            FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
45            CHECK (status IN ('todo', 'doing', 'done')),
46            CHECK (owner IN ('human', 'ai'))
47        )
48        "#,
49    )
50    .execute(pool)
51    .await?;
52
53    // Add active_form column if it doesn't exist (migration for existing databases)
54    // This column stores the present progressive form of task description for UI display
55    let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN active_form TEXT")
56        .execute(pool)
57        .await; // Ignore error if column already exists
58
59    // Create FTS5 virtual table for tasks with trigram tokenizer for better CJK support
60    // For existing databases, we need to drop and recreate if tokenizer changed
61    let _ = sqlx::query("DROP TABLE IF EXISTS tasks_fts")
62        .execute(pool)
63        .await; // Ignore error if table doesn't exist
64
65    sqlx::query(
66        r#"
67        CREATE VIRTUAL TABLE tasks_fts USING fts5(
68            name,
69            spec,
70            content=tasks,
71            content_rowid=id,
72            tokenize='trigram'
73        )
74        "#,
75    )
76    .execute(pool)
77    .await?;
78
79    // Create triggers to keep FTS in sync
80    sqlx::query(
81        r#"
82        CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
83            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
84        END
85        "#,
86    )
87    .execute(pool)
88    .await?;
89
90    sqlx::query(
91        r#"
92        CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
93            DELETE FROM tasks_fts WHERE rowid = old.id;
94        END
95        "#,
96    )
97    .execute(pool)
98    .await?;
99
100    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
101    // Note: We always drop first because SQLite doesn't support CREATE OR REPLACE TRIGGER,
102    // and we need to update existing databases that have the buggy trigger.
103    let _ = sqlx::query("DROP TRIGGER IF EXISTS tasks_au")
104        .execute(pool)
105        .await; // Ignore error if trigger doesn't exist
106
107    sqlx::query(
108        r#"
109        CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
110            INSERT INTO tasks_fts(tasks_fts, rowid, name, spec) VALUES('delete', old.id, old.name, old.spec);
111            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
112        END
113        "#,
114    )
115    .execute(pool)
116    .await?;
117
118    // Rebuild FTS index with existing data from tasks table
119    sqlx::query(
120        r#"
121        INSERT INTO tasks_fts(rowid, name, spec)
122        SELECT id, name, spec FROM tasks
123        "#,
124    )
125    .execute(pool)
126    .await?;
127
128    // Create events table
129    sqlx::query(
130        r#"
131        CREATE TABLE IF NOT EXISTS events (
132            id INTEGER PRIMARY KEY AUTOINCREMENT,
133            task_id INTEGER NOT NULL,
134            timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
135            log_type TEXT NOT NULL,
136            discussion_data TEXT NOT NULL,
137            FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
138        )
139        "#,
140    )
141    .execute(pool)
142    .await?;
143
144    // Create index on task_id for events
145    sqlx::query(
146        r#"
147        CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
148        "#,
149    )
150    .execute(pool)
151    .await?;
152
153    // Create FTS5 virtual table for events
154    sqlx::query(
155        r#"
156        CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
157            discussion_data,
158            content=events,
159            content_rowid=id
160        )
161        "#,
162    )
163    .execute(pool)
164    .await?;
165
166    // Create triggers for events FTS
167    sqlx::query(
168        r#"
169        CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
170            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
171        END
172        "#,
173    )
174    .execute(pool)
175    .await?;
176
177    sqlx::query(
178        r#"
179        CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
180            DELETE FROM events_fts WHERE rowid = old.id;
181        END
182        "#,
183    )
184    .execute(pool)
185    .await?;
186
187    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
188    let _ = sqlx::query("DROP TRIGGER IF EXISTS events_au")
189        .execute(pool)
190        .await; // Ignore error if trigger doesn't exist
191
192    sqlx::query(
193        r#"
194        CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
195            INSERT INTO events_fts(events_fts, rowid, discussion_data) VALUES('delete', old.id, old.discussion_data);
196            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
197        END
198        "#,
199    )
200    .execute(pool)
201    .await?;
202
203    // Create workspace_state table
204    sqlx::query(
205        r#"
206        CREATE TABLE IF NOT EXISTS workspace_state (
207            key TEXT PRIMARY KEY,
208            value TEXT NOT NULL
209        )
210        "#,
211    )
212    .execute(pool)
213    .await?;
214
215    // Create dependencies table for v0.2.0
216    sqlx::query(
217        r#"
218        CREATE TABLE IF NOT EXISTS dependencies (
219            id INTEGER PRIMARY KEY AUTOINCREMENT,
220            blocking_task_id INTEGER NOT NULL,
221            blocked_task_id INTEGER NOT NULL,
222            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
223            FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
224            FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
225            UNIQUE(blocking_task_id, blocked_task_id),
226            CHECK(blocking_task_id != blocked_task_id)
227        )
228        "#,
229    )
230    .execute(pool)
231    .await?;
232
233    // Create indexes for dependencies table
234    sqlx::query(
235        r#"
236        CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
237        ON dependencies(blocking_task_id)
238        "#,
239    )
240    .execute(pool)
241    .await?;
242
243    sqlx::query(
244        r#"
245        CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
246        ON dependencies(blocked_task_id)
247        "#,
248    )
249    .execute(pool)
250    .await?;
251
252    // Create composite index for event filtering (v0.2.0)
253    sqlx::query(
254        r#"
255        CREATE INDEX IF NOT EXISTS idx_events_task_type_time
256        ON events(task_id, log_type, timestamp)
257        "#,
258    )
259    .execute(pool)
260    .await?;
261
262    // Create indexes for task sorting and filtering (v0.7.2 - Phase 1)
263    // Index 1: Support FocusAware and Priority sorting with status/parent filtering
264    sqlx::query(
265        r#"
266        CREATE INDEX IF NOT EXISTS idx_tasks_status_parent_priority
267        ON tasks(status, parent_id, priority, id)
268        "#,
269    )
270    .execute(pool)
271    .await?;
272
273    // Index 2: Support Priority sorting mode (aligned with pick_next)
274    sqlx::query(
275        r#"
276        CREATE INDEX IF NOT EXISTS idx_tasks_priority_complexity
277        ON tasks(priority, complexity, id)
278        "#,
279    )
280    .execute(pool)
281    .await?;
282
283    // Index 3: Support Time sorting and FocusAware secondary sorting (partial index for performance)
284    sqlx::query(
285        r#"
286        CREATE INDEX IF NOT EXISTS idx_tasks_doing_at
287        ON tasks(first_doing_at)
288        WHERE status = 'doing'
289        "#,
290    )
291    .execute(pool)
292    .await?;
293
294    // Add owner column to tasks table (v0.9.0 - Human Task Protection)
295    // 'human' = created by human (CLI/Dashboard), 'ai' = created by AI (MCP)
296    // Note: SQLite doesn't support CHECK constraints in ALTER TABLE, so we use a simple default
297    let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN owner TEXT NOT NULL DEFAULT 'human'")
298        .execute(pool)
299        .await; // Ignore error if column already exists
300
301    // Update schema version to 0.9.0
302    sqlx::query(
303        r#"
304        INSERT INTO workspace_state (key, value)
305        VALUES ('schema_version', '0.9.0')
306        ON CONFLICT(key) DO UPDATE SET value = '0.9.0'
307        "#,
308    )
309    .execute(pool)
310    .await?;
311
312    Ok(())
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use tempfile::TempDir;
319
320    #[tokio::test]
321    async fn test_create_pool_success() {
322        let temp_dir = TempDir::new().unwrap();
323        let db_path = temp_dir.path().join("test.db");
324
325        let pool = create_pool(&db_path).await.unwrap();
326
327        // Verify we can execute a query
328        let result: i64 = sqlx::query_scalar("SELECT 1")
329            .fetch_one(&pool)
330            .await
331            .unwrap();
332
333        assert_eq!(result, 1);
334    }
335
336    #[tokio::test]
337    async fn test_run_migrations_creates_tables() {
338        let temp_dir = TempDir::new().unwrap();
339        let db_path = temp_dir.path().join("test.db");
340        let pool = create_pool(&db_path).await.unwrap();
341
342        run_migrations(&pool).await.unwrap();
343
344        // Verify tables were created
345        let tables: Vec<String> =
346            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
347                .fetch_all(&pool)
348                .await
349                .unwrap();
350
351        assert!(tables.contains(&"tasks".to_string()));
352        assert!(tables.contains(&"events".to_string()));
353        assert!(tables.contains(&"workspace_state".to_string()));
354    }
355
356    #[tokio::test]
357    async fn test_run_migrations_creates_fts_tables() {
358        let temp_dir = TempDir::new().unwrap();
359        let db_path = temp_dir.path().join("test.db");
360        let pool = create_pool(&db_path).await.unwrap();
361
362        run_migrations(&pool).await.unwrap();
363
364        // Verify FTS tables were created
365        let tables: Vec<String> = sqlx::query_scalar(
366            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
367        )
368        .fetch_all(&pool)
369        .await
370        .unwrap();
371
372        assert!(tables.contains(&"tasks_fts".to_string()));
373        assert!(tables.contains(&"events_fts".to_string()));
374    }
375
376    #[tokio::test]
377    async fn test_run_migrations_creates_triggers() {
378        let temp_dir = TempDir::new().unwrap();
379        let db_path = temp_dir.path().join("test.db");
380        let pool = create_pool(&db_path).await.unwrap();
381
382        run_migrations(&pool).await.unwrap();
383
384        // Verify triggers were created
385        let triggers: Vec<String> =
386            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
387                .fetch_all(&pool)
388                .await
389                .unwrap();
390
391        assert!(triggers.contains(&"tasks_ai".to_string()));
392        assert!(triggers.contains(&"tasks_ad".to_string()));
393        assert!(triggers.contains(&"tasks_au".to_string()));
394        assert!(triggers.contains(&"events_ai".to_string()));
395        assert!(triggers.contains(&"events_ad".to_string()));
396        assert!(triggers.contains(&"events_au".to_string()));
397    }
398
399    #[tokio::test]
400    async fn test_run_migrations_idempotent() {
401        let temp_dir = TempDir::new().unwrap();
402        let db_path = temp_dir.path().join("test.db");
403        let pool = create_pool(&db_path).await.unwrap();
404
405        // Run migrations twice
406        run_migrations(&pool).await.unwrap();
407        run_migrations(&pool).await.unwrap();
408
409        // Should not fail - migrations are idempotent
410        let tables: Vec<String> =
411            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
412                .fetch_all(&pool)
413                .await
414                .unwrap();
415
416        assert!(tables.len() >= 3);
417    }
418
419    #[tokio::test]
420    async fn test_fts_triggers_work() {
421        let temp_dir = TempDir::new().unwrap();
422        let db_path = temp_dir.path().join("test.db");
423        let pool = create_pool(&db_path).await.unwrap();
424        run_migrations(&pool).await.unwrap();
425
426        // Insert a task
427        sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
428            .bind("Test task")
429            .bind("Test spec")
430            .bind("todo")
431            .execute(&pool)
432            .await
433            .unwrap();
434
435        // Verify FTS was updated
436        let count: i64 =
437            sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
438                .fetch_one(&pool)
439                .await
440                .unwrap();
441
442        assert_eq!(count, 1);
443    }
444
445    #[tokio::test]
446    async fn test_workspace_state_table_structure() {
447        let temp_dir = TempDir::new().unwrap();
448        let db_path = temp_dir.path().join("test.db");
449        let pool = create_pool(&db_path).await.unwrap();
450        run_migrations(&pool).await.unwrap();
451
452        // Insert and retrieve workspace state
453        sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
454            .bind("test_key")
455            .bind("test_value")
456            .execute(&pool)
457            .await
458            .unwrap();
459
460        let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
461            .bind("test_key")
462            .fetch_one(&pool)
463            .await
464            .unwrap();
465
466        assert_eq!(value, "test_value");
467    }
468
469    #[tokio::test]
470    async fn test_task_status_constraint() {
471        let temp_dir = TempDir::new().unwrap();
472        let db_path = temp_dir.path().join("test.db");
473        let pool = create_pool(&db_path).await.unwrap();
474        run_migrations(&pool).await.unwrap();
475
476        // Try to insert task with invalid status
477        let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
478            .bind("Test")
479            .bind("invalid_status")
480            .execute(&pool)
481            .await;
482
483        // Should fail due to CHECK constraint
484        assert!(result.is_err());
485    }
486
487    // v0.2.0 Migration Tests
488
489    #[tokio::test]
490    async fn test_dependencies_table_created() {
491        let temp_dir = TempDir::new().unwrap();
492        let db_path = temp_dir.path().join("test.db");
493        let pool = create_pool(&db_path).await.unwrap();
494        run_migrations(&pool).await.unwrap();
495
496        // Verify dependencies table exists
497        let tables: Vec<String> = sqlx::query_scalar(
498            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
499        )
500        .fetch_all(&pool)
501        .await
502        .unwrap();
503
504        assert!(tables.contains(&"dependencies".to_string()));
505    }
506
507    #[tokio::test]
508    async fn test_dependencies_indexes_created() {
509        let temp_dir = TempDir::new().unwrap();
510        let db_path = temp_dir.path().join("test.db");
511        let pool = create_pool(&db_path).await.unwrap();
512        run_migrations(&pool).await.unwrap();
513
514        // Verify indexes exist
515        let indexes: Vec<String> = sqlx::query_scalar(
516            "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
517        )
518        .fetch_all(&pool)
519        .await
520        .unwrap();
521
522        assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
523        assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
524        assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
525    }
526
527    #[tokio::test]
528    async fn test_dependencies_self_dependency_constraint() {
529        let temp_dir = TempDir::new().unwrap();
530        let db_path = temp_dir.path().join("test.db");
531        let pool = create_pool(&db_path).await.unwrap();
532        run_migrations(&pool).await.unwrap();
533
534        // Create a task
535        sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
536            .bind("Task 1")
537            .bind("todo")
538            .execute(&pool)
539            .await
540            .unwrap();
541
542        // Try to create self-dependency (should fail)
543        let result = sqlx::query(
544            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
545        )
546        .bind(1)
547        .bind(1)
548        .execute(&pool)
549        .await;
550
551        assert!(result.is_err());
552    }
553
554    #[tokio::test]
555    async fn test_dependencies_unique_constraint() {
556        let temp_dir = TempDir::new().unwrap();
557        let db_path = temp_dir.path().join("test.db");
558        let pool = create_pool(&db_path).await.unwrap();
559        run_migrations(&pool).await.unwrap();
560
561        // Create tasks
562        for i in 1..=2 {
563            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
564                .bind(format!("Task {}", i))
565                .bind("todo")
566                .execute(&pool)
567                .await
568                .unwrap();
569        }
570
571        // Create dependency
572        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
573            .bind(1)
574            .bind(2)
575            .execute(&pool)
576            .await
577            .unwrap();
578
579        // Try to create duplicate dependency (should fail)
580        let result = sqlx::query(
581            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
582        )
583        .bind(1)
584        .bind(2)
585        .execute(&pool)
586        .await;
587
588        assert!(result.is_err());
589    }
590
591    #[tokio::test]
592    async fn test_dependencies_cascade_delete() {
593        let temp_dir = TempDir::new().unwrap();
594        let db_path = temp_dir.path().join("test.db");
595        let pool = create_pool(&db_path).await.unwrap();
596        run_migrations(&pool).await.unwrap();
597
598        // Create tasks
599        for i in 1..=2 {
600            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
601                .bind(format!("Task {}", i))
602                .bind("todo")
603                .execute(&pool)
604                .await
605                .unwrap();
606        }
607
608        // Create dependency
609        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
610            .bind(1)
611            .bind(2)
612            .execute(&pool)
613            .await
614            .unwrap();
615
616        // Verify dependency exists
617        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
618            .fetch_one(&pool)
619            .await
620            .unwrap();
621        assert_eq!(count, 1);
622
623        // Delete blocking task
624        sqlx::query("DELETE FROM tasks WHERE id = ?")
625            .bind(1)
626            .execute(&pool)
627            .await
628            .unwrap();
629
630        // Verify dependency was cascade deleted
631        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
632            .fetch_one(&pool)
633            .await
634            .unwrap();
635        assert_eq!(count, 0);
636    }
637
638    #[tokio::test]
639    async fn test_schema_version_tracking() {
640        let temp_dir = TempDir::new().unwrap();
641        let db_path = temp_dir.path().join("test.db");
642        let pool = create_pool(&db_path).await.unwrap();
643        run_migrations(&pool).await.unwrap();
644
645        // Verify schema version is set to 0.2.0
646        let version: String =
647            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
648                .fetch_one(&pool)
649                .await
650                .unwrap();
651
652        assert_eq!(version, "0.9.0");
653    }
654
655    #[tokio::test]
656    async fn test_migration_idempotency_v0_9_0() {
657        let temp_dir = TempDir::new().unwrap();
658        let db_path = temp_dir.path().join("test.db");
659        let pool = create_pool(&db_path).await.unwrap();
660
661        // Run migrations multiple times
662        run_migrations(&pool).await.unwrap();
663        run_migrations(&pool).await.unwrap();
664        run_migrations(&pool).await.unwrap();
665
666        // Verify dependencies table exists and is functional
667        let tables: Vec<String> = sqlx::query_scalar(
668            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
669        )
670        .fetch_all(&pool)
671        .await
672        .unwrap();
673
674        assert!(tables.contains(&"dependencies".to_string()));
675
676        // Verify schema version is still correct
677        let version: String =
678            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
679                .fetch_one(&pool)
680                .await
681                .unwrap();
682
683        assert_eq!(version, "0.9.0");
684    }
685}