intent_engine/db/
mod.rs

1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8    let options = SqliteConnectOptions::new()
9        .filename(db_path)
10        .create_if_missing(true)
11        .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12        .busy_timeout(std::time::Duration::from_millis(5000));
13
14    let pool = SqlitePoolOptions::new()
15        .max_connections(5)
16        .connect_with(options)
17        .await?;
18
19    Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23    // Enable FTS5
24    sqlx::query("PRAGMA journal_mode=WAL;")
25        .execute(pool)
26        .await?;
27
28    // Create tasks table
29    sqlx::query(
30        r#"
31        CREATE TABLE IF NOT EXISTS tasks (
32            id INTEGER PRIMARY KEY AUTOINCREMENT,
33            parent_id INTEGER,
34            name TEXT NOT NULL,
35            spec TEXT,
36            status TEXT NOT NULL DEFAULT 'todo',
37            complexity INTEGER,
38            priority INTEGER DEFAULT 0,
39            first_todo_at DATETIME,
40            first_doing_at DATETIME,
41            first_done_at DATETIME,
42            active_form TEXT,
43            FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
44            CHECK (status IN ('todo', 'doing', 'done'))
45        )
46        "#,
47    )
48    .execute(pool)
49    .await?;
50
51    // Add active_form column if it doesn't exist (migration for existing databases)
52    // This column stores the present progressive form of task description for UI display
53    let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN active_form TEXT")
54        .execute(pool)
55        .await; // Ignore error if column already exists
56
57    // Create FTS5 virtual table for tasks with trigram tokenizer for better CJK support
58    // For existing databases, we need to drop and recreate if tokenizer changed
59    let _ = sqlx::query("DROP TABLE IF EXISTS tasks_fts")
60        .execute(pool)
61        .await; // Ignore error if table doesn't exist
62
63    sqlx::query(
64        r#"
65        CREATE VIRTUAL TABLE tasks_fts USING fts5(
66            name,
67            spec,
68            content=tasks,
69            content_rowid=id,
70            tokenize='trigram'
71        )
72        "#,
73    )
74    .execute(pool)
75    .await?;
76
77    // Create triggers to keep FTS in sync
78    sqlx::query(
79        r#"
80        CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
81            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
82        END
83        "#,
84    )
85    .execute(pool)
86    .await?;
87
88    sqlx::query(
89        r#"
90        CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
91            DELETE FROM tasks_fts WHERE rowid = old.id;
92        END
93        "#,
94    )
95    .execute(pool)
96    .await?;
97
98    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
99    // Note: We always drop first because SQLite doesn't support CREATE OR REPLACE TRIGGER,
100    // and we need to update existing databases that have the buggy trigger.
101    let _ = sqlx::query("DROP TRIGGER IF EXISTS tasks_au")
102        .execute(pool)
103        .await; // Ignore error if trigger doesn't exist
104
105    sqlx::query(
106        r#"
107        CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
108            INSERT INTO tasks_fts(tasks_fts, rowid, name, spec) VALUES('delete', old.id, old.name, old.spec);
109            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
110        END
111        "#,
112    )
113    .execute(pool)
114    .await?;
115
116    // Rebuild FTS index with existing data from tasks table
117    sqlx::query(
118        r#"
119        INSERT INTO tasks_fts(rowid, name, spec)
120        SELECT id, name, spec FROM tasks
121        "#,
122    )
123    .execute(pool)
124    .await?;
125
126    // Create events table
127    sqlx::query(
128        r#"
129        CREATE TABLE IF NOT EXISTS events (
130            id INTEGER PRIMARY KEY AUTOINCREMENT,
131            task_id INTEGER NOT NULL,
132            timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
133            log_type TEXT NOT NULL,
134            discussion_data TEXT NOT NULL,
135            FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
136        )
137        "#,
138    )
139    .execute(pool)
140    .await?;
141
142    // Create index on task_id for events
143    sqlx::query(
144        r#"
145        CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
146        "#,
147    )
148    .execute(pool)
149    .await?;
150
151    // Create FTS5 virtual table for events
152    sqlx::query(
153        r#"
154        CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
155            discussion_data,
156            content=events,
157            content_rowid=id
158        )
159        "#,
160    )
161    .execute(pool)
162    .await?;
163
164    // Create triggers for events FTS
165    sqlx::query(
166        r#"
167        CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
168            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
169        END
170        "#,
171    )
172    .execute(pool)
173    .await?;
174
175    sqlx::query(
176        r#"
177        CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
178            DELETE FROM events_fts WHERE rowid = old.id;
179        END
180        "#,
181    )
182    .execute(pool)
183    .await?;
184
185    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
186    let _ = sqlx::query("DROP TRIGGER IF EXISTS events_au")
187        .execute(pool)
188        .await; // Ignore error if trigger doesn't exist
189
190    sqlx::query(
191        r#"
192        CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
193            INSERT INTO events_fts(events_fts, rowid, discussion_data) VALUES('delete', old.id, old.discussion_data);
194            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
195        END
196        "#,
197    )
198    .execute(pool)
199    .await?;
200
201    // Create workspace_state table
202    sqlx::query(
203        r#"
204        CREATE TABLE IF NOT EXISTS workspace_state (
205            key TEXT PRIMARY KEY,
206            value TEXT NOT NULL
207        )
208        "#,
209    )
210    .execute(pool)
211    .await?;
212
213    // Create dependencies table for v0.2.0
214    sqlx::query(
215        r#"
216        CREATE TABLE IF NOT EXISTS dependencies (
217            id INTEGER PRIMARY KEY AUTOINCREMENT,
218            blocking_task_id INTEGER NOT NULL,
219            blocked_task_id INTEGER NOT NULL,
220            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
221            FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
222            FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
223            UNIQUE(blocking_task_id, blocked_task_id),
224            CHECK(blocking_task_id != blocked_task_id)
225        )
226        "#,
227    )
228    .execute(pool)
229    .await?;
230
231    // Create indexes for dependencies table
232    sqlx::query(
233        r#"
234        CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
235        ON dependencies(blocking_task_id)
236        "#,
237    )
238    .execute(pool)
239    .await?;
240
241    sqlx::query(
242        r#"
243        CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
244        ON dependencies(blocked_task_id)
245        "#,
246    )
247    .execute(pool)
248    .await?;
249
250    // Create composite index for event filtering (v0.2.0)
251    sqlx::query(
252        r#"
253        CREATE INDEX IF NOT EXISTS idx_events_task_type_time
254        ON events(task_id, log_type, timestamp)
255        "#,
256    )
257    .execute(pool)
258    .await?;
259
260    // Create indexes for task sorting and filtering (v0.7.2 - Phase 1)
261    // Index 1: Support FocusAware and Priority sorting with status/parent filtering
262    sqlx::query(
263        r#"
264        CREATE INDEX IF NOT EXISTS idx_tasks_status_parent_priority
265        ON tasks(status, parent_id, priority, id)
266        "#,
267    )
268    .execute(pool)
269    .await?;
270
271    // Index 2: Support Priority sorting mode (aligned with pick_next)
272    sqlx::query(
273        r#"
274        CREATE INDEX IF NOT EXISTS idx_tasks_priority_complexity
275        ON tasks(priority, complexity, id)
276        "#,
277    )
278    .execute(pool)
279    .await?;
280
281    // Index 3: Support Time sorting and FocusAware secondary sorting (partial index for performance)
282    sqlx::query(
283        r#"
284        CREATE INDEX IF NOT EXISTS idx_tasks_doing_at
285        ON tasks(first_doing_at)
286        WHERE status = 'doing'
287        "#,
288    )
289    .execute(pool)
290    .await?;
291
292    // Update schema version to 0.2.0
293    sqlx::query(
294        r#"
295        INSERT INTO workspace_state (key, value)
296        VALUES ('schema_version', '0.2.0')
297        ON CONFLICT(key) DO UPDATE SET value = '0.2.0'
298        "#,
299    )
300    .execute(pool)
301    .await?;
302
303    Ok(())
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use tempfile::TempDir;
310
311    #[tokio::test]
312    async fn test_create_pool_success() {
313        let temp_dir = TempDir::new().unwrap();
314        let db_path = temp_dir.path().join("test.db");
315
316        let pool = create_pool(&db_path).await.unwrap();
317
318        // Verify we can execute a query
319        let result: i64 = sqlx::query_scalar("SELECT 1")
320            .fetch_one(&pool)
321            .await
322            .unwrap();
323
324        assert_eq!(result, 1);
325    }
326
327    #[tokio::test]
328    async fn test_run_migrations_creates_tables() {
329        let temp_dir = TempDir::new().unwrap();
330        let db_path = temp_dir.path().join("test.db");
331        let pool = create_pool(&db_path).await.unwrap();
332
333        run_migrations(&pool).await.unwrap();
334
335        // Verify tables were created
336        let tables: Vec<String> =
337            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
338                .fetch_all(&pool)
339                .await
340                .unwrap();
341
342        assert!(tables.contains(&"tasks".to_string()));
343        assert!(tables.contains(&"events".to_string()));
344        assert!(tables.contains(&"workspace_state".to_string()));
345    }
346
347    #[tokio::test]
348    async fn test_run_migrations_creates_fts_tables() {
349        let temp_dir = TempDir::new().unwrap();
350        let db_path = temp_dir.path().join("test.db");
351        let pool = create_pool(&db_path).await.unwrap();
352
353        run_migrations(&pool).await.unwrap();
354
355        // Verify FTS tables were created
356        let tables: Vec<String> = sqlx::query_scalar(
357            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
358        )
359        .fetch_all(&pool)
360        .await
361        .unwrap();
362
363        assert!(tables.contains(&"tasks_fts".to_string()));
364        assert!(tables.contains(&"events_fts".to_string()));
365    }
366
367    #[tokio::test]
368    async fn test_run_migrations_creates_triggers() {
369        let temp_dir = TempDir::new().unwrap();
370        let db_path = temp_dir.path().join("test.db");
371        let pool = create_pool(&db_path).await.unwrap();
372
373        run_migrations(&pool).await.unwrap();
374
375        // Verify triggers were created
376        let triggers: Vec<String> =
377            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
378                .fetch_all(&pool)
379                .await
380                .unwrap();
381
382        assert!(triggers.contains(&"tasks_ai".to_string()));
383        assert!(triggers.contains(&"tasks_ad".to_string()));
384        assert!(triggers.contains(&"tasks_au".to_string()));
385        assert!(triggers.contains(&"events_ai".to_string()));
386        assert!(triggers.contains(&"events_ad".to_string()));
387        assert!(triggers.contains(&"events_au".to_string()));
388    }
389
390    #[tokio::test]
391    async fn test_run_migrations_idempotent() {
392        let temp_dir = TempDir::new().unwrap();
393        let db_path = temp_dir.path().join("test.db");
394        let pool = create_pool(&db_path).await.unwrap();
395
396        // Run migrations twice
397        run_migrations(&pool).await.unwrap();
398        run_migrations(&pool).await.unwrap();
399
400        // Should not fail - migrations are idempotent
401        let tables: Vec<String> =
402            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
403                .fetch_all(&pool)
404                .await
405                .unwrap();
406
407        assert!(tables.len() >= 3);
408    }
409
410    #[tokio::test]
411    async fn test_fts_triggers_work() {
412        let temp_dir = TempDir::new().unwrap();
413        let db_path = temp_dir.path().join("test.db");
414        let pool = create_pool(&db_path).await.unwrap();
415        run_migrations(&pool).await.unwrap();
416
417        // Insert a task
418        sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
419            .bind("Test task")
420            .bind("Test spec")
421            .bind("todo")
422            .execute(&pool)
423            .await
424            .unwrap();
425
426        // Verify FTS was updated
427        let count: i64 =
428            sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
429                .fetch_one(&pool)
430                .await
431                .unwrap();
432
433        assert_eq!(count, 1);
434    }
435
436    #[tokio::test]
437    async fn test_workspace_state_table_structure() {
438        let temp_dir = TempDir::new().unwrap();
439        let db_path = temp_dir.path().join("test.db");
440        let pool = create_pool(&db_path).await.unwrap();
441        run_migrations(&pool).await.unwrap();
442
443        // Insert and retrieve workspace state
444        sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
445            .bind("test_key")
446            .bind("test_value")
447            .execute(&pool)
448            .await
449            .unwrap();
450
451        let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
452            .bind("test_key")
453            .fetch_one(&pool)
454            .await
455            .unwrap();
456
457        assert_eq!(value, "test_value");
458    }
459
460    #[tokio::test]
461    async fn test_task_status_constraint() {
462        let temp_dir = TempDir::new().unwrap();
463        let db_path = temp_dir.path().join("test.db");
464        let pool = create_pool(&db_path).await.unwrap();
465        run_migrations(&pool).await.unwrap();
466
467        // Try to insert task with invalid status
468        let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
469            .bind("Test")
470            .bind("invalid_status")
471            .execute(&pool)
472            .await;
473
474        // Should fail due to CHECK constraint
475        assert!(result.is_err());
476    }
477
478    // v0.2.0 Migration Tests
479
480    #[tokio::test]
481    async fn test_dependencies_table_created() {
482        let temp_dir = TempDir::new().unwrap();
483        let db_path = temp_dir.path().join("test.db");
484        let pool = create_pool(&db_path).await.unwrap();
485        run_migrations(&pool).await.unwrap();
486
487        // Verify dependencies table exists
488        let tables: Vec<String> = sqlx::query_scalar(
489            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
490        )
491        .fetch_all(&pool)
492        .await
493        .unwrap();
494
495        assert!(tables.contains(&"dependencies".to_string()));
496    }
497
498    #[tokio::test]
499    async fn test_dependencies_indexes_created() {
500        let temp_dir = TempDir::new().unwrap();
501        let db_path = temp_dir.path().join("test.db");
502        let pool = create_pool(&db_path).await.unwrap();
503        run_migrations(&pool).await.unwrap();
504
505        // Verify indexes exist
506        let indexes: Vec<String> = sqlx::query_scalar(
507            "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
508        )
509        .fetch_all(&pool)
510        .await
511        .unwrap();
512
513        assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
514        assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
515        assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
516    }
517
518    #[tokio::test]
519    async fn test_dependencies_self_dependency_constraint() {
520        let temp_dir = TempDir::new().unwrap();
521        let db_path = temp_dir.path().join("test.db");
522        let pool = create_pool(&db_path).await.unwrap();
523        run_migrations(&pool).await.unwrap();
524
525        // Create a task
526        sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
527            .bind("Task 1")
528            .bind("todo")
529            .execute(&pool)
530            .await
531            .unwrap();
532
533        // Try to create self-dependency (should fail)
534        let result = sqlx::query(
535            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
536        )
537        .bind(1)
538        .bind(1)
539        .execute(&pool)
540        .await;
541
542        assert!(result.is_err());
543    }
544
545    #[tokio::test]
546    async fn test_dependencies_unique_constraint() {
547        let temp_dir = TempDir::new().unwrap();
548        let db_path = temp_dir.path().join("test.db");
549        let pool = create_pool(&db_path).await.unwrap();
550        run_migrations(&pool).await.unwrap();
551
552        // Create tasks
553        for i in 1..=2 {
554            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
555                .bind(format!("Task {}", i))
556                .bind("todo")
557                .execute(&pool)
558                .await
559                .unwrap();
560        }
561
562        // Create dependency
563        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
564            .bind(1)
565            .bind(2)
566            .execute(&pool)
567            .await
568            .unwrap();
569
570        // Try to create duplicate dependency (should fail)
571        let result = sqlx::query(
572            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
573        )
574        .bind(1)
575        .bind(2)
576        .execute(&pool)
577        .await;
578
579        assert!(result.is_err());
580    }
581
582    #[tokio::test]
583    async fn test_dependencies_cascade_delete() {
584        let temp_dir = TempDir::new().unwrap();
585        let db_path = temp_dir.path().join("test.db");
586        let pool = create_pool(&db_path).await.unwrap();
587        run_migrations(&pool).await.unwrap();
588
589        // Create tasks
590        for i in 1..=2 {
591            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
592                .bind(format!("Task {}", i))
593                .bind("todo")
594                .execute(&pool)
595                .await
596                .unwrap();
597        }
598
599        // Create dependency
600        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
601            .bind(1)
602            .bind(2)
603            .execute(&pool)
604            .await
605            .unwrap();
606
607        // Verify dependency exists
608        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
609            .fetch_one(&pool)
610            .await
611            .unwrap();
612        assert_eq!(count, 1);
613
614        // Delete blocking task
615        sqlx::query("DELETE FROM tasks WHERE id = ?")
616            .bind(1)
617            .execute(&pool)
618            .await
619            .unwrap();
620
621        // Verify dependency was cascade deleted
622        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
623            .fetch_one(&pool)
624            .await
625            .unwrap();
626        assert_eq!(count, 0);
627    }
628
629    #[tokio::test]
630    async fn test_schema_version_tracking() {
631        let temp_dir = TempDir::new().unwrap();
632        let db_path = temp_dir.path().join("test.db");
633        let pool = create_pool(&db_path).await.unwrap();
634        run_migrations(&pool).await.unwrap();
635
636        // Verify schema version is set to 0.2.0
637        let version: String =
638            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
639                .fetch_one(&pool)
640                .await
641                .unwrap();
642
643        assert_eq!(version, "0.2.0");
644    }
645
646    #[tokio::test]
647    async fn test_migration_idempotency_v0_2_0() {
648        let temp_dir = TempDir::new().unwrap();
649        let db_path = temp_dir.path().join("test.db");
650        let pool = create_pool(&db_path).await.unwrap();
651
652        // Run migrations multiple times
653        run_migrations(&pool).await.unwrap();
654        run_migrations(&pool).await.unwrap();
655        run_migrations(&pool).await.unwrap();
656
657        // Verify dependencies table exists and is functional
658        let tables: Vec<String> = sqlx::query_scalar(
659            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
660        )
661        .fetch_all(&pool)
662        .await
663        .unwrap();
664
665        assert!(tables.contains(&"dependencies".to_string()));
666
667        // Verify schema version is still correct
668        let version: String =
669            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
670                .fetch_one(&pool)
671                .await
672                .unwrap();
673
674        assert_eq!(version, "0.2.0");
675    }
676}