1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use chrono::{DateTime, Utc};
5use kanban_core::graph::{Edge, Graph};
6use kanban_domain::command_store::CommandStore;
7use kanban_domain::commands::Command;
8use kanban_domain::data_store::DataStore;
9use kanban_domain::{
10 ArchivedCard, Board, Card, CardEdgeType, Column, DependencyGraph, KanbanError, KanbanResult,
11 Snapshot, Sprint, SprintLog,
12};
13use kanban_persistence::{
14 PersistenceError, PersistenceMetadata, PersistenceResult, PersistenceStore, StoreSnapshot,
15};
16use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
17use sqlx::{Pool, Row, Sqlite};
18use uuid::Uuid;
19
20const SCHEMA: &str = include_str!("schema.sql");
21
22pub struct SqliteStore {
24 pool: Pool<Sqlite>,
25 path: PathBuf,
26 instance_id: Uuid,
27}
28
29fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
30 let handle = tokio::runtime::Handle::current();
31 debug_assert!(
32 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread,
33 "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
34 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
35 because synchronous DataStore methods need to block on async SQLite I/O."
36 );
37 tokio::task::block_in_place(|| handle.block_on(f))
38}
39
40fn db_err(e: sqlx::Error) -> KanbanError {
41 KanbanError::Database(e.to_string())
42}
43
44fn ser_err(msg: impl std::fmt::Display) -> KanbanError {
45 KanbanError::Serialization(msg.to_string())
46}
47
48fn p_uuid(s: &str) -> KanbanResult<Uuid> {
49 Uuid::parse_str(s).map_err(ser_err)
50}
51
52fn p_dt(s: &str) -> KanbanResult<DateTime<Utc>> {
53 DateTime::parse_from_rfc3339(s)
54 .map_err(ser_err)
55 .map(|dt| dt.with_timezone(&Utc))
56}
57
58fn p_enum<T: serde::de::DeserializeOwned>(s: &str, label: &str) -> KanbanResult<T> {
59 serde_json::from_value(serde_json::Value::String(s.to_owned()))
60 .map_err(|_| ser_err(format!("unknown {label} variant: {s}")))
61}
62
63fn fmt_dt(dt: &DateTime<Utc>) -> String {
64 dt.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
65}
66
67fn opt_dt(dt: &Option<DateTime<Utc>>) -> Option<String> {
68 dt.as_ref().map(fmt_dt)
69}
70
71fn row_to_board(
74 row: &SqliteRow,
75 sprint_names: Vec<String>,
76 sprint_counters: HashMap<String, u32>,
77) -> KanbanResult<Board> {
78 let id_str: String = row.try_get("id").map_err(db_err)?;
79 let active_sprint_id_str: Option<String> = row.try_get("active_sprint_id").map_err(db_err)?;
80 let completion_column_id_str: Option<String> =
81 row.try_get("completion_column_id").map_err(db_err)?;
82 let task_sort_field_str: String = row.try_get("task_sort_field").map_err(db_err)?;
83 let task_sort_order_str: String = row.try_get("task_sort_order").map_err(db_err)?;
84 let task_list_view_str: String = row.try_get("task_list_view").map_err(db_err)?;
85 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
86 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
87 let sprint_duration_days_raw: Option<i32> =
88 row.try_get("sprint_duration_days").map_err(db_err)?;
89
90 Ok(Board {
91 id: p_uuid(&id_str)?,
92 name: row.try_get("name").map_err(db_err)?,
93 description: row.try_get("description").map_err(db_err)?,
94 sprint_prefix: row.try_get("sprint_prefix").map_err(db_err)?,
95 card_prefix: row.try_get("card_prefix").map_err(db_err)?,
96 task_sort_field: p_enum(&task_sort_field_str, "task_sort_field")?,
97 task_sort_order: p_enum(&task_sort_order_str, "task_sort_order")?,
98 sprint_duration_days: sprint_duration_days_raw.map(|v| v as u32),
99 sprint_names,
100 sprint_name_used_count: row
101 .try_get::<i32, _>("sprint_name_used_count")
102 .map_err(db_err)? as usize,
103 next_sprint_number: row
104 .try_get::<i32, _>("next_sprint_number")
105 .map_err(db_err)? as u32,
106 active_sprint_id: active_sprint_id_str.as_deref().map(p_uuid).transpose()?,
107 task_list_view: p_enum(&task_list_view_str, "task_list_view")?,
108 card_counter: row.try_get::<i32, _>("card_counter").map_err(db_err)? as u32,
109 sprint_counters,
110 completion_column_id: completion_column_id_str
111 .as_deref()
112 .map(p_uuid)
113 .transpose()?,
114 position: row.try_get::<i32, _>("position").map_err(db_err)?,
115 created_at: p_dt(&created_at_str)?,
116 updated_at: p_dt(&updated_at_str)?,
117 })
118}
119
120fn row_to_column(row: &SqliteRow) -> KanbanResult<Column> {
121 let id_str: String = row.try_get("id").map_err(db_err)?;
122 let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
123 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
124 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
125
126 Ok(Column {
127 id: p_uuid(&id_str)?,
128 board_id: p_uuid(&board_id_str)?,
129 name: row.try_get("name").map_err(db_err)?,
130 position: row.try_get("position").map_err(db_err)?,
131 wip_limit: row.try_get("wip_limit").map_err(db_err)?,
132 created_at: p_dt(&created_at_str)?,
133 updated_at: p_dt(&updated_at_str)?,
134 })
135}
136
137fn row_to_card(row: &SqliteRow, sprint_logs: Vec<SprintLog>) -> KanbanResult<Card> {
138 let id_str: String = row.try_get("id").map_err(db_err)?;
139 let column_id_str: String = row.try_get("column_id").map_err(db_err)?;
140 let sprint_id_str: Option<String> = row.try_get("sprint_id").map_err(db_err)?;
141 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
142 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
143 let completed_at_str: Option<String> = row.try_get("completed_at").map_err(db_err)?;
144 let due_date_str: Option<String> = row.try_get("due_date").map_err(db_err)?;
145 let priority_str: String = row.try_get("priority").map_err(db_err)?;
146 let status_str: String = row.try_get("status").map_err(db_err)?;
147 let points_raw: Option<i32> = row.try_get("points").map_err(db_err)?;
148
149 Ok(Card {
150 id: p_uuid(&id_str)?,
151 column_id: p_uuid(&column_id_str)?,
152 title: row.try_get("title").map_err(db_err)?,
153 description: row.try_get("description").map_err(db_err)?,
154 priority: p_enum(&priority_str, "priority")?,
155 status: p_enum(&status_str, "status")?,
156 position: row.try_get("position").map_err(db_err)?,
157 due_date: due_date_str.as_deref().map(p_dt).transpose()?,
158 points: points_raw
159 .map(|v| u8::try_from(v).map_err(|_| ser_err(format!("points {v} out of range"))))
160 .transpose()?,
161 card_number: row.try_get::<i32, _>("card_number").map_err(db_err)? as u32,
162 sprint_id: sprint_id_str.as_deref().map(p_uuid).transpose()?,
163 created_at: p_dt(&created_at_str)?,
164 updated_at: p_dt(&updated_at_str)?,
165 completed_at: completed_at_str.as_deref().map(p_dt).transpose()?,
166 sprint_logs,
167 })
168}
169
170fn row_to_sprint(row: &SqliteRow) -> KanbanResult<Sprint> {
171 let id_str: String = row.try_get("id").map_err(db_err)?;
172 let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
173 let status_str: String = row.try_get("status").map_err(db_err)?;
174 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
175 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
176 let start_date_str: Option<String> = row.try_get("start_date").map_err(db_err)?;
177 let end_date_str: Option<String> = row.try_get("end_date").map_err(db_err)?;
178 let name_index_raw: Option<i32> = row.try_get("name_index").map_err(db_err)?;
179
180 Ok(Sprint {
181 id: p_uuid(&id_str)?,
182 board_id: p_uuid(&board_id_str)?,
183 sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
184 name_index: name_index_raw.map(|v| v as usize),
185 prefix: row.try_get("prefix").map_err(db_err)?,
186 card_prefix: row.try_get("card_prefix").map_err(db_err)?,
187 status: p_enum(&status_str, "sprint status")?,
188 start_date: start_date_str.as_deref().map(p_dt).transpose()?,
189 end_date: end_date_str.as_deref().map(p_dt).transpose()?,
190 created_at: p_dt(&created_at_str)?,
191 updated_at: p_dt(&updated_at_str)?,
192 })
193}
194
195fn rows_to_graph(rows: &[SqliteRow]) -> KanbanResult<DependencyGraph> {
196 let mut graph: Graph<CardEdgeType> = Graph::new();
197 for row in rows {
198 let source_str: String = row.try_get("source_id").map_err(db_err)?;
199 let target_str: String = row.try_get("target_id").map_err(db_err)?;
200 let edge_type_str: String = row.try_get("edge_type").map_err(db_err)?;
201 let direction_str: String = row.try_get("direction").map_err(db_err)?;
202 let weight: Option<f64> = row.try_get("weight").map_err(db_err)?;
203 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
204 let archived_at_str: Option<String> = row.try_get("archived_at").map_err(db_err)?;
205
206 graph.add_edge(Edge {
207 source: p_uuid(&source_str)?,
208 target: p_uuid(&target_str)?,
209 edge_type: p_enum(&edge_type_str, "edge_type")?,
210 direction: p_enum(&direction_str, "edge direction")?,
211 weight: weight.map(|w| w as f32),
212 created_at: p_dt(&created_at_str)?,
213 archived_at: archived_at_str.as_deref().map(p_dt).transpose()?,
214 });
215 }
216 Ok(DependencyGraph { cards: graph })
217}
218
219fn row_to_sprint_log(row: &SqliteRow) -> KanbanResult<SprintLog> {
220 let sprint_id_str: String = row.try_get("sprint_id").map_err(db_err)?;
221 let started_at_str: String = row.try_get("started_at").map_err(db_err)?;
222 let ended_at_str: Option<String> = row.try_get("ended_at").map_err(db_err)?;
223
224 Ok(SprintLog {
225 sprint_id: p_uuid(&sprint_id_str)?,
226 sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
227 sprint_name: row.try_get("sprint_name").map_err(db_err)?,
228 started_at: p_dt(&started_at_str)?,
229 ended_at: ended_at_str.as_deref().map(p_dt).transpose()?,
230 status: row.try_get("status").map_err(db_err)?,
231 })
232}
233
234impl SqliteStore {
237 pub async fn open(path: impl AsRef<Path>) -> KanbanResult<Self> {
238 let handle = tokio::runtime::Handle::current();
239 if handle.runtime_flavor() != tokio::runtime::RuntimeFlavor::MultiThread {
240 return Err(KanbanError::Database(
241 "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
242 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
243 because synchronous DataStore methods need to block on async SQLite I/O."
244 .to_string(),
245 ));
246 }
247
248 let path_buf = path.as_ref().to_path_buf();
249
250 let options = SqliteConnectOptions::new()
251 .filename(&path_buf)
252 .create_if_missing(true)
253 .foreign_keys(true)
254 .pragma("journal_mode", "wal");
255
256 let pool = SqlitePoolOptions::new()
257 .max_connections(2)
258 .connect_with(options)
259 .await
260 .map_err(|e| KanbanError::Database(e.to_string()))?;
261
262 sqlx::raw_sql(SCHEMA)
263 .execute(&pool)
264 .await
265 .map_err(|e| KanbanError::Database(e.to_string()))?;
266
267 Self::migrate(&pool).await?;
268
269 let instance_id = Self::load_or_create_instance_id(&pool).await?;
270
271 Ok(Self {
272 pool,
273 path: path_buf,
274 instance_id,
275 })
276 }
277
278 async fn load_or_create_instance_id(pool: &Pool<Sqlite>) -> KanbanResult<Uuid> {
279 let row: Option<String> =
280 sqlx::query_scalar("SELECT instance_id FROM metadata WHERE id = 1")
281 .fetch_optional(pool)
282 .await
283 .map_err(db_err)?;
284 match row {
285 Some(s) => p_uuid(&s),
286 None => {
287 let id = Uuid::new_v4();
288 let now = Utc::now().to_rfc3339();
289 sqlx::query(
290 "INSERT INTO metadata (id, instance_id, saved_at, schema_version) VALUES (1, ?, ?, 1)",
291 )
292 .bind(id.to_string())
293 .bind(&now)
294 .execute(pool)
295 .await
296 .map_err(db_err)?;
297 Ok(id)
298 }
299 }
300 }
301
302 async fn migrate(pool: &Pool<Sqlite>) -> KanbanResult<()> {
303 let has_snapshot_col: bool = sqlx::query_scalar(
304 "SELECT COUNT(*) > 0 FROM pragma_table_info('command_log') WHERE name = 'snapshot_data'",
305 )
306 .fetch_one(pool)
307 .await
308 .map_err(db_err)?;
309
310 if !has_snapshot_col {
311 sqlx::raw_sql("ALTER TABLE command_log ADD COLUMN snapshot_data BLOB")
312 .execute(pool)
313 .await
314 .map_err(db_err)?;
315 }
316
317 let has_position_col: bool = sqlx::query_scalar(
318 "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'position'",
319 )
320 .fetch_one(pool)
321 .await
322 .map_err(db_err)?;
323
324 if !has_position_col {
325 sqlx::raw_sql("ALTER TABLE boards ADD COLUMN position INTEGER NOT NULL DEFAULT 0")
326 .execute(pool)
327 .await
328 .map_err(db_err)?;
329 }
330
331 let has_card_counter_col: bool = sqlx::query_scalar(
332 "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'card_counter'",
333 )
334 .fetch_one(pool)
335 .await
336 .map_err(db_err)?;
337
338 if !has_card_counter_col {
339 sqlx::raw_sql("ALTER TABLE boards ADD COLUMN card_counter INTEGER NOT NULL DEFAULT 1")
340 .execute(pool)
341 .await
342 .map_err(db_err)?;
343 }
344
345 Ok(())
346 }
347
348 pub fn pool(&self) -> &Pool<Sqlite> {
349 &self.pool
350 }
351
352 pub async fn checkpoint(&self) -> KanbanResult<()> {
353 sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
354 .execute(&self.pool)
355 .await
356 .map_err(|e| KanbanError::Database(e.to_string()))?;
357 Ok(())
358 }
359
360 async fn fetch_board_aux(
361 &self,
362 board_id: &str,
363 ) -> KanbanResult<(Vec<String>, HashMap<String, u32>)> {
364 let name_rows =
365 sqlx::query("SELECT name FROM board_sprint_names WHERE board_id = ? ORDER BY position")
366 .bind(board_id)
367 .fetch_all(&self.pool)
368 .await
369 .map_err(db_err)?;
370 let sprint_names: Vec<String> = name_rows
371 .iter()
372 .map(|r| r.try_get("name").map_err(db_err))
373 .collect::<KanbanResult<_>>()?;
374
375 let counter_rows =
376 sqlx::query("SELECT prefix, counter FROM board_sprint_counters WHERE board_id = ?")
377 .bind(board_id)
378 .fetch_all(&self.pool)
379 .await
380 .map_err(db_err)?;
381 let mut sprint_counters = HashMap::new();
382 for row in &counter_rows {
383 let prefix: String = row.try_get("prefix").map_err(db_err)?;
384 let counter: i32 = row.try_get("counter").map_err(db_err)?;
385 sprint_counters.insert(prefix, counter as u32);
386 }
387
388 Ok((sprint_names, sprint_counters))
389 }
390
391 async fn write_board_with_conn(
392 conn: &mut sqlx::SqliteConnection,
393 board: &Board,
394 ) -> KanbanResult<()> {
395 let id = board.id.to_string();
396
397 sqlx::query(
398 "INSERT INTO boards (id, name, description, sprint_prefix, card_prefix,
399 task_sort_field, task_sort_order, sprint_duration_days,
400 sprint_name_used_count, next_sprint_number, active_sprint_id,
401 task_list_view, card_counter, completion_column_id, position,
402 created_at, updated_at)
403 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
404 ON CONFLICT(id) DO UPDATE SET
405 name=excluded.name, description=excluded.description,
406 sprint_prefix=excluded.sprint_prefix, card_prefix=excluded.card_prefix,
407 task_sort_field=excluded.task_sort_field, task_sort_order=excluded.task_sort_order,
408 sprint_duration_days=excluded.sprint_duration_days,
409 sprint_name_used_count=excluded.sprint_name_used_count,
410 next_sprint_number=excluded.next_sprint_number,
411 active_sprint_id=excluded.active_sprint_id,
412 task_list_view=excluded.task_list_view, card_counter=excluded.card_counter,
413 completion_column_id=excluded.completion_column_id,
414 position=excluded.position,
415 updated_at=excluded.updated_at",
416 )
417 .bind(&id)
418 .bind(&board.name)
419 .bind(&board.description)
420 .bind(&board.sprint_prefix)
421 .bind(&board.card_prefix)
422 .bind(format!("{:?}", board.task_sort_field))
423 .bind(format!("{:?}", board.task_sort_order))
424 .bind(board.sprint_duration_days.map(|v| v as i32))
425 .bind(board.sprint_name_used_count as i32)
426 .bind(board.next_sprint_number as i32)
427 .bind(board.active_sprint_id.map(|id| id.to_string()))
428 .bind(format!("{:?}", board.task_list_view))
429 .bind(board.card_counter as i32)
430 .bind(board.completion_column_id.map(|id| id.to_string()))
431 .bind(board.position)
432 .bind(fmt_dt(&board.created_at))
433 .bind(fmt_dt(&board.updated_at))
434 .execute(&mut *conn)
435 .await
436 .map_err(db_err)?;
437
438 sqlx::query("DELETE FROM board_sprint_names WHERE board_id = ?")
439 .bind(&id)
440 .execute(&mut *conn)
441 .await
442 .map_err(db_err)?;
443 for (i, name) in board.sprint_names.iter().enumerate() {
444 sqlx::query(
445 "INSERT INTO board_sprint_names (board_id, position, name) VALUES (?, ?, ?)",
446 )
447 .bind(&id)
448 .bind(i as i32)
449 .bind(name)
450 .execute(&mut *conn)
451 .await
452 .map_err(db_err)?;
453 }
454
455 sqlx::query("DELETE FROM board_sprint_counters WHERE board_id = ?")
456 .bind(&id)
457 .execute(&mut *conn)
458 .await
459 .map_err(db_err)?;
460 for (prefix, counter) in &board.sprint_counters {
461 sqlx::query(
462 "INSERT INTO board_sprint_counters (board_id, prefix, counter) VALUES (?, ?, ?)",
463 )
464 .bind(&id)
465 .bind(prefix)
466 .bind(*counter as i32)
467 .execute(&mut *conn)
468 .await
469 .map_err(db_err)?;
470 }
471
472 Ok(())
473 }
474
475 async fn write_board_async(&self, board: &Board) -> KanbanResult<()> {
476 let mut tx = self.pool.begin().await.map_err(db_err)?;
477 Self::write_board_with_conn(&mut tx, board).await?;
478 tx.commit().await.map_err(db_err)?;
479 Ok(())
480 }
481
482 async fn fetch_sprint_logs_for_card(&self, card_id: &str) -> KanbanResult<Vec<SprintLog>> {
483 let rows = sqlx::query(
484 "SELECT sprint_id, sprint_number, sprint_name, started_at, ended_at, status
485 FROM sprint_logs WHERE card_id = ? ORDER BY id",
486 )
487 .bind(card_id)
488 .fetch_all(&self.pool)
489 .await
490 .map_err(db_err)?;
491 rows.iter().map(row_to_sprint_log).collect()
492 }
493
494 async fn write_card_with_conn(
495 conn: &mut sqlx::SqliteConnection,
496 card: &Card,
497 ) -> KanbanResult<()> {
498 let id = card.id.to_string();
499
500 sqlx::query(
501 "INSERT INTO cards (id, column_id, title, description, priority, status, position,
502 due_date, points, card_number, sprint_id, created_at, updated_at, completed_at)
503 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
504 ON CONFLICT(id) DO UPDATE SET
505 column_id=excluded.column_id, title=excluded.title,
506 description=excluded.description, priority=excluded.priority,
507 status=excluded.status, position=excluded.position,
508 due_date=excluded.due_date, points=excluded.points,
509 card_number=excluded.card_number, sprint_id=excluded.sprint_id,
510 updated_at=excluded.updated_at, completed_at=excluded.completed_at",
511 )
512 .bind(&id)
513 .bind(card.column_id.to_string())
514 .bind(&card.title)
515 .bind(&card.description)
516 .bind(format!("{:?}", card.priority))
517 .bind(format!("{:?}", card.status))
518 .bind(card.position)
519 .bind(opt_dt(&card.due_date))
520 .bind(card.points.map(|v| v as i32))
521 .bind(card.card_number as i32)
522 .bind(card.sprint_id.map(|id| id.to_string()))
523 .bind(fmt_dt(&card.created_at))
524 .bind(fmt_dt(&card.updated_at))
525 .bind(opt_dt(&card.completed_at))
526 .execute(&mut *conn)
527 .await
528 .map_err(db_err)?;
529
530 sqlx::query("DELETE FROM sprint_logs WHERE card_id = ?")
531 .bind(&id)
532 .execute(&mut *conn)
533 .await
534 .map_err(db_err)?;
535 for log in &card.sprint_logs {
536 sqlx::query(
537 "INSERT INTO sprint_logs (card_id, sprint_id, sprint_number, sprint_name,
538 started_at, ended_at, status)
539 VALUES (?, ?, ?, ?, ?, ?, ?)",
540 )
541 .bind(&id)
542 .bind(log.sprint_id.to_string())
543 .bind(log.sprint_number as i32)
544 .bind(&log.sprint_name)
545 .bind(fmt_dt(&log.started_at))
546 .bind(opt_dt(&log.ended_at))
547 .bind(&log.status)
548 .execute(&mut *conn)
549 .await
550 .map_err(db_err)?;
551 }
552
553 Ok(())
554 }
555
556 async fn write_card_async(&self, card: &Card) -> KanbanResult<()> {
557 let mut tx = self.pool.begin().await.map_err(db_err)?;
558 Self::write_card_with_conn(&mut tx, card).await?;
559 tx.commit().await.map_err(db_err)?;
560 Ok(())
561 }
562
563 async fn fetch_sprint_logs_batch(
564 &self,
565 card_ids: &[String],
566 ) -> KanbanResult<HashMap<String, Vec<SprintLog>>> {
567 if card_ids.is_empty() {
568 return Ok(HashMap::new());
569 }
570 let placeholders = card_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
571 let sql = format!(
572 "SELECT card_id, sprint_id, sprint_number, sprint_name, started_at, ended_at, status
573 FROM sprint_logs WHERE card_id IN ({placeholders}) ORDER BY id"
574 );
575 let mut query = sqlx::query(&sql);
576 for id in card_ids {
577 query = query.bind(id);
578 }
579 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
580 let mut map: HashMap<String, Vec<SprintLog>> = HashMap::new();
581 for row in &rows {
582 let card_id: String = row.try_get("card_id").map_err(db_err)?;
583 let log = row_to_sprint_log(row)?;
584 map.entry(card_id).or_default().push(log);
585 }
586 Ok(map)
587 }
588
589 async fn fetch_cards_with_filter(
590 &self,
591 where_clause: &str,
592 binds: &[String],
593 ) -> KanbanResult<Vec<Card>> {
594 let sql = format!(
595 "SELECT id, column_id, title, description, priority, status, position,
596 due_date, points, card_number, sprint_id, created_at, updated_at, completed_at
597 FROM cards WHERE id NOT IN (SELECT card_id FROM archived_cards) {}
598 ORDER BY position ASC, created_at ASC",
599 where_clause
600 );
601 let mut query = sqlx::query(&sql);
602 for b in binds {
603 query = query.bind(b);
604 }
605 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
606
607 let card_ids: Vec<String> = rows
608 .iter()
609 .map(|r| r.try_get("id").map_err(db_err))
610 .collect::<KanbanResult<_>>()?;
611 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
612
613 let mut cards = Vec::with_capacity(rows.len());
614 for row in &rows {
615 let id_str: String = row.try_get("id").map_err(db_err)?;
616 let logs = logs_map.remove(&id_str).unwrap_or_default();
617 cards.push(row_to_card(row, logs)?);
618 }
619 Ok(cards)
620 }
621
622 async fn get_graph_with_conn(
623 conn: &mut sqlx::SqliteConnection,
624 ) -> KanbanResult<DependencyGraph> {
625 let rows = sqlx::query(
626 "SELECT source_id, target_id, edge_type, direction, weight, created_at, archived_at
627 FROM card_edges",
628 )
629 .fetch_all(&mut *conn)
630 .await
631 .map_err(db_err)?;
632 rows_to_graph(&rows)
633 }
634
635 async fn modify_graph_async(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
636 let mut tx = self.pool.begin().await.map_err(db_err)?;
637 let mut graph = Self::get_graph_with_conn(&mut tx).await?;
638 f(&mut graph)?;
639 Self::write_graph_with_conn(&mut tx, &graph).await?;
640 tx.commit().await.map_err(db_err)?;
641 Ok(())
642 }
643
644 async fn write_graph_with_conn(
645 conn: &mut sqlx::SqliteConnection,
646 graph: &DependencyGraph,
647 ) -> KanbanResult<()> {
648 sqlx::query("DELETE FROM card_edges")
649 .execute(&mut *conn)
650 .await
651 .map_err(db_err)?;
652
653 for edge in graph.cards.edges() {
654 sqlx::query(
655 "INSERT INTO card_edges
656 (source_id, target_id, edge_type, direction, weight, created_at, archived_at)
657 VALUES (?, ?, ?, ?, ?, ?, ?)",
658 )
659 .bind(edge.source.to_string())
660 .bind(edge.target.to_string())
661 .bind(format!("{:?}", edge.edge_type))
662 .bind(format!("{:?}", edge.direction))
663 .bind(edge.weight.map(|w| w as f64))
664 .bind(fmt_dt(&edge.created_at))
665 .bind(opt_dt(&edge.archived_at))
666 .execute(&mut *conn)
667 .await
668 .map_err(db_err)?;
669 }
670
671 Ok(())
672 }
673
674 async fn write_graph_async(&self, graph: &DependencyGraph) -> KanbanResult<()> {
675 let mut tx = self.pool.begin().await.map_err(db_err)?;
676 Self::write_graph_with_conn(&mut tx, graph).await?;
677 tx.commit().await.map_err(db_err)?;
678 Ok(())
679 }
680
681 async fn snapshot_async(&self) -> KanbanResult<Snapshot> {
682 let boards = self.list_boards_async().await?;
683 let columns = self.list_all_columns_async().await?;
684 let cards = self.fetch_cards_with_filter("", &[]).await?;
685 let archived_cards = self.list_archived_cards_async().await?;
686 let sprints = self.list_all_sprints_async().await?;
687 let graph = self.get_graph_async().await?;
688 Ok(Snapshot::from_data(
689 boards,
690 columns,
691 cards,
692 archived_cards,
693 sprints,
694 graph,
695 ))
696 }
697
698 async fn apply_snapshot_async(&self, snapshot: Snapshot) -> KanbanResult<()> {
699 let mut tx = self.pool.begin().await.map_err(db_err)?;
700
701 sqlx::query("PRAGMA defer_foreign_keys = ON")
702 .execute(&mut *tx)
703 .await
704 .map_err(db_err)?;
705
706 sqlx::query("DELETE FROM card_edges")
707 .execute(&mut *tx)
708 .await
709 .map_err(db_err)?;
710 sqlx::query("DELETE FROM archived_cards")
711 .execute(&mut *tx)
712 .await
713 .map_err(db_err)?;
714 sqlx::query("DELETE FROM sprint_logs")
715 .execute(&mut *tx)
716 .await
717 .map_err(db_err)?;
718 sqlx::query("DELETE FROM cards")
719 .execute(&mut *tx)
720 .await
721 .map_err(db_err)?;
722 sqlx::query("DELETE FROM sprints")
723 .execute(&mut *tx)
724 .await
725 .map_err(db_err)?;
726 sqlx::query("DELETE FROM board_sprint_names")
727 .execute(&mut *tx)
728 .await
729 .map_err(db_err)?;
730 sqlx::query("DELETE FROM board_sprint_counters")
731 .execute(&mut *tx)
732 .await
733 .map_err(db_err)?;
734 sqlx::query("DELETE FROM columns")
735 .execute(&mut *tx)
736 .await
737 .map_err(db_err)?;
738 sqlx::query("DELETE FROM boards")
739 .execute(&mut *tx)
740 .await
741 .map_err(db_err)?;
742
743 for board in &snapshot.boards {
744 Self::write_board_with_conn(&mut tx, board).await?;
745 }
746 for column in &snapshot.columns {
747 Self::write_column_with_conn(&mut tx, column).await?;
748 }
749 for sprint in &snapshot.sprints {
750 Self::write_sprint_with_conn(&mut tx, sprint).await?;
751 }
752 for card in &snapshot.cards {
753 Self::write_card_with_conn(&mut tx, card).await?;
754 }
755 for ac in &snapshot.archived_cards {
756 Self::write_archived_card_with_conn(&mut tx, ac).await?;
757 }
758 Self::write_graph_with_conn(&mut tx, &snapshot.graph).await?;
759
760 tx.commit().await.map_err(db_err)?;
761 Ok(())
762 }
763
764 async fn fetch_all_board_aux(
765 &self,
766 ) -> KanbanResult<(
767 HashMap<String, Vec<String>>,
768 HashMap<String, HashMap<String, u32>>,
769 )> {
770 let name_rows = sqlx::query(
771 "SELECT board_id, name FROM board_sprint_names ORDER BY board_id, position",
772 )
773 .fetch_all(&self.pool)
774 .await
775 .map_err(db_err)?;
776 let mut names_map: HashMap<String, Vec<String>> = HashMap::new();
777 for row in &name_rows {
778 let board_id: String = row.try_get("board_id").map_err(db_err)?;
779 let name: String = row.try_get("name").map_err(db_err)?;
780 names_map.entry(board_id).or_default().push(name);
781 }
782
783 let counter_rows =
784 sqlx::query("SELECT board_id, prefix, counter FROM board_sprint_counters")
785 .fetch_all(&self.pool)
786 .await
787 .map_err(db_err)?;
788 let mut counters_map: HashMap<String, HashMap<String, u32>> = HashMap::new();
789 for row in &counter_rows {
790 let board_id: String = row.try_get("board_id").map_err(db_err)?;
791 let prefix: String = row.try_get("prefix").map_err(db_err)?;
792 let counter: i32 = row.try_get("counter").map_err(db_err)?;
793 counters_map
794 .entry(board_id)
795 .or_default()
796 .insert(prefix, counter as u32);
797 }
798
799 Ok((names_map, counters_map))
800 }
801
802 async fn list_boards_async(&self) -> KanbanResult<Vec<Board>> {
803 let rows = sqlx::query(
804 "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
805 task_sort_order, sprint_duration_days, sprint_name_used_count,
806 next_sprint_number, active_sprint_id, task_list_view,
807 COALESCE(card_counter, 1) as card_counter,
808 completion_column_id, position, created_at, updated_at
809 FROM boards ORDER BY position ASC",
810 )
811 .fetch_all(&self.pool)
812 .await
813 .map_err(db_err)?;
814
815 let (mut names_map, mut counters_map) = self.fetch_all_board_aux().await?;
816
817 let mut boards = Vec::with_capacity(rows.len());
818 for row in &rows {
819 let id_str: String = row.try_get("id").map_err(db_err)?;
820 let names = names_map.remove(&id_str).unwrap_or_default();
821 let counters = counters_map.remove(&id_str).unwrap_or_default();
822 boards.push(row_to_board(row, names, counters)?);
823 }
824 Ok(boards)
825 }
826
827 async fn list_all_columns_async(&self) -> KanbanResult<Vec<Column>> {
828 let rows = sqlx::query(
829 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
830 FROM columns ORDER BY position",
831 )
832 .fetch_all(&self.pool)
833 .await
834 .map_err(db_err)?;
835 rows.iter().map(row_to_column).collect()
836 }
837
838 async fn list_all_sprints_async(&self) -> KanbanResult<Vec<Sprint>> {
839 let rows = sqlx::query(
840 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
841 status, start_date, end_date, created_at, updated_at
842 FROM sprints ORDER BY sprint_number",
843 )
844 .fetch_all(&self.pool)
845 .await
846 .map_err(db_err)?;
847 rows.iter().map(row_to_sprint).collect()
848 }
849
850 async fn list_archived_cards_async(&self) -> KanbanResult<Vec<ArchivedCard>> {
851 let rows = sqlx::query(
852 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
853 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
854 c.created_at, c.updated_at, c.completed_at,
855 ac.archived_at, ac.original_column_id, ac.original_position
856 FROM archived_cards ac
857 JOIN cards c ON ac.card_id = c.id
858 ORDER BY ac.archived_at",
859 )
860 .fetch_all(&self.pool)
861 .await
862 .map_err(db_err)?;
863
864 let card_ids: Vec<String> = rows
865 .iter()
866 .map(|r| r.try_get("id").map_err(db_err))
867 .collect::<KanbanResult<_>>()?;
868 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
869
870 let mut result = Vec::with_capacity(rows.len());
871 for row in &rows {
872 let id_str: String = row.try_get("id").map_err(db_err)?;
873 let logs = logs_map.remove(&id_str).unwrap_or_default();
874 let card = row_to_card(row, logs)?;
875 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
876 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
877 result.push(ArchivedCard {
878 card,
879 archived_at: p_dt(&archived_at_str)?,
880 original_column_id: p_uuid(&orig_col_str)?,
881 original_position: row.try_get("original_position").map_err(db_err)?,
882 });
883 }
884 Ok(result)
885 }
886
887 async fn get_graph_async(&self) -> KanbanResult<DependencyGraph> {
888 let rows = sqlx::query(
889 "SELECT source_id, target_id, edge_type, direction, weight, created_at, archived_at
890 FROM card_edges",
891 )
892 .fetch_all(&self.pool)
893 .await
894 .map_err(db_err)?;
895 rows_to_graph(&rows)
896 }
897
898 async fn write_column_with_conn(
899 conn: &mut sqlx::SqliteConnection,
900 column: &Column,
901 ) -> KanbanResult<()> {
902 sqlx::query(
903 "INSERT INTO columns (id, board_id, name, position, wip_limit, created_at, updated_at)
904 VALUES (?, ?, ?, ?, ?, ?, ?)
905 ON CONFLICT(id) DO UPDATE SET
906 board_id=excluded.board_id, name=excluded.name,
907 position=excluded.position, wip_limit=excluded.wip_limit,
908 updated_at=excluded.updated_at",
909 )
910 .bind(column.id.to_string())
911 .bind(column.board_id.to_string())
912 .bind(&column.name)
913 .bind(column.position)
914 .bind(column.wip_limit)
915 .bind(fmt_dt(&column.created_at))
916 .bind(fmt_dt(&column.updated_at))
917 .execute(&mut *conn)
918 .await
919 .map_err(db_err)?;
920 Ok(())
921 }
922
923 async fn write_column_async(&self, column: &Column) -> KanbanResult<()> {
924 Self::write_column_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, column).await
925 }
926
927 async fn write_sprint_with_conn(
928 conn: &mut sqlx::SqliteConnection,
929 sprint: &Sprint,
930 ) -> KanbanResult<()> {
931 sqlx::query(
932 "INSERT INTO sprints (id, board_id, sprint_number, name_index, prefix, card_prefix,
933 status, start_date, end_date, created_at, updated_at)
934 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
935 ON CONFLICT(id) DO UPDATE SET
936 board_id=excluded.board_id, sprint_number=excluded.sprint_number,
937 name_index=excluded.name_index, prefix=excluded.prefix,
938 card_prefix=excluded.card_prefix, status=excluded.status,
939 start_date=excluded.start_date, end_date=excluded.end_date,
940 updated_at=excluded.updated_at",
941 )
942 .bind(sprint.id.to_string())
943 .bind(sprint.board_id.to_string())
944 .bind(sprint.sprint_number as i32)
945 .bind(sprint.name_index.map(|v| v as i32))
946 .bind(&sprint.prefix)
947 .bind(&sprint.card_prefix)
948 .bind(format!("{:?}", sprint.status))
949 .bind(opt_dt(&sprint.start_date))
950 .bind(opt_dt(&sprint.end_date))
951 .bind(fmt_dt(&sprint.created_at))
952 .bind(fmt_dt(&sprint.updated_at))
953 .execute(&mut *conn)
954 .await
955 .map_err(db_err)?;
956 Ok(())
957 }
958
959 async fn write_sprint_async(&self, sprint: &Sprint) -> KanbanResult<()> {
960 Self::write_sprint_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, sprint).await
961 }
962
963 async fn write_archived_card_with_conn(
964 conn: &mut sqlx::SqliteConnection,
965 ac: &ArchivedCard,
966 ) -> KanbanResult<()> {
967 Self::write_card_with_conn(conn, &ac.card).await?;
968 sqlx::query(
969 "INSERT INTO archived_cards (card_id, archived_at, original_column_id, original_position)
970 VALUES (?, ?, ?, ?)
971 ON CONFLICT(card_id) DO UPDATE SET
972 archived_at=excluded.archived_at,
973 original_column_id=excluded.original_column_id,
974 original_position=excluded.original_position",
975 )
976 .bind(ac.card.id.to_string())
977 .bind(fmt_dt(&ac.archived_at))
978 .bind(ac.original_column_id.to_string())
979 .bind(ac.original_position)
980 .execute(&mut *conn)
981 .await
982 .map_err(db_err)?;
983 Ok(())
984 }
985
986 async fn write_archived_card_async(&self, ac: &ArchivedCard) -> KanbanResult<()> {
987 let mut tx = self.pool.begin().await.map_err(db_err)?;
988 Self::write_archived_card_with_conn(&mut tx, ac).await?;
989 tx.commit().await.map_err(db_err)?;
990 Ok(())
991 }
992}
993
994impl DataStore for SqliteStore {
995 fn get_board(&self, id: Uuid) -> KanbanResult<Option<Board>> {
998 run(async {
999 let id_str = id.to_string();
1000 let row = sqlx::query(
1001 "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1002 task_sort_order, sprint_duration_days, sprint_name_used_count,
1003 next_sprint_number, active_sprint_id, task_list_view,
1004 COALESCE(card_counter, 1) as card_counter,
1005 completion_column_id, position, created_at, updated_at
1006 FROM boards WHERE id = ?",
1007 )
1008 .bind(&id_str)
1009 .fetch_optional(&self.pool)
1010 .await
1011 .map_err(db_err)?;
1012
1013 match row {
1014 Some(row) => {
1015 let (names, counters) = self.fetch_board_aux(&id_str).await?;
1016 Ok(Some(row_to_board(&row, names, counters)?))
1017 }
1018 None => Ok(None),
1019 }
1020 })
1021 }
1022
1023 fn list_boards(&self) -> KanbanResult<Vec<Board>> {
1024 run(self.list_boards_async())
1025 }
1026
1027 fn upsert_board(&self, board: Board) -> KanbanResult<()> {
1028 run(self.write_board_async(&board))
1029 }
1030
1031 fn delete_board(&self, id: Uuid) -> KanbanResult<()> {
1032 run(async {
1033 sqlx::query("DELETE FROM boards WHERE id = ?")
1034 .bind(id.to_string())
1035 .execute(&self.pool)
1036 .await
1037 .map_err(db_err)?;
1038 Ok(())
1039 })
1040 }
1041
1042 fn get_column(&self, id: Uuid) -> KanbanResult<Option<Column>> {
1045 run(async {
1046 let row = sqlx::query(
1047 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1048 FROM columns WHERE id = ?",
1049 )
1050 .bind(id.to_string())
1051 .fetch_optional(&self.pool)
1052 .await
1053 .map_err(db_err)?;
1054 row.as_ref().map(row_to_column).transpose()
1055 })
1056 }
1057
1058 fn list_columns_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Column>> {
1059 run(async {
1060 let rows = sqlx::query(
1061 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1062 FROM columns WHERE board_id = ? ORDER BY position",
1063 )
1064 .bind(board_id.to_string())
1065 .fetch_all(&self.pool)
1066 .await
1067 .map_err(db_err)?;
1068 rows.iter().map(row_to_column).collect()
1069 })
1070 }
1071
1072 fn list_all_columns(&self) -> KanbanResult<Vec<Column>> {
1073 run(self.list_all_columns_async())
1074 }
1075
1076 fn upsert_column(&self, column: Column) -> KanbanResult<()> {
1077 run(self.write_column_async(&column))
1078 }
1079
1080 fn delete_column(&self, id: Uuid) -> KanbanResult<()> {
1081 run(async {
1082 sqlx::query("DELETE FROM columns WHERE id = ?")
1083 .bind(id.to_string())
1084 .execute(&self.pool)
1085 .await
1086 .map_err(db_err)?;
1087 Ok(())
1088 })
1089 }
1090
1091 fn delete_columns_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1092 run(async {
1093 sqlx::query("DELETE FROM columns WHERE board_id = ?")
1094 .bind(board_id.to_string())
1095 .execute(&self.pool)
1096 .await
1097 .map_err(db_err)?;
1098 Ok(())
1099 })
1100 }
1101
1102 fn get_card(&self, id: Uuid) -> KanbanResult<Option<Card>> {
1105 run(async {
1106 let id_str = id.to_string();
1107 let row = sqlx::query(
1108 "SELECT id, column_id, title, description, priority, status, position,
1109 due_date, points, card_number, sprint_id, created_at, updated_at,
1110 completed_at
1111 FROM cards
1112 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1113 )
1114 .bind(&id_str)
1115 .fetch_optional(&self.pool)
1116 .await
1117 .map_err(db_err)?;
1118
1119 match row {
1120 Some(row) => {
1121 let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1122 Ok(Some(row_to_card(&row, logs)?))
1123 }
1124 None => Ok(None),
1125 }
1126 })
1127 }
1128
1129 fn list_all_cards(&self) -> KanbanResult<Vec<Card>> {
1130 run(self.fetch_cards_with_filter("", &[]))
1131 }
1132
1133 fn list_cards_by_column(&self, column_id: Uuid) -> KanbanResult<Vec<Card>> {
1134 run(self.fetch_cards_with_filter("AND column_id = ?", &[column_id.to_string()]))
1135 }
1136
1137 fn list_cards_by_sprint(&self, sprint_id: Uuid) -> KanbanResult<Vec<Card>> {
1138 run(self.fetch_cards_with_filter("AND sprint_id = ?", &[sprint_id.to_string()]))
1139 }
1140
1141 fn count_cards_in_column(&self, column_id: Uuid) -> KanbanResult<usize> {
1142 run(async {
1143 let row = sqlx::query(
1144 "SELECT COUNT(*) as cnt FROM cards
1145 WHERE column_id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1146 )
1147 .bind(column_id.to_string())
1148 .fetch_one(&self.pool)
1149 .await
1150 .map_err(db_err)?;
1151 Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1152 })
1153 }
1154
1155 fn count_cards_in_column_excluding(
1156 &self,
1157 column_id: Uuid,
1158 exclude: &[Uuid],
1159 ) -> KanbanResult<usize> {
1160 run(async {
1161 if exclude.is_empty() {
1162 return self.count_cards_in_column(column_id);
1163 }
1164 let placeholders = exclude.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1165 let sql = format!(
1166 "SELECT COUNT(*) as cnt FROM cards
1167 WHERE column_id = ?
1168 AND id NOT IN (SELECT card_id FROM archived_cards)
1169 AND id NOT IN ({placeholders})"
1170 );
1171 let mut query = sqlx::query(&sql).bind(column_id.to_string());
1172 for id in exclude {
1173 query = query.bind(id.to_string());
1174 }
1175 let row = query.fetch_one(&self.pool).await.map_err(db_err)?;
1176 Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1177 })
1178 }
1179
1180 fn upsert_card(&self, card: Card) -> KanbanResult<()> {
1181 run(self.write_card_async(&card))
1182 }
1183
1184 fn delete_card(&self, id: Uuid) -> KanbanResult<()> {
1185 run(async {
1186 sqlx::query(
1187 "DELETE FROM cards
1188 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1189 )
1190 .bind(id.to_string())
1191 .execute(&self.pool)
1192 .await
1193 .map_err(db_err)?;
1194 Ok(())
1195 })
1196 }
1197
1198 fn delete_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<()> {
1199 run(async {
1200 if column_ids.is_empty() {
1201 return Ok(());
1202 }
1203 let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1204 let sql = format!(
1205 "DELETE FROM cards
1206 WHERE column_id IN ({placeholders})
1207 AND id NOT IN (SELECT card_id FROM archived_cards)"
1208 );
1209 let mut query = sqlx::query(&sql);
1210 for id in column_ids {
1211 query = query.bind(id.to_string());
1212 }
1213 query.execute(&self.pool).await.map_err(db_err)?;
1214 Ok(())
1215 })
1216 }
1217
1218 fn clear_sprint_from_cards(
1219 &self,
1220 sprint_id: Uuid,
1221 timestamp: DateTime<Utc>,
1222 ) -> KanbanResult<()> {
1223 run(async {
1224 let now = fmt_dt(×tamp);
1225 sqlx::query(
1226 "UPDATE cards SET sprint_id = NULL, updated_at = ?
1227 WHERE sprint_id = ?
1228 AND id NOT IN (SELECT card_id FROM archived_cards)",
1229 )
1230 .bind(&now)
1231 .bind(sprint_id.to_string())
1232 .execute(&self.pool)
1233 .await
1234 .map_err(db_err)?;
1235 Ok(())
1236 })
1237 }
1238
1239 fn get_archived_card(&self, card_id: Uuid) -> KanbanResult<Option<ArchivedCard>> {
1242 run(async {
1243 let id_str = card_id.to_string();
1244 let row = sqlx::query(
1245 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1246 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1247 c.created_at, c.updated_at, c.completed_at,
1248 ac.archived_at, ac.original_column_id, ac.original_position
1249 FROM archived_cards ac
1250 JOIN cards c ON ac.card_id = c.id
1251 WHERE ac.card_id = ?",
1252 )
1253 .bind(&id_str)
1254 .fetch_optional(&self.pool)
1255 .await
1256 .map_err(db_err)?;
1257
1258 match row {
1259 Some(row) => {
1260 let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1261 let card = row_to_card(&row, logs)?;
1262 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1263 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1264 Ok(Some(ArchivedCard {
1265 card,
1266 archived_at: p_dt(&archived_at_str)?,
1267 original_column_id: p_uuid(&orig_col_str)?,
1268 original_position: row.try_get("original_position").map_err(db_err)?,
1269 }))
1270 }
1271 None => Ok(None),
1272 }
1273 })
1274 }
1275
1276 fn list_archived_cards(&self) -> KanbanResult<Vec<ArchivedCard>> {
1277 run(self.list_archived_cards_async())
1278 }
1279
1280 fn insert_archived_card(&self, ac: ArchivedCard) -> KanbanResult<()> {
1281 run(self.write_archived_card_async(&ac))
1282 }
1283
1284 fn delete_archived_card(&self, card_id: Uuid) -> KanbanResult<()> {
1285 run(async {
1286 let mut tx = self.pool.begin().await.map_err(db_err)?;
1287 sqlx::query("DELETE FROM archived_cards WHERE card_id = ?")
1288 .bind(card_id.to_string())
1289 .execute(&mut *tx)
1290 .await
1291 .map_err(db_err)?;
1292 sqlx::query("DELETE FROM cards WHERE id = ?")
1293 .bind(card_id.to_string())
1294 .execute(&mut *tx)
1295 .await
1296 .map_err(db_err)?;
1297 tx.commit().await.map_err(db_err)
1298 })
1299 }
1300
1301 fn list_archived_cards_by_columns(
1302 &self,
1303 column_ids: &[Uuid],
1304 ) -> KanbanResult<Vec<ArchivedCard>> {
1305 if column_ids.is_empty() {
1306 return Ok(Vec::new());
1307 }
1308 run(async {
1309 let placeholders: Vec<&str> = column_ids.iter().map(|_| "?").collect();
1310 let sql = format!(
1311 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1312 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1313 c.created_at, c.updated_at, c.completed_at,
1314 ac.archived_at, ac.original_column_id, ac.original_position
1315 FROM archived_cards ac
1316 JOIN cards c ON ac.card_id = c.id
1317 WHERE ac.original_column_id IN ({})
1318 ORDER BY ac.archived_at",
1319 placeholders.join(", ")
1320 );
1321 let mut query = sqlx::query(&sql);
1322 for id in column_ids {
1323 query = query.bind(id.to_string());
1324 }
1325 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
1326
1327 let card_ids: Vec<String> = rows
1328 .iter()
1329 .map(|r| r.try_get("id").map_err(db_err))
1330 .collect::<KanbanResult<_>>()?;
1331 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1332
1333 let mut result = Vec::with_capacity(rows.len());
1334 for row in &rows {
1335 let id_str: String = row.try_get("id").map_err(db_err)?;
1336 let logs = logs_map.remove(&id_str).unwrap_or_default();
1337 let card = row_to_card(row, logs)?;
1338 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1339 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1340 result.push(ArchivedCard {
1341 card,
1342 archived_at: p_dt(&archived_at_str)?,
1343 original_column_id: p_uuid(&orig_col_str)?,
1344 original_position: row.try_get("original_position").map_err(db_err)?,
1345 });
1346 }
1347 Ok(result)
1348 })
1349 }
1350
1351 fn clear_sprint_from_archived_cards(
1352 &self,
1353 sprint_id: Uuid,
1354 timestamp: DateTime<Utc>,
1355 ) -> KanbanResult<()> {
1356 run(async {
1357 let now = fmt_dt(×tamp);
1358 sqlx::query(
1359 "UPDATE cards SET sprint_id = NULL, updated_at = ?
1360 WHERE sprint_id = ?
1361 AND id IN (SELECT card_id FROM archived_cards)",
1362 )
1363 .bind(&now)
1364 .bind(sprint_id.to_string())
1365 .execute(&self.pool)
1366 .await
1367 .map_err(db_err)?;
1368 Ok(())
1369 })
1370 }
1371
1372 fn get_sprint(&self, id: Uuid) -> KanbanResult<Option<Sprint>> {
1375 run(async {
1376 let row = sqlx::query(
1377 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1378 status, start_date, end_date, created_at, updated_at
1379 FROM sprints WHERE id = ?",
1380 )
1381 .bind(id.to_string())
1382 .fetch_optional(&self.pool)
1383 .await
1384 .map_err(db_err)?;
1385 row.as_ref().map(row_to_sprint).transpose()
1386 })
1387 }
1388
1389 fn list_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Sprint>> {
1390 run(async {
1391 let rows = sqlx::query(
1392 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1393 status, start_date, end_date, created_at, updated_at
1394 FROM sprints WHERE board_id = ? ORDER BY sprint_number",
1395 )
1396 .bind(board_id.to_string())
1397 .fetch_all(&self.pool)
1398 .await
1399 .map_err(db_err)?;
1400 rows.iter().map(row_to_sprint).collect()
1401 })
1402 }
1403
1404 fn list_all_sprints(&self) -> KanbanResult<Vec<Sprint>> {
1405 run(self.list_all_sprints_async())
1406 }
1407
1408 fn upsert_sprint(&self, sprint: Sprint) -> KanbanResult<()> {
1409 run(self.write_sprint_async(&sprint))
1410 }
1411
1412 fn delete_sprint(&self, id: Uuid) -> KanbanResult<()> {
1413 run(async {
1414 sqlx::query("DELETE FROM sprints WHERE id = ?")
1415 .bind(id.to_string())
1416 .execute(&self.pool)
1417 .await
1418 .map_err(db_err)?;
1419 Ok(())
1420 })
1421 }
1422
1423 fn delete_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1424 run(async {
1425 sqlx::query("DELETE FROM sprints WHERE board_id = ?")
1426 .bind(board_id.to_string())
1427 .execute(&self.pool)
1428 .await
1429 .map_err(db_err)?;
1430 Ok(())
1431 })
1432 }
1433
1434 fn get_graph(&self) -> KanbanResult<DependencyGraph> {
1437 run(self.get_graph_async())
1438 }
1439
1440 fn set_graph(&self, graph: DependencyGraph) -> KanbanResult<()> {
1441 run(self.write_graph_async(&graph))
1442 }
1443
1444 fn modify_graph(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
1445 run(self.modify_graph_async(f))
1446 }
1447
1448 fn snapshot(&self) -> KanbanResult<Snapshot> {
1451 run(self.snapshot_async())
1452 }
1453
1454 fn apply_snapshot(&self, snapshot: Snapshot) -> KanbanResult<()> {
1455 run(self.apply_snapshot_async(snapshot))
1456 }
1457}
1458
1459impl CommandStore for SqliteStore {
1460 fn append_commands(&self, cmds: &[Command]) -> KanbanResult<u64> {
1461 let batch_json = serde_json::to_string(cmds).map_err(ser_err)?;
1462 let count: i64 = run(async {
1463 let mut tx = self.pool.begin().await.map_err(db_err)?;
1464 sqlx::query("INSERT INTO command_log (cmd_json) VALUES (?)")
1465 .bind(&batch_json)
1466 .execute(&mut *tx)
1467 .await
1468 .map_err(db_err)?;
1469 let c: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM command_log")
1470 .fetch_one(&mut *tx)
1471 .await
1472 .map_err(db_err)?;
1473 tx.commit().await.map_err(db_err)?;
1474 Ok::<i64, KanbanError>(c)
1475 })?;
1476 Ok(count as u64)
1477 }
1478
1479 fn command_count(&self) -> KanbanResult<u64> {
1480 let count: i64 = run(async {
1481 sqlx::query_scalar("SELECT COUNT(*) FROM command_log")
1482 .fetch_one(&self.pool)
1483 .await
1484 .map_err(db_err)
1485 })?;
1486 Ok(count as u64)
1487 }
1488
1489 fn load_commands(&self, from: u64, to: u64) -> KanbanResult<Vec<Vec<Command>>> {
1490 let rows: Vec<String> = run(async {
1491 sqlx::query_scalar(
1492 "SELECT cmd_json FROM command_log WHERE idx > ? AND idx <= ? ORDER BY idx",
1493 )
1494 .bind(from as i64)
1495 .bind(to as i64)
1496 .fetch_all(&self.pool)
1497 .await
1498 .map_err(db_err)
1499 })?;
1500 rows.iter()
1501 .map(|json| serde_json::from_str::<Vec<Command>>(json).map_err(ser_err))
1502 .collect()
1503 }
1504
1505 fn load_all_commands(&self) -> KanbanResult<(Vec<Vec<Command>>, u64)> {
1506 run(async {
1507 let mut tx = self.pool.begin().await.map_err(db_err)?;
1508 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM command_log")
1509 .fetch_one(&mut *tx)
1510 .await
1511 .map_err(db_err)?;
1512 let rows: Vec<String> =
1513 sqlx::query_scalar("SELECT cmd_json FROM command_log ORDER BY idx")
1514 .fetch_all(&mut *tx)
1515 .await
1516 .map_err(db_err)?;
1517 tx.commit().await.map_err(db_err)?;
1518 let batches = rows
1519 .iter()
1520 .map(|json| serde_json::from_str::<Vec<Command>>(json).map_err(ser_err))
1521 .collect::<KanbanResult<Vec<Vec<Command>>>>()?;
1522 Ok((batches, count as u64))
1523 })
1524 }
1525
1526 fn truncate_commands_after(&self, after: u64) -> KanbanResult<()> {
1527 run(async {
1528 sqlx::query("DELETE FROM command_log WHERE idx > ?")
1529 .bind(after as i64)
1530 .execute(&self.pool)
1531 .await
1532 .map_err(db_err)
1533 })?;
1534 Ok(())
1535 }
1536
1537 fn supports_indexed_snapshots(&self) -> bool {
1538 true
1539 }
1540
1541 fn store_snapshot_at(&self, idx: u64, snapshot: &Snapshot) -> KanbanResult<()> {
1542 use flate2::write::DeflateEncoder;
1543 use flate2::Compression;
1544 use std::io::Write;
1545
1546 let json = serde_json::to_vec(snapshot).map_err(ser_err)?;
1547 let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast());
1548 encoder
1549 .write_all(&json)
1550 .map_err(|e| KanbanError::Database(e.to_string()))?;
1551 let compressed = encoder
1552 .finish()
1553 .map_err(|e| KanbanError::Database(e.to_string()))?;
1554
1555 run(async {
1556 sqlx::query("UPDATE command_log SET snapshot_data = ? WHERE idx = ?")
1557 .bind(&compressed)
1558 .bind(idx as i64)
1559 .execute(&self.pool)
1560 .await
1561 .map_err(db_err)
1562 })?;
1563 Ok(())
1564 }
1565
1566 fn shift_commands(&self, drop_count: u64) -> KanbanResult<()> {
1567 run(async {
1568 let mut tx = self.pool.begin().await.map_err(db_err)?;
1569
1570 let kept_ids: Vec<i64> =
1572 sqlx::query_scalar("SELECT idx FROM command_log ORDER BY idx LIMIT -1 OFFSET ?")
1573 .bind(drop_count as i64)
1574 .fetch_all(&mut *tx)
1575 .await
1576 .map_err(db_err)?;
1577
1578 if kept_ids.is_empty() {
1579 sqlx::query("DELETE FROM command_log")
1580 .execute(&mut *tx)
1581 .await
1582 .map_err(db_err)?;
1583 } else {
1584 sqlx::query("DELETE FROM command_log WHERE idx < ?")
1586 .bind(kept_ids[0])
1587 .execute(&mut *tx)
1588 .await
1589 .map_err(db_err)?;
1590
1591 for (new_idx, &old_idx) in kept_ids.iter().enumerate() {
1593 let new_val = (new_idx + 1) as i64;
1594 if new_val != old_idx {
1595 sqlx::query("UPDATE command_log SET idx = -? WHERE idx = ?")
1596 .bind(new_val)
1597 .bind(old_idx)
1598 .execute(&mut *tx)
1599 .await
1600 .map_err(db_err)?;
1601 }
1602 }
1603 sqlx::query("UPDATE command_log SET idx = -idx WHERE idx < 0")
1605 .execute(&mut *tx)
1606 .await
1607 .map_err(db_err)?;
1608 }
1609
1610 tx.commit().await.map_err(db_err)?;
1611 Ok(())
1612 })
1613 }
1614
1615 fn load_snapshot_at(&self, idx: u64) -> KanbanResult<Option<Snapshot>> {
1616 use flate2::read::DeflateDecoder;
1617 use std::io::Read;
1618
1619 let blob: Option<Vec<u8>> = run(async {
1620 sqlx::query_scalar("SELECT snapshot_data FROM command_log WHERE idx = ?")
1621 .bind(idx as i64)
1622 .fetch_optional(&self.pool)
1623 .await
1624 .map_err(db_err)
1625 })?;
1626
1627 match blob {
1628 Some(compressed) => {
1629 let mut decoder = DeflateDecoder::new(&compressed[..]);
1630 let mut json = Vec::new();
1631 decoder
1632 .read_to_end(&mut json)
1633 .map_err(|e| KanbanError::Database(e.to_string()))?;
1634 let snapshot: Snapshot = serde_json::from_slice(&json).map_err(ser_err)?;
1635 Ok(Some(snapshot))
1636 }
1637 None => Ok(None),
1638 }
1639 }
1640}
1641
1642#[async_trait::async_trait]
1643impl PersistenceStore for SqliteStore {
1644 async fn save(&self, snapshot: StoreSnapshot) -> PersistenceResult<PersistenceMetadata> {
1645 let domain_snapshot: Snapshot = serde_json::from_slice(&snapshot.data)
1646 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1647 self.apply_snapshot_async(domain_snapshot)
1648 .await
1649 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1650 self.checkpoint()
1651 .await
1652 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1653 Ok(PersistenceMetadata::new(self.instance_id))
1654 }
1655
1656 async fn load(&self) -> PersistenceResult<(StoreSnapshot, PersistenceMetadata)> {
1657 let domain_snapshot = self
1658 .snapshot_async()
1659 .await
1660 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1661 let data = serde_json::to_vec(&domain_snapshot)
1662 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1663 let meta = PersistenceMetadata::new(self.instance_id);
1664 Ok((
1665 StoreSnapshot {
1666 data,
1667 metadata: meta.clone(),
1668 },
1669 meta,
1670 ))
1671 }
1672
1673 async fn exists(&self) -> bool {
1674 self.path.exists()
1675 }
1676
1677 fn path(&self) -> &std::path::Path {
1678 &self.path
1679 }
1680
1681 fn instance_id(&self) -> Uuid {
1682 self.instance_id
1683 }
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688 use super::*;
1689 use tempfile::TempDir;
1690
1691 fn make_rt() -> tokio::runtime::Runtime {
1692 tokio::runtime::Builder::new_multi_thread()
1693 .enable_all()
1694 .build()
1695 .unwrap()
1696 }
1697
1698 #[test]
1699 fn test_sqlitestore_path_is_preserved() {
1700 let dir = TempDir::new().unwrap();
1701 let path = dir.path().join("test.db");
1702 let rt = make_rt();
1703 let store = rt.block_on(SqliteStore::open(&path)).unwrap();
1704 assert_eq!(store.path(), path.as_path());
1705 }
1706
1707 #[test]
1708 fn test_sqlitestore_instance_id_persists_across_reopen() {
1709 let dir = TempDir::new().unwrap();
1710 let path = dir.path().join("test.db");
1711 let rt = make_rt();
1712 let id1 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1713 let id2 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1714 assert_eq!(id1, id2, "instance_id must be stable across reopens");
1715 }
1716
1717 #[test]
1718 fn test_sqlitestore_persistence_save_load_roundtrip() {
1719 use kanban_domain::{Board, DependencyGraph};
1720 use kanban_persistence::snapshot_to_json_bytes;
1721
1722 let dir = TempDir::new().unwrap();
1723 let path = dir.path().join("test.db");
1724 let rt = make_rt();
1725
1726 rt.block_on(async {
1727 let store = SqliteStore::open(&path).await.unwrap();
1728 let board = Board::new("Test Board".to_string(), None);
1729 let snapshot = Snapshot::from_data(
1730 vec![board],
1731 vec![],
1732 vec![],
1733 vec![],
1734 vec![],
1735 DependencyGraph::new(),
1736 );
1737 let data = snapshot_to_json_bytes(&snapshot).unwrap();
1738 let meta = PersistenceMetadata::new(store.instance_id());
1739 let store_snap = StoreSnapshot {
1740 data,
1741 metadata: meta,
1742 };
1743
1744 PersistenceStore::save(&store, store_snap).await.unwrap();
1745
1746 let (loaded_snap, _meta) = PersistenceStore::load(&store).await.unwrap();
1747 let loaded: Snapshot = serde_json::from_slice(&loaded_snap.data).unwrap();
1748 assert_eq!(loaded.boards.len(), 1);
1749 assert_eq!(loaded.boards[0].name, "Test Board");
1750 });
1751 }
1752
1753 #[test]
1754 fn test_sqlitestore_exists_returns_true_after_open() {
1755 let dir = TempDir::new().unwrap();
1756 let path = dir.path().join("test.db");
1757 let rt = make_rt();
1758 rt.block_on(async {
1759 let store = SqliteStore::open(&path).await.unwrap();
1760 assert!(PersistenceStore::exists(&store).await);
1761 });
1762 }
1763
1764 #[test]
1765 fn test_checkpoint_executes_without_error() {
1766 let dir = TempDir::new().unwrap();
1767 let path = dir.path().join("test.sqlite3");
1768 let rt = make_rt();
1769 rt.block_on(async {
1770 let store = SqliteStore::open(&path).await.unwrap();
1771 store.checkpoint().await.unwrap();
1772 });
1773 }
1774
1775 #[test]
1776 fn test_save_checkpoints_wal_file_stays_minimal() {
1777 let dir = TempDir::new().unwrap();
1778 let path = dir.path().join("test.sqlite3");
1779 let rt = make_rt();
1780 rt.block_on(async {
1781 let store = SqliteStore::open(&path).await.unwrap();
1782 let (snapshot, _) = PersistenceStore::load(&store).await.unwrap();
1783 PersistenceStore::save(&store, snapshot).await.unwrap();
1784 let wal_path = path.with_extension("sqlite3-wal");
1785 if wal_path.exists() {
1786 assert!(
1787 wal_path.metadata().unwrap().len() < 32 * 1024,
1788 "WAL file should be minimal after save+checkpoint"
1789 );
1790 }
1791 });
1792 }
1793
1794 #[test]
1795 fn test_delete_archived_card_orphaned_cards_row_is_still_cleaned_up() {
1796 use kanban_domain::data_store::DataStore;
1797 let dir = TempDir::new().unwrap();
1798 let path = dir.path().join("test.sqlite3");
1799 let rt = make_rt();
1800 rt.block_on(async {
1801 let store = SqliteStore::open(&path).await.unwrap();
1802
1803 let mut board = kanban_domain::Board::new("B".to_string(), None);
1804 let column = kanban_domain::Column::new(board.id, "Col".to_string(), 0);
1805 let card = kanban_domain::Card::new(&mut board, column.id, "Task".to_string(), 0);
1806 let card_id = card.id;
1807 let column_id = column.id;
1808 store.upsert_board(board).unwrap();
1809 store.upsert_column(column).unwrap();
1810 store.upsert_card(card.clone()).unwrap();
1811
1812 let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
1815 store.insert_archived_card(archived).unwrap();
1816
1817 store.delete_archived_card(card_id).unwrap();
1818
1819 assert!(
1820 store.list_archived_cards().unwrap().is_empty(),
1821 "card should be gone from archived_cards"
1822 );
1823 assert!(
1824 store.list_all_cards().unwrap().is_empty(),
1825 "orphaned cards row should also be removed by delete_archived_card"
1826 );
1827 });
1828 }
1829
1830 #[test]
1831 fn test_delete_archived_card_removes_from_cards_table() {
1832 use kanban_domain::data_store::DataStore;
1833 let dir = TempDir::new().unwrap();
1834 let path = dir.path().join("test.sqlite3");
1835 let rt = make_rt();
1836 rt.block_on(async {
1837 let store = SqliteStore::open(&path).await.unwrap();
1838
1839 let mut board = kanban_domain::Board::new("B".to_string(), None);
1840 let column = kanban_domain::Column::new(board.id, "Col".to_string(), 0);
1841 let card = kanban_domain::Card::new(&mut board, column.id, "Task".to_string(), 0);
1842 let card_id = card.id;
1843 let column_id = column.id;
1844 store.upsert_board(board).unwrap();
1845 store.upsert_column(column).unwrap();
1846 store.upsert_card(card.clone()).unwrap();
1847
1848 let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
1849 store.insert_archived_card(archived).unwrap();
1850 store.delete_card(card_id).unwrap();
1851
1852 assert_eq!(store.list_archived_cards().unwrap().len(), 1);
1853
1854 store.delete_archived_card(card_id).unwrap();
1855
1856 assert!(
1857 store.list_archived_cards().unwrap().is_empty(),
1858 "card should be gone from archived_cards"
1859 );
1860 assert!(
1861 store.list_all_cards().unwrap().is_empty(),
1862 "card should also be gone from cards table, not restored as active"
1863 );
1864 assert!(
1865 store.get_card(card_id).unwrap().is_none(),
1866 "get_card should return None for permanently deleted card"
1867 );
1868 });
1869 }
1870}