1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use chrono::{DateTime, Utc};
5use kanban_core::graph::{Edge, Graph};
6use kanban_domain::data_store::DataStore;
7use kanban_domain::{
8 ArchivedCard, Board, Card, CardEdgeType, Column, DependencyGraph, KanbanError, KanbanResult,
9 Snapshot, Sprint, SprintLog,
10};
11use kanban_persistence::{
12 PersistenceError, PersistenceMetadata, PersistenceResult, PersistenceStore, StoreSnapshot,
13};
14use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
15use sqlx::{Pool, Row, Sqlite};
16use uuid::Uuid;
17
18const SCHEMA: &str = include_str!("schema.sql");
19
20pub struct SqliteStore {
22 pool: Pool<Sqlite>,
23 path: PathBuf,
24 instance_id: Uuid,
25}
26
27fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
28 let handle = tokio::runtime::Handle::current();
29 debug_assert!(
30 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread,
31 "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
32 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
33 because synchronous DataStore methods need to block on async SQLite I/O."
34 );
35 tokio::task::block_in_place(|| handle.block_on(f))
36}
37
38fn db_err(e: sqlx::Error) -> KanbanError {
39 KanbanError::Database(e.to_string())
40}
41
42fn ser_err(msg: impl std::fmt::Display) -> KanbanError {
43 KanbanError::Serialization(msg.to_string())
44}
45
46fn p_uuid(s: &str) -> KanbanResult<Uuid> {
47 Uuid::parse_str(s).map_err(ser_err)
48}
49
50fn p_dt(s: &str) -> KanbanResult<DateTime<Utc>> {
51 DateTime::parse_from_rfc3339(s)
52 .map_err(ser_err)
53 .map(|dt| dt.with_timezone(&Utc))
54}
55
56fn p_enum<T: serde::de::DeserializeOwned>(s: &str, label: &str) -> KanbanResult<T> {
57 serde_json::from_value(serde_json::Value::String(s.to_owned()))
58 .map_err(|_| ser_err(format!("unknown {label} variant: {s}")))
59}
60
61fn fmt_dt(dt: &DateTime<Utc>) -> String {
62 dt.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
63}
64
65fn opt_dt(dt: &Option<DateTime<Utc>>) -> Option<String> {
66 dt.as_ref().map(fmt_dt)
67}
68
69fn row_to_board(
72 row: &SqliteRow,
73 sprint_names: Vec<String>,
74 sprint_counters: HashMap<String, u32>,
75) -> KanbanResult<Board> {
76 let id_str: String = row.try_get("id").map_err(db_err)?;
77 let active_sprint_id_str: Option<String> = row.try_get("active_sprint_id").map_err(db_err)?;
78 let completion_column_id_str: Option<String> =
79 row.try_get("completion_column_id").map_err(db_err)?;
80 let task_sort_field_str: String = row.try_get("task_sort_field").map_err(db_err)?;
81 let task_sort_order_str: String = row.try_get("task_sort_order").map_err(db_err)?;
82 let task_list_view_str: String = row.try_get("task_list_view").map_err(db_err)?;
83 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
84 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
85 let sprint_duration_days_raw: Option<i32> =
86 row.try_get("sprint_duration_days").map_err(db_err)?;
87
88 Ok(Board {
89 id: p_uuid(&id_str)?,
90 name: row.try_get("name").map_err(db_err)?,
91 description: row.try_get("description").map_err(db_err)?,
92 sprint_prefix: row.try_get("sprint_prefix").map_err(db_err)?,
93 card_prefix: row.try_get("card_prefix").map_err(db_err)?,
94 task_sort_field: p_enum(&task_sort_field_str, "task_sort_field")?,
95 task_sort_order: p_enum(&task_sort_order_str, "task_sort_order")?,
96 sprint_duration_days: sprint_duration_days_raw.map(|v| v as u32),
97 sprint_names,
98 sprint_name_used_count: row
99 .try_get::<i32, _>("sprint_name_used_count")
100 .map_err(db_err)? as usize,
101 next_sprint_number: row
102 .try_get::<i32, _>("next_sprint_number")
103 .map_err(db_err)? as u32,
104 active_sprint_id: active_sprint_id_str.as_deref().map(p_uuid).transpose()?,
105 task_list_view: p_enum(&task_list_view_str, "task_list_view")?,
106 card_counter: row.try_get::<i32, _>("card_counter").map_err(db_err)? as u32,
107 sprint_counters,
108 completion_column_id: completion_column_id_str
109 .as_deref()
110 .map(p_uuid)
111 .transpose()?,
112 position: row.try_get::<i32, _>("position").map_err(db_err)?,
113 created_at: p_dt(&created_at_str)?,
114 updated_at: p_dt(&updated_at_str)?,
115 })
116}
117
118fn row_to_column(row: &SqliteRow) -> KanbanResult<Column> {
119 let id_str: String = row.try_get("id").map_err(db_err)?;
120 let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
121 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
122 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
123
124 Ok(Column {
125 id: p_uuid(&id_str)?,
126 board_id: p_uuid(&board_id_str)?,
127 name: row.try_get("name").map_err(db_err)?,
128 position: row.try_get("position").map_err(db_err)?,
129 wip_limit: row.try_get("wip_limit").map_err(db_err)?,
130 created_at: p_dt(&created_at_str)?,
131 updated_at: p_dt(&updated_at_str)?,
132 })
133}
134
135fn row_to_card(row: &SqliteRow, sprint_logs: Vec<SprintLog>) -> KanbanResult<Card> {
136 let id_str: String = row.try_get("id").map_err(db_err)?;
137 let column_id_str: String = row.try_get("column_id").map_err(db_err)?;
138 let sprint_id_str: Option<String> = row.try_get("sprint_id").map_err(db_err)?;
139 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
140 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
141 let completed_at_str: Option<String> = row.try_get("completed_at").map_err(db_err)?;
142 let due_date_str: Option<String> = row.try_get("due_date").map_err(db_err)?;
143 let priority_str: String = row.try_get("priority").map_err(db_err)?;
144 let status_str: String = row.try_get("status").map_err(db_err)?;
145 let points_raw: Option<i32> = row.try_get("points").map_err(db_err)?;
146
147 Ok(Card {
148 id: p_uuid(&id_str)?,
149 column_id: p_uuid(&column_id_str)?,
150 title: row.try_get("title").map_err(db_err)?,
151 description: row.try_get("description").map_err(db_err)?,
152 priority: p_enum(&priority_str, "priority")?,
153 status: p_enum(&status_str, "status")?,
154 position: row.try_get("position").map_err(db_err)?,
155 due_date: due_date_str.as_deref().map(p_dt).transpose()?,
156 points: points_raw
157 .map(|v| u8::try_from(v).map_err(|_| ser_err(format!("points {v} out of range"))))
158 .transpose()?,
159 card_number: row.try_get::<i32, _>("card_number").map_err(db_err)? as u32,
160 sprint_id: sprint_id_str.as_deref().map(p_uuid).transpose()?,
161 created_at: p_dt(&created_at_str)?,
162 updated_at: p_dt(&updated_at_str)?,
163 completed_at: completed_at_str.as_deref().map(p_dt).transpose()?,
164 sprint_logs,
165 })
166}
167
168fn row_to_sprint(row: &SqliteRow) -> KanbanResult<Sprint> {
169 let id_str: String = row.try_get("id").map_err(db_err)?;
170 let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
171 let status_str: String = row.try_get("status").map_err(db_err)?;
172 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
173 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
174 let start_date_str: Option<String> = row.try_get("start_date").map_err(db_err)?;
175 let end_date_str: Option<String> = row.try_get("end_date").map_err(db_err)?;
176 let name_index_raw: Option<i32> = row.try_get("name_index").map_err(db_err)?;
177
178 Ok(Sprint {
179 id: p_uuid(&id_str)?,
180 board_id: p_uuid(&board_id_str)?,
181 sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
182 name_index: name_index_raw.map(|v| v as usize),
183 prefix: row.try_get("prefix").map_err(db_err)?,
184 card_prefix: row.try_get("card_prefix").map_err(db_err)?,
185 status: p_enum(&status_str, "sprint status")?,
186 start_date: start_date_str.as_deref().map(p_dt).transpose()?,
187 end_date: end_date_str.as_deref().map(p_dt).transpose()?,
188 created_at: p_dt(&created_at_str)?,
189 updated_at: p_dt(&updated_at_str)?,
190 })
191}
192
193fn rows_to_graph(rows: &[SqliteRow]) -> KanbanResult<DependencyGraph> {
194 let mut graph: Graph<CardEdgeType> = Graph::new();
195 for row in rows {
196 let source_str: String = row.try_get("source_id").map_err(db_err)?;
197 let target_str: String = row.try_get("target_id").map_err(db_err)?;
198 let edge_type_str: String = row.try_get("edge_type").map_err(db_err)?;
199 let direction_str: String = row.try_get("direction").map_err(db_err)?;
200 let weight: Option<f64> = row.try_get("weight").map_err(db_err)?;
201 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
202 let archived_at_str: Option<String> = row.try_get("archived_at").map_err(db_err)?;
203
204 graph.add_edge(Edge {
205 source: p_uuid(&source_str)?,
206 target: p_uuid(&target_str)?,
207 edge_type: p_enum(&edge_type_str, "edge_type")?,
208 direction: p_enum(&direction_str, "edge direction")?,
209 weight: weight.map(|w| w as f32),
210 created_at: p_dt(&created_at_str)?,
211 archived_at: archived_at_str.as_deref().map(p_dt).transpose()?,
212 });
213 }
214 Ok(DependencyGraph { cards: graph })
215}
216
217fn row_to_sprint_log(row: &SqliteRow) -> KanbanResult<SprintLog> {
218 let sprint_id_str: String = row.try_get("sprint_id").map_err(db_err)?;
219 let started_at_str: String = row.try_get("started_at").map_err(db_err)?;
220 let ended_at_str: Option<String> = row.try_get("ended_at").map_err(db_err)?;
221
222 Ok(SprintLog {
223 sprint_id: p_uuid(&sprint_id_str)?,
224 sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
225 sprint_name: row.try_get("sprint_name").map_err(db_err)?,
226 started_at: p_dt(&started_at_str)?,
227 ended_at: ended_at_str.as_deref().map(p_dt).transpose()?,
228 status: row.try_get("status").map_err(db_err)?,
229 })
230}
231
232impl SqliteStore {
235 pub async fn open(path: impl AsRef<Path>) -> KanbanResult<Self> {
236 let handle = tokio::runtime::Handle::current();
237 if handle.runtime_flavor() != tokio::runtime::RuntimeFlavor::MultiThread {
238 return Err(KanbanError::Database(
239 "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
240 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
241 because synchronous DataStore methods need to block on async SQLite I/O."
242 .to_string(),
243 ));
244 }
245
246 let path_buf = path.as_ref().to_path_buf();
247
248 let options = SqliteConnectOptions::new()
249 .filename(&path_buf)
250 .create_if_missing(true)
251 .foreign_keys(true)
252 .pragma("journal_mode", "wal");
253
254 let pool = SqlitePoolOptions::new()
255 .max_connections(2)
256 .connect_with(options)
257 .await
258 .map_err(|e| KanbanError::Database(e.to_string()))?;
259
260 sqlx::raw_sql(SCHEMA)
261 .execute(&pool)
262 .await
263 .map_err(|e| KanbanError::Database(e.to_string()))?;
264
265 Self::migrate(&pool).await?;
266
267 let instance_id = Self::load_or_create_instance_id(&pool).await?;
268
269 Ok(Self {
270 pool,
271 path: path_buf,
272 instance_id,
273 })
274 }
275
276 async fn load_or_create_instance_id(pool: &Pool<Sqlite>) -> KanbanResult<Uuid> {
277 let row: Option<String> =
278 sqlx::query_scalar("SELECT instance_id FROM metadata WHERE id = 1")
279 .fetch_optional(pool)
280 .await
281 .map_err(db_err)?;
282 match row {
283 Some(s) => p_uuid(&s),
284 None => {
285 let id = Uuid::new_v4();
286 let now = Utc::now().to_rfc3339();
287 sqlx::query(
288 "INSERT INTO metadata (id, instance_id, saved_at, schema_version) VALUES (1, ?, ?, 1)",
289 )
290 .bind(id.to_string())
291 .bind(&now)
292 .execute(pool)
293 .await
294 .map_err(db_err)?;
295 Ok(id)
296 }
297 }
298 }
299
300 async fn migrate(pool: &Pool<Sqlite>) -> KanbanResult<()> {
301 let has_command_log: bool = sqlx::query_scalar(
304 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='command_log'",
305 )
306 .fetch_one(pool)
307 .await
308 .map_err(db_err)?;
309
310 if has_command_log {
311 tracing::info!(
312 "dropping legacy command_log table: undo history is now in-session only and cannot be carried back to pre-KAN-405 builds"
313 );
314 sqlx::raw_sql("DROP TABLE IF EXISTS command_log")
315 .execute(pool)
316 .await
317 .map_err(db_err)?;
318 }
319
320 let has_undo_state: bool = sqlx::query_scalar(
321 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='undo_state'",
322 )
323 .fetch_one(pool)
324 .await
325 .map_err(db_err)?;
326
327 if has_undo_state {
328 tracing::info!(
329 "dropping legacy undo_state table: undo cursor is now in-session only and cannot be carried back to pre-KAN-405 builds"
330 );
331 sqlx::raw_sql("DROP TABLE IF EXISTS undo_state")
332 .execute(pool)
333 .await
334 .map_err(db_err)?;
335 }
336
337 let has_position_col: bool = sqlx::query_scalar(
338 "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'position'",
339 )
340 .fetch_one(pool)
341 .await
342 .map_err(db_err)?;
343
344 if !has_position_col {
345 sqlx::raw_sql("ALTER TABLE boards ADD COLUMN position INTEGER NOT NULL DEFAULT 0")
346 .execute(pool)
347 .await
348 .map_err(db_err)?;
349 }
350
351 let has_card_counter_col: bool = sqlx::query_scalar(
352 "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'card_counter'",
353 )
354 .fetch_one(pool)
355 .await
356 .map_err(db_err)?;
357
358 if !has_card_counter_col {
359 sqlx::raw_sql("ALTER TABLE boards ADD COLUMN card_counter INTEGER NOT NULL DEFAULT 1")
360 .execute(pool)
361 .await
362 .map_err(db_err)?;
363 }
364
365 Ok(())
366 }
367
368 pub fn pool(&self) -> &Pool<Sqlite> {
369 &self.pool
370 }
371
372 pub async fn checkpoint(&self) -> KanbanResult<()> {
373 sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
374 .execute(&self.pool)
375 .await
376 .map_err(|e| KanbanError::Database(e.to_string()))?;
377 Ok(())
378 }
379
380 async fn fetch_board_aux(
381 &self,
382 board_id: &str,
383 ) -> KanbanResult<(Vec<String>, HashMap<String, u32>)> {
384 let name_rows =
385 sqlx::query("SELECT name FROM board_sprint_names WHERE board_id = ? ORDER BY position")
386 .bind(board_id)
387 .fetch_all(&self.pool)
388 .await
389 .map_err(db_err)?;
390 let sprint_names: Vec<String> = name_rows
391 .iter()
392 .map(|r| r.try_get("name").map_err(db_err))
393 .collect::<KanbanResult<_>>()?;
394
395 let counter_rows =
396 sqlx::query("SELECT prefix, counter FROM board_sprint_counters WHERE board_id = ?")
397 .bind(board_id)
398 .fetch_all(&self.pool)
399 .await
400 .map_err(db_err)?;
401 let mut sprint_counters = HashMap::new();
402 for row in &counter_rows {
403 let prefix: String = row.try_get("prefix").map_err(db_err)?;
404 let counter: i32 = row.try_get("counter").map_err(db_err)?;
405 sprint_counters.insert(prefix, counter as u32);
406 }
407
408 Ok((sprint_names, sprint_counters))
409 }
410
411 async fn write_board_with_conn(
412 conn: &mut sqlx::SqliteConnection,
413 board: &Board,
414 ) -> KanbanResult<()> {
415 let id = board.id.to_string();
416
417 sqlx::query(
418 "INSERT INTO boards (id, name, description, sprint_prefix, card_prefix,
419 task_sort_field, task_sort_order, sprint_duration_days,
420 sprint_name_used_count, next_sprint_number, active_sprint_id,
421 task_list_view, card_counter, completion_column_id, position,
422 created_at, updated_at)
423 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
424 ON CONFLICT(id) DO UPDATE SET
425 name=excluded.name, description=excluded.description,
426 sprint_prefix=excluded.sprint_prefix, card_prefix=excluded.card_prefix,
427 task_sort_field=excluded.task_sort_field, task_sort_order=excluded.task_sort_order,
428 sprint_duration_days=excluded.sprint_duration_days,
429 sprint_name_used_count=excluded.sprint_name_used_count,
430 next_sprint_number=excluded.next_sprint_number,
431 active_sprint_id=excluded.active_sprint_id,
432 task_list_view=excluded.task_list_view, card_counter=excluded.card_counter,
433 completion_column_id=excluded.completion_column_id,
434 position=excluded.position,
435 updated_at=excluded.updated_at",
436 )
437 .bind(&id)
438 .bind(&board.name)
439 .bind(&board.description)
440 .bind(&board.sprint_prefix)
441 .bind(&board.card_prefix)
442 .bind(format!("{:?}", board.task_sort_field))
443 .bind(format!("{:?}", board.task_sort_order))
444 .bind(board.sprint_duration_days.map(|v| v as i32))
445 .bind(board.sprint_name_used_count as i32)
446 .bind(board.next_sprint_number as i32)
447 .bind(board.active_sprint_id.map(|id| id.to_string()))
448 .bind(format!("{:?}", board.task_list_view))
449 .bind(board.card_counter as i32)
450 .bind(board.completion_column_id.map(|id| id.to_string()))
451 .bind(board.position)
452 .bind(fmt_dt(&board.created_at))
453 .bind(fmt_dt(&board.updated_at))
454 .execute(&mut *conn)
455 .await
456 .map_err(db_err)?;
457
458 sqlx::query("DELETE FROM board_sprint_names WHERE board_id = ?")
459 .bind(&id)
460 .execute(&mut *conn)
461 .await
462 .map_err(db_err)?;
463 for (i, name) in board.sprint_names.iter().enumerate() {
464 sqlx::query(
465 "INSERT INTO board_sprint_names (board_id, position, name) VALUES (?, ?, ?)",
466 )
467 .bind(&id)
468 .bind(i as i32)
469 .bind(name)
470 .execute(&mut *conn)
471 .await
472 .map_err(db_err)?;
473 }
474
475 sqlx::query("DELETE FROM board_sprint_counters WHERE board_id = ?")
476 .bind(&id)
477 .execute(&mut *conn)
478 .await
479 .map_err(db_err)?;
480 for (prefix, counter) in &board.sprint_counters {
481 sqlx::query(
482 "INSERT INTO board_sprint_counters (board_id, prefix, counter) VALUES (?, ?, ?)",
483 )
484 .bind(&id)
485 .bind(prefix)
486 .bind(*counter as i32)
487 .execute(&mut *conn)
488 .await
489 .map_err(db_err)?;
490 }
491
492 Ok(())
493 }
494
495 async fn write_board_async(&self, board: &Board) -> KanbanResult<()> {
496 let mut tx = self.pool.begin().await.map_err(db_err)?;
497 Self::write_board_with_conn(&mut tx, board).await?;
498 tx.commit().await.map_err(db_err)?;
499 Ok(())
500 }
501
502 async fn fetch_sprint_logs_for_card(&self, card_id: &str) -> KanbanResult<Vec<SprintLog>> {
503 let rows = sqlx::query(
504 "SELECT sprint_id, sprint_number, sprint_name, started_at, ended_at, status
505 FROM sprint_logs WHERE card_id = ? ORDER BY id",
506 )
507 .bind(card_id)
508 .fetch_all(&self.pool)
509 .await
510 .map_err(db_err)?;
511 rows.iter().map(row_to_sprint_log).collect()
512 }
513
514 async fn write_card_with_conn(
515 conn: &mut sqlx::SqliteConnection,
516 card: &Card,
517 ) -> KanbanResult<()> {
518 let id = card.id.to_string();
519
520 sqlx::query(
521 "INSERT INTO cards (id, column_id, title, description, priority, status, position,
522 due_date, points, card_number, sprint_id, created_at, updated_at, completed_at)
523 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
524 ON CONFLICT(id) DO UPDATE SET
525 column_id=excluded.column_id, title=excluded.title,
526 description=excluded.description, priority=excluded.priority,
527 status=excluded.status, position=excluded.position,
528 due_date=excluded.due_date, points=excluded.points,
529 card_number=excluded.card_number, sprint_id=excluded.sprint_id,
530 updated_at=excluded.updated_at, completed_at=excluded.completed_at",
531 )
532 .bind(&id)
533 .bind(card.column_id.to_string())
534 .bind(&card.title)
535 .bind(&card.description)
536 .bind(format!("{:?}", card.priority))
537 .bind(format!("{:?}", card.status))
538 .bind(card.position)
539 .bind(opt_dt(&card.due_date))
540 .bind(card.points.map(|v| v as i32))
541 .bind(card.card_number as i32)
542 .bind(card.sprint_id.map(|id| id.to_string()))
543 .bind(fmt_dt(&card.created_at))
544 .bind(fmt_dt(&card.updated_at))
545 .bind(opt_dt(&card.completed_at))
546 .execute(&mut *conn)
547 .await
548 .map_err(db_err)?;
549
550 sqlx::query("DELETE FROM sprint_logs WHERE card_id = ?")
551 .bind(&id)
552 .execute(&mut *conn)
553 .await
554 .map_err(db_err)?;
555 for log in &card.sprint_logs {
556 sqlx::query(
557 "INSERT INTO sprint_logs (card_id, sprint_id, sprint_number, sprint_name,
558 started_at, ended_at, status)
559 VALUES (?, ?, ?, ?, ?, ?, ?)",
560 )
561 .bind(&id)
562 .bind(log.sprint_id.to_string())
563 .bind(log.sprint_number as i32)
564 .bind(&log.sprint_name)
565 .bind(fmt_dt(&log.started_at))
566 .bind(opt_dt(&log.ended_at))
567 .bind(&log.status)
568 .execute(&mut *conn)
569 .await
570 .map_err(db_err)?;
571 }
572
573 Ok(())
574 }
575
576 async fn write_card_async(&self, card: &Card) -> KanbanResult<()> {
577 let mut tx = self.pool.begin().await.map_err(db_err)?;
578 Self::write_card_with_conn(&mut tx, card).await?;
579 tx.commit().await.map_err(db_err)?;
580 Ok(())
581 }
582
583 async fn fetch_sprint_logs_batch(
584 &self,
585 card_ids: &[String],
586 ) -> KanbanResult<HashMap<String, Vec<SprintLog>>> {
587 if card_ids.is_empty() {
588 return Ok(HashMap::new());
589 }
590 let placeholders = card_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
591 let sql = format!(
592 "SELECT card_id, sprint_id, sprint_number, sprint_name, started_at, ended_at, status
593 FROM sprint_logs WHERE card_id IN ({placeholders}) ORDER BY id"
594 );
595 let mut query = sqlx::query(&sql);
596 for id in card_ids {
597 query = query.bind(id);
598 }
599 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
600 let mut map: HashMap<String, Vec<SprintLog>> = HashMap::new();
601 for row in &rows {
602 let card_id: String = row.try_get("card_id").map_err(db_err)?;
603 let log = row_to_sprint_log(row)?;
604 map.entry(card_id).or_default().push(log);
605 }
606 Ok(map)
607 }
608
609 async fn fetch_cards_with_filter(
610 &self,
611 where_clause: &str,
612 binds: &[String],
613 ) -> KanbanResult<Vec<Card>> {
614 let sql = format!(
615 "SELECT id, column_id, title, description, priority, status, position,
616 due_date, points, card_number, sprint_id, created_at, updated_at, completed_at
617 FROM cards WHERE id NOT IN (SELECT card_id FROM archived_cards) {}
618 ORDER BY position ASC, created_at ASC",
619 where_clause
620 );
621 let mut query = sqlx::query(&sql);
622 for b in binds {
623 query = query.bind(b);
624 }
625 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
626
627 let card_ids: Vec<String> = rows
628 .iter()
629 .map(|r| r.try_get("id").map_err(db_err))
630 .collect::<KanbanResult<_>>()?;
631 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
632
633 let mut cards = Vec::with_capacity(rows.len());
634 for row in &rows {
635 let id_str: String = row.try_get("id").map_err(db_err)?;
636 let logs = logs_map.remove(&id_str).unwrap_or_default();
637 cards.push(row_to_card(row, logs)?);
638 }
639 Ok(cards)
640 }
641
642 async fn get_graph_with_conn(
643 conn: &mut sqlx::SqliteConnection,
644 ) -> KanbanResult<DependencyGraph> {
645 let rows = sqlx::query(
646 "SELECT source_id, target_id, edge_type, direction, weight, created_at, archived_at
647 FROM card_edges",
648 )
649 .fetch_all(&mut *conn)
650 .await
651 .map_err(db_err)?;
652 rows_to_graph(&rows)
653 }
654
655 async fn modify_graph_async(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
656 let mut tx = self.pool.begin().await.map_err(db_err)?;
657 let mut graph = Self::get_graph_with_conn(&mut tx).await?;
658 f(&mut graph)?;
659 Self::write_graph_with_conn(&mut tx, &graph).await?;
660 tx.commit().await.map_err(db_err)?;
661 Ok(())
662 }
663
664 async fn write_graph_with_conn(
665 conn: &mut sqlx::SqliteConnection,
666 graph: &DependencyGraph,
667 ) -> KanbanResult<()> {
668 sqlx::query("DELETE FROM card_edges")
669 .execute(&mut *conn)
670 .await
671 .map_err(db_err)?;
672
673 for edge in graph.cards.edges() {
674 sqlx::query(
675 "INSERT INTO card_edges
676 (source_id, target_id, edge_type, direction, weight, created_at, archived_at)
677 VALUES (?, ?, ?, ?, ?, ?, ?)",
678 )
679 .bind(edge.source.to_string())
680 .bind(edge.target.to_string())
681 .bind(format!("{:?}", edge.edge_type))
682 .bind(format!("{:?}", edge.direction))
683 .bind(edge.weight.map(|w| w as f64))
684 .bind(fmt_dt(&edge.created_at))
685 .bind(opt_dt(&edge.archived_at))
686 .execute(&mut *conn)
687 .await
688 .map_err(db_err)?;
689 }
690
691 Ok(())
692 }
693
694 async fn write_graph_async(&self, graph: &DependencyGraph) -> KanbanResult<()> {
695 let mut tx = self.pool.begin().await.map_err(db_err)?;
696 Self::write_graph_with_conn(&mut tx, graph).await?;
697 tx.commit().await.map_err(db_err)?;
698 Ok(())
699 }
700
701 async fn snapshot_async(&self) -> KanbanResult<Snapshot> {
702 let boards = self.list_boards_async().await?;
703 let columns = self.list_all_columns_async().await?;
704 let cards = self.fetch_cards_with_filter("", &[]).await?;
705 let archived_cards = self.list_archived_cards_async().await?;
706 let sprints = self.list_all_sprints_async().await?;
707 let graph = self.get_graph_async().await?;
708 Ok(Snapshot::from_data(
709 boards,
710 columns,
711 cards,
712 archived_cards,
713 sprints,
714 graph,
715 ))
716 }
717
718 async fn apply_snapshot_async(&self, snapshot: Snapshot) -> KanbanResult<()> {
719 let mut tx = self.pool.begin().await.map_err(db_err)?;
720
721 sqlx::query("PRAGMA defer_foreign_keys = ON")
722 .execute(&mut *tx)
723 .await
724 .map_err(db_err)?;
725
726 sqlx::query("DELETE FROM card_edges")
727 .execute(&mut *tx)
728 .await
729 .map_err(db_err)?;
730 sqlx::query("DELETE FROM archived_cards")
731 .execute(&mut *tx)
732 .await
733 .map_err(db_err)?;
734 sqlx::query("DELETE FROM sprint_logs")
735 .execute(&mut *tx)
736 .await
737 .map_err(db_err)?;
738 sqlx::query("DELETE FROM cards")
739 .execute(&mut *tx)
740 .await
741 .map_err(db_err)?;
742 sqlx::query("DELETE FROM sprints")
743 .execute(&mut *tx)
744 .await
745 .map_err(db_err)?;
746 sqlx::query("DELETE FROM board_sprint_names")
747 .execute(&mut *tx)
748 .await
749 .map_err(db_err)?;
750 sqlx::query("DELETE FROM board_sprint_counters")
751 .execute(&mut *tx)
752 .await
753 .map_err(db_err)?;
754 sqlx::query("DELETE FROM columns")
755 .execute(&mut *tx)
756 .await
757 .map_err(db_err)?;
758 sqlx::query("DELETE FROM boards")
759 .execute(&mut *tx)
760 .await
761 .map_err(db_err)?;
762
763 for board in &snapshot.boards {
764 Self::write_board_with_conn(&mut tx, board).await?;
765 }
766 for column in &snapshot.columns {
767 Self::write_column_with_conn(&mut tx, column).await?;
768 }
769 for sprint in &snapshot.sprints {
770 Self::write_sprint_with_conn(&mut tx, sprint).await?;
771 }
772 for card in &snapshot.cards {
773 Self::write_card_with_conn(&mut tx, card).await?;
774 }
775 for ac in &snapshot.archived_cards {
776 Self::write_archived_card_with_conn(&mut tx, ac).await?;
777 }
778 Self::write_graph_with_conn(&mut tx, &snapshot.graph).await?;
779
780 tx.commit().await.map_err(db_err)?;
781 Ok(())
782 }
783
784 async fn fetch_all_board_aux(
785 &self,
786 ) -> KanbanResult<(
787 HashMap<String, Vec<String>>,
788 HashMap<String, HashMap<String, u32>>,
789 )> {
790 let name_rows = sqlx::query(
791 "SELECT board_id, name FROM board_sprint_names ORDER BY board_id, position",
792 )
793 .fetch_all(&self.pool)
794 .await
795 .map_err(db_err)?;
796 let mut names_map: HashMap<String, Vec<String>> = HashMap::new();
797 for row in &name_rows {
798 let board_id: String = row.try_get("board_id").map_err(db_err)?;
799 let name: String = row.try_get("name").map_err(db_err)?;
800 names_map.entry(board_id).or_default().push(name);
801 }
802
803 let counter_rows =
804 sqlx::query("SELECT board_id, prefix, counter FROM board_sprint_counters")
805 .fetch_all(&self.pool)
806 .await
807 .map_err(db_err)?;
808 let mut counters_map: HashMap<String, HashMap<String, u32>> = HashMap::new();
809 for row in &counter_rows {
810 let board_id: String = row.try_get("board_id").map_err(db_err)?;
811 let prefix: String = row.try_get("prefix").map_err(db_err)?;
812 let counter: i32 = row.try_get("counter").map_err(db_err)?;
813 counters_map
814 .entry(board_id)
815 .or_default()
816 .insert(prefix, counter as u32);
817 }
818
819 Ok((names_map, counters_map))
820 }
821
822 async fn list_boards_async(&self) -> KanbanResult<Vec<Board>> {
823 let rows = sqlx::query(
824 "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
825 task_sort_order, sprint_duration_days, sprint_name_used_count,
826 next_sprint_number, active_sprint_id, task_list_view,
827 COALESCE(card_counter, 1) as card_counter,
828 completion_column_id, position, created_at, updated_at
829 FROM boards ORDER BY position ASC",
830 )
831 .fetch_all(&self.pool)
832 .await
833 .map_err(db_err)?;
834
835 let (mut names_map, mut counters_map) = self.fetch_all_board_aux().await?;
836
837 let mut boards = Vec::with_capacity(rows.len());
838 for row in &rows {
839 let id_str: String = row.try_get("id").map_err(db_err)?;
840 let names = names_map.remove(&id_str).unwrap_or_default();
841 let counters = counters_map.remove(&id_str).unwrap_or_default();
842 boards.push(row_to_board(row, names, counters)?);
843 }
844 Ok(boards)
845 }
846
847 async fn list_all_columns_async(&self) -> KanbanResult<Vec<Column>> {
848 let rows = sqlx::query(
849 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
850 FROM columns ORDER BY position",
851 )
852 .fetch_all(&self.pool)
853 .await
854 .map_err(db_err)?;
855 rows.iter().map(row_to_column).collect()
856 }
857
858 async fn list_all_sprints_async(&self) -> KanbanResult<Vec<Sprint>> {
859 let rows = sqlx::query(
860 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
861 status, start_date, end_date, created_at, updated_at
862 FROM sprints ORDER BY sprint_number",
863 )
864 .fetch_all(&self.pool)
865 .await
866 .map_err(db_err)?;
867 rows.iter().map(row_to_sprint).collect()
868 }
869
870 async fn list_archived_cards_async(&self) -> KanbanResult<Vec<ArchivedCard>> {
871 let rows = sqlx::query(
872 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
873 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
874 c.created_at, c.updated_at, c.completed_at,
875 ac.archived_at, ac.original_column_id, ac.original_position
876 FROM archived_cards ac
877 JOIN cards c ON ac.card_id = c.id
878 ORDER BY ac.archived_at",
879 )
880 .fetch_all(&self.pool)
881 .await
882 .map_err(db_err)?;
883
884 let card_ids: Vec<String> = rows
885 .iter()
886 .map(|r| r.try_get("id").map_err(db_err))
887 .collect::<KanbanResult<_>>()?;
888 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
889
890 let mut result = Vec::with_capacity(rows.len());
891 for row in &rows {
892 let id_str: String = row.try_get("id").map_err(db_err)?;
893 let logs = logs_map.remove(&id_str).unwrap_or_default();
894 let card = row_to_card(row, logs)?;
895 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
896 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
897 result.push(ArchivedCard {
898 card,
899 archived_at: p_dt(&archived_at_str)?,
900 original_column_id: p_uuid(&orig_col_str)?,
901 original_position: row.try_get("original_position").map_err(db_err)?,
902 });
903 }
904 Ok(result)
905 }
906
907 async fn get_graph_async(&self) -> KanbanResult<DependencyGraph> {
908 let rows = sqlx::query(
909 "SELECT source_id, target_id, edge_type, direction, weight, created_at, archived_at
910 FROM card_edges",
911 )
912 .fetch_all(&self.pool)
913 .await
914 .map_err(db_err)?;
915 rows_to_graph(&rows)
916 }
917
918 async fn write_column_with_conn(
919 conn: &mut sqlx::SqliteConnection,
920 column: &Column,
921 ) -> KanbanResult<()> {
922 sqlx::query(
923 "INSERT INTO columns (id, board_id, name, position, wip_limit, created_at, updated_at)
924 VALUES (?, ?, ?, ?, ?, ?, ?)
925 ON CONFLICT(id) DO UPDATE SET
926 board_id=excluded.board_id, name=excluded.name,
927 position=excluded.position, wip_limit=excluded.wip_limit,
928 updated_at=excluded.updated_at",
929 )
930 .bind(column.id.to_string())
931 .bind(column.board_id.to_string())
932 .bind(&column.name)
933 .bind(column.position)
934 .bind(column.wip_limit)
935 .bind(fmt_dt(&column.created_at))
936 .bind(fmt_dt(&column.updated_at))
937 .execute(&mut *conn)
938 .await
939 .map_err(db_err)?;
940 Ok(())
941 }
942
943 async fn write_column_async(&self, column: &Column) -> KanbanResult<()> {
944 Self::write_column_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, column).await
945 }
946
947 async fn write_sprint_with_conn(
948 conn: &mut sqlx::SqliteConnection,
949 sprint: &Sprint,
950 ) -> KanbanResult<()> {
951 sqlx::query(
952 "INSERT INTO sprints (id, board_id, sprint_number, name_index, prefix, card_prefix,
953 status, start_date, end_date, created_at, updated_at)
954 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
955 ON CONFLICT(id) DO UPDATE SET
956 board_id=excluded.board_id, sprint_number=excluded.sprint_number,
957 name_index=excluded.name_index, prefix=excluded.prefix,
958 card_prefix=excluded.card_prefix, status=excluded.status,
959 start_date=excluded.start_date, end_date=excluded.end_date,
960 updated_at=excluded.updated_at",
961 )
962 .bind(sprint.id.to_string())
963 .bind(sprint.board_id.to_string())
964 .bind(sprint.sprint_number as i32)
965 .bind(sprint.name_index.map(|v| v as i32))
966 .bind(&sprint.prefix)
967 .bind(&sprint.card_prefix)
968 .bind(format!("{:?}", sprint.status))
969 .bind(opt_dt(&sprint.start_date))
970 .bind(opt_dt(&sprint.end_date))
971 .bind(fmt_dt(&sprint.created_at))
972 .bind(fmt_dt(&sprint.updated_at))
973 .execute(&mut *conn)
974 .await
975 .map_err(db_err)?;
976 Ok(())
977 }
978
979 async fn write_sprint_async(&self, sprint: &Sprint) -> KanbanResult<()> {
980 Self::write_sprint_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, sprint).await
981 }
982
983 async fn write_archived_card_with_conn(
984 conn: &mut sqlx::SqliteConnection,
985 ac: &ArchivedCard,
986 ) -> KanbanResult<()> {
987 Self::write_card_with_conn(conn, &ac.card).await?;
988 sqlx::query(
989 "INSERT INTO archived_cards (card_id, archived_at, original_column_id, original_position)
990 VALUES (?, ?, ?, ?)
991 ON CONFLICT(card_id) DO UPDATE SET
992 archived_at=excluded.archived_at,
993 original_column_id=excluded.original_column_id,
994 original_position=excluded.original_position",
995 )
996 .bind(ac.card.id.to_string())
997 .bind(fmt_dt(&ac.archived_at))
998 .bind(ac.original_column_id.to_string())
999 .bind(ac.original_position)
1000 .execute(&mut *conn)
1001 .await
1002 .map_err(db_err)?;
1003 Ok(())
1004 }
1005
1006 async fn write_archived_card_async(&self, ac: &ArchivedCard) -> KanbanResult<()> {
1007 let mut tx = self.pool.begin().await.map_err(db_err)?;
1008 Self::write_archived_card_with_conn(&mut tx, ac).await?;
1009 tx.commit().await.map_err(db_err)?;
1010 Ok(())
1011 }
1012}
1013
1014impl DataStore for SqliteStore {
1015 fn get_board(&self, id: Uuid) -> KanbanResult<Option<Board>> {
1018 run(async {
1019 let id_str = id.to_string();
1020 let row = sqlx::query(
1021 "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1022 task_sort_order, sprint_duration_days, sprint_name_used_count,
1023 next_sprint_number, active_sprint_id, task_list_view,
1024 COALESCE(card_counter, 1) as card_counter,
1025 completion_column_id, position, created_at, updated_at
1026 FROM boards WHERE id = ?",
1027 )
1028 .bind(&id_str)
1029 .fetch_optional(&self.pool)
1030 .await
1031 .map_err(db_err)?;
1032
1033 match row {
1034 Some(row) => {
1035 let (names, counters) = self.fetch_board_aux(&id_str).await?;
1036 Ok(Some(row_to_board(&row, names, counters)?))
1037 }
1038 None => Ok(None),
1039 }
1040 })
1041 }
1042
1043 fn list_boards(&self) -> KanbanResult<Vec<Board>> {
1044 run(self.list_boards_async())
1045 }
1046
1047 fn upsert_board(&self, board: Board) -> KanbanResult<()> {
1048 run(self.write_board_async(&board))
1049 }
1050
1051 fn delete_board(&self, id: Uuid) -> KanbanResult<()> {
1052 run(async {
1053 sqlx::query("DELETE FROM boards WHERE id = ?")
1054 .bind(id.to_string())
1055 .execute(&self.pool)
1056 .await
1057 .map_err(db_err)?;
1058 Ok(())
1059 })
1060 }
1061
1062 fn get_column(&self, id: Uuid) -> KanbanResult<Option<Column>> {
1065 run(async {
1066 let row = sqlx::query(
1067 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1068 FROM columns WHERE id = ?",
1069 )
1070 .bind(id.to_string())
1071 .fetch_optional(&self.pool)
1072 .await
1073 .map_err(db_err)?;
1074 row.as_ref().map(row_to_column).transpose()
1075 })
1076 }
1077
1078 fn list_columns_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Column>> {
1079 run(async {
1080 let rows = sqlx::query(
1081 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1082 FROM columns WHERE board_id = ? ORDER BY position",
1083 )
1084 .bind(board_id.to_string())
1085 .fetch_all(&self.pool)
1086 .await
1087 .map_err(db_err)?;
1088 rows.iter().map(row_to_column).collect()
1089 })
1090 }
1091
1092 fn list_all_columns(&self) -> KanbanResult<Vec<Column>> {
1093 run(self.list_all_columns_async())
1094 }
1095
1096 fn upsert_column(&self, column: Column) -> KanbanResult<()> {
1097 run(self.write_column_async(&column))
1098 }
1099
1100 fn delete_column(&self, id: Uuid) -> KanbanResult<()> {
1101 run(async {
1102 sqlx::query("DELETE FROM columns WHERE id = ?")
1103 .bind(id.to_string())
1104 .execute(&self.pool)
1105 .await
1106 .map_err(db_err)?;
1107 Ok(())
1108 })
1109 }
1110
1111 fn delete_columns_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1112 run(async {
1113 sqlx::query("DELETE FROM columns WHERE board_id = ?")
1114 .bind(board_id.to_string())
1115 .execute(&self.pool)
1116 .await
1117 .map_err(db_err)?;
1118 Ok(())
1119 })
1120 }
1121
1122 fn get_card(&self, id: Uuid) -> KanbanResult<Option<Card>> {
1125 run(async {
1126 let id_str = id.to_string();
1127 let row = sqlx::query(
1128 "SELECT id, column_id, title, description, priority, status, position,
1129 due_date, points, card_number, sprint_id, created_at, updated_at,
1130 completed_at
1131 FROM cards
1132 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1133 )
1134 .bind(&id_str)
1135 .fetch_optional(&self.pool)
1136 .await
1137 .map_err(db_err)?;
1138
1139 match row {
1140 Some(row) => {
1141 let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1142 Ok(Some(row_to_card(&row, logs)?))
1143 }
1144 None => Ok(None),
1145 }
1146 })
1147 }
1148
1149 fn list_all_cards(&self) -> KanbanResult<Vec<Card>> {
1150 run(self.fetch_cards_with_filter("", &[]))
1151 }
1152
1153 fn list_cards_by_column(&self, column_id: Uuid) -> KanbanResult<Vec<Card>> {
1154 run(self.fetch_cards_with_filter("AND column_id = ?", &[column_id.to_string()]))
1155 }
1156
1157 fn list_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<Vec<Card>> {
1158 if column_ids.is_empty() {
1159 return Ok(Vec::new());
1160 }
1161 let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1162 let where_clause = format!("AND column_id IN ({placeholders})");
1163 let binds: Vec<String> = column_ids.iter().map(|id| id.to_string()).collect();
1164 run(self.fetch_cards_with_filter(&where_clause, &binds))
1165 }
1166
1167 fn list_cards_by_sprint(&self, sprint_id: Uuid) -> KanbanResult<Vec<Card>> {
1168 run(self.fetch_cards_with_filter("AND sprint_id = ?", &[sprint_id.to_string()]))
1169 }
1170
1171 fn count_cards_in_column(&self, column_id: Uuid) -> KanbanResult<usize> {
1172 run(async {
1173 let row = sqlx::query(
1174 "SELECT COUNT(*) as cnt FROM cards
1175 WHERE column_id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1176 )
1177 .bind(column_id.to_string())
1178 .fetch_one(&self.pool)
1179 .await
1180 .map_err(db_err)?;
1181 Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1182 })
1183 }
1184
1185 fn count_cards_in_column_excluding(
1186 &self,
1187 column_id: Uuid,
1188 exclude: &[Uuid],
1189 ) -> KanbanResult<usize> {
1190 run(async {
1191 if exclude.is_empty() {
1192 return self.count_cards_in_column(column_id);
1193 }
1194 let placeholders = exclude.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1195 let sql = format!(
1196 "SELECT COUNT(*) as cnt FROM cards
1197 WHERE column_id = ?
1198 AND id NOT IN (SELECT card_id FROM archived_cards)
1199 AND id NOT IN ({placeholders})"
1200 );
1201 let mut query = sqlx::query(&sql).bind(column_id.to_string());
1202 for id in exclude {
1203 query = query.bind(id.to_string());
1204 }
1205 let row = query.fetch_one(&self.pool).await.map_err(db_err)?;
1206 Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1207 })
1208 }
1209
1210 fn upsert_card(&self, card: Card) -> KanbanResult<()> {
1211 run(self.write_card_async(&card))
1212 }
1213
1214 fn delete_card(&self, id: Uuid) -> KanbanResult<()> {
1215 run(async {
1216 sqlx::query(
1217 "DELETE FROM cards
1218 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1219 )
1220 .bind(id.to_string())
1221 .execute(&self.pool)
1222 .await
1223 .map_err(db_err)?;
1224 Ok(())
1225 })
1226 }
1227
1228 fn delete_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<()> {
1229 run(async {
1230 if column_ids.is_empty() {
1231 return Ok(());
1232 }
1233 let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1234 let sql = format!(
1235 "DELETE FROM cards
1236 WHERE column_id IN ({placeholders})
1237 AND id NOT IN (SELECT card_id FROM archived_cards)"
1238 );
1239 let mut query = sqlx::query(&sql);
1240 for id in column_ids {
1241 query = query.bind(id.to_string());
1242 }
1243 query.execute(&self.pool).await.map_err(db_err)?;
1244 Ok(())
1245 })
1246 }
1247
1248 fn clear_sprint_from_cards(
1249 &self,
1250 sprint_id: Uuid,
1251 timestamp: DateTime<Utc>,
1252 ) -> KanbanResult<()> {
1253 run(async {
1254 let now = fmt_dt(×tamp);
1255 sqlx::query(
1256 "UPDATE cards SET sprint_id = NULL, updated_at = ?
1257 WHERE sprint_id = ?
1258 AND id NOT IN (SELECT card_id FROM archived_cards)",
1259 )
1260 .bind(&now)
1261 .bind(sprint_id.to_string())
1262 .execute(&self.pool)
1263 .await
1264 .map_err(db_err)?;
1265 Ok(())
1266 })
1267 }
1268
1269 fn get_archived_card(&self, card_id: Uuid) -> KanbanResult<Option<ArchivedCard>> {
1272 run(async {
1273 let id_str = card_id.to_string();
1274 let row = sqlx::query(
1275 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1276 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1277 c.created_at, c.updated_at, c.completed_at,
1278 ac.archived_at, ac.original_column_id, ac.original_position
1279 FROM archived_cards ac
1280 JOIN cards c ON ac.card_id = c.id
1281 WHERE ac.card_id = ?",
1282 )
1283 .bind(&id_str)
1284 .fetch_optional(&self.pool)
1285 .await
1286 .map_err(db_err)?;
1287
1288 match row {
1289 Some(row) => {
1290 let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1291 let card = row_to_card(&row, logs)?;
1292 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1293 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1294 Ok(Some(ArchivedCard {
1295 card,
1296 archived_at: p_dt(&archived_at_str)?,
1297 original_column_id: p_uuid(&orig_col_str)?,
1298 original_position: row.try_get("original_position").map_err(db_err)?,
1299 }))
1300 }
1301 None => Ok(None),
1302 }
1303 })
1304 }
1305
1306 fn list_archived_cards(&self) -> KanbanResult<Vec<ArchivedCard>> {
1307 run(self.list_archived_cards_async())
1308 }
1309
1310 fn insert_archived_card(&self, ac: ArchivedCard) -> KanbanResult<()> {
1311 run(self.write_archived_card_async(&ac))
1312 }
1313
1314 fn delete_archived_card(&self, card_id: Uuid) -> KanbanResult<()> {
1315 run(async {
1316 let mut tx = self.pool.begin().await.map_err(db_err)?;
1317 sqlx::query("DELETE FROM archived_cards WHERE card_id = ?")
1318 .bind(card_id.to_string())
1319 .execute(&mut *tx)
1320 .await
1321 .map_err(db_err)?;
1322 sqlx::query("DELETE FROM cards WHERE id = ?")
1323 .bind(card_id.to_string())
1324 .execute(&mut *tx)
1325 .await
1326 .map_err(db_err)?;
1327 tx.commit().await.map_err(db_err)
1328 })
1329 }
1330
1331 fn list_archived_cards_by_columns(
1332 &self,
1333 column_ids: &[Uuid],
1334 ) -> KanbanResult<Vec<ArchivedCard>> {
1335 if column_ids.is_empty() {
1336 return Ok(Vec::new());
1337 }
1338 run(async {
1339 let placeholders: Vec<&str> = column_ids.iter().map(|_| "?").collect();
1340 let sql = format!(
1341 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1342 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1343 c.created_at, c.updated_at, c.completed_at,
1344 ac.archived_at, ac.original_column_id, ac.original_position
1345 FROM archived_cards ac
1346 JOIN cards c ON ac.card_id = c.id
1347 WHERE ac.original_column_id IN ({})
1348 ORDER BY ac.archived_at",
1349 placeholders.join(", ")
1350 );
1351 let mut query = sqlx::query(&sql);
1352 for id in column_ids {
1353 query = query.bind(id.to_string());
1354 }
1355 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
1356
1357 let card_ids: Vec<String> = rows
1358 .iter()
1359 .map(|r| r.try_get("id").map_err(db_err))
1360 .collect::<KanbanResult<_>>()?;
1361 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1362
1363 let mut result = Vec::with_capacity(rows.len());
1364 for row in &rows {
1365 let id_str: String = row.try_get("id").map_err(db_err)?;
1366 let logs = logs_map.remove(&id_str).unwrap_or_default();
1367 let card = row_to_card(row, logs)?;
1368 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1369 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1370 result.push(ArchivedCard {
1371 card,
1372 archived_at: p_dt(&archived_at_str)?,
1373 original_column_id: p_uuid(&orig_col_str)?,
1374 original_position: row.try_get("original_position").map_err(db_err)?,
1375 });
1376 }
1377 Ok(result)
1378 })
1379 }
1380
1381 fn clear_sprint_from_archived_cards(
1382 &self,
1383 sprint_id: Uuid,
1384 timestamp: DateTime<Utc>,
1385 ) -> KanbanResult<()> {
1386 run(async {
1387 let now = fmt_dt(×tamp);
1388 sqlx::query(
1389 "UPDATE cards SET sprint_id = NULL, updated_at = ?
1390 WHERE sprint_id = ?
1391 AND id IN (SELECT card_id FROM archived_cards)",
1392 )
1393 .bind(&now)
1394 .bind(sprint_id.to_string())
1395 .execute(&self.pool)
1396 .await
1397 .map_err(db_err)?;
1398 Ok(())
1399 })
1400 }
1401
1402 fn get_sprint(&self, id: Uuid) -> KanbanResult<Option<Sprint>> {
1405 run(async {
1406 let row = sqlx::query(
1407 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1408 status, start_date, end_date, created_at, updated_at
1409 FROM sprints WHERE id = ?",
1410 )
1411 .bind(id.to_string())
1412 .fetch_optional(&self.pool)
1413 .await
1414 .map_err(db_err)?;
1415 row.as_ref().map(row_to_sprint).transpose()
1416 })
1417 }
1418
1419 fn list_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Sprint>> {
1420 run(async {
1421 let rows = sqlx::query(
1422 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1423 status, start_date, end_date, created_at, updated_at
1424 FROM sprints WHERE board_id = ? ORDER BY sprint_number",
1425 )
1426 .bind(board_id.to_string())
1427 .fetch_all(&self.pool)
1428 .await
1429 .map_err(db_err)?;
1430 rows.iter().map(row_to_sprint).collect()
1431 })
1432 }
1433
1434 fn list_all_sprints(&self) -> KanbanResult<Vec<Sprint>> {
1435 run(self.list_all_sprints_async())
1436 }
1437
1438 fn upsert_sprint(&self, sprint: Sprint) -> KanbanResult<()> {
1439 run(self.write_sprint_async(&sprint))
1440 }
1441
1442 fn delete_sprint(&self, id: Uuid) -> KanbanResult<()> {
1443 run(async {
1444 sqlx::query("DELETE FROM sprints WHERE id = ?")
1445 .bind(id.to_string())
1446 .execute(&self.pool)
1447 .await
1448 .map_err(db_err)?;
1449 Ok(())
1450 })
1451 }
1452
1453 fn delete_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1454 run(async {
1455 sqlx::query("DELETE FROM sprints WHERE board_id = ?")
1456 .bind(board_id.to_string())
1457 .execute(&self.pool)
1458 .await
1459 .map_err(db_err)?;
1460 Ok(())
1461 })
1462 }
1463
1464 fn get_graph(&self) -> KanbanResult<DependencyGraph> {
1467 run(self.get_graph_async())
1468 }
1469
1470 fn set_graph(&self, graph: DependencyGraph) -> KanbanResult<()> {
1471 run(self.write_graph_async(&graph))
1472 }
1473
1474 fn modify_graph(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
1475 run(self.modify_graph_async(f))
1476 }
1477
1478 fn snapshot(&self) -> KanbanResult<Snapshot> {
1481 run(self.snapshot_async())
1482 }
1483
1484 fn apply_snapshot(&self, snapshot: Snapshot) -> KanbanResult<()> {
1485 run(self.apply_snapshot_async(snapshot))
1486 }
1487}
1488
1489#[async_trait::async_trait]
1490impl PersistenceStore for SqliteStore {
1491 async fn save(&self, snapshot: StoreSnapshot) -> PersistenceResult<PersistenceMetadata> {
1492 let domain_snapshot: Snapshot = serde_json::from_slice(&snapshot.data)
1493 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1494 self.apply_snapshot_async(domain_snapshot)
1495 .await
1496 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1497 self.checkpoint()
1498 .await
1499 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1500 Ok(PersistenceMetadata::new(self.instance_id))
1501 }
1502
1503 async fn load(&self) -> PersistenceResult<(StoreSnapshot, PersistenceMetadata)> {
1504 let domain_snapshot = self
1505 .snapshot_async()
1506 .await
1507 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1508 let data = serde_json::to_vec(&domain_snapshot)
1509 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1510 let meta = PersistenceMetadata::new(self.instance_id);
1511 Ok((
1512 StoreSnapshot {
1513 data,
1514 metadata: meta.clone(),
1515 },
1516 meta,
1517 ))
1518 }
1519
1520 async fn exists(&self) -> bool {
1521 self.path.exists()
1522 }
1523
1524 fn path(&self) -> &std::path::Path {
1525 &self.path
1526 }
1527
1528 fn instance_id(&self) -> Uuid {
1529 self.instance_id
1530 }
1531
1532 async fn close(&self) {
1533 self.pool.close().await;
1534 }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use super::*;
1540 use tempfile::TempDir;
1541
1542 fn make_rt() -> tokio::runtime::Runtime {
1543 tokio::runtime::Builder::new_multi_thread()
1544 .enable_all()
1545 .build()
1546 .unwrap()
1547 }
1548
1549 #[test]
1550 fn test_sqlitestore_path_is_preserved() {
1551 let dir = TempDir::new().unwrap();
1552 let path = dir.path().join("test.db");
1553 let rt = make_rt();
1554 let store = rt.block_on(SqliteStore::open(&path)).unwrap();
1555 assert_eq!(store.path(), path.as_path());
1556 }
1557
1558 #[test]
1559 fn test_sqlitestore_instance_id_persists_across_reopen() {
1560 let dir = TempDir::new().unwrap();
1561 let path = dir.path().join("test.db");
1562 let rt = make_rt();
1563 let id1 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1564 let id2 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1565 assert_eq!(id1, id2, "instance_id must be stable across reopens");
1566 }
1567
1568 #[test]
1569 fn test_sqlitestore_persistence_save_load_roundtrip() {
1570 use kanban_domain::{Board, DependencyGraph};
1571 use kanban_persistence::snapshot_to_json_bytes;
1572
1573 let dir = TempDir::new().unwrap();
1574 let path = dir.path().join("test.db");
1575 let rt = make_rt();
1576
1577 rt.block_on(async {
1578 let store = SqliteStore::open(&path).await.unwrap();
1579 let board = Board::new("Test Board".to_string(), None);
1580 let snapshot = Snapshot::from_data(
1581 vec![board],
1582 vec![],
1583 vec![],
1584 vec![],
1585 vec![],
1586 DependencyGraph::new(),
1587 );
1588 let data = snapshot_to_json_bytes(&snapshot).unwrap();
1589 let meta = PersistenceMetadata::new(store.instance_id());
1590 let store_snap = StoreSnapshot {
1591 data,
1592 metadata: meta,
1593 };
1594
1595 PersistenceStore::save(&store, store_snap).await.unwrap();
1596
1597 let (loaded_snap, _meta) = PersistenceStore::load(&store).await.unwrap();
1598 let loaded: Snapshot = serde_json::from_slice(&loaded_snap.data).unwrap();
1599 assert_eq!(loaded.boards.len(), 1);
1600 assert_eq!(loaded.boards[0].name, "Test Board");
1601 });
1602 }
1603
1604 #[test]
1605 fn test_sqlitestore_exists_returns_true_after_open() {
1606 let dir = TempDir::new().unwrap();
1607 let path = dir.path().join("test.db");
1608 let rt = make_rt();
1609 rt.block_on(async {
1610 let store = SqliteStore::open(&path).await.unwrap();
1611 assert!(PersistenceStore::exists(&store).await);
1612 });
1613 }
1614
1615 #[test]
1616 fn test_checkpoint_executes_without_error() {
1617 let dir = TempDir::new().unwrap();
1618 let path = dir.path().join("test.sqlite3");
1619 let rt = make_rt();
1620 rt.block_on(async {
1621 let store = SqliteStore::open(&path).await.unwrap();
1622 store.checkpoint().await.unwrap();
1623 });
1624 }
1625
1626 #[test]
1627 fn test_save_checkpoints_wal_file_stays_minimal() {
1628 let dir = TempDir::new().unwrap();
1629 let path = dir.path().join("test.sqlite3");
1630 let rt = make_rt();
1631 rt.block_on(async {
1632 let store = SqliteStore::open(&path).await.unwrap();
1633 let (snapshot, _) = PersistenceStore::load(&store).await.unwrap();
1634 PersistenceStore::save(&store, snapshot).await.unwrap();
1635 let wal_path = path.with_extension("sqlite3-wal");
1636 if wal_path.exists() {
1637 assert!(
1638 wal_path.metadata().unwrap().len() < 32 * 1024,
1639 "WAL file should be minimal after save+checkpoint"
1640 );
1641 }
1642 });
1643 }
1644
1645 #[test]
1646 fn test_delete_archived_card_orphaned_cards_row_is_still_cleaned_up() {
1647 use kanban_domain::data_store::DataStore;
1648 let dir = TempDir::new().unwrap();
1649 let path = dir.path().join("test.sqlite3");
1650 let rt = make_rt();
1651 rt.block_on(async {
1652 let store = SqliteStore::open(&path).await.unwrap();
1653
1654 let mut board = kanban_domain::Board::new("B".to_string(), None);
1655 let column = kanban_domain::Column::new(board.id, "Col".to_string(), 0);
1656 let card = kanban_domain::Card::new(&mut board, column.id, "Task".to_string(), 0);
1657 let card_id = card.id;
1658 let column_id = column.id;
1659 store.upsert_board(board).unwrap();
1660 store.upsert_column(column).unwrap();
1661 store.upsert_card(card.clone()).unwrap();
1662
1663 let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
1666 store.insert_archived_card(archived).unwrap();
1667
1668 store.delete_archived_card(card_id).unwrap();
1669
1670 assert!(
1671 store.list_archived_cards().unwrap().is_empty(),
1672 "card should be gone from archived_cards"
1673 );
1674 assert!(
1675 store.list_all_cards().unwrap().is_empty(),
1676 "orphaned cards row should also be removed by delete_archived_card"
1677 );
1678 });
1679 }
1680
1681 #[test]
1682 fn test_delete_archived_card_removes_from_cards_table() {
1683 use kanban_domain::data_store::DataStore;
1684 let dir = TempDir::new().unwrap();
1685 let path = dir.path().join("test.sqlite3");
1686 let rt = make_rt();
1687 rt.block_on(async {
1688 let store = SqliteStore::open(&path).await.unwrap();
1689
1690 let mut board = kanban_domain::Board::new("B".to_string(), None);
1691 let column = kanban_domain::Column::new(board.id, "Col".to_string(), 0);
1692 let card = kanban_domain::Card::new(&mut board, column.id, "Task".to_string(), 0);
1693 let card_id = card.id;
1694 let column_id = column.id;
1695 store.upsert_board(board).unwrap();
1696 store.upsert_column(column).unwrap();
1697 store.upsert_card(card.clone()).unwrap();
1698
1699 let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
1700 store.insert_archived_card(archived).unwrap();
1701 store.delete_card(card_id).unwrap();
1702
1703 assert_eq!(store.list_archived_cards().unwrap().len(), 1);
1704
1705 store.delete_archived_card(card_id).unwrap();
1706
1707 assert!(
1708 store.list_archived_cards().unwrap().is_empty(),
1709 "card should be gone from archived_cards"
1710 );
1711 assert!(
1712 store.list_all_cards().unwrap().is_empty(),
1713 "card should also be gone from cards table, not restored as active"
1714 );
1715 assert!(
1716 store.get_card(card_id).unwrap().is_none(),
1717 "get_card should return None for permanently deleted card"
1718 );
1719 });
1720 }
1721}