intent_engine/db/
mod.rs

1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8    let options = SqliteConnectOptions::new()
9        .filename(db_path)
10        .create_if_missing(true)
11        .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12        .busy_timeout(std::time::Duration::from_millis(5000));
13
14    let pool = SqlitePoolOptions::new()
15        .max_connections(5)
16        .connect_with(options)
17        .await?;
18
19    Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23    // Enable FTS5
24    sqlx::query("PRAGMA journal_mode=WAL;")
25        .execute(pool)
26        .await?;
27
28    // Create tasks table
29    sqlx::query(
30        r#"
31        CREATE TABLE IF NOT EXISTS tasks (
32            id INTEGER PRIMARY KEY AUTOINCREMENT,
33            parent_id INTEGER,
34            name TEXT NOT NULL,
35            spec TEXT,
36            status TEXT NOT NULL DEFAULT 'todo',
37            complexity INTEGER,
38            priority INTEGER DEFAULT 0,
39            first_todo_at DATETIME,
40            first_doing_at DATETIME,
41            first_done_at DATETIME,
42            active_form TEXT,
43            FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
44            CHECK (status IN ('todo', 'doing', 'done'))
45        )
46        "#,
47    )
48    .execute(pool)
49    .await?;
50
51    // Add active_form column if it doesn't exist (migration for existing databases)
52    // This column stores the present progressive form of task description for UI display
53    let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN active_form TEXT")
54        .execute(pool)
55        .await; // Ignore error if column already exists
56
57    // Create FTS5 virtual table for tasks with trigram tokenizer for better CJK support
58    // For existing databases, we need to drop and recreate if tokenizer changed
59    let _ = sqlx::query("DROP TABLE IF EXISTS tasks_fts")
60        .execute(pool)
61        .await; // Ignore error if table doesn't exist
62
63    sqlx::query(
64        r#"
65        CREATE VIRTUAL TABLE tasks_fts USING fts5(
66            name,
67            spec,
68            content=tasks,
69            content_rowid=id,
70            tokenize='trigram'
71        )
72        "#,
73    )
74    .execute(pool)
75    .await?;
76
77    // Create triggers to keep FTS in sync
78    sqlx::query(
79        r#"
80        CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
81            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
82        END
83        "#,
84    )
85    .execute(pool)
86    .await?;
87
88    sqlx::query(
89        r#"
90        CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
91            DELETE FROM tasks_fts WHERE rowid = old.id;
92        END
93        "#,
94    )
95    .execute(pool)
96    .await?;
97
98    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
99    // Note: We always drop first because SQLite doesn't support CREATE OR REPLACE TRIGGER,
100    // and we need to update existing databases that have the buggy trigger.
101    let _ = sqlx::query("DROP TRIGGER IF EXISTS tasks_au")
102        .execute(pool)
103        .await; // Ignore error if trigger doesn't exist
104
105    sqlx::query(
106        r#"
107        CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
108            INSERT INTO tasks_fts(tasks_fts, rowid, name, spec) VALUES('delete', old.id, old.name, old.spec);
109            INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
110        END
111        "#,
112    )
113    .execute(pool)
114    .await?;
115
116    // Rebuild FTS index with existing data from tasks table
117    sqlx::query(
118        r#"
119        INSERT INTO tasks_fts(rowid, name, spec)
120        SELECT id, name, spec FROM tasks
121        "#,
122    )
123    .execute(pool)
124    .await?;
125
126    // Create events table
127    sqlx::query(
128        r#"
129        CREATE TABLE IF NOT EXISTS events (
130            id INTEGER PRIMARY KEY AUTOINCREMENT,
131            task_id INTEGER NOT NULL,
132            timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
133            log_type TEXT NOT NULL,
134            discussion_data TEXT NOT NULL,
135            FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
136        )
137        "#,
138    )
139    .execute(pool)
140    .await?;
141
142    // Create index on task_id for events
143    sqlx::query(
144        r#"
145        CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
146        "#,
147    )
148    .execute(pool)
149    .await?;
150
151    // Create FTS5 virtual table for events
152    sqlx::query(
153        r#"
154        CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
155            discussion_data,
156            content=events,
157            content_rowid=id
158        )
159        "#,
160    )
161    .execute(pool)
162    .await?;
163
164    // Create triggers for events FTS
165    sqlx::query(
166        r#"
167        CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
168            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
169        END
170        "#,
171    )
172    .execute(pool)
173    .await?;
174
175    sqlx::query(
176        r#"
177        CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
178            DELETE FROM events_fts WHERE rowid = old.id;
179        END
180        "#,
181    )
182    .execute(pool)
183    .await?;
184
185    // Recreate trigger with correct FTS5 syntax (drop and create for migration from buggy version)
186    let _ = sqlx::query("DROP TRIGGER IF EXISTS events_au")
187        .execute(pool)
188        .await; // Ignore error if trigger doesn't exist
189
190    sqlx::query(
191        r#"
192        CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
193            INSERT INTO events_fts(events_fts, rowid, discussion_data) VALUES('delete', old.id, old.discussion_data);
194            INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
195        END
196        "#,
197    )
198    .execute(pool)
199    .await?;
200
201    // Create workspace_state table
202    sqlx::query(
203        r#"
204        CREATE TABLE IF NOT EXISTS workspace_state (
205            key TEXT PRIMARY KEY,
206            value TEXT NOT NULL
207        )
208        "#,
209    )
210    .execute(pool)
211    .await?;
212
213    // Create dependencies table for v0.2.0
214    sqlx::query(
215        r#"
216        CREATE TABLE IF NOT EXISTS dependencies (
217            id INTEGER PRIMARY KEY AUTOINCREMENT,
218            blocking_task_id INTEGER NOT NULL,
219            blocked_task_id INTEGER NOT NULL,
220            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
221            FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
222            FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
223            UNIQUE(blocking_task_id, blocked_task_id),
224            CHECK(blocking_task_id != blocked_task_id)
225        )
226        "#,
227    )
228    .execute(pool)
229    .await?;
230
231    // Create indexes for dependencies table
232    sqlx::query(
233        r#"
234        CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
235        ON dependencies(blocking_task_id)
236        "#,
237    )
238    .execute(pool)
239    .await?;
240
241    sqlx::query(
242        r#"
243        CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
244        ON dependencies(blocked_task_id)
245        "#,
246    )
247    .execute(pool)
248    .await?;
249
250    // Create composite index for event filtering (v0.2.0)
251    sqlx::query(
252        r#"
253        CREATE INDEX IF NOT EXISTS idx_events_task_type_time
254        ON events(task_id, log_type, timestamp)
255        "#,
256    )
257    .execute(pool)
258    .await?;
259
260    // Update schema version to 0.2.0
261    sqlx::query(
262        r#"
263        INSERT INTO workspace_state (key, value)
264        VALUES ('schema_version', '0.2.0')
265        ON CONFLICT(key) DO UPDATE SET value = '0.2.0'
266        "#,
267    )
268    .execute(pool)
269    .await?;
270
271    Ok(())
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use tempfile::TempDir;
278
279    #[tokio::test]
280    async fn test_create_pool_success() {
281        let temp_dir = TempDir::new().unwrap();
282        let db_path = temp_dir.path().join("test.db");
283
284        let pool = create_pool(&db_path).await.unwrap();
285
286        // Verify we can execute a query
287        let result: i64 = sqlx::query_scalar("SELECT 1")
288            .fetch_one(&pool)
289            .await
290            .unwrap();
291
292        assert_eq!(result, 1);
293    }
294
295    #[tokio::test]
296    async fn test_run_migrations_creates_tables() {
297        let temp_dir = TempDir::new().unwrap();
298        let db_path = temp_dir.path().join("test.db");
299        let pool = create_pool(&db_path).await.unwrap();
300
301        run_migrations(&pool).await.unwrap();
302
303        // Verify tables were created
304        let tables: Vec<String> =
305            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
306                .fetch_all(&pool)
307                .await
308                .unwrap();
309
310        assert!(tables.contains(&"tasks".to_string()));
311        assert!(tables.contains(&"events".to_string()));
312        assert!(tables.contains(&"workspace_state".to_string()));
313    }
314
315    #[tokio::test]
316    async fn test_run_migrations_creates_fts_tables() {
317        let temp_dir = TempDir::new().unwrap();
318        let db_path = temp_dir.path().join("test.db");
319        let pool = create_pool(&db_path).await.unwrap();
320
321        run_migrations(&pool).await.unwrap();
322
323        // Verify FTS tables were created
324        let tables: Vec<String> = sqlx::query_scalar(
325            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
326        )
327        .fetch_all(&pool)
328        .await
329        .unwrap();
330
331        assert!(tables.contains(&"tasks_fts".to_string()));
332        assert!(tables.contains(&"events_fts".to_string()));
333    }
334
335    #[tokio::test]
336    async fn test_run_migrations_creates_triggers() {
337        let temp_dir = TempDir::new().unwrap();
338        let db_path = temp_dir.path().join("test.db");
339        let pool = create_pool(&db_path).await.unwrap();
340
341        run_migrations(&pool).await.unwrap();
342
343        // Verify triggers were created
344        let triggers: Vec<String> =
345            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
346                .fetch_all(&pool)
347                .await
348                .unwrap();
349
350        assert!(triggers.contains(&"tasks_ai".to_string()));
351        assert!(triggers.contains(&"tasks_ad".to_string()));
352        assert!(triggers.contains(&"tasks_au".to_string()));
353        assert!(triggers.contains(&"events_ai".to_string()));
354        assert!(triggers.contains(&"events_ad".to_string()));
355        assert!(triggers.contains(&"events_au".to_string()));
356    }
357
358    #[tokio::test]
359    async fn test_run_migrations_idempotent() {
360        let temp_dir = TempDir::new().unwrap();
361        let db_path = temp_dir.path().join("test.db");
362        let pool = create_pool(&db_path).await.unwrap();
363
364        // Run migrations twice
365        run_migrations(&pool).await.unwrap();
366        run_migrations(&pool).await.unwrap();
367
368        // Should not fail - migrations are idempotent
369        let tables: Vec<String> =
370            sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
371                .fetch_all(&pool)
372                .await
373                .unwrap();
374
375        assert!(tables.len() >= 3);
376    }
377
378    #[tokio::test]
379    async fn test_fts_triggers_work() {
380        let temp_dir = TempDir::new().unwrap();
381        let db_path = temp_dir.path().join("test.db");
382        let pool = create_pool(&db_path).await.unwrap();
383        run_migrations(&pool).await.unwrap();
384
385        // Insert a task
386        sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
387            .bind("Test task")
388            .bind("Test spec")
389            .bind("todo")
390            .execute(&pool)
391            .await
392            .unwrap();
393
394        // Verify FTS was updated
395        let count: i64 =
396            sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
397                .fetch_one(&pool)
398                .await
399                .unwrap();
400
401        assert_eq!(count, 1);
402    }
403
404    #[tokio::test]
405    async fn test_workspace_state_table_structure() {
406        let temp_dir = TempDir::new().unwrap();
407        let db_path = temp_dir.path().join("test.db");
408        let pool = create_pool(&db_path).await.unwrap();
409        run_migrations(&pool).await.unwrap();
410
411        // Insert and retrieve workspace state
412        sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
413            .bind("test_key")
414            .bind("test_value")
415            .execute(&pool)
416            .await
417            .unwrap();
418
419        let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
420            .bind("test_key")
421            .fetch_one(&pool)
422            .await
423            .unwrap();
424
425        assert_eq!(value, "test_value");
426    }
427
428    #[tokio::test]
429    async fn test_task_status_constraint() {
430        let temp_dir = TempDir::new().unwrap();
431        let db_path = temp_dir.path().join("test.db");
432        let pool = create_pool(&db_path).await.unwrap();
433        run_migrations(&pool).await.unwrap();
434
435        // Try to insert task with invalid status
436        let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
437            .bind("Test")
438            .bind("invalid_status")
439            .execute(&pool)
440            .await;
441
442        // Should fail due to CHECK constraint
443        assert!(result.is_err());
444    }
445
446    // v0.2.0 Migration Tests
447
448    #[tokio::test]
449    async fn test_dependencies_table_created() {
450        let temp_dir = TempDir::new().unwrap();
451        let db_path = temp_dir.path().join("test.db");
452        let pool = create_pool(&db_path).await.unwrap();
453        run_migrations(&pool).await.unwrap();
454
455        // Verify dependencies table exists
456        let tables: Vec<String> = sqlx::query_scalar(
457            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
458        )
459        .fetch_all(&pool)
460        .await
461        .unwrap();
462
463        assert!(tables.contains(&"dependencies".to_string()));
464    }
465
466    #[tokio::test]
467    async fn test_dependencies_indexes_created() {
468        let temp_dir = TempDir::new().unwrap();
469        let db_path = temp_dir.path().join("test.db");
470        let pool = create_pool(&db_path).await.unwrap();
471        run_migrations(&pool).await.unwrap();
472
473        // Verify indexes exist
474        let indexes: Vec<String> = sqlx::query_scalar(
475            "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
476        )
477        .fetch_all(&pool)
478        .await
479        .unwrap();
480
481        assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
482        assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
483        assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
484    }
485
486    #[tokio::test]
487    async fn test_dependencies_self_dependency_constraint() {
488        let temp_dir = TempDir::new().unwrap();
489        let db_path = temp_dir.path().join("test.db");
490        let pool = create_pool(&db_path).await.unwrap();
491        run_migrations(&pool).await.unwrap();
492
493        // Create a task
494        sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
495            .bind("Task 1")
496            .bind("todo")
497            .execute(&pool)
498            .await
499            .unwrap();
500
501        // Try to create self-dependency (should fail)
502        let result = sqlx::query(
503            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
504        )
505        .bind(1)
506        .bind(1)
507        .execute(&pool)
508        .await;
509
510        assert!(result.is_err());
511    }
512
513    #[tokio::test]
514    async fn test_dependencies_unique_constraint() {
515        let temp_dir = TempDir::new().unwrap();
516        let db_path = temp_dir.path().join("test.db");
517        let pool = create_pool(&db_path).await.unwrap();
518        run_migrations(&pool).await.unwrap();
519
520        // Create tasks
521        for i in 1..=2 {
522            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
523                .bind(format!("Task {}", i))
524                .bind("todo")
525                .execute(&pool)
526                .await
527                .unwrap();
528        }
529
530        // Create dependency
531        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
532            .bind(1)
533            .bind(2)
534            .execute(&pool)
535            .await
536            .unwrap();
537
538        // Try to create duplicate dependency (should fail)
539        let result = sqlx::query(
540            "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
541        )
542        .bind(1)
543        .bind(2)
544        .execute(&pool)
545        .await;
546
547        assert!(result.is_err());
548    }
549
550    #[tokio::test]
551    async fn test_dependencies_cascade_delete() {
552        let temp_dir = TempDir::new().unwrap();
553        let db_path = temp_dir.path().join("test.db");
554        let pool = create_pool(&db_path).await.unwrap();
555        run_migrations(&pool).await.unwrap();
556
557        // Create tasks
558        for i in 1..=2 {
559            sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
560                .bind(format!("Task {}", i))
561                .bind("todo")
562                .execute(&pool)
563                .await
564                .unwrap();
565        }
566
567        // Create dependency
568        sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
569            .bind(1)
570            .bind(2)
571            .execute(&pool)
572            .await
573            .unwrap();
574
575        // Verify dependency exists
576        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
577            .fetch_one(&pool)
578            .await
579            .unwrap();
580        assert_eq!(count, 1);
581
582        // Delete blocking task
583        sqlx::query("DELETE FROM tasks WHERE id = ?")
584            .bind(1)
585            .execute(&pool)
586            .await
587            .unwrap();
588
589        // Verify dependency was cascade deleted
590        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
591            .fetch_one(&pool)
592            .await
593            .unwrap();
594        assert_eq!(count, 0);
595    }
596
597    #[tokio::test]
598    async fn test_schema_version_tracking() {
599        let temp_dir = TempDir::new().unwrap();
600        let db_path = temp_dir.path().join("test.db");
601        let pool = create_pool(&db_path).await.unwrap();
602        run_migrations(&pool).await.unwrap();
603
604        // Verify schema version is set to 0.2.0
605        let version: String =
606            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
607                .fetch_one(&pool)
608                .await
609                .unwrap();
610
611        assert_eq!(version, "0.2.0");
612    }
613
614    #[tokio::test]
615    async fn test_migration_idempotency_v0_2_0() {
616        let temp_dir = TempDir::new().unwrap();
617        let db_path = temp_dir.path().join("test.db");
618        let pool = create_pool(&db_path).await.unwrap();
619
620        // Run migrations multiple times
621        run_migrations(&pool).await.unwrap();
622        run_migrations(&pool).await.unwrap();
623        run_migrations(&pool).await.unwrap();
624
625        // Verify dependencies table exists and is functional
626        let tables: Vec<String> = sqlx::query_scalar(
627            "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
628        )
629        .fetch_all(&pool)
630        .await
631        .unwrap();
632
633        assert!(tables.contains(&"dependencies".to_string()));
634
635        // Verify schema version is still correct
636        let version: String =
637            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
638                .fetch_one(&pool)
639                .await
640                .unwrap();
641
642        assert_eq!(version, "0.2.0");
643    }
644}