1pub mod models;
2
3use crate::error::Result;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
5use std::path::Path;
6
7pub async fn create_pool(db_path: &Path) -> Result<SqlitePool> {
8 let options = SqliteConnectOptions::new()
9 .filename(db_path)
10 .create_if_missing(true)
11 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
12 .busy_timeout(std::time::Duration::from_millis(5000));
13
14 let pool = SqlitePoolOptions::new()
15 .max_connections(5)
16 .connect_with(options)
17 .await?;
18
19 Ok(pool)
20}
21
22pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
23 sqlx::query("PRAGMA journal_mode=WAL;")
25 .execute(pool)
26 .await?;
27
28 sqlx::query(
30 r#"
31 CREATE TABLE IF NOT EXISTS tasks (
32 id INTEGER PRIMARY KEY AUTOINCREMENT,
33 parent_id INTEGER,
34 name TEXT NOT NULL,
35 spec TEXT,
36 status TEXT NOT NULL DEFAULT 'todo',
37 complexity INTEGER,
38 priority INTEGER DEFAULT 0,
39 first_todo_at DATETIME,
40 first_doing_at DATETIME,
41 first_done_at DATETIME,
42 FOREIGN KEY (parent_id) REFERENCES tasks(id) ON DELETE CASCADE,
43 CHECK (status IN ('todo', 'doing', 'done'))
44 )
45 "#,
46 )
47 .execute(pool)
48 .await?;
49
50 sqlx::query(
52 r#"
53 CREATE VIRTUAL TABLE IF NOT EXISTS tasks_fts USING fts5(
54 name,
55 spec,
56 content=tasks,
57 content_rowid=id
58 )
59 "#,
60 )
61 .execute(pool)
62 .await?;
63
64 sqlx::query(
66 r#"
67 CREATE TRIGGER IF NOT EXISTS tasks_ai AFTER INSERT ON tasks BEGIN
68 INSERT INTO tasks_fts(rowid, name, spec) VALUES (new.id, new.name, new.spec);
69 END
70 "#,
71 )
72 .execute(pool)
73 .await?;
74
75 sqlx::query(
76 r#"
77 CREATE TRIGGER IF NOT EXISTS tasks_ad AFTER DELETE ON tasks BEGIN
78 DELETE FROM tasks_fts WHERE rowid = old.id;
79 END
80 "#,
81 )
82 .execute(pool)
83 .await?;
84
85 sqlx::query(
86 r#"
87 CREATE TRIGGER IF NOT EXISTS tasks_au AFTER UPDATE ON tasks BEGIN
88 UPDATE tasks_fts SET name = new.name, spec = new.spec WHERE rowid = old.id;
89 END
90 "#,
91 )
92 .execute(pool)
93 .await?;
94
95 sqlx::query(
97 r#"
98 CREATE TABLE IF NOT EXISTS events (
99 id INTEGER PRIMARY KEY AUTOINCREMENT,
100 task_id INTEGER NOT NULL,
101 timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
102 log_type TEXT NOT NULL,
103 discussion_data TEXT NOT NULL,
104 FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
105 )
106 "#,
107 )
108 .execute(pool)
109 .await?;
110
111 sqlx::query(
113 r#"
114 CREATE INDEX IF NOT EXISTS idx_events_task_id ON events(task_id)
115 "#,
116 )
117 .execute(pool)
118 .await?;
119
120 sqlx::query(
122 r#"
123 CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
124 discussion_data,
125 content=events,
126 content_rowid=id
127 )
128 "#,
129 )
130 .execute(pool)
131 .await?;
132
133 sqlx::query(
135 r#"
136 CREATE TRIGGER IF NOT EXISTS events_ai AFTER INSERT ON events BEGIN
137 INSERT INTO events_fts(rowid, discussion_data) VALUES (new.id, new.discussion_data);
138 END
139 "#,
140 )
141 .execute(pool)
142 .await?;
143
144 sqlx::query(
145 r#"
146 CREATE TRIGGER IF NOT EXISTS events_ad AFTER DELETE ON events BEGIN
147 DELETE FROM events_fts WHERE rowid = old.id;
148 END
149 "#,
150 )
151 .execute(pool)
152 .await?;
153
154 sqlx::query(
155 r#"
156 CREATE TRIGGER IF NOT EXISTS events_au AFTER UPDATE ON events BEGIN
157 UPDATE events_fts SET discussion_data = new.discussion_data WHERE rowid = old.id;
158 END
159 "#,
160 )
161 .execute(pool)
162 .await?;
163
164 sqlx::query(
166 r#"
167 CREATE TABLE IF NOT EXISTS workspace_state (
168 key TEXT PRIMARY KEY,
169 value TEXT NOT NULL
170 )
171 "#,
172 )
173 .execute(pool)
174 .await?;
175
176 sqlx::query(
178 r#"
179 CREATE TABLE IF NOT EXISTS dependencies (
180 id INTEGER PRIMARY KEY AUTOINCREMENT,
181 blocking_task_id INTEGER NOT NULL,
182 blocked_task_id INTEGER NOT NULL,
183 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
184 FOREIGN KEY (blocking_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
185 FOREIGN KEY (blocked_task_id) REFERENCES tasks(id) ON DELETE CASCADE,
186 UNIQUE(blocking_task_id, blocked_task_id),
187 CHECK(blocking_task_id != blocked_task_id)
188 )
189 "#,
190 )
191 .execute(pool)
192 .await?;
193
194 sqlx::query(
196 r#"
197 CREATE INDEX IF NOT EXISTS idx_dependencies_blocking
198 ON dependencies(blocking_task_id)
199 "#,
200 )
201 .execute(pool)
202 .await?;
203
204 sqlx::query(
205 r#"
206 CREATE INDEX IF NOT EXISTS idx_dependencies_blocked
207 ON dependencies(blocked_task_id)
208 "#,
209 )
210 .execute(pool)
211 .await?;
212
213 sqlx::query(
215 r#"
216 CREATE INDEX IF NOT EXISTS idx_events_task_type_time
217 ON events(task_id, log_type, timestamp)
218 "#,
219 )
220 .execute(pool)
221 .await?;
222
223 sqlx::query(
225 r#"
226 INSERT INTO workspace_state (key, value)
227 VALUES ('schema_version', '0.2.0')
228 ON CONFLICT(key) DO UPDATE SET value = '0.2.0'
229 "#,
230 )
231 .execute(pool)
232 .await?;
233
234 Ok(())
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use tempfile::TempDir;
241
242 #[tokio::test]
243 async fn test_create_pool_success() {
244 let temp_dir = TempDir::new().unwrap();
245 let db_path = temp_dir.path().join("test.db");
246
247 let pool = create_pool(&db_path).await.unwrap();
248
249 let result: i64 = sqlx::query_scalar("SELECT 1")
251 .fetch_one(&pool)
252 .await
253 .unwrap();
254
255 assert_eq!(result, 1);
256 }
257
258 #[tokio::test]
259 async fn test_run_migrations_creates_tables() {
260 let temp_dir = TempDir::new().unwrap();
261 let db_path = temp_dir.path().join("test.db");
262 let pool = create_pool(&db_path).await.unwrap();
263
264 run_migrations(&pool).await.unwrap();
265
266 let tables: Vec<String> =
268 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
269 .fetch_all(&pool)
270 .await
271 .unwrap();
272
273 assert!(tables.contains(&"tasks".to_string()));
274 assert!(tables.contains(&"events".to_string()));
275 assert!(tables.contains(&"workspace_state".to_string()));
276 }
277
278 #[tokio::test]
279 async fn test_run_migrations_creates_fts_tables() {
280 let temp_dir = TempDir::new().unwrap();
281 let db_path = temp_dir.path().join("test.db");
282 let pool = create_pool(&db_path).await.unwrap();
283
284 run_migrations(&pool).await.unwrap();
285
286 let tables: Vec<String> = sqlx::query_scalar(
288 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%_fts'",
289 )
290 .fetch_all(&pool)
291 .await
292 .unwrap();
293
294 assert!(tables.contains(&"tasks_fts".to_string()));
295 assert!(tables.contains(&"events_fts".to_string()));
296 }
297
298 #[tokio::test]
299 async fn test_run_migrations_creates_triggers() {
300 let temp_dir = TempDir::new().unwrap();
301 let db_path = temp_dir.path().join("test.db");
302 let pool = create_pool(&db_path).await.unwrap();
303
304 run_migrations(&pool).await.unwrap();
305
306 let triggers: Vec<String> =
308 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='trigger'")
309 .fetch_all(&pool)
310 .await
311 .unwrap();
312
313 assert!(triggers.contains(&"tasks_ai".to_string()));
314 assert!(triggers.contains(&"tasks_ad".to_string()));
315 assert!(triggers.contains(&"tasks_au".to_string()));
316 assert!(triggers.contains(&"events_ai".to_string()));
317 assert!(triggers.contains(&"events_ad".to_string()));
318 assert!(triggers.contains(&"events_au".to_string()));
319 }
320
321 #[tokio::test]
322 async fn test_run_migrations_idempotent() {
323 let temp_dir = TempDir::new().unwrap();
324 let db_path = temp_dir.path().join("test.db");
325 let pool = create_pool(&db_path).await.unwrap();
326
327 run_migrations(&pool).await.unwrap();
329 run_migrations(&pool).await.unwrap();
330
331 let tables: Vec<String> =
333 sqlx::query_scalar("SELECT name FROM sqlite_master WHERE type='table'")
334 .fetch_all(&pool)
335 .await
336 .unwrap();
337
338 assert!(tables.len() >= 3);
339 }
340
341 #[tokio::test]
342 async fn test_fts_triggers_work() {
343 let temp_dir = TempDir::new().unwrap();
344 let db_path = temp_dir.path().join("test.db");
345 let pool = create_pool(&db_path).await.unwrap();
346 run_migrations(&pool).await.unwrap();
347
348 sqlx::query("INSERT INTO tasks (name, spec, status) VALUES (?, ?, ?)")
350 .bind("Test task")
351 .bind("Test spec")
352 .bind("todo")
353 .execute(&pool)
354 .await
355 .unwrap();
356
357 let count: i64 =
359 sqlx::query_scalar("SELECT COUNT(*) FROM tasks_fts WHERE name MATCH 'Test'")
360 .fetch_one(&pool)
361 .await
362 .unwrap();
363
364 assert_eq!(count, 1);
365 }
366
367 #[tokio::test]
368 async fn test_workspace_state_table_structure() {
369 let temp_dir = TempDir::new().unwrap();
370 let db_path = temp_dir.path().join("test.db");
371 let pool = create_pool(&db_path).await.unwrap();
372 run_migrations(&pool).await.unwrap();
373
374 sqlx::query("INSERT INTO workspace_state (key, value) VALUES (?, ?)")
376 .bind("test_key")
377 .bind("test_value")
378 .execute(&pool)
379 .await
380 .unwrap();
381
382 let value: String = sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = ?")
383 .bind("test_key")
384 .fetch_one(&pool)
385 .await
386 .unwrap();
387
388 assert_eq!(value, "test_value");
389 }
390
391 #[tokio::test]
392 async fn test_task_status_constraint() {
393 let temp_dir = TempDir::new().unwrap();
394 let db_path = temp_dir.path().join("test.db");
395 let pool = create_pool(&db_path).await.unwrap();
396 run_migrations(&pool).await.unwrap();
397
398 let result = sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
400 .bind("Test")
401 .bind("invalid_status")
402 .execute(&pool)
403 .await;
404
405 assert!(result.is_err());
407 }
408
409 #[tokio::test]
412 async fn test_dependencies_table_created() {
413 let temp_dir = TempDir::new().unwrap();
414 let db_path = temp_dir.path().join("test.db");
415 let pool = create_pool(&db_path).await.unwrap();
416 run_migrations(&pool).await.unwrap();
417
418 let tables: Vec<String> = sqlx::query_scalar(
420 "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
421 )
422 .fetch_all(&pool)
423 .await
424 .unwrap();
425
426 assert!(tables.contains(&"dependencies".to_string()));
427 }
428
429 #[tokio::test]
430 async fn test_dependencies_indexes_created() {
431 let temp_dir = TempDir::new().unwrap();
432 let db_path = temp_dir.path().join("test.db");
433 let pool = create_pool(&db_path).await.unwrap();
434 run_migrations(&pool).await.unwrap();
435
436 let indexes: Vec<String> = sqlx::query_scalar(
438 "SELECT name FROM sqlite_master WHERE type='index' AND name IN ('idx_dependencies_blocking', 'idx_dependencies_blocked', 'idx_events_task_type_time')",
439 )
440 .fetch_all(&pool)
441 .await
442 .unwrap();
443
444 assert!(indexes.contains(&"idx_dependencies_blocking".to_string()));
445 assert!(indexes.contains(&"idx_dependencies_blocked".to_string()));
446 assert!(indexes.contains(&"idx_events_task_type_time".to_string()));
447 }
448
449 #[tokio::test]
450 async fn test_dependencies_self_dependency_constraint() {
451 let temp_dir = TempDir::new().unwrap();
452 let db_path = temp_dir.path().join("test.db");
453 let pool = create_pool(&db_path).await.unwrap();
454 run_migrations(&pool).await.unwrap();
455
456 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
458 .bind("Task 1")
459 .bind("todo")
460 .execute(&pool)
461 .await
462 .unwrap();
463
464 let result = sqlx::query(
466 "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
467 )
468 .bind(1)
469 .bind(1)
470 .execute(&pool)
471 .await;
472
473 assert!(result.is_err());
474 }
475
476 #[tokio::test]
477 async fn test_dependencies_unique_constraint() {
478 let temp_dir = TempDir::new().unwrap();
479 let db_path = temp_dir.path().join("test.db");
480 let pool = create_pool(&db_path).await.unwrap();
481 run_migrations(&pool).await.unwrap();
482
483 for i in 1..=2 {
485 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
486 .bind(format!("Task {}", i))
487 .bind("todo")
488 .execute(&pool)
489 .await
490 .unwrap();
491 }
492
493 sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
495 .bind(1)
496 .bind(2)
497 .execute(&pool)
498 .await
499 .unwrap();
500
501 let result = sqlx::query(
503 "INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)",
504 )
505 .bind(1)
506 .bind(2)
507 .execute(&pool)
508 .await;
509
510 assert!(result.is_err());
511 }
512
513 #[tokio::test]
514 async fn test_dependencies_cascade_delete() {
515 let temp_dir = TempDir::new().unwrap();
516 let db_path = temp_dir.path().join("test.db");
517 let pool = create_pool(&db_path).await.unwrap();
518 run_migrations(&pool).await.unwrap();
519
520 for i in 1..=2 {
522 sqlx::query("INSERT INTO tasks (name, status) VALUES (?, ?)")
523 .bind(format!("Task {}", i))
524 .bind("todo")
525 .execute(&pool)
526 .await
527 .unwrap();
528 }
529
530 sqlx::query("INSERT INTO dependencies (blocking_task_id, blocked_task_id) VALUES (?, ?)")
532 .bind(1)
533 .bind(2)
534 .execute(&pool)
535 .await
536 .unwrap();
537
538 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
540 .fetch_one(&pool)
541 .await
542 .unwrap();
543 assert_eq!(count, 1);
544
545 sqlx::query("DELETE FROM tasks WHERE id = ?")
547 .bind(1)
548 .execute(&pool)
549 .await
550 .unwrap();
551
552 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM dependencies")
554 .fetch_one(&pool)
555 .await
556 .unwrap();
557 assert_eq!(count, 0);
558 }
559
560 #[tokio::test]
561 async fn test_schema_version_tracking() {
562 let temp_dir = TempDir::new().unwrap();
563 let db_path = temp_dir.path().join("test.db");
564 let pool = create_pool(&db_path).await.unwrap();
565 run_migrations(&pool).await.unwrap();
566
567 let version: String =
569 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
570 .fetch_one(&pool)
571 .await
572 .unwrap();
573
574 assert_eq!(version, "0.2.0");
575 }
576
577 #[tokio::test]
578 async fn test_migration_idempotency_v0_2_0() {
579 let temp_dir = TempDir::new().unwrap();
580 let db_path = temp_dir.path().join("test.db");
581 let pool = create_pool(&db_path).await.unwrap();
582
583 run_migrations(&pool).await.unwrap();
585 run_migrations(&pool).await.unwrap();
586 run_migrations(&pool).await.unwrap();
587
588 let tables: Vec<String> = sqlx::query_scalar(
590 "SELECT name FROM sqlite_master WHERE type='table' AND name='dependencies'",
591 )
592 .fetch_all(&pool)
593 .await
594 .unwrap();
595
596 assert!(tables.contains(&"dependencies".to_string()));
597
598 let version: String =
600 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'schema_version'")
601 .fetch_one(&pool)
602 .await
603 .unwrap();
604
605 assert_eq!(version, "0.2.0");
606 }
607}