1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8 let options = SqliteConnectOptions::new()
9 .filename(db_path)
10 .create_if_missing(true)
11 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12 .busy_timeout(std::time::Duration::from_millis(5000));
13
14 let pool = SqlitePoolOptions::new()
15 .max_connections(5)
16 .connect_with(options)
17 .await?;
18
19 Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23 sqlx::query("PRAGMA journal_mode=WAL;")
25 .execute(pool)
26 .await?;
27
28 sqlx::query(
30 r#"
31 CREATE TABLE IF NOT EXISTS tasks (
32 id INTEGER PRIMARY KEY AUTOINCREMENT,
33 parent_id INTEGER,
34 name TEXT NOT NULL,
35 spec TEXT,
36 status TEXT NOT NULL DEFAULT 'todo',
37 complexity INTEGER,
38 priority INTEGER DEFAULT 0,
39 first_todo_at DATETIME,
40 first_doing_at DATETIME,
41 first_done_at DATETIME,
42 active_form TEXT,
43 owner TEXT NOT NULL DEFAULT 'human',
44 FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
45 CHECK (status IN ('todo', 'doing', 'done')),
46 CHECK (owner IN ('human', 'ai'))
47 )
48 "#,
49 )
50 .execute(pool)
51 .await?;
52
53 let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN active_form TEXT")
56 .execute(pool)
57 .await; let _ = sqlx::query("DROP TABLE IF EXISTS tasks_fts")
62 .execute(pool)
63 .await; sqlx::query(
66 r#"
67 CREATE VIRTUAL TABLE tasks_fts USING fts5(
68 name,
69 spec,
70 content=tasks,
71 content_rowid=id,
72 tokenize='trigram'
73 )
74 "#,
75 )
76 .execute(pool)
77 .await?;
78
79 sqlx::query(
81 r#"
82 CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
83 INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
84 END
85 "#,
86 )
87 .execute(pool)
88 .await?;
89
90 sqlx::query(
91 r#"
92 CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
93 DELETE FROM tasks_fts WHERE rowid = old.id;
94 END
95 "#,
96 )
97 .execute(pool)
98 .await?;
99
100 let _ = sqlx::query("DROP TRIGGER IF EXISTS tasks_au")
104 .execute(pool)
105 .await; sqlx::query(
108 r#"
109 CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
110 INSERT INTO tasks_fts(tasks_fts, rowid, name, spec) VALUES('delete', old.id, old.name, old.spec);
111 INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
112 END
113 "#,
114 )
115 .execute(pool)
116 .await?;
117
118 sqlx::query(
120 r#"
121 INSERT INTO tasks_fts(rowid, name, spec)
122 SELECT id, name, spec FROM tasks
123 "#,
124 )
125 .execute(pool)
126 .await?;
127
128 sqlx::query(
130 r#"
131 CREATE TABLE IF NOT EXISTS events (
132 id INTEGER PRIMARY KEY AUTOINCREMENT,
133 task_id INTEGER NOT NULL,
134 timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
135 log_type TEXT NOT NULL,
136 discussion_data TEXT NOT NULL,
137 FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
138 )
139 "#,
140 )
141 .execute(pool)
142 .await?;
143
144 sqlx::query(
146 r#"
147 CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
148 "#,
149 )
150 .execute(pool)
151 .await?;
152
153 sqlx::query(
155 r#"
156 CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
157 discussion_data,
158 content=events,
159 content_rowid=id
160 )
161 "#,
162 )
163 .execute(pool)
164 .await?;
165
166 sqlx::query(
168 r#"
169 CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
170 INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
171 END
172 "#,
173 )
174 .execute(pool)
175 .await?;
176
177 sqlx::query(
178 r#"
179 CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
180 DELETE FROM events_fts WHERE rowid = old.id;
181 END
182 "#,
183 )
184 .execute(pool)
185 .await?;
186
187 let _ = sqlx::query("DROP TRIGGER IF EXISTS events_au")
189 .execute(pool)
190 .await; sqlx::query(
193 r#"
194 CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
195 INSERT INTO events_fts(events_fts, rowid, discussion_data) VALUES('delete', old.id, old.discussion_data);
196 INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
197 END
198 "#,
199 )
200 .execute(pool)
201 .await?;
202
203 sqlx::query(
205 r#"
206 CREATE TABLE IF NOT EXISTS workspace_state (
207 key TEXT PRIMARY KEY,
208 value TEXT NOT NULL
209 )
210 "#,
211 )
212 .execute(pool)
213 .await?;
214
215 sqlx::query(
217 r#"
218 CREATE TABLE IF NOT EXISTS dependencies (
219 id INTEGER PRIMARY KEY AUTOINCREMENT,
220 blocking_task_id INTEGER NOT NULL,
221 blocked_task_id INTEGER NOT NULL,
222 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
223 FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
224 FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
225 UNIQUE(blocking_task_id, blocked_task_id),
226 CHECK(blocking_task_id != blocked_task_id)
227 )
228 "#,
229 )
230 .execute(pool)
231 .await?;
232
233 sqlx::query(
235 r#"
236 CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
237 ON dependencies(blocking_task_id)
238 "#,
239 )
240 .execute(pool)
241 .await?;
242
243 sqlx::query(
244 r#"
245 CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
246 ON dependencies(blocked_task_id)
247 "#,
248 )
249 .execute(pool)
250 .await?;
251
252 sqlx::query(
254 r#"
255 CREATE INDEX IF NOT EXISTS idx_events_task_type_time
256 ON events(task_id, log_type, timestamp)
257 "#,
258 )
259 .execute(pool)
260 .await?;
261
262 sqlx::query(
265 r#"
266 CREATE INDEX IF NOT EXISTS idx_tasks_status_parent_priority
267 ON tasks(status, parent_id, priority, id)
268 "#,
269 )
270 .execute(pool)
271 .await?;
272
273 sqlx::query(
275 r#"
276 CREATE INDEX IF NOT EXISTS idx_tasks_priority_complexity
277 ON tasks(priority, complexity, id)
278 "#,
279 )
280 .execute(pool)
281 .await?;
282
283 sqlx::query(
285 r#"
286 CREATE INDEX IF NOT EXISTS idx_tasks_doing_at
287 ON tasks(first_doing_at)
288 WHERE status = 'doing'
289 "#,
290 )
291 .execute(pool)
292 .await?;
293
294 let _ = sqlx::query("ALTER TABLE tasks ADD COLUMN owner TEXT NOT NULL DEFAULT 'human'")
298 .execute(pool)
299 .await; sqlx::query(
303 r#"
304 INSERT INTO workspace_state (key, value)
305 VALUES ('schema_version', '0.9.0')
306 ON CONFLICT(key) DO UPDATE SET value = '0.9.0'
307 "#,
308 )
309 .execute(pool)
310 .await?;
311
312 Ok(())
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use tempfile::TempDir;
319
320 #[tokio::test]
321 async fn test_create_pool_success() {
322 let temp_dir = TempDir::new().unwrap();
323 let db_path = temp_dir.path().join("test.db");
324
325 let pool = create_pool(&db_path).await.unwrap();
326
327 let result: i64 = sqlx::query_scalar("SELECT 1")
329 .fetch_one(&pool)
330 .await
331 .unwrap();
332
333 assert_eq!(result, 1);
334 }
335
336 #[tokio::test]
337 async fn test_run_migrations_creates_tables() {
338 let temp_dir = TempDir::new().unwrap();
339 let db_path = temp_dir.path().join("test.db");
340 let pool = create_pool(&db_path).await.unwrap();
341
342 run_migrations(&pool).await.unwrap();
343
344 let tables: Vec<String> =
346 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
347 .fetch_all(&pool)
348 .await
349 .unwrap();
350
351 assert!(tables.contains(&"tasks".to_string()));
352 assert!(tables.contains(&"events".to_string()));
353 assert!(tables.contains(&"workspace_state".to_string()));
354 }
355
356 #[tokio::test]
357 async fn test_run_migrations_creates_fts_tables() {
358 let temp_dir = TempDir::new().unwrap();
359 let db_path = temp_dir.path().join("test.db");
360 let pool = create_pool(&db_path).await.unwrap();
361
362 run_migrations(&pool).await.unwrap();
363
364 let tables: Vec<String> = sqlx::query_scalar(
366 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
367 )
368 .fetch_all(&pool)
369 .await
370 .unwrap();
371
372 assert!(tables.contains(&"tasks_fts".to_string()));
373 assert!(tables.contains(&"events_fts".to_string()));
374 }
375
376 #[tokio::test]
377 async fn test_run_migrations_creates_triggers() {
378 let temp_dir = TempDir::new().unwrap();
379 let db_path = temp_dir.path().join("test.db");
380 let pool = create_pool(&db_path).await.unwrap();
381
382 run_migrations(&pool).await.unwrap();
383
384 let triggers: Vec<String> =
386 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
387 .fetch_all(&pool)
388 .await
389 .unwrap();
390
391 assert!(triggers.contains(&"tasks_ai".to_string()));
392 assert!(triggers.contains(&"tasks_ad".to_string()));
393 assert!(triggers.contains(&"tasks_au".to_string()));
394 assert!(triggers.contains(&"events_ai".to_string()));
395 assert!(triggers.contains(&"events_ad".to_string()));
396 assert!(triggers.contains(&"events_au".to_string()));
397 }
398
399 #[tokio::test]
400 async fn test_run_migrations_idempotent() {
401 let temp_dir = TempDir::new().unwrap();
402 let db_path = temp_dir.path().join("test.db");
403 let pool = create_pool(&db_path).await.unwrap();
404
405 run_migrations(&pool).await.unwrap();
407 run_migrations(&pool).await.unwrap();
408
409 let tables: Vec<String> =
411 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
412 .fetch_all(&pool)
413 .await
414 .unwrap();
415
416 assert!(tables.len() >= 3);
417 }
418
419 #[tokio::test]
420 async fn test_fts_triggers_work() {
421 let temp_dir = TempDir::new().unwrap();
422 let db_path = temp_dir.path().join("test.db");
423 let pool = create_pool(&db_path).await.unwrap();
424 run_migrations(&pool).await.unwrap();
425
426 sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
428 .bind("Test task")
429 .bind("Test spec")
430 .bind("todo")
431 .execute(&pool)
432 .await
433 .unwrap();
434
435 let count: i64 =
437 sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
438 .fetch_one(&pool)
439 .await
440 .unwrap();
441
442 assert_eq!(count, 1);
443 }
444
445 #[tokio::test]
446 async fn test_workspace_state_table_structure() {
447 let temp_dir = TempDir::new().unwrap();
448 let db_path = temp_dir.path().join("test.db");
449 let pool = create_pool(&db_path).await.unwrap();
450 run_migrations(&pool).await.unwrap();
451
452 sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
454 .bind("test_key")
455 .bind("test_value")
456 .execute(&pool)
457 .await
458 .unwrap();
459
460 let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
461 .bind("test_key")
462 .fetch_one(&pool)
463 .await
464 .unwrap();
465
466 assert_eq!(value, "test_value");
467 }
468
469 #[tokio::test]
470 async fn test_task_status_constraint() {
471 let temp_dir = TempDir::new().unwrap();
472 let db_path = temp_dir.path().join("test.db");
473 let pool = create_pool(&db_path).await.unwrap();
474 run_migrations(&pool).await.unwrap();
475
476 let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
478 .bind("Test")
479 .bind("invalid_status")
480 .execute(&pool)
481 .await;
482
483 assert!(result.is_err());
485 }
486
487 #[tokio::test]
490 async fn test_dependencies_table_created() {
491 let temp_dir = TempDir::new().unwrap();
492 let db_path = temp_dir.path().join("test.db");
493 let pool = create_pool(&db_path).await.unwrap();
494 run_migrations(&pool).await.unwrap();
495
496 let tables: Vec<String> = sqlx::query_scalar(
498 "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
499 )
500 .fetch_all(&pool)
501 .await
502 .unwrap();
503
504 assert!(tables.contains(&"dependencies".to_string()));
505 }
506
507 #[tokio::test]
508 async fn test_dependencies_indexes_created() {
509 let temp_dir = TempDir::new().unwrap();
510 let db_path = temp_dir.path().join("test.db");
511 let pool = create_pool(&db_path).await.unwrap();
512 run_migrations(&pool).await.unwrap();
513
514 let indexes: Vec<String> = sqlx::query_scalar(
516 "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
517 )
518 .fetch_all(&pool)
519 .await
520 .unwrap();
521
522 assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
523 assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
524 assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
525 }
526
527 #[tokio::test]
528 async fn test_dependencies_self_dependency_constraint() {
529 let temp_dir = TempDir::new().unwrap();
530 let db_path = temp_dir.path().join("test.db");
531 let pool = create_pool(&db_path).await.unwrap();
532 run_migrations(&pool).await.unwrap();
533
534 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
536 .bind("Task 1")
537 .bind("todo")
538 .execute(&pool)
539 .await
540 .unwrap();
541
542 let result = sqlx::query(
544 "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
545 )
546 .bind(1)
547 .bind(1)
548 .execute(&pool)
549 .await;
550
551 assert!(result.is_err());
552 }
553
554 #[tokio::test]
555 async fn test_dependencies_unique_constraint() {
556 let temp_dir = TempDir::new().unwrap();
557 let db_path = temp_dir.path().join("test.db");
558 let pool = create_pool(&db_path).await.unwrap();
559 run_migrations(&pool).await.unwrap();
560
561 for i in 1..=2 {
563 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
564 .bind(format!("Task {}", i))
565 .bind("todo")
566 .execute(&pool)
567 .await
568 .unwrap();
569 }
570
571 sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
573 .bind(1)
574 .bind(2)
575 .execute(&pool)
576 .await
577 .unwrap();
578
579 let result = sqlx::query(
581 "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
582 )
583 .bind(1)
584 .bind(2)
585 .execute(&pool)
586 .await;
587
588 assert!(result.is_err());
589 }
590
591 #[tokio::test]
592 async fn test_dependencies_cascade_delete() {
593 let temp_dir = TempDir::new().unwrap();
594 let db_path = temp_dir.path().join("test.db");
595 let pool = create_pool(&db_path).await.unwrap();
596 run_migrations(&pool).await.unwrap();
597
598 for i in 1..=2 {
600 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
601 .bind(format!("Task {}", i))
602 .bind("todo")
603 .execute(&pool)
604 .await
605 .unwrap();
606 }
607
608 sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
610 .bind(1)
611 .bind(2)
612 .execute(&pool)
613 .await
614 .unwrap();
615
616 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
618 .fetch_one(&pool)
619 .await
620 .unwrap();
621 assert_eq!(count, 1);
622
623 sqlx::query("DELETE FROM tasks WHERE id = ?")
625 .bind(1)
626 .execute(&pool)
627 .await
628 .unwrap();
629
630 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
632 .fetch_one(&pool)
633 .await
634 .unwrap();
635 assert_eq!(count, 0);
636 }
637
638 #[tokio::test]
639 async fn test_schema_version_tracking() {
640 let temp_dir = TempDir::new().unwrap();
641 let db_path = temp_dir.path().join("test.db");
642 let pool = create_pool(&db_path).await.unwrap();
643 run_migrations(&pool).await.unwrap();
644
645 let version: String =
647 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
648 .fetch_one(&pool)
649 .await
650 .unwrap();
651
652 assert_eq!(version, "0.9.0");
653 }
654
655 #[tokio::test]
656 async fn test_migration_idempotency_v0_9_0() {
657 let temp_dir = TempDir::new().unwrap();
658 let db_path = temp_dir.path().join("test.db");
659 let pool = create_pool(&db_path).await.unwrap();
660
661 run_migrations(&pool).await.unwrap();
663 run_migrations(&pool).await.unwrap();
664 run_migrations(&pool).await.unwrap();
665
666 let tables: Vec<String> = sqlx::query_scalar(
668 "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
669 )
670 .fetch_all(&pool)
671 .await
672 .unwrap();
673
674 assert!(tables.contains(&"dependencies".to_string()));
675
676 let version: String =
678 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
679 .fetch_one(&pool)
680 .await
681 .unwrap();
682
683 assert_eq!(version, "0.9.0");
684 }
685}