1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8 let options = SqliteConnectOptions::new()
9 .filename(db_path)
10 .create_if_missing(true)
11 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12 .busy_timeout(std::time::Duration::from_millis(5000));
13
14 let pool = SqlitePoolOptions::new()
15 .max_connections(5)
16 .connect_with(options)
17 .await?;
18
19 Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23 sqlx::query("PRAGMA journal_mode=WAL;")
25 .execute(pool)
26 .await?;
27
28 sqlx::query(
30 r#"
31 CREATE TABLE IF NOT EXISTS tasks (
32 id INTEGER PRIMARY KEY AUTOINCREMENT,
33 parent_id INTEGER,
34 name TEXT NOT NULL,
35 spec TEXT,
36 status TEXT NOT NULL DEFAULT 'todo',
37 complexity INTEGER,
38 priority INTEGER DEFAULT 0,
39 first_todo_at DATETIME,
40 first_doing_at DATETIME,
41 first_done_at DATETIME,
42 active_form TEXT,
43 owner TEXT NOT NULL DEFAULT 'human',
44 FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
45 CHECK (status IN ('todo', 'doing', 'done')),
46 CHECK (owner IN ('human', 'ai'))
47 )
48 "#,
49 )
50 .execute(pool)
51 .await?;
52
53 let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN active_form TEXT")
56 .execute(pool)
57 .await; let _ = sqlx::query("DROP TABLE IF EXISTS tasks_fts")
62 .execute(pool)
63 .await; sqlx::query(
66 r#"
67 CREATE VIRTUAL TABLE tasks_fts USING fts5(
68 name,
69 spec,
70 content=tasks,
71 content_rowid=id,
72 tokenize='trigram'
73 )
74 "#,
75 )
76 .execute(pool)
77 .await?;
78
79 sqlx::query(
81 r#"
82 CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
83 INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
84 END
85 "#,
86 )
87 .execute(pool)
88 .await?;
89
90 sqlx::query(
91 r#"
92 CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
93 DELETE FROM tasks_fts WHERE rowid = old.id;
94 END
95 "#,
96 )
97 .execute(pool)
98 .await?;
99
100 let _ = sqlx::query("DROP TRIGGER IF EXISTS tasks_au")
104 .execute(pool)
105 .await; sqlx::query(
108 r#"
109 CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
110 INSERT INTO tasks_fts(tasks_fts, rowid, name, spec) VALUES('delete', old.id, old.name, old.spec);
111 INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
112 END
113 "#,
114 )
115 .execute(pool)
116 .await?;
117
118 sqlx::query(
120 r#"
121 INSERT INTO tasks_fts(rowid, name, spec)
122 SELECT id, name, spec FROM tasks
123 "#,
124 )
125 .execute(pool)
126 .await?;
127
128 sqlx::query(
130 r#"
131 CREATE TABLE IF NOT EXISTS events (
132 id INTEGER PRIMARY KEY AUTOINCREMENT,
133 task_id INTEGER NOT NULL,
134 timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
135 log_type TEXT NOT NULL,
136 discussion_data TEXT NOT NULL,
137 FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
138 )
139 "#,
140 )
141 .execute(pool)
142 .await?;
143
144 sqlx::query(
146 r#"
147 CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
148 "#,
149 )
150 .execute(pool)
151 .await?;
152
153 sqlx::query(
155 r#"
156 CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
157 discussion_data,
158 content=events,
159 content_rowid=id
160 )
161 "#,
162 )
163 .execute(pool)
164 .await?;
165
166 sqlx::query(
168 r#"
169 CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
170 INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
171 END
172 "#,
173 )
174 .execute(pool)
175 .await?;
176
177 sqlx::query(
178 r#"
179 CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
180 DELETE FROM events_fts WHERE rowid = old.id;
181 END
182 "#,
183 )
184 .execute(pool)
185 .await?;
186
187 let _ = sqlx::query("DROP TRIGGER IF EXISTS events_au")
189 .execute(pool)
190 .await; sqlx::query(
193 r#"
194 CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
195 INSERT INTO events_fts(events_fts, rowid, discussion_data) VALUES('delete', old.id, old.discussion_data);
196 INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
197 END
198 "#,
199 )
200 .execute(pool)
201 .await?;
202
203 sqlx::query(
205 r#"
206 CREATE TABLE IF NOT EXISTS workspace_state (
207 key TEXT PRIMARY KEY,
208 value TEXT NOT NULL
209 )
210 "#,
211 )
212 .execute(pool)
213 .await?;
214
215 sqlx::query(
218 r#"
219 CREATE TABLE IF NOT EXISTS sessions (
220 session_id TEXT PRIMARY KEY,
221 current_task_id INTEGER,
222 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
223 last_active_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
224 FOREIGN KEY (current_task_id) REFERENCES tasks(id) ON DELETE SET NULL
225 )
226 "#,
227 )
228 .execute(pool)
229 .await?;
230
231 sqlx::query(
233 r#"
234 CREATE INDEX IF NOT EXISTS idx_sessions_last_active
235 ON sessions(last_active_at)
236 "#,
237 )
238 .execute(pool)
239 .await?;
240
241 sqlx::query(
244 r#"
245 INSERT OR IGNORE INTO sessions (session_id, current_task_id, created_at, last_active_at)
246 SELECT '-1', CAST(value AS INTEGER), datetime('now'), datetime('now')
247 FROM workspace_state
248 WHERE key = 'current_task_id' AND value IS NOT NULL AND value != ''
249 "#,
250 )
251 .execute(pool)
252 .await?;
253
254 sqlx::query(
256 r#"
257 CREATE TABLE IF NOT EXISTS dependencies (
258 id INTEGER PRIMARY KEY AUTOINCREMENT,
259 blocking_task_id INTEGER NOT NULL,
260 blocked_task_id INTEGER NOT NULL,
261 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
262 FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
263 FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
264 UNIQUE(blocking_task_id, blocked_task_id),
265 CHECK(blocking_task_id != blocked_task_id)
266 )
267 "#,
268 )
269 .execute(pool)
270 .await?;
271
272 sqlx::query(
274 r#"
275 CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
276 ON dependencies(blocking_task_id)
277 "#,
278 )
279 .execute(pool)
280 .await?;
281
282 sqlx::query(
283 r#"
284 CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
285 ON dependencies(blocked_task_id)
286 "#,
287 )
288 .execute(pool)
289 .await?;
290
291 sqlx::query(
293 r#"
294 CREATE INDEX IF NOT EXISTS idx_events_task_type_time
295 ON events(task_id, log_type, timestamp)
296 "#,
297 )
298 .execute(pool)
299 .await?;
300
301 sqlx::query(
304 r#"
305 CREATE INDEX IF NOT EXISTS idx_tasks_status_parent_priority
306 ON tasks(status, parent_id, priority, id)
307 "#,
308 )
309 .execute(pool)
310 .await?;
311
312 sqlx::query(
314 r#"
315 CREATE INDEX IF NOT EXISTS idx_tasks_priority_complexity
316 ON tasks(priority, complexity, id)
317 "#,
318 )
319 .execute(pool)
320 .await?;
321
322 sqlx::query(
324 r#"
325 CREATE INDEX IF NOT EXISTS idx_tasks_doing_at
326 ON tasks(first_doing_at)
327 WHERE status = 'doing'
328 "#,
329 )
330 .execute(pool)
331 .await?;
332
333 let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN owner TEXT NOT NULL DEFAULT 'human'")
337 .execute(pool)
338 .await; sqlx::query(
342 r#"
343 INSERT INTO workspace_state (key, value)
344 VALUES ('schema_version', '0.11.0')
345 ON CONFLICT(key) DO UPDATE SET value = '0.11.0'
346 "#,
347 )
348 .execute(pool)
349 .await?;
350
351 Ok(())
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use tempfile::TempDir;
358
359 #[tokio::test]
360 async fn test_create_pool_success() {
361 let temp_dir = TempDir::new().unwrap();
362 let db_path = temp_dir.path().join("test.db");
363
364 let pool = create_pool(&db_path).await.unwrap();
365
366 let result: i64 = sqlx::query_scalar("SELECT 1")
368 .fetch_one(&pool)
369 .await
370 .unwrap();
371
372 assert_eq!(result, 1);
373 }
374
375 #[tokio::test]
376 async fn test_run_migrations_creates_tables() {
377 let temp_dir = TempDir::new().unwrap();
378 let db_path = temp_dir.path().join("test.db");
379 let pool = create_pool(&db_path).await.unwrap();
380
381 run_migrations(&pool).await.unwrap();
382
383 let tables: Vec<String> =
385 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
386 .fetch_all(&pool)
387 .await
388 .unwrap();
389
390 assert!(tables.contains(&"tasks".to_string()));
391 assert!(tables.contains(&"events".to_string()));
392 assert!(tables.contains(&"workspace_state".to_string()));
393 }
394
395 #[tokio::test]
396 async fn test_run_migrations_creates_fts_tables() {
397 let temp_dir = TempDir::new().unwrap();
398 let db_path = temp_dir.path().join("test.db");
399 let pool = create_pool(&db_path).await.unwrap();
400
401 run_migrations(&pool).await.unwrap();
402
403 let tables: Vec<String> = sqlx::query_scalar(
405 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
406 )
407 .fetch_all(&pool)
408 .await
409 .unwrap();
410
411 assert!(tables.contains(&"tasks_fts".to_string()));
412 assert!(tables.contains(&"events_fts".to_string()));
413 }
414
415 #[tokio::test]
416 async fn test_run_migrations_creates_triggers() {
417 let temp_dir = TempDir::new().unwrap();
418 let db_path = temp_dir.path().join("test.db");
419 let pool = create_pool(&db_path).await.unwrap();
420
421 run_migrations(&pool).await.unwrap();
422
423 let triggers: Vec<String> =
425 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
426 .fetch_all(&pool)
427 .await
428 .unwrap();
429
430 assert!(triggers.contains(&"tasks_ai".to_string()));
431 assert!(triggers.contains(&"tasks_ad".to_string()));
432 assert!(triggers.contains(&"tasks_au".to_string()));
433 assert!(triggers.contains(&"events_ai".to_string()));
434 assert!(triggers.contains(&"events_ad".to_string()));
435 assert!(triggers.contains(&"events_au".to_string()));
436 }
437
438 #[tokio::test]
439 async fn test_run_migrations_idempotent() {
440 let temp_dir = TempDir::new().unwrap();
441 let db_path = temp_dir.path().join("test.db");
442 let pool = create_pool(&db_path).await.unwrap();
443
444 run_migrations(&pool).await.unwrap();
446 run_migrations(&pool).await.unwrap();
447
448 let tables: Vec<String> =
450 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
451 .fetch_all(&pool)
452 .await
453 .unwrap();
454
455 assert!(tables.len() >= 3);
456 }
457
458 #[tokio::test]
459 async fn test_fts_triggers_work() {
460 let temp_dir = TempDir::new().unwrap();
461 let db_path = temp_dir.path().join("test.db");
462 let pool = create_pool(&db_path).await.unwrap();
463 run_migrations(&pool).await.unwrap();
464
465 sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
467 .bind("Test task")
468 .bind("Test spec")
469 .bind("todo")
470 .execute(&pool)
471 .await
472 .unwrap();
473
474 let count: i64 =
476 sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
477 .fetch_one(&pool)
478 .await
479 .unwrap();
480
481 assert_eq!(count, 1);
482 }
483
484 #[tokio::test]
485 async fn test_workspace_state_table_structure() {
486 let temp_dir = TempDir::new().unwrap();
487 let db_path = temp_dir.path().join("test.db");
488 let pool = create_pool(&db_path).await.unwrap();
489 run_migrations(&pool).await.unwrap();
490
491 sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
493 .bind("test_key")
494 .bind("test_value")
495 .execute(&pool)
496 .await
497 .unwrap();
498
499 let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
500 .bind("test_key")
501 .fetch_one(&pool)
502 .await
503 .unwrap();
504
505 assert_eq!(value, "test_value");
506 }
507
508 #[tokio::test]
509 async fn test_task_status_constraint() {
510 let temp_dir = TempDir::new().unwrap();
511 let db_path = temp_dir.path().join("test.db");
512 let pool = create_pool(&db_path).await.unwrap();
513 run_migrations(&pool).await.unwrap();
514
515 let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
517 .bind("Test")
518 .bind("invalid_status")
519 .execute(&pool)
520 .await;
521
522 assert!(result.is_err());
524 }
525
526 #[tokio::test]
529 async fn test_dependencies_table_created() {
530 let temp_dir = TempDir::new().unwrap();
531 let db_path = temp_dir.path().join("test.db");
532 let pool = create_pool(&db_path).await.unwrap();
533 run_migrations(&pool).await.unwrap();
534
535 let tables: Vec<String> = sqlx::query_scalar(
537 "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
538 )
539 .fetch_all(&pool)
540 .await
541 .unwrap();
542
543 assert!(tables.contains(&"dependencies".to_string()));
544 }
545
546 #[tokio::test]
547 async fn test_dependencies_indexes_created() {
548 let temp_dir = TempDir::new().unwrap();
549 let db_path = temp_dir.path().join("test.db");
550 let pool = create_pool(&db_path).await.unwrap();
551 run_migrations(&pool).await.unwrap();
552
553 let indexes: Vec<String> = sqlx::query_scalar(
555 "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
556 )
557 .fetch_all(&pool)
558 .await
559 .unwrap();
560
561 assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
562 assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
563 assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
564 }
565
566 #[tokio::test]
567 async fn test_dependencies_self_dependency_constraint() {
568 let temp_dir = TempDir::new().unwrap();
569 let db_path = temp_dir.path().join("test.db");
570 let pool = create_pool(&db_path).await.unwrap();
571 run_migrations(&pool).await.unwrap();
572
573 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
575 .bind("Task 1")
576 .bind("todo")
577 .execute(&pool)
578 .await
579 .unwrap();
580
581 let result = sqlx::query(
583 "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
584 )
585 .bind(1)
586 .bind(1)
587 .execute(&pool)
588 .await;
589
590 assert!(result.is_err());
591 }
592
593 #[tokio::test]
594 async fn test_dependencies_unique_constraint() {
595 let temp_dir = TempDir::new().unwrap();
596 let db_path = temp_dir.path().join("test.db");
597 let pool = create_pool(&db_path).await.unwrap();
598 run_migrations(&pool).await.unwrap();
599
600 for i in 1..=2 {
602 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
603 .bind(format!("Task {}", i))
604 .bind("todo")
605 .execute(&pool)
606 .await
607 .unwrap();
608 }
609
610 sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
612 .bind(1)
613 .bind(2)
614 .execute(&pool)
615 .await
616 .unwrap();
617
618 let result = sqlx::query(
620 "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
621 )
622 .bind(1)
623 .bind(2)
624 .execute(&pool)
625 .await;
626
627 assert!(result.is_err());
628 }
629
630 #[tokio::test]
631 async fn test_dependencies_cascade_delete() {
632 let temp_dir = TempDir::new().unwrap();
633 let db_path = temp_dir.path().join("test.db");
634 let pool = create_pool(&db_path).await.unwrap();
635 run_migrations(&pool).await.unwrap();
636
637 for i in 1..=2 {
639 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
640 .bind(format!("Task {}", i))
641 .bind("todo")
642 .execute(&pool)
643 .await
644 .unwrap();
645 }
646
647 sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
649 .bind(1)
650 .bind(2)
651 .execute(&pool)
652 .await
653 .unwrap();
654
655 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
657 .fetch_one(&pool)
658 .await
659 .unwrap();
660 assert_eq!(count, 1);
661
662 sqlx::query("DELETE FROM tasks WHERE id = ?")
664 .bind(1)
665 .execute(&pool)
666 .await
667 .unwrap();
668
669 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
671 .fetch_one(&pool)
672 .await
673 .unwrap();
674 assert_eq!(count, 0);
675 }
676
677 #[tokio::test]
678 async fn test_schema_version_tracking() {
679 let temp_dir = TempDir::new().unwrap();
680 let db_path = temp_dir.path().join("test.db");
681 let pool = create_pool(&db_path).await.unwrap();
682 run_migrations(&pool).await.unwrap();
683
684 let version: String =
686 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
687 .fetch_one(&pool)
688 .await
689 .unwrap();
690
691 assert_eq!(version, "0.11.0");
692 }
693
694 #[tokio::test]
695 async fn test_migration_idempotency_v0_11_0() {
696 let temp_dir = TempDir::new().unwrap();
697 let db_path = temp_dir.path().join("test.db");
698 let pool = create_pool(&db_path).await.unwrap();
699
700 run_migrations(&pool).await.unwrap();
702 run_migrations(&pool).await.unwrap();
703 run_migrations(&pool).await.unwrap();
704
705 let tables: Vec<String> = sqlx::query_scalar(
707 "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
708 )
709 .fetch_all(&pool)
710 .await
711 .unwrap();
712
713 assert!(tables.contains(&"dependencies".to_string()));
714
715 let version: String =
717 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
718 .fetch_one(&pool)
719 .await
720 .unwrap();
721
722 assert_eq!(version, "0.11.0");
723 }
724
725 #[tokio::test]
726 async fn test_sessions_table_created() {
727 let temp_dir = TempDir::new().unwrap();
728 let db_path = temp_dir.path().join("test.db");
729 let pool = create_pool(&db_path).await.unwrap();
730 run_migrations(&pool).await.unwrap();
731
732 let tables: Vec<String> = sqlx::query_scalar(
734 "SELECT name FROM sqlite_master WHERE type='table' AND name='sessions'",
735 )
736 .fetch_all(&pool)
737 .await
738 .unwrap();
739
740 assert!(tables.contains(&"sessions".to_string()));
741
742 let indices: Vec<String> = sqlx::query_scalar(
744 "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_sessions_last_active'",
745 )
746 .fetch_all(&pool)
747 .await
748 .unwrap();
749
750 assert!(indices.contains(&"idx_sessions_last_active".to_string()));
751 }
752}