1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use chrono::{DateTime, Utc};
5use kanban_domain::data_store::DataStore;
6use kanban_domain::{
7 ArchivedCard, Board, Card, Column, DependencyGraph, KanbanError, KanbanResult, Snapshot,
8 Sprint, SprintLog,
9};
10use kanban_persistence::{
11 PersistenceError, PersistenceMetadata, PersistenceResult, PersistenceStore, StoreSnapshot,
12};
13use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
14use sqlx::{Pool, Row, Sqlite};
15use uuid::Uuid;
16
17const SCHEMA: &str = include_str!("schema.sql");
18
19pub const SUPPORTED_SCHEMA_VERSION: u32 = 2;
22
23type MetadataRow = (String, String, Option<String>, Option<String>, u32);
27
28pub struct SqliteStore {
30 pool: Pool<Sqlite>,
31 path: PathBuf,
32 instance_id: Uuid,
33}
34
35fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
36 let handle = tokio::runtime::Handle::current();
37 debug_assert!(
38 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread,
39 "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
40 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
41 because synchronous DataStore methods need to block on async SQLite I/O."
42 );
43 tokio::task::block_in_place(|| handle.block_on(f))
44}
45
46fn db_err(e: sqlx::Error) -> KanbanError {
47 KanbanError::Database(e.to_string())
48}
49
50fn ser_err(msg: impl std::fmt::Display) -> KanbanError {
51 KanbanError::Serialization(msg.to_string())
52}
53
54fn p_uuid(s: &str) -> KanbanResult<Uuid> {
55 Uuid::parse_str(s).map_err(ser_err)
56}
57
58fn p_dt(s: &str) -> KanbanResult<DateTime<Utc>> {
59 DateTime::parse_from_rfc3339(s)
60 .map_err(ser_err)
61 .map(|dt| dt.with_timezone(&Utc))
62}
63
64fn p_enum<T: serde::de::DeserializeOwned>(s: &str, label: &str) -> KanbanResult<T> {
65 serde_json::from_value(serde_json::Value::String(s.to_owned()))
66 .map_err(|_| ser_err(format!("unknown {label} variant: {s}")))
67}
68
69fn ser_enum<T: serde::Serialize + std::fmt::Debug>(v: &T, label: &str) -> KanbanResult<String> {
74 match serde_json::to_value(v).map_err(ser_err)? {
75 serde_json::Value::String(s) => Ok(s),
76 other => Err(ser_err(format!(
77 "{label} did not serialise to a JSON string: {other}"
78 ))),
79 }
80}
81
82fn fmt_dt(dt: &DateTime<Utc>) -> String {
83 dt.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
84}
85
86fn required_str<'a>(value: &'a str, field: &str) -> KanbanResult<&'a str> {
87 if value.is_empty() {
88 Err(ser_err(format!(
89 "required field '{field}' must not be empty"
90 )))
91 } else {
92 Ok(value)
93 }
94}
95
96fn opt_dt(dt: &Option<DateTime<Utc>>) -> Option<String> {
97 dt.as_ref().map(fmt_dt)
98}
99
100fn row_to_board(
103 row: &SqliteRow,
104 sprint_names: Vec<String>,
105 sprint_counters: HashMap<String, u32>,
106) -> KanbanResult<Board> {
107 let id_str: String = row.try_get("id").map_err(db_err)?;
108 let active_sprint_id_str: Option<String> = row.try_get("active_sprint_id").map_err(db_err)?;
109 let completion_column_id_str: Option<String> =
110 row.try_get("completion_column_id").map_err(db_err)?;
111 let task_sort_field_str: String = row.try_get("task_sort_field").map_err(db_err)?;
112 let task_sort_order_str: String = row.try_get("task_sort_order").map_err(db_err)?;
113 let task_list_view_str: String = row.try_get("task_list_view").map_err(db_err)?;
114 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
115 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
116 let sprint_duration_days_raw: Option<i32> =
117 row.try_get("sprint_duration_days").map_err(db_err)?;
118
119 Ok(Board {
120 id: p_uuid(&id_str)?,
121 name: row.try_get("name").map_err(db_err)?,
122 description: row.try_get("description").map_err(db_err)?,
123 sprint_prefix: row.try_get("sprint_prefix").map_err(db_err)?,
124 card_prefix: row.try_get("card_prefix").map_err(db_err)?,
125 task_sort_field: p_enum(&task_sort_field_str, "task_sort_field")?,
126 task_sort_order: p_enum(&task_sort_order_str, "task_sort_order")?,
127 sprint_duration_days: sprint_duration_days_raw.map(|v| v as u32),
128 sprint_names,
129 sprint_name_used_count: row
130 .try_get::<i32, _>("sprint_name_used_count")
131 .map_err(db_err)? as usize,
132 next_sprint_number: row
133 .try_get::<i32, _>("next_sprint_number")
134 .map_err(db_err)? as u32,
135 active_sprint_id: active_sprint_id_str.as_deref().map(p_uuid).transpose()?,
136 task_list_view: p_enum(&task_list_view_str, "task_list_view")?,
137 card_counter: row.try_get::<i32, _>("card_counter").map_err(db_err)? as u32,
138 sprint_counters,
139 completion_column_id: completion_column_id_str
140 .as_deref()
141 .map(p_uuid)
142 .transpose()?,
143 position: row.try_get::<i32, _>("position").map_err(db_err)?,
144 created_at: p_dt(&created_at_str)?,
145 updated_at: p_dt(&updated_at_str)?,
146 })
147}
148
149fn row_to_column(row: &SqliteRow) -> KanbanResult<Column> {
150 let id_str: String = row.try_get("id").map_err(db_err)?;
151 let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
152 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
153 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
154
155 Ok(Column {
156 id: p_uuid(&id_str)?,
157 board_id: p_uuid(&board_id_str)?,
158 name: row.try_get("name").map_err(db_err)?,
159 position: row.try_get("position").map_err(db_err)?,
160 wip_limit: row.try_get("wip_limit").map_err(db_err)?,
161 created_at: p_dt(&created_at_str)?,
162 updated_at: p_dt(&updated_at_str)?,
163 })
164}
165
166fn row_to_card(row: &SqliteRow, sprint_logs: Vec<SprintLog>) -> KanbanResult<Card> {
167 let id_str: String = row.try_get("id").map_err(db_err)?;
168 let column_id_str: String = row.try_get("column_id").map_err(db_err)?;
169 let sprint_id_str: Option<String> = row.try_get("sprint_id").map_err(db_err)?;
170 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
171 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
172 let completed_at_str: Option<String> = row.try_get("completed_at").map_err(db_err)?;
173 let due_date_str: Option<String> = row.try_get("due_date").map_err(db_err)?;
174 let priority_str: String = row.try_get("priority").map_err(db_err)?;
175 let status_str: String = row.try_get("status").map_err(db_err)?;
176 let points_raw: Option<i32> = row.try_get("points").map_err(db_err)?;
177
178 Ok(Card {
179 id: p_uuid(&id_str)?,
180 column_id: p_uuid(&column_id_str)?,
181 title: row.try_get("title").map_err(db_err)?,
182 description: row.try_get("description").map_err(db_err)?,
183 priority: p_enum(&priority_str, "priority")?,
184 status: p_enum(&status_str, "status")?,
185 position: row.try_get("position").map_err(db_err)?,
186 due_date: due_date_str.as_deref().map(p_dt).transpose()?,
187 points: points_raw
188 .map(|v| u8::try_from(v).map_err(|_| ser_err(format!("points {v} out of range"))))
189 .transpose()?,
190 card_number: row.try_get::<i32, _>("card_number").map_err(db_err)? as u32,
191 sprint_id: sprint_id_str.as_deref().map(p_uuid).transpose()?,
192 created_at: p_dt(&created_at_str)?,
193 updated_at: p_dt(&updated_at_str)?,
194 completed_at: completed_at_str.as_deref().map(p_dt).transpose()?,
195 sprint_logs,
196 })
197}
198
199fn row_to_sprint(row: &SqliteRow) -> KanbanResult<Sprint> {
200 let id_str: String = row.try_get("id").map_err(db_err)?;
201 let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
202 let status_str: String = row.try_get("status").map_err(db_err)?;
203 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
204 let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
205 let start_date_str: Option<String> = row.try_get("start_date").map_err(db_err)?;
206 let end_date_str: Option<String> = row.try_get("end_date").map_err(db_err)?;
207 let name_index_raw: Option<i32> = row.try_get("name_index").map_err(db_err)?;
208
209 Ok(Sprint {
210 id: p_uuid(&id_str)?,
211 board_id: p_uuid(&board_id_str)?,
212 sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
213 name_index: name_index_raw.map(|v| v as usize),
214 prefix: row.try_get("prefix").map_err(db_err)?,
215 card_prefix: row.try_get("card_prefix").map_err(db_err)?,
216 status: p_enum(&status_str, "sprint status")?,
217 start_date: start_date_str.as_deref().map(p_dt).transpose()?,
218 end_date: end_date_str.as_deref().map(p_dt).transpose()?,
219 created_at: p_dt(&created_at_str)?,
220 updated_at: p_dt(&updated_at_str)?,
221 })
222}
223
224fn row_to_edge_base(row: &SqliteRow) -> KanbanResult<kanban_core::EdgeBase> {
227 let source_str: String = row.try_get("source_id").map_err(db_err)?;
228 let target_str: String = row.try_get("target_id").map_err(db_err)?;
229 let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
230 let archived_at_str: Option<String> = row.try_get("archived_at").map_err(db_err)?;
231 Ok(kanban_core::EdgeBase {
232 source: p_uuid(&source_str)?,
233 target: p_uuid(&target_str)?,
234 created_at: p_dt(&created_at_str)?,
235 archived_at: archived_at_str.as_deref().map(p_dt).transpose()?,
236 })
237}
238
239fn row_to_sprint_log(row: &SqliteRow) -> KanbanResult<SprintLog> {
240 let sprint_id_str: String = row.try_get("sprint_id").map_err(db_err)?;
241 let started_at_str: String = row.try_get("started_at").map_err(db_err)?;
242 let ended_at_str: Option<String> = row.try_get("ended_at").map_err(db_err)?;
243
244 Ok(SprintLog {
245 sprint_id: p_uuid(&sprint_id_str)?,
246 sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
247 sprint_name: row.try_get("sprint_name").map_err(db_err)?,
248 started_at: p_dt(&started_at_str)?,
249 ended_at: ended_at_str.as_deref().map(p_dt).transpose()?,
250 status: row.try_get("status").map_err(db_err)?,
251 })
252}
253
254impl SqliteStore {
257 pub async fn open(path: impl AsRef<Path>) -> KanbanResult<Self> {
258 let handle = tokio::runtime::Handle::current();
259 if handle.runtime_flavor() != tokio::runtime::RuntimeFlavor::MultiThread {
260 return Err(KanbanError::Database(
261 "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
262 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
263 because synchronous DataStore methods need to block on async SQLite I/O."
264 .to_string(),
265 ));
266 }
267
268 let path_buf = path.as_ref().to_path_buf();
269
270 let options = SqliteConnectOptions::new()
271 .filename(&path_buf)
272 .create_if_missing(true)
273 .foreign_keys(true)
274 .pragma("journal_mode", "wal");
275
276 let pool = SqlitePoolOptions::new()
277 .max_connections(2)
278 .connect_with(options)
279 .await
280 .map_err(|e| KanbanError::Database(e.to_string()))?;
281
282 let metadata_table_exists: bool = sqlx::query_scalar(
294 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='metadata'",
295 )
296 .fetch_one(&pool)
297 .await
298 .map_err(|e| KanbanError::Database(e.to_string()))?;
299 if metadata_table_exists {
300 let pre_migrate_version: Option<u32> =
301 sqlx::query_scalar("SELECT schema_version FROM metadata WHERE id = 1")
302 .fetch_optional(&pool)
303 .await
304 .map_err(|e| KanbanError::Database(e.to_string()))?;
305 if let Some(v) = pre_migrate_version {
306 if v > SUPPORTED_SCHEMA_VERSION {
307 return Err(KanbanError::UnsupportedFutureVersion {
308 file_version: v,
309 binary_max: SUPPORTED_SCHEMA_VERSION,
310 });
311 }
312 }
313 }
314
315 Self::drop_legacy_command_log_if_present(&pool).await?;
321
322 sqlx::raw_sql(SCHEMA)
323 .execute(&pool)
324 .await
325 .map_err(|e| KanbanError::Database(e.to_string()))?;
326
327 Self::migrate(&pool).await?;
328
329 let instance_id = Self::load_or_create_instance_id(&pool).await?;
330
331 Ok(Self {
332 pool,
333 path: path_buf,
334 instance_id,
335 })
336 }
337
338 async fn load_or_create_instance_id(pool: &Pool<Sqlite>) -> KanbanResult<Uuid> {
339 let row: Option<String> =
340 sqlx::query_scalar("SELECT instance_id FROM metadata WHERE id = 1")
341 .fetch_optional(pool)
342 .await
343 .map_err(db_err)?;
344 match row {
345 Some(s) => p_uuid(&s),
346 None => {
347 let id = Uuid::new_v4();
348 let now = Utc::now().to_rfc3339();
349 sqlx::query(
350 "INSERT INTO metadata (id, instance_id, saved_at, schema_version) VALUES (1, ?, ?, ?)",
351 )
352 .bind(id.to_string())
353 .bind(&now)
354 .bind(SUPPORTED_SCHEMA_VERSION)
355 .execute(pool)
356 .await
357 .map_err(db_err)?;
358 Ok(id)
359 }
360 }
361 }
362
363 async fn drop_legacy_command_log_if_present(pool: &Pool<Sqlite>) -> KanbanResult<()> {
367 let has_command_log: bool = sqlx::query_scalar(
368 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='command_log'",
369 )
370 .fetch_one(pool)
371 .await
372 .map_err(db_err)?;
373 if !has_command_log {
374 return Ok(());
375 }
376 let has_batch_index_col: bool = sqlx::query_scalar(
377 "SELECT COUNT(*) > 0 FROM pragma_table_info('command_log') WHERE name = 'batch_index'",
378 )
379 .fetch_one(pool)
380 .await
381 .map_err(db_err)?;
382 if !has_batch_index_col {
383 tracing::info!(
384 "dropping legacy command_log table (pre-KAN-405 schema) so the KAN-191 schema can be applied"
385 );
386 sqlx::raw_sql("DROP TABLE IF EXISTS command_log")
387 .execute(pool)
388 .await
389 .map_err(db_err)?;
390 }
391 Ok(())
392 }
393
394 async fn migrate(pool: &Pool<Sqlite>) -> KanbanResult<()> {
395 let has_undo_state: bool = sqlx::query_scalar(
401 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='undo_state'",
402 )
403 .fetch_one(pool)
404 .await
405 .map_err(db_err)?;
406
407 if has_undo_state {
408 tracing::info!(
409 "dropping legacy undo_state table: undo cursor stays in-session, only command_log persists"
410 );
411 sqlx::raw_sql("DROP TABLE IF EXISTS undo_state")
412 .execute(pool)
413 .await
414 .map_err(db_err)?;
415 }
416
417 let has_position_col: bool = sqlx::query_scalar(
418 "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'position'",
419 )
420 .fetch_one(pool)
421 .await
422 .map_err(db_err)?;
423
424 if !has_position_col {
425 sqlx::raw_sql("ALTER TABLE boards ADD COLUMN position INTEGER NOT NULL DEFAULT 0")
426 .execute(pool)
427 .await
428 .map_err(db_err)?;
429 }
430
431 let has_card_counter_col: bool = sqlx::query_scalar(
432 "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'card_counter'",
433 )
434 .fetch_one(pool)
435 .await
436 .map_err(db_err)?;
437
438 if !has_card_counter_col {
439 sqlx::raw_sql("ALTER TABLE boards ADD COLUMN card_counter INTEGER NOT NULL DEFAULT 1")
440 .execute(pool)
441 .await
442 .map_err(db_err)?;
443 }
444
445 Self::drop_legacy_card_edges_if_present(pool).await?;
446
447 for col in ["writer_version", "writer_commit"] {
449 let has_col: bool = sqlx::query_scalar(&format!(
450 "SELECT COUNT(*) > 0 FROM pragma_table_info('metadata') WHERE name = '{col}'"
451 ))
452 .fetch_one(pool)
453 .await
454 .map_err(db_err)?;
455 if !has_col {
456 sqlx::raw_sql(&format!("ALTER TABLE metadata ADD COLUMN {col} TEXT"))
457 .execute(pool)
458 .await
459 .map_err(db_err)?;
460 }
461 }
462 sqlx::query("UPDATE metadata SET schema_version = ? WHERE id = 1 AND schema_version < ?")
466 .bind(SUPPORTED_SCHEMA_VERSION)
467 .bind(SUPPORTED_SCHEMA_VERSION)
468 .execute(pool)
469 .await
470 .map_err(db_err)?;
471
472 Ok(())
473 }
474
475 async fn drop_legacy_card_edges_if_present(pool: &Pool<Sqlite>) -> KanbanResult<()> {
482 let has_card_edges: bool = sqlx::query_scalar(
483 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='card_edges'",
484 )
485 .fetch_one(pool)
486 .await
487 .map_err(db_err)?;
488 if !has_card_edges {
489 return Ok(());
490 }
491 tracing::info!(
492 "dropping legacy card_edges table (pre per-kind schema); per-kind tables take over"
493 );
494 sqlx::raw_sql(
495 "DROP INDEX IF EXISTS idx_card_edges_source;
496 DROP INDEX IF EXISTS idx_card_edges_target;
497 DROP TABLE IF EXISTS card_edges;",
498 )
499 .execute(pool)
500 .await
501 .map_err(db_err)?;
502 Ok(())
503 }
504
505 pub fn pool(&self) -> &Pool<Sqlite> {
506 &self.pool
507 }
508
509 pub fn read_metadata_sync(&self) -> KanbanResult<Option<PersistenceMetadata>> {
513 run(async {
514 let row: Option<MetadataRow> = sqlx::query_as(
515 "SELECT instance_id, saved_at, writer_version, writer_commit, schema_version \
516 FROM metadata WHERE id = 1",
517 )
518 .fetch_optional(&self.pool)
519 .await
520 .map_err(db_err)?;
521 let Some(row) = row else {
522 return Ok(None);
523 };
524 let instance_id = p_uuid(&row.0)?;
525 let saved_at = p_dt(&row.1)?;
526 Ok(Some(PersistenceMetadata {
527 instance_id,
528 saved_at,
529 writer_version: row.2,
530 writer_commit: row.3,
531 format_version: Some(row.4),
532 }))
533 })
534 }
535
536 pub async fn stamp_writer(&self) -> KanbanResult<DateTime<Utc>> {
544 let now = Utc::now();
545 sqlx::query(
546 "UPDATE metadata SET saved_at = ?, writer_version = ?, writer_commit = ? WHERE id = 1",
547 )
548 .bind(now.to_rfc3339())
549 .bind(kanban_core::KANBAN_VERSION)
550 .bind(kanban_core::KANBAN_COMMIT)
551 .execute(&self.pool)
552 .await
553 .map_err(|e| KanbanError::Database(e.to_string()))?;
554 Ok(now)
555 }
556
557 pub async fn checkpoint(&self) -> KanbanResult<()> {
561 sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
562 .execute(&self.pool)
563 .await
564 .map_err(|e| KanbanError::Database(e.to_string()))?;
565 Ok(())
566 }
567
568 pub async fn append_command_batch(
573 &self,
574 batch_index: u64,
575 commands_json: &str,
576 ) -> KanbanResult<()> {
577 sqlx::query(
578 "INSERT INTO command_log (batch_index, commands_json, created_at) VALUES (?, ?, ?)",
579 )
580 .bind(batch_index as i64)
581 .bind(commands_json)
582 .bind(fmt_dt(&Utc::now()))
583 .execute(&self.pool)
584 .await
585 .map_err(db_err)?;
586 Ok(())
587 }
588
589 pub async fn load_all_command_batches(&self) -> KanbanResult<Vec<String>> {
592 let rows = sqlx::query("SELECT commands_json FROM command_log ORDER BY batch_index ASC")
593 .fetch_all(&self.pool)
594 .await
595 .map_err(db_err)?;
596 let mut out = Vec::with_capacity(rows.len());
597 for row in &rows {
598 out.push(row.try_get::<String, _>("commands_json").map_err(db_err)?);
599 }
600 Ok(out)
601 }
602
603 pub async fn truncate_command_log_after(&self, after: u64) -> KanbanResult<()> {
605 sqlx::query("DELETE FROM command_log WHERE batch_index >= ?")
606 .bind(after as i64)
607 .execute(&self.pool)
608 .await
609 .map_err(db_err)?;
610 Ok(())
611 }
612
613 pub async fn shift_command_log(&self, drop_count: u64) -> KanbanResult<()> {
616 if drop_count == 0 {
617 return Ok(());
618 }
619 let mut tx = self.pool.begin().await.map_err(db_err)?;
620 sqlx::query("DELETE FROM command_log WHERE batch_index < ?")
621 .bind(drop_count as i64)
622 .execute(&mut *tx)
623 .await
624 .map_err(db_err)?;
625 sqlx::query("UPDATE command_log SET batch_index = batch_index - ?")
626 .bind(drop_count as i64)
627 .execute(&mut *tx)
628 .await
629 .map_err(db_err)?;
630 tx.commit().await.map_err(db_err)?;
631 Ok(())
632 }
633
634 async fn fetch_board_aux(
635 &self,
636 board_id: &str,
637 ) -> KanbanResult<(Vec<String>, HashMap<String, u32>)> {
638 let name_rows =
639 sqlx::query("SELECT name FROM board_sprint_names WHERE board_id = ? ORDER BY position")
640 .bind(board_id)
641 .fetch_all(&self.pool)
642 .await
643 .map_err(db_err)?;
644 let sprint_names: Vec<String> = name_rows
645 .iter()
646 .map(|r| r.try_get("name").map_err(db_err))
647 .collect::<KanbanResult<_>>()?;
648
649 let counter_rows =
650 sqlx::query("SELECT prefix, counter FROM board_sprint_counters WHERE board_id = ?")
651 .bind(board_id)
652 .fetch_all(&self.pool)
653 .await
654 .map_err(db_err)?;
655 let mut sprint_counters = HashMap::new();
656 for row in &counter_rows {
657 let prefix: String = row.try_get("prefix").map_err(db_err)?;
658 let counter: i32 = row.try_get("counter").map_err(db_err)?;
659 sprint_counters.insert(prefix, counter as u32);
660 }
661
662 Ok((sprint_names, sprint_counters))
663 }
664
665 async fn write_board_with_conn(
666 conn: &mut sqlx::SqliteConnection,
667 board: &Board,
668 ) -> KanbanResult<()> {
669 let id = board.id.to_string();
670
671 sqlx::query(
672 "INSERT INTO boards (id, name, description, sprint_prefix, card_prefix,
673 task_sort_field, task_sort_order, sprint_duration_days,
674 sprint_name_used_count, next_sprint_number, active_sprint_id,
675 task_list_view, card_counter, completion_column_id, position,
676 created_at, updated_at)
677 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
678 ON CONFLICT(id) DO UPDATE SET
679 name=excluded.name, description=excluded.description,
680 sprint_prefix=excluded.sprint_prefix, card_prefix=excluded.card_prefix,
681 task_sort_field=excluded.task_sort_field, task_sort_order=excluded.task_sort_order,
682 sprint_duration_days=excluded.sprint_duration_days,
683 sprint_name_used_count=excluded.sprint_name_used_count,
684 next_sprint_number=excluded.next_sprint_number,
685 active_sprint_id=excluded.active_sprint_id,
686 task_list_view=excluded.task_list_view, card_counter=excluded.card_counter,
687 completion_column_id=excluded.completion_column_id,
688 position=excluded.position,
689 updated_at=excluded.updated_at",
690 )
691 .bind(&id)
692 .bind(required_str(&board.name, "board.name")?)
693 .bind(&board.description)
694 .bind(&board.sprint_prefix)
695 .bind(&board.card_prefix)
696 .bind(format!("{:?}", board.task_sort_field))
697 .bind(format!("{:?}", board.task_sort_order))
698 .bind(board.sprint_duration_days.map(|v| v as i32))
699 .bind(board.sprint_name_used_count as i32)
700 .bind(board.next_sprint_number as i32)
701 .bind(board.active_sprint_id.map(|id| id.to_string()))
702 .bind(format!("{:?}", board.task_list_view))
703 .bind(board.card_counter as i32)
704 .bind(board.completion_column_id.map(|id| id.to_string()))
705 .bind(board.position)
706 .bind(fmt_dt(&board.created_at))
707 .bind(fmt_dt(&board.updated_at))
708 .execute(&mut *conn)
709 .await
710 .map_err(db_err)?;
711
712 sqlx::query("DELETE FROM board_sprint_names WHERE board_id = ?")
713 .bind(&id)
714 .execute(&mut *conn)
715 .await
716 .map_err(db_err)?;
717 for (i, name) in board.sprint_names.iter().enumerate() {
718 sqlx::query(
719 "INSERT INTO board_sprint_names (board_id, position, name) VALUES (?, ?, ?)",
720 )
721 .bind(&id)
722 .bind(i as i32)
723 .bind(required_str(name, "board.sprint_names[*]")?)
724 .execute(&mut *conn)
725 .await
726 .map_err(db_err)?;
727 }
728
729 sqlx::query("DELETE FROM board_sprint_counters WHERE board_id = ?")
730 .bind(&id)
731 .execute(&mut *conn)
732 .await
733 .map_err(db_err)?;
734 for (prefix, counter) in &board.sprint_counters {
735 sqlx::query(
736 "INSERT INTO board_sprint_counters (board_id, prefix, counter) VALUES (?, ?, ?)",
737 )
738 .bind(&id)
739 .bind(prefix)
740 .bind(*counter as i32)
741 .execute(&mut *conn)
742 .await
743 .map_err(db_err)?;
744 }
745
746 Ok(())
747 }
748
749 async fn write_board_async(&self, board: &Board) -> KanbanResult<()> {
750 let mut tx = self.pool.begin().await.map_err(db_err)?;
751 Self::write_board_with_conn(&mut tx, board).await?;
752 tx.commit().await.map_err(db_err)?;
753 Ok(())
754 }
755
756 async fn fetch_sprint_logs_for_card(&self, card_id: &str) -> KanbanResult<Vec<SprintLog>> {
757 let rows = sqlx::query(
758 "SELECT sprint_id, sprint_number, sprint_name, started_at, ended_at, status
759 FROM sprint_logs WHERE card_id = ? ORDER BY id",
760 )
761 .bind(card_id)
762 .fetch_all(&self.pool)
763 .await
764 .map_err(db_err)?;
765 rows.iter().map(row_to_sprint_log).collect()
766 }
767
768 async fn write_card_with_conn(
769 conn: &mut sqlx::SqliteConnection,
770 card: &Card,
771 ) -> KanbanResult<()> {
772 let id = card.id.to_string();
773
774 sqlx::query(
775 "INSERT INTO cards (id, column_id, title, description, priority, status, position,
776 due_date, points, card_number, sprint_id, created_at, updated_at, completed_at)
777 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
778 ON CONFLICT(id) DO UPDATE SET
779 column_id=excluded.column_id, title=excluded.title,
780 description=excluded.description, priority=excluded.priority,
781 status=excluded.status, position=excluded.position,
782 due_date=excluded.due_date, points=excluded.points,
783 card_number=excluded.card_number, sprint_id=excluded.sprint_id,
784 updated_at=excluded.updated_at, completed_at=excluded.completed_at",
785 )
786 .bind(&id)
787 .bind(card.column_id.to_string())
788 .bind(required_str(&card.title, "card.title")?)
789 .bind(&card.description)
790 .bind(format!("{:?}", card.priority))
791 .bind(format!("{:?}", card.status))
792 .bind(card.position)
793 .bind(opt_dt(&card.due_date))
794 .bind(card.points.map(|v| v as i32))
795 .bind(card.card_number as i32)
796 .bind(card.sprint_id.map(|id| id.to_string()))
797 .bind(fmt_dt(&card.created_at))
798 .bind(fmt_dt(&card.updated_at))
799 .bind(opt_dt(&card.completed_at))
800 .execute(&mut *conn)
801 .await
802 .map_err(db_err)?;
803
804 sqlx::query("DELETE FROM sprint_logs WHERE card_id = ?")
805 .bind(&id)
806 .execute(&mut *conn)
807 .await
808 .map_err(db_err)?;
809 for log in &card.sprint_logs {
810 sqlx::query(
811 "INSERT INTO sprint_logs (card_id, sprint_id, sprint_number, sprint_name,
812 started_at, ended_at, status)
813 VALUES (?, ?, ?, ?, ?, ?, ?)",
814 )
815 .bind(&id)
816 .bind(log.sprint_id.to_string())
817 .bind(log.sprint_number as i32)
818 .bind(&log.sprint_name)
819 .bind(fmt_dt(&log.started_at))
820 .bind(opt_dt(&log.ended_at))
821 .bind(required_str(&log.status, "sprint_log.status")?)
822 .execute(&mut *conn)
823 .await
824 .map_err(db_err)?;
825 }
826
827 Ok(())
828 }
829
830 async fn write_card_async(&self, card: &Card) -> KanbanResult<()> {
831 let mut tx = self.pool.begin().await.map_err(db_err)?;
832 Self::write_card_with_conn(&mut tx, card).await?;
833 tx.commit().await.map_err(db_err)?;
834 Ok(())
835 }
836
837 async fn fetch_sprint_logs_batch(
838 &self,
839 card_ids: &[String],
840 ) -> KanbanResult<HashMap<String, Vec<SprintLog>>> {
841 if card_ids.is_empty() {
842 return Ok(HashMap::new());
843 }
844 let placeholders = card_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
845 let sql = format!(
846 "SELECT card_id, sprint_id, sprint_number, sprint_name, started_at, ended_at, status
847 FROM sprint_logs WHERE card_id IN ({placeholders}) ORDER BY id"
848 );
849 let mut query = sqlx::query(&sql);
850 for id in card_ids {
851 query = query.bind(id);
852 }
853 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
854 let mut map: HashMap<String, Vec<SprintLog>> = HashMap::new();
855 for row in &rows {
856 let card_id: String = row.try_get("card_id").map_err(db_err)?;
857 let log = row_to_sprint_log(row)?;
858 map.entry(card_id).or_default().push(log);
859 }
860 Ok(map)
861 }
862
863 async fn fetch_cards_with_filter(
864 &self,
865 where_clause: &str,
866 binds: &[String],
867 ) -> KanbanResult<Vec<Card>> {
868 let sql = format!(
869 "SELECT id, column_id, title, description, priority, status, position,
870 due_date, points, card_number, sprint_id, created_at, updated_at, completed_at
871 FROM cards WHERE id NOT IN (SELECT card_id FROM archived_cards) {}
872 ORDER BY position ASC, created_at ASC",
873 where_clause
874 );
875 let mut query = sqlx::query(&sql);
876 for b in binds {
877 query = query.bind(b);
878 }
879 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
880
881 let card_ids: Vec<String> = rows
882 .iter()
883 .map(|r| r.try_get("id").map_err(db_err))
884 .collect::<KanbanResult<_>>()?;
885 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
886
887 let mut cards = Vec::with_capacity(rows.len());
888 for row in &rows {
889 let id_str: String = row.try_get("id").map_err(db_err)?;
890 let logs = logs_map.remove(&id_str).unwrap_or_default();
891 cards.push(row_to_card(row, logs)?);
892 }
893 Ok(cards)
894 }
895
896 async fn get_graph_with_conn(
897 conn: &mut sqlx::SqliteConnection,
898 ) -> KanbanResult<DependencyGraph> {
899 use kanban_core::EdgeBase;
900 use kanban_domain::{BlocksEdge, RelatesEdge, SpawnsEdge};
901
902 let mut spawns: Vec<SpawnsEdge> = Vec::new();
903 for row in
904 sqlx::query("SELECT source_id, target_id, created_at, archived_at FROM spawns_edges")
905 .fetch_all(&mut *conn)
906 .await
907 .map_err(db_err)?
908 {
909 spawns.push(SpawnsEdge {
910 base: row_to_edge_base(&row)?,
911 });
912 }
913
914 let mut blocks: Vec<BlocksEdge> = Vec::new();
915 for row in sqlx::query(
916 "SELECT source_id, target_id, severity, created_at, archived_at FROM blocks_edges",
917 )
918 .fetch_all(&mut *conn)
919 .await
920 .map_err(db_err)?
921 {
922 let severity_str: String = row.try_get("severity").map_err(db_err)?;
923 blocks.push(BlocksEdge {
924 base: row_to_edge_base(&row)?,
925 severity: p_enum(&severity_str, "severity")?,
926 });
927 }
928
929 let mut relates: Vec<RelatesEdge> = Vec::new();
930 for row in sqlx::query(
931 "SELECT source_id, target_id, kind, created_at, archived_at FROM relates_edges",
932 )
933 .fetch_all(&mut *conn)
934 .await
935 .map_err(db_err)?
936 {
937 let kind_str: String = row.try_get("kind").map_err(db_err)?;
938 relates.push(RelatesEdge {
939 base: row_to_edge_base(&row)?,
940 kind: p_enum(&kind_str, "relates kind")?,
941 });
942 }
943
944 let _ = EdgeBase::<uuid::Uuid>::new; DependencyGraph::from_validated_per_kind_edges(spawns, blocks, relates)
946 }
947
948 async fn modify_graph_async(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
949 let mut tx = self.pool.begin().await.map_err(db_err)?;
950 let mut graph = Self::get_graph_with_conn(&mut tx).await?;
951 f(&mut graph)?;
952 Self::write_graph_with_conn(&mut tx, &graph).await?;
953 tx.commit().await.map_err(db_err)?;
954 Ok(())
955 }
956
957 async fn write_graph_with_conn(
958 conn: &mut sqlx::SqliteConnection,
959 graph: &DependencyGraph,
960 ) -> KanbanResult<()> {
961 use kanban_core::Edge as _;
962
963 sqlx::query("DELETE FROM spawns_edges")
964 .execute(&mut *conn)
965 .await
966 .map_err(db_err)?;
967 sqlx::query("DELETE FROM blocks_edges")
968 .execute(&mut *conn)
969 .await
970 .map_err(db_err)?;
971 sqlx::query("DELETE FROM relates_edges")
972 .execute(&mut *conn)
973 .await
974 .map_err(db_err)?;
975
976 for e in graph.spawns_edges() {
977 sqlx::query(
978 "INSERT INTO spawns_edges
979 (source_id, target_id, created_at, archived_at)
980 VALUES (?, ?, ?, ?)",
981 )
982 .bind(e.source().to_string())
983 .bind(e.target().to_string())
984 .bind(fmt_dt(&e.created_at()))
985 .bind(opt_dt(&e.archived_at()))
986 .execute(&mut *conn)
987 .await
988 .map_err(db_err)?;
989 }
990 for e in graph.blocks_edges() {
991 sqlx::query(
992 "INSERT INTO blocks_edges
993 (source_id, target_id, severity, created_at, archived_at)
994 VALUES (?, ?, ?, ?, ?)",
995 )
996 .bind(e.source().to_string())
997 .bind(e.target().to_string())
998 .bind(ser_enum(&e.severity, "severity")?)
999 .bind(fmt_dt(&e.created_at()))
1000 .bind(opt_dt(&e.archived_at()))
1001 .execute(&mut *conn)
1002 .await
1003 .map_err(db_err)?;
1004 }
1005 for e in graph.relates_edges() {
1006 sqlx::query(
1007 "INSERT INTO relates_edges
1008 (source_id, target_id, kind, created_at, archived_at)
1009 VALUES (?, ?, ?, ?, ?)",
1010 )
1011 .bind(e.source().to_string())
1012 .bind(e.target().to_string())
1013 .bind(ser_enum(&e.kind, "relates kind")?)
1014 .bind(fmt_dt(&e.created_at()))
1015 .bind(opt_dt(&e.archived_at()))
1016 .execute(&mut *conn)
1017 .await
1018 .map_err(db_err)?;
1019 }
1020
1021 Ok(())
1022 }
1023
1024 async fn write_graph_async(&self, graph: &DependencyGraph) -> KanbanResult<()> {
1025 let mut tx = self.pool.begin().await.map_err(db_err)?;
1026 Self::write_graph_with_conn(&mut tx, graph).await?;
1027 tx.commit().await.map_err(db_err)?;
1028 Ok(())
1029 }
1030
1031 async fn snapshot_async(&self) -> KanbanResult<Snapshot> {
1032 let boards = self.list_boards_async().await?;
1033 let columns = self.list_all_columns_async().await?;
1034 let cards = self.fetch_cards_with_filter("", &[]).await?;
1035 let archived_cards = self.list_archived_cards_async().await?;
1036 let sprints = self.list_all_sprints_async().await?;
1037 let graph = self.get_graph_async().await?;
1038 Ok(Snapshot::from_data(
1039 boards,
1040 columns,
1041 cards,
1042 archived_cards,
1043 sprints,
1044 graph,
1045 ))
1046 }
1047
1048 async fn apply_snapshot_async(&self, snapshot: Snapshot) -> KanbanResult<()> {
1049 let mut tx = self.pool.begin().await.map_err(db_err)?;
1050
1051 sqlx::query("PRAGMA defer_foreign_keys = ON")
1052 .execute(&mut *tx)
1053 .await
1054 .map_err(db_err)?;
1055
1056 sqlx::query("DELETE FROM spawns_edges")
1057 .execute(&mut *tx)
1058 .await
1059 .map_err(db_err)?;
1060 sqlx::query("DELETE FROM blocks_edges")
1061 .execute(&mut *tx)
1062 .await
1063 .map_err(db_err)?;
1064 sqlx::query("DELETE FROM relates_edges")
1065 .execute(&mut *tx)
1066 .await
1067 .map_err(db_err)?;
1068 sqlx::query("DELETE FROM archived_cards")
1069 .execute(&mut *tx)
1070 .await
1071 .map_err(db_err)?;
1072 sqlx::query("DELETE FROM sprint_logs")
1073 .execute(&mut *tx)
1074 .await
1075 .map_err(db_err)?;
1076 sqlx::query("DELETE FROM cards")
1077 .execute(&mut *tx)
1078 .await
1079 .map_err(db_err)?;
1080 sqlx::query("DELETE FROM sprints")
1081 .execute(&mut *tx)
1082 .await
1083 .map_err(db_err)?;
1084 sqlx::query("DELETE FROM board_sprint_names")
1085 .execute(&mut *tx)
1086 .await
1087 .map_err(db_err)?;
1088 sqlx::query("DELETE FROM board_sprint_counters")
1089 .execute(&mut *tx)
1090 .await
1091 .map_err(db_err)?;
1092 sqlx::query("DELETE FROM columns")
1093 .execute(&mut *tx)
1094 .await
1095 .map_err(db_err)?;
1096 sqlx::query("DELETE FROM boards")
1097 .execute(&mut *tx)
1098 .await
1099 .map_err(db_err)?;
1100
1101 for board in &snapshot.boards {
1102 Self::write_board_with_conn(&mut tx, board).await?;
1103 }
1104 for column in &snapshot.columns {
1105 Self::write_column_with_conn(&mut tx, column).await?;
1106 }
1107 for sprint in &snapshot.sprints {
1108 Self::write_sprint_with_conn(&mut tx, sprint).await?;
1109 }
1110 for card in &snapshot.cards {
1111 Self::write_card_with_conn(&mut tx, card).await?;
1112 }
1113 for ac in &snapshot.archived_cards {
1114 Self::write_archived_card_with_conn(&mut tx, ac).await?;
1115 }
1116 Self::write_graph_with_conn(&mut tx, &snapshot.graph).await?;
1117
1118 tx.commit().await.map_err(db_err)?;
1119 Ok(())
1120 }
1121
1122 async fn fetch_all_board_aux(
1123 &self,
1124 ) -> KanbanResult<(
1125 HashMap<String, Vec<String>>,
1126 HashMap<String, HashMap<String, u32>>,
1127 )> {
1128 let name_rows = sqlx::query(
1129 "SELECT board_id, name FROM board_sprint_names ORDER BY board_id, position",
1130 )
1131 .fetch_all(&self.pool)
1132 .await
1133 .map_err(db_err)?;
1134 let mut names_map: HashMap<String, Vec<String>> = HashMap::new();
1135 for row in &name_rows {
1136 let board_id: String = row.try_get("board_id").map_err(db_err)?;
1137 let name: String = row.try_get("name").map_err(db_err)?;
1138 names_map.entry(board_id).or_default().push(name);
1139 }
1140
1141 let counter_rows =
1142 sqlx::query("SELECT board_id, prefix, counter FROM board_sprint_counters")
1143 .fetch_all(&self.pool)
1144 .await
1145 .map_err(db_err)?;
1146 let mut counters_map: HashMap<String, HashMap<String, u32>> = HashMap::new();
1147 for row in &counter_rows {
1148 let board_id: String = row.try_get("board_id").map_err(db_err)?;
1149 let prefix: String = row.try_get("prefix").map_err(db_err)?;
1150 let counter: i32 = row.try_get("counter").map_err(db_err)?;
1151 counters_map
1152 .entry(board_id)
1153 .or_default()
1154 .insert(prefix, counter as u32);
1155 }
1156
1157 Ok((names_map, counters_map))
1158 }
1159
1160 async fn list_boards_async(&self) -> KanbanResult<Vec<Board>> {
1161 let rows = sqlx::query(
1162 "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1163 task_sort_order, sprint_duration_days, sprint_name_used_count,
1164 next_sprint_number, active_sprint_id, task_list_view,
1165 COALESCE(card_counter, 1) as card_counter,
1166 completion_column_id, position, created_at, updated_at
1167 FROM boards ORDER BY position ASC",
1168 )
1169 .fetch_all(&self.pool)
1170 .await
1171 .map_err(db_err)?;
1172
1173 let (mut names_map, mut counters_map) = self.fetch_all_board_aux().await?;
1174
1175 let mut boards = Vec::with_capacity(rows.len());
1176 for row in &rows {
1177 let id_str: String = row.try_get("id").map_err(db_err)?;
1178 let names = names_map.remove(&id_str).unwrap_or_default();
1179 let counters = counters_map.remove(&id_str).unwrap_or_default();
1180 boards.push(row_to_board(row, names, counters)?);
1181 }
1182 Ok(boards)
1183 }
1184
1185 async fn list_all_columns_async(&self) -> KanbanResult<Vec<Column>> {
1186 let rows = sqlx::query(
1187 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1188 FROM columns ORDER BY position",
1189 )
1190 .fetch_all(&self.pool)
1191 .await
1192 .map_err(db_err)?;
1193 rows.iter().map(row_to_column).collect()
1194 }
1195
1196 async fn list_all_sprints_async(&self) -> KanbanResult<Vec<Sprint>> {
1197 let rows = sqlx::query(
1198 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1199 status, start_date, end_date, created_at, updated_at
1200 FROM sprints ORDER BY sprint_number",
1201 )
1202 .fetch_all(&self.pool)
1203 .await
1204 .map_err(db_err)?;
1205 rows.iter().map(row_to_sprint).collect()
1206 }
1207
1208 async fn list_archived_cards_async(&self) -> KanbanResult<Vec<ArchivedCard>> {
1209 let rows = sqlx::query(
1210 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1211 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1212 c.created_at, c.updated_at, c.completed_at,
1213 ac.archived_at, ac.original_column_id, ac.original_position
1214 FROM archived_cards ac
1215 JOIN cards c ON ac.card_id = c.id
1216 ORDER BY ac.archived_at",
1217 )
1218 .fetch_all(&self.pool)
1219 .await
1220 .map_err(db_err)?;
1221
1222 let card_ids: Vec<String> = rows
1223 .iter()
1224 .map(|r| r.try_get("id").map_err(db_err))
1225 .collect::<KanbanResult<_>>()?;
1226 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1227
1228 let mut result = Vec::with_capacity(rows.len());
1229 for row in &rows {
1230 let id_str: String = row.try_get("id").map_err(db_err)?;
1231 let logs = logs_map.remove(&id_str).unwrap_or_default();
1232 let card = row_to_card(row, logs)?;
1233 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1234 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1235 result.push(ArchivedCard {
1236 card,
1237 archived_at: p_dt(&archived_at_str)?,
1238 original_column_id: p_uuid(&orig_col_str)?,
1239 original_position: row.try_get("original_position").map_err(db_err)?,
1240 });
1241 }
1242 Ok(result)
1243 }
1244
1245 async fn get_graph_async(&self) -> KanbanResult<DependencyGraph> {
1246 let mut tx = self.pool.begin().await.map_err(db_err)?;
1252 let graph = Self::get_graph_with_conn(&mut tx).await?;
1253 tx.commit().await.map_err(db_err)?;
1254 Ok(graph)
1255 }
1256
1257 async fn write_column_with_conn(
1258 conn: &mut sqlx::SqliteConnection,
1259 column: &Column,
1260 ) -> KanbanResult<()> {
1261 sqlx::query(
1262 "INSERT INTO columns (id, board_id, name, position, wip_limit, created_at, updated_at)
1263 VALUES (?, ?, ?, ?, ?, ?, ?)
1264 ON CONFLICT(id) DO UPDATE SET
1265 board_id=excluded.board_id, name=excluded.name,
1266 position=excluded.position, wip_limit=excluded.wip_limit,
1267 updated_at=excluded.updated_at",
1268 )
1269 .bind(column.id.to_string())
1270 .bind(column.board_id.to_string())
1271 .bind(required_str(&column.name, "column.name")?)
1272 .bind(column.position)
1273 .bind(column.wip_limit)
1274 .bind(fmt_dt(&column.created_at))
1275 .bind(fmt_dt(&column.updated_at))
1276 .execute(&mut *conn)
1277 .await
1278 .map_err(db_err)?;
1279 Ok(())
1280 }
1281
1282 async fn write_column_async(&self, column: &Column) -> KanbanResult<()> {
1283 Self::write_column_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, column).await
1284 }
1285
1286 async fn write_sprint_with_conn(
1287 conn: &mut sqlx::SqliteConnection,
1288 sprint: &Sprint,
1289 ) -> KanbanResult<()> {
1290 sqlx::query(
1291 "INSERT INTO sprints (id, board_id, sprint_number, name_index, prefix, card_prefix,
1292 status, start_date, end_date, created_at, updated_at)
1293 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1294 ON CONFLICT(id) DO UPDATE SET
1295 board_id=excluded.board_id, sprint_number=excluded.sprint_number,
1296 name_index=excluded.name_index, prefix=excluded.prefix,
1297 card_prefix=excluded.card_prefix, status=excluded.status,
1298 start_date=excluded.start_date, end_date=excluded.end_date,
1299 updated_at=excluded.updated_at",
1300 )
1301 .bind(sprint.id.to_string())
1302 .bind(sprint.board_id.to_string())
1303 .bind(sprint.sprint_number as i32)
1304 .bind(sprint.name_index.map(|v| v as i32))
1305 .bind(&sprint.prefix)
1306 .bind(&sprint.card_prefix)
1307 .bind(format!("{:?}", sprint.status))
1308 .bind(opt_dt(&sprint.start_date))
1309 .bind(opt_dt(&sprint.end_date))
1310 .bind(fmt_dt(&sprint.created_at))
1311 .bind(fmt_dt(&sprint.updated_at))
1312 .execute(&mut *conn)
1313 .await
1314 .map_err(db_err)?;
1315 Ok(())
1316 }
1317
1318 async fn write_sprint_async(&self, sprint: &Sprint) -> KanbanResult<()> {
1319 Self::write_sprint_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, sprint).await
1320 }
1321
1322 async fn write_archived_card_with_conn(
1323 conn: &mut sqlx::SqliteConnection,
1324 ac: &ArchivedCard,
1325 ) -> KanbanResult<()> {
1326 Self::write_card_with_conn(conn, &ac.card).await?;
1327 sqlx::query(
1328 "INSERT INTO archived_cards (card_id, archived_at, original_column_id, original_position)
1329 VALUES (?, ?, ?, ?)
1330 ON CONFLICT(card_id) DO UPDATE SET
1331 archived_at=excluded.archived_at,
1332 original_column_id=excluded.original_column_id,
1333 original_position=excluded.original_position",
1334 )
1335 .bind(ac.card.id.to_string())
1336 .bind(fmt_dt(&ac.archived_at))
1337 .bind(ac.original_column_id.to_string())
1338 .bind(ac.original_position)
1339 .execute(&mut *conn)
1340 .await
1341 .map_err(db_err)?;
1342 Ok(())
1343 }
1344
1345 async fn write_archived_card_async(&self, ac: &ArchivedCard) -> KanbanResult<()> {
1346 let mut tx = self.pool.begin().await.map_err(db_err)?;
1347 Self::write_archived_card_with_conn(&mut tx, ac).await?;
1348 tx.commit().await.map_err(db_err)?;
1349 Ok(())
1350 }
1351}
1352
1353impl DataStore for SqliteStore {
1354 fn get_board(&self, id: Uuid) -> KanbanResult<Option<Board>> {
1357 run(async {
1358 let id_str = id.to_string();
1359 let row = sqlx::query(
1360 "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1361 task_sort_order, sprint_duration_days, sprint_name_used_count,
1362 next_sprint_number, active_sprint_id, task_list_view,
1363 COALESCE(card_counter, 1) as card_counter,
1364 completion_column_id, position, created_at, updated_at
1365 FROM boards WHERE id = ?",
1366 )
1367 .bind(&id_str)
1368 .fetch_optional(&self.pool)
1369 .await
1370 .map_err(db_err)?;
1371
1372 match row {
1373 Some(row) => {
1374 let (names, counters) = self.fetch_board_aux(&id_str).await?;
1375 Ok(Some(row_to_board(&row, names, counters)?))
1376 }
1377 None => Ok(None),
1378 }
1379 })
1380 }
1381
1382 fn list_boards(&self) -> KanbanResult<Vec<Board>> {
1383 run(self.list_boards_async())
1384 }
1385
1386 fn upsert_board(&self, board: Board) -> KanbanResult<()> {
1387 run(self.write_board_async(&board))
1388 }
1389
1390 fn delete_board(&self, id: Uuid) -> KanbanResult<()> {
1391 run(async {
1392 sqlx::query("DELETE FROM boards WHERE id = ?")
1393 .bind(id.to_string())
1394 .execute(&self.pool)
1395 .await
1396 .map_err(db_err)?;
1397 Ok(())
1398 })
1399 }
1400
1401 fn get_column(&self, id: Uuid) -> KanbanResult<Option<Column>> {
1404 run(async {
1405 let row = sqlx::query(
1406 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1407 FROM columns WHERE id = ?",
1408 )
1409 .bind(id.to_string())
1410 .fetch_optional(&self.pool)
1411 .await
1412 .map_err(db_err)?;
1413 row.as_ref().map(row_to_column).transpose()
1414 })
1415 }
1416
1417 fn list_columns_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Column>> {
1418 run(async {
1419 let rows = sqlx::query(
1420 "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1421 FROM columns WHERE board_id = ? ORDER BY position",
1422 )
1423 .bind(board_id.to_string())
1424 .fetch_all(&self.pool)
1425 .await
1426 .map_err(db_err)?;
1427 rows.iter().map(row_to_column).collect()
1428 })
1429 }
1430
1431 fn list_all_columns(&self) -> KanbanResult<Vec<Column>> {
1432 run(self.list_all_columns_async())
1433 }
1434
1435 fn upsert_column(&self, column: Column) -> KanbanResult<()> {
1436 run(self.write_column_async(&column))
1437 }
1438
1439 fn delete_column(&self, id: Uuid) -> KanbanResult<()> {
1440 run(async {
1441 sqlx::query("DELETE FROM columns WHERE id = ?")
1442 .bind(id.to_string())
1443 .execute(&self.pool)
1444 .await
1445 .map_err(db_err)?;
1446 Ok(())
1447 })
1448 }
1449
1450 fn delete_columns_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1451 run(async {
1452 sqlx::query("DELETE FROM columns WHERE board_id = ?")
1453 .bind(board_id.to_string())
1454 .execute(&self.pool)
1455 .await
1456 .map_err(db_err)?;
1457 Ok(())
1458 })
1459 }
1460
1461 fn get_card(&self, id: Uuid) -> KanbanResult<Option<Card>> {
1464 run(async {
1465 let id_str = id.to_string();
1466 let row = sqlx::query(
1467 "SELECT id, column_id, title, description, priority, status, position,
1468 due_date, points, card_number, sprint_id, created_at, updated_at,
1469 completed_at
1470 FROM cards
1471 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1472 )
1473 .bind(&id_str)
1474 .fetch_optional(&self.pool)
1475 .await
1476 .map_err(db_err)?;
1477
1478 match row {
1479 Some(row) => {
1480 let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1481 Ok(Some(row_to_card(&row, logs)?))
1482 }
1483 None => Ok(None),
1484 }
1485 })
1486 }
1487
1488 fn list_all_cards(&self) -> KanbanResult<Vec<Card>> {
1489 run(self.fetch_cards_with_filter("", &[]))
1490 }
1491
1492 fn list_cards_by_column(&self, column_id: Uuid) -> KanbanResult<Vec<Card>> {
1493 run(self.fetch_cards_with_filter("AND column_id = ?", &[column_id.to_string()]))
1494 }
1495
1496 fn list_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<Vec<Card>> {
1497 if column_ids.is_empty() {
1498 return Ok(Vec::new());
1499 }
1500 let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1501 let where_clause = format!("AND column_id IN ({placeholders})");
1502 let binds: Vec<String> = column_ids.iter().map(|id| id.to_string()).collect();
1503 run(self.fetch_cards_with_filter(&where_clause, &binds))
1504 }
1505
1506 fn list_cards_by_sprint(&self, sprint_id: Uuid) -> KanbanResult<Vec<Card>> {
1507 run(self.fetch_cards_with_filter("AND sprint_id = ?", &[sprint_id.to_string()]))
1508 }
1509
1510 fn count_cards_in_column(&self, column_id: Uuid) -> KanbanResult<usize> {
1511 run(async {
1512 let row = sqlx::query(
1513 "SELECT COUNT(*) as cnt FROM cards
1514 WHERE column_id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1515 )
1516 .bind(column_id.to_string())
1517 .fetch_one(&self.pool)
1518 .await
1519 .map_err(db_err)?;
1520 Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1521 })
1522 }
1523
1524 fn count_cards_in_column_excluding(
1525 &self,
1526 column_id: Uuid,
1527 exclude: &[Uuid],
1528 ) -> KanbanResult<usize> {
1529 run(async {
1530 if exclude.is_empty() {
1531 return self.count_cards_in_column(column_id);
1532 }
1533 let placeholders = exclude.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1534 let sql = format!(
1535 "SELECT COUNT(*) as cnt FROM cards
1536 WHERE column_id = ?
1537 AND id NOT IN (SELECT card_id FROM archived_cards)
1538 AND id NOT IN ({placeholders})"
1539 );
1540 let mut query = sqlx::query(&sql).bind(column_id.to_string());
1541 for id in exclude {
1542 query = query.bind(id.to_string());
1543 }
1544 let row = query.fetch_one(&self.pool).await.map_err(db_err)?;
1545 Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1546 })
1547 }
1548
1549 fn upsert_card(&self, card: Card) -> KanbanResult<()> {
1550 run(self.write_card_async(&card))
1551 }
1552
1553 fn delete_card(&self, id: Uuid) -> KanbanResult<()> {
1554 run(async {
1555 sqlx::query(
1556 "DELETE FROM cards
1557 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1558 )
1559 .bind(id.to_string())
1560 .execute(&self.pool)
1561 .await
1562 .map_err(db_err)?;
1563 Ok(())
1564 })
1565 }
1566
1567 fn delete_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<()> {
1568 run(async {
1569 if column_ids.is_empty() {
1570 return Ok(());
1571 }
1572 let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1573 let sql = format!(
1574 "DELETE FROM cards
1575 WHERE column_id IN ({placeholders})
1576 AND id NOT IN (SELECT card_id FROM archived_cards)"
1577 );
1578 let mut query = sqlx::query(&sql);
1579 for id in column_ids {
1580 query = query.bind(id.to_string());
1581 }
1582 query.execute(&self.pool).await.map_err(db_err)?;
1583 Ok(())
1584 })
1585 }
1586
1587 fn clear_sprint_from_cards(
1588 &self,
1589 sprint_id: Uuid,
1590 timestamp: DateTime<Utc>,
1591 ) -> KanbanResult<()> {
1592 run(async {
1593 let now = fmt_dt(×tamp);
1594 sqlx::query(
1595 "UPDATE cards SET sprint_id = NULL, updated_at = ?
1596 WHERE sprint_id = ?
1597 AND id NOT IN (SELECT card_id FROM archived_cards)",
1598 )
1599 .bind(&now)
1600 .bind(sprint_id.to_string())
1601 .execute(&self.pool)
1602 .await
1603 .map_err(db_err)?;
1604 Ok(())
1605 })
1606 }
1607
1608 fn get_archived_card(&self, card_id: Uuid) -> KanbanResult<Option<ArchivedCard>> {
1611 run(async {
1612 let id_str = card_id.to_string();
1613 let row = sqlx::query(
1614 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1615 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1616 c.created_at, c.updated_at, c.completed_at,
1617 ac.archived_at, ac.original_column_id, ac.original_position
1618 FROM archived_cards ac
1619 JOIN cards c ON ac.card_id = c.id
1620 WHERE ac.card_id = ?",
1621 )
1622 .bind(&id_str)
1623 .fetch_optional(&self.pool)
1624 .await
1625 .map_err(db_err)?;
1626
1627 match row {
1628 Some(row) => {
1629 let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1630 let card = row_to_card(&row, logs)?;
1631 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1632 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1633 Ok(Some(ArchivedCard {
1634 card,
1635 archived_at: p_dt(&archived_at_str)?,
1636 original_column_id: p_uuid(&orig_col_str)?,
1637 original_position: row.try_get("original_position").map_err(db_err)?,
1638 }))
1639 }
1640 None => Ok(None),
1641 }
1642 })
1643 }
1644
1645 fn list_archived_cards(&self) -> KanbanResult<Vec<ArchivedCard>> {
1646 run(self.list_archived_cards_async())
1647 }
1648
1649 fn insert_archived_card(&self, ac: ArchivedCard) -> KanbanResult<()> {
1650 run(self.write_archived_card_async(&ac))
1651 }
1652
1653 fn delete_archived_card(&self, card_id: Uuid) -> KanbanResult<()> {
1654 run(async {
1655 let mut tx = self.pool.begin().await.map_err(db_err)?;
1656 sqlx::query("DELETE FROM archived_cards WHERE card_id = ?")
1657 .bind(card_id.to_string())
1658 .execute(&mut *tx)
1659 .await
1660 .map_err(db_err)?;
1661 sqlx::query("DELETE FROM cards WHERE id = ?")
1662 .bind(card_id.to_string())
1663 .execute(&mut *tx)
1664 .await
1665 .map_err(db_err)?;
1666 tx.commit().await.map_err(db_err)
1667 })
1668 }
1669
1670 fn list_archived_cards_by_columns(
1671 &self,
1672 column_ids: &[Uuid],
1673 ) -> KanbanResult<Vec<ArchivedCard>> {
1674 if column_ids.is_empty() {
1675 return Ok(Vec::new());
1676 }
1677 run(async {
1678 let placeholders: Vec<&str> = column_ids.iter().map(|_| "?").collect();
1679 let sql = format!(
1680 "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1681 c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1682 c.created_at, c.updated_at, c.completed_at,
1683 ac.archived_at, ac.original_column_id, ac.original_position
1684 FROM archived_cards ac
1685 JOIN cards c ON ac.card_id = c.id
1686 WHERE ac.original_column_id IN ({})
1687 ORDER BY ac.archived_at",
1688 placeholders.join(", ")
1689 );
1690 let mut query = sqlx::query(&sql);
1691 for id in column_ids {
1692 query = query.bind(id.to_string());
1693 }
1694 let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
1695
1696 let card_ids: Vec<String> = rows
1697 .iter()
1698 .map(|r| r.try_get("id").map_err(db_err))
1699 .collect::<KanbanResult<_>>()?;
1700 let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1701
1702 let mut result = Vec::with_capacity(rows.len());
1703 for row in &rows {
1704 let id_str: String = row.try_get("id").map_err(db_err)?;
1705 let logs = logs_map.remove(&id_str).unwrap_or_default();
1706 let card = row_to_card(row, logs)?;
1707 let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1708 let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1709 result.push(ArchivedCard {
1710 card,
1711 archived_at: p_dt(&archived_at_str)?,
1712 original_column_id: p_uuid(&orig_col_str)?,
1713 original_position: row.try_get("original_position").map_err(db_err)?,
1714 });
1715 }
1716 Ok(result)
1717 })
1718 }
1719
1720 fn clear_sprint_from_archived_cards(
1721 &self,
1722 sprint_id: Uuid,
1723 timestamp: DateTime<Utc>,
1724 ) -> KanbanResult<()> {
1725 run(async {
1726 let now = fmt_dt(×tamp);
1727 sqlx::query(
1728 "UPDATE cards SET sprint_id = NULL, updated_at = ?
1729 WHERE sprint_id = ?
1730 AND id IN (SELECT card_id FROM archived_cards)",
1731 )
1732 .bind(&now)
1733 .bind(sprint_id.to_string())
1734 .execute(&self.pool)
1735 .await
1736 .map_err(db_err)?;
1737 Ok(())
1738 })
1739 }
1740
1741 fn get_sprint(&self, id: Uuid) -> KanbanResult<Option<Sprint>> {
1744 run(async {
1745 let row = sqlx::query(
1746 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1747 status, start_date, end_date, created_at, updated_at
1748 FROM sprints WHERE id = ?",
1749 )
1750 .bind(id.to_string())
1751 .fetch_optional(&self.pool)
1752 .await
1753 .map_err(db_err)?;
1754 row.as_ref().map(row_to_sprint).transpose()
1755 })
1756 }
1757
1758 fn list_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Sprint>> {
1759 run(async {
1760 let rows = sqlx::query(
1761 "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1762 status, start_date, end_date, created_at, updated_at
1763 FROM sprints WHERE board_id = ? ORDER BY sprint_number",
1764 )
1765 .bind(board_id.to_string())
1766 .fetch_all(&self.pool)
1767 .await
1768 .map_err(db_err)?;
1769 rows.iter().map(row_to_sprint).collect()
1770 })
1771 }
1772
1773 fn list_all_sprints(&self) -> KanbanResult<Vec<Sprint>> {
1774 run(self.list_all_sprints_async())
1775 }
1776
1777 fn upsert_sprint(&self, sprint: Sprint) -> KanbanResult<()> {
1778 run(self.write_sprint_async(&sprint))
1779 }
1780
1781 fn delete_sprint(&self, id: Uuid) -> KanbanResult<()> {
1782 run(async {
1783 sqlx::query("DELETE FROM sprints WHERE id = ?")
1784 .bind(id.to_string())
1785 .execute(&self.pool)
1786 .await
1787 .map_err(db_err)?;
1788 Ok(())
1789 })
1790 }
1791
1792 fn delete_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1793 run(async {
1794 sqlx::query("DELETE FROM sprints WHERE board_id = ?")
1795 .bind(board_id.to_string())
1796 .execute(&self.pool)
1797 .await
1798 .map_err(db_err)?;
1799 Ok(())
1800 })
1801 }
1802
1803 fn get_graph(&self) -> KanbanResult<DependencyGraph> {
1806 run(self.get_graph_async())
1807 }
1808
1809 fn set_graph(&self, graph: DependencyGraph) -> KanbanResult<()> {
1810 run(self.write_graph_async(&graph))
1811 }
1812
1813 fn modify_graph(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
1814 run(self.modify_graph_async(f))
1815 }
1816
1817 fn snapshot(&self) -> KanbanResult<Snapshot> {
1820 run(self.snapshot_async())
1821 }
1822
1823 fn apply_snapshot(&self, snapshot: Snapshot) -> KanbanResult<()> {
1824 run(self.apply_snapshot_async(snapshot))
1825 }
1826}
1827
1828#[async_trait::async_trait]
1829impl PersistenceStore for SqliteStore {
1830 async fn save(&self, snapshot: StoreSnapshot) -> PersistenceResult<PersistenceMetadata> {
1831 let domain_snapshot: Snapshot = serde_json::from_slice(&snapshot.data)
1832 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1833 self.apply_snapshot_async(domain_snapshot)
1834 .await
1835 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1836
1837 let saved_at = self
1838 .stamp_writer()
1839 .await
1840 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1841 self.checkpoint()
1842 .await
1843 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1844
1845 Ok(PersistenceMetadata {
1852 instance_id: self.instance_id,
1853 saved_at,
1854 writer_version: Some(kanban_core::KANBAN_VERSION.to_string()),
1855 writer_commit: Some(kanban_core::KANBAN_COMMIT.to_string()),
1856 format_version: Some(SUPPORTED_SCHEMA_VERSION),
1857 })
1858 }
1859
1860 async fn load(&self) -> PersistenceResult<(StoreSnapshot, PersistenceMetadata)> {
1861 let domain_snapshot = self
1862 .snapshot_async()
1863 .await
1864 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1865 let data = serde_json::to_vec(&domain_snapshot)
1866 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1867
1868 let row: (String, Option<String>, Option<String>, u32) = sqlx::query_as(
1869 "SELECT saved_at, writer_version, writer_commit, schema_version \
1870 FROM metadata WHERE id = 1",
1871 )
1872 .fetch_one(&self.pool)
1873 .await
1874 .map_err(|e| PersistenceError::Database(e.to_string()))?;
1875 let saved_at = DateTime::parse_from_rfc3339(&row.0)
1876 .map_err(|e| PersistenceError::Serialization(e.to_string()))?
1877 .with_timezone(&Utc);
1878 let meta = PersistenceMetadata {
1879 instance_id: self.instance_id,
1880 saved_at,
1881 writer_version: row.1,
1882 writer_commit: row.2,
1883 format_version: Some(row.3),
1884 };
1885 Ok((
1886 StoreSnapshot {
1887 data,
1888 metadata: meta.clone(),
1889 },
1890 meta,
1891 ))
1892 }
1893
1894 async fn exists(&self) -> bool {
1895 self.path.exists()
1896 }
1897
1898 fn path(&self) -> &std::path::Path {
1899 &self.path
1900 }
1901
1902 fn instance_id(&self) -> Uuid {
1903 self.instance_id
1904 }
1905
1906 async fn close(&self) {
1907 self.pool.close().await;
1908 }
1909}
1910
1911#[cfg(test)]
1912mod tests {
1913 use super::*;
1914 use tempfile::TempDir;
1915
1916 fn make_rt() -> tokio::runtime::Runtime {
1917 tokio::runtime::Builder::new_multi_thread()
1918 .enable_all()
1919 .build()
1920 .unwrap()
1921 }
1922
1923 #[test]
1924 fn test_sqlitestore_path_is_preserved() {
1925 let dir = TempDir::new().unwrap();
1926 let path = dir.path().join("test.db");
1927 let rt = make_rt();
1928 let store = rt.block_on(SqliteStore::open(&path)).unwrap();
1929 assert_eq!(store.path(), path.as_path());
1930 }
1931
1932 #[test]
1933 fn test_sqlitestore_instance_id_persists_across_reopen() {
1934 let dir = TempDir::new().unwrap();
1935 let path = dir.path().join("test.db");
1936 let rt = make_rt();
1937 let id1 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1938 let id2 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1939 assert_eq!(id1, id2, "instance_id must be stable across reopens");
1940 }
1941
1942 #[test]
1943 fn test_sqlitestore_persistence_save_load_roundtrip() {
1944 use kanban_domain::{Board, DependencyGraph};
1945 use kanban_persistence::snapshot_to_json_bytes;
1946
1947 let dir = TempDir::new().unwrap();
1948 let path = dir.path().join("test.db");
1949 let rt = make_rt();
1950
1951 rt.block_on(async {
1952 let store = SqliteStore::open(&path).await.unwrap();
1953 let board = Board::new("Test Board", None::<String>);
1954 let snapshot = Snapshot::from_data(
1955 vec![board],
1956 vec![],
1957 vec![],
1958 vec![],
1959 vec![],
1960 DependencyGraph::new(),
1961 );
1962 let data = snapshot_to_json_bytes(&snapshot).unwrap();
1963 let meta = PersistenceMetadata::new(store.instance_id());
1964 let store_snap = StoreSnapshot {
1965 data,
1966 metadata: meta,
1967 };
1968
1969 PersistenceStore::save(&store, store_snap).await.unwrap();
1970
1971 let (loaded_snap, _meta) = PersistenceStore::load(&store).await.unwrap();
1972 let loaded: Snapshot = serde_json::from_slice(&loaded_snap.data).unwrap();
1973 assert_eq!(loaded.boards.len(), 1);
1974 assert_eq!(loaded.boards[0].name, "Test Board");
1975 });
1976 }
1977
1978 #[test]
1979 fn test_sqlitestore_exists_returns_true_after_open() {
1980 let dir = TempDir::new().unwrap();
1981 let path = dir.path().join("test.db");
1982 let rt = make_rt();
1983 rt.block_on(async {
1984 let store = SqliteStore::open(&path).await.unwrap();
1985 assert!(PersistenceStore::exists(&store).await);
1986 });
1987 }
1988
1989 #[test]
1990 fn test_checkpoint_executes_without_error() {
1991 let dir = TempDir::new().unwrap();
1992 let path = dir.path().join("test.sqlite3");
1993 let rt = make_rt();
1994 rt.block_on(async {
1995 let store = SqliteStore::open(&path).await.unwrap();
1996 store.checkpoint().await.unwrap();
1997 });
1998 }
1999
2000 #[test]
2001 fn test_save_checkpoints_wal_file_stays_minimal() {
2002 let dir = TempDir::new().unwrap();
2003 let path = dir.path().join("test.sqlite3");
2004 let rt = make_rt();
2005 rt.block_on(async {
2006 let store = SqliteStore::open(&path).await.unwrap();
2007 let (snapshot, _) = PersistenceStore::load(&store).await.unwrap();
2008 PersistenceStore::save(&store, snapshot).await.unwrap();
2009 let wal_path = path.with_extension("sqlite3-wal");
2010 if wal_path.exists() {
2011 assert!(
2012 wal_path.metadata().unwrap().len() < 32 * 1024,
2013 "WAL file should be minimal after save+checkpoint"
2014 );
2015 }
2016 });
2017 }
2018
2019 #[test]
2024 fn test_blocks_edges_rejects_unknown_severity() {
2025 let dir = TempDir::new().unwrap();
2026 let path = dir.path().join("check.sqlite3");
2027 let rt = make_rt();
2028 rt.block_on(async {
2029 let store = SqliteStore::open(&path).await.unwrap();
2030 let insert = sqlx::query(
2031 "INSERT INTO blocks_edges
2032 (source_id, target_id, severity, created_at, archived_at)
2033 VALUES (?, ?, 'Catastrophic', ?, NULL)",
2034 )
2035 .bind(uuid::Uuid::nil().to_string())
2036 .bind(uuid::Uuid::from_u128(0x42).to_string())
2037 .bind(chrono::Utc::now().to_rfc3339())
2038 .execute(store.pool())
2039 .await;
2040 assert!(
2041 insert.is_err(),
2042 "CHECK on severity must reject 'Catastrophic'; got {:?}",
2043 insert
2044 );
2045 });
2046 }
2047
2048 #[test]
2049 fn test_relates_edges_rejects_unknown_kind() {
2050 let dir = TempDir::new().unwrap();
2051 let path = dir.path().join("check_relates.sqlite3");
2052 let rt = make_rt();
2053 rt.block_on(async {
2054 let store = SqliteStore::open(&path).await.unwrap();
2055 let insert = sqlx::query(
2056 "INSERT INTO relates_edges
2057 (source_id, target_id, kind, created_at, archived_at)
2058 VALUES (?, ?, 'Unknown', ?, NULL)",
2059 )
2060 .bind(uuid::Uuid::nil().to_string())
2061 .bind(uuid::Uuid::from_u128(0x42).to_string())
2062 .bind(chrono::Utc::now().to_rfc3339())
2063 .execute(store.pool())
2064 .await;
2065 assert!(insert.is_err());
2066 });
2067 }
2068
2069 #[test]
2074 fn test_open_drops_legacy_card_edges_table() {
2075 let dir = TempDir::new().unwrap();
2076 let path = dir.path().join("legacy.sqlite3");
2077 let rt = make_rt();
2078 rt.block_on(async {
2079 let pool = sqlx::sqlite::SqlitePoolOptions::new()
2083 .max_connections(1)
2084 .connect_with(
2085 sqlx::sqlite::SqliteConnectOptions::new()
2086 .filename(&path)
2087 .create_if_missing(true),
2088 )
2089 .await
2090 .unwrap();
2091 sqlx::raw_sql(
2092 "CREATE TABLE card_edges (
2093 source_id TEXT NOT NULL,
2094 target_id TEXT NOT NULL,
2095 edge_type TEXT NOT NULL,
2096 direction TEXT NOT NULL,
2097 weight REAL,
2098 created_at TEXT NOT NULL,
2099 archived_at TEXT,
2100 PRIMARY KEY (source_id, target_id, edge_type)
2101 )",
2102 )
2103 .execute(&pool)
2104 .await
2105 .unwrap();
2106 drop(pool);
2107
2108 let store = SqliteStore::open(&path).await.unwrap();
2110 let has_card_edges: bool = sqlx::query_scalar(
2111 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='card_edges'",
2112 )
2113 .fetch_one(store.pool())
2114 .await
2115 .unwrap();
2116 assert!(!has_card_edges, "legacy card_edges table must be dropped");
2117
2118 for table in ["spawns_edges", "blocks_edges", "relates_edges"] {
2119 let has: bool = sqlx::query_scalar(&format!(
2120 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='{}'",
2121 table
2122 ))
2123 .fetch_one(store.pool())
2124 .await
2125 .unwrap();
2126 assert!(has, "{table} must exist after open");
2127 }
2128 });
2129 }
2130
2131 #[test]
2132 fn test_delete_archived_card_orphaned_cards_row_is_still_cleaned_up() {
2133 use kanban_domain::data_store::DataStore;
2134 let dir = TempDir::new().unwrap();
2135 let path = dir.path().join("test.sqlite3");
2136 let rt = make_rt();
2137 rt.block_on(async {
2138 let store = SqliteStore::open(&path).await.unwrap();
2139
2140 let mut board = kanban_domain::Board::new("B", None::<String>);
2141 let column = kanban_domain::Column::new(board.id, "Col", 0);
2142 let card = kanban_domain::Card::new(&mut board, column.id, "Task", 0);
2143 let card_id = card.id;
2144 let column_id = column.id;
2145 store.upsert_board(board).unwrap();
2146 store.upsert_column(column).unwrap();
2147 store.upsert_card(card.clone()).unwrap();
2148
2149 let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
2152 store.insert_archived_card(archived).unwrap();
2153
2154 store.delete_archived_card(card_id).unwrap();
2155
2156 assert!(
2157 store.list_archived_cards().unwrap().is_empty(),
2158 "card should be gone from archived_cards"
2159 );
2160 assert!(
2161 store.list_all_cards().unwrap().is_empty(),
2162 "orphaned cards row should also be removed by delete_archived_card"
2163 );
2164 });
2165 }
2166
2167 #[test]
2168 fn test_delete_archived_card_removes_from_cards_table() {
2169 use kanban_domain::data_store::DataStore;
2170 let dir = TempDir::new().unwrap();
2171 let path = dir.path().join("test.sqlite3");
2172 let rt = make_rt();
2173 rt.block_on(async {
2174 let store = SqliteStore::open(&path).await.unwrap();
2175
2176 let mut board = kanban_domain::Board::new("B", None::<String>);
2177 let column = kanban_domain::Column::new(board.id, "Col", 0);
2178 let card = kanban_domain::Card::new(&mut board, column.id, "Task", 0);
2179 let card_id = card.id;
2180 let column_id = column.id;
2181 store.upsert_board(board).unwrap();
2182 store.upsert_column(column).unwrap();
2183 store.upsert_card(card.clone()).unwrap();
2184
2185 let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
2186 store.insert_archived_card(archived).unwrap();
2187 store.delete_card(card_id).unwrap();
2188
2189 assert_eq!(store.list_archived_cards().unwrap().len(), 1);
2190
2191 store.delete_archived_card(card_id).unwrap();
2192
2193 assert!(
2194 store.list_archived_cards().unwrap().is_empty(),
2195 "card should be gone from archived_cards"
2196 );
2197 assert!(
2198 store.list_all_cards().unwrap().is_empty(),
2199 "card should also be gone from cards table, not restored as active"
2200 );
2201 assert!(
2202 store.get_card(card_id).unwrap().is_none(),
2203 "get_card should return None for permanently deleted card"
2204 );
2205 });
2206 }
2207
2208 #[test]
2209 fn test_read_metadata_sync_reflects_actual_schema_version_from_db_row() {
2210 let dir = TempDir::new().unwrap();
2216 let path = dir.path().join("drift.db");
2217 let rt = make_rt();
2218 rt.block_on(async {
2219 let store = SqliteStore::open(&path).await.unwrap();
2220 sqlx::query("UPDATE metadata SET schema_version = 1 WHERE id = 1")
2221 .execute(&store.pool)
2222 .await
2223 .unwrap();
2224
2225 let meta = store
2226 .read_metadata_sync()
2227 .unwrap()
2228 .expect("metadata row should exist");
2229 assert_eq!(
2230 meta.format_version,
2231 Some(1),
2232 "format_version must reflect the DB row (1), not the binary's SUPPORTED ({SUPPORTED_SCHEMA_VERSION})"
2233 );
2234 });
2235 }
2236
2237 #[test]
2238 fn test_load_reports_actual_schema_version_in_metadata() {
2239 use kanban_domain::DependencyGraph;
2240 use kanban_persistence::snapshot_to_json_bytes;
2241
2242 let dir = TempDir::new().unwrap();
2243 let path = dir.path().join("load_drift.db");
2244 let rt = make_rt();
2245 rt.block_on(async {
2246 let store = SqliteStore::open(&path).await.unwrap();
2247 let snapshot = Snapshot::from_data(
2249 vec![],
2250 vec![],
2251 vec![],
2252 vec![],
2253 vec![],
2254 DependencyGraph::new(),
2255 );
2256 let data = snapshot_to_json_bytes(&snapshot).unwrap();
2257 PersistenceStore::save(
2258 &store,
2259 StoreSnapshot {
2260 data,
2261 metadata: PersistenceMetadata::new(store.instance_id()),
2262 },
2263 )
2264 .await
2265 .unwrap();
2266 sqlx::query("UPDATE metadata SET schema_version = 1 WHERE id = 1")
2267 .execute(&store.pool)
2268 .await
2269 .unwrap();
2270
2271 let (_, meta) = PersistenceStore::load(&store).await.unwrap();
2272 assert_eq!(
2273 meta.format_version,
2274 Some(1),
2275 "load() must report the DB's actual schema_version, not the const"
2276 );
2277 });
2278 }
2279
2280 #[test]
2281 fn test_stamp_writer_updates_metadata_row_and_returns_timestamp() {
2282 let dir = TempDir::new().unwrap();
2283 let path = dir.path().join("stamped.db");
2284 let rt = make_rt();
2285 rt.block_on(async {
2286 let store = SqliteStore::open(&path).await.unwrap();
2287 sqlx::query(
2289 "UPDATE metadata SET writer_version = NULL, writer_commit = NULL WHERE id = 1",
2290 )
2291 .execute(&store.pool)
2292 .await
2293 .unwrap();
2294
2295 let before = Utc::now();
2296 let stamped_at = store.stamp_writer().await.unwrap();
2297 let after = Utc::now();
2298
2299 assert!(
2300 stamped_at >= before && stamped_at <= after,
2301 "returned timestamp must be in [before, after]: {stamped_at:?}"
2302 );
2303
2304 let (saved_at_str, wv, wc): (String, Option<String>, Option<String>) = sqlx::query_as(
2305 "SELECT saved_at, writer_version, writer_commit FROM metadata WHERE id = 1",
2306 )
2307 .fetch_one(&store.pool)
2308 .await
2309 .unwrap();
2310 assert_eq!(wv.as_deref(), Some(kanban_core::KANBAN_VERSION));
2311 assert_eq!(wc.as_deref(), Some(kanban_core::KANBAN_COMMIT));
2312 assert_eq!(saved_at_str, stamped_at.to_rfc3339());
2313 });
2314 }
2315
2316 #[test]
2317 fn test_checkpoint_alone_does_not_stamp_writer_fields() {
2318 let dir = TempDir::new().unwrap();
2319 let path = dir.path().join("ckpt_only.db");
2320 let rt = make_rt();
2321 rt.block_on(async {
2322 let store = SqliteStore::open(&path).await.unwrap();
2323 sqlx::query(
2324 "UPDATE metadata SET writer_version = NULL, writer_commit = NULL WHERE id = 1",
2325 )
2326 .execute(&store.pool)
2327 .await
2328 .unwrap();
2329
2330 store.checkpoint().await.unwrap();
2331
2332 let (wv, wc): (Option<String>, Option<String>) =
2333 sqlx::query_as("SELECT writer_version, writer_commit FROM metadata WHERE id = 1")
2334 .fetch_one(&store.pool)
2335 .await
2336 .unwrap();
2337 assert!(
2338 wv.is_none() && wc.is_none(),
2339 "checkpoint() must be WAL-only post-split; got wv={wv:?} wc={wc:?}"
2340 );
2341 });
2342 }
2343
2344 #[test]
2345 fn test_fresh_db_records_schema_version_2() {
2346 let dir = TempDir::new().unwrap();
2347 let path = dir.path().join("fresh.db");
2348 let rt = make_rt();
2349 rt.block_on(async {
2350 let store = SqliteStore::open(&path).await.unwrap();
2351 let version: u32 =
2352 sqlx::query_scalar("SELECT schema_version FROM metadata WHERE id = 1")
2353 .fetch_one(&store.pool)
2354 .await
2355 .unwrap();
2356 assert_eq!(version, SUPPORTED_SCHEMA_VERSION);
2357 });
2358 }
2359
2360 #[test]
2361 fn test_fresh_db_has_writer_version_and_writer_commit_columns() {
2362 let dir = TempDir::new().unwrap();
2363 let path = dir.path().join("fresh.db");
2364 let rt = make_rt();
2365 rt.block_on(async {
2366 let store = SqliteStore::open(&path).await.unwrap();
2367 for col in ["writer_version", "writer_commit"] {
2368 let exists: bool = sqlx::query_scalar(&format!(
2369 "SELECT COUNT(*) > 0 FROM pragma_table_info('metadata') WHERE name = '{col}'"
2370 ))
2371 .fetch_one(&store.pool)
2372 .await
2373 .unwrap();
2374 assert!(exists, "metadata.{col} column must exist on fresh DB");
2375 }
2376 });
2377 }
2378
2379 #[test]
2380 fn test_save_stamps_writer_version_and_commit_into_metadata_row() {
2381 use kanban_domain::DependencyGraph;
2382 use kanban_persistence::snapshot_to_json_bytes;
2383
2384 let dir = TempDir::new().unwrap();
2385 let path = dir.path().join("stamped.db");
2386 let rt = make_rt();
2387 rt.block_on(async {
2388 let store = SqliteStore::open(&path).await.unwrap();
2389 let snapshot = Snapshot::from_data(
2390 vec![],
2391 vec![],
2392 vec![],
2393 vec![],
2394 vec![],
2395 DependencyGraph::new(),
2396 );
2397 let data = snapshot_to_json_bytes(&snapshot).unwrap();
2398 let store_snap = StoreSnapshot {
2399 data,
2400 metadata: PersistenceMetadata::new(store.instance_id()),
2401 };
2402 let returned = PersistenceStore::save(&store, store_snap).await.unwrap();
2403
2404 assert_eq!(
2405 returned.writer_version.as_deref(),
2406 Some(kanban_core::KANBAN_VERSION),
2407 );
2408 assert_eq!(
2409 returned.writer_commit.as_deref(),
2410 Some(kanban_core::KANBAN_COMMIT),
2411 );
2412
2413 let row: (Option<String>, Option<String>) =
2414 sqlx::query_as("SELECT writer_version, writer_commit FROM metadata WHERE id = 1")
2415 .fetch_one(&store.pool)
2416 .await
2417 .unwrap();
2418 assert_eq!(row.0.as_deref(), Some(kanban_core::KANBAN_VERSION));
2419 assert_eq!(row.1.as_deref(), Some(kanban_core::KANBAN_COMMIT));
2420 });
2421 }
2422
2423 #[test]
2424 fn test_load_returns_writer_stamp_from_metadata_row() {
2425 use kanban_domain::DependencyGraph;
2426 use kanban_persistence::snapshot_to_json_bytes;
2427
2428 let dir = TempDir::new().unwrap();
2429 let path = dir.path().join("loaded.db");
2430 let rt = make_rt();
2431 rt.block_on(async {
2432 let store = SqliteStore::open(&path).await.unwrap();
2433 let snapshot = Snapshot::from_data(
2434 vec![],
2435 vec![],
2436 vec![],
2437 vec![],
2438 vec![],
2439 DependencyGraph::new(),
2440 );
2441 let data = snapshot_to_json_bytes(&snapshot).unwrap();
2442 PersistenceStore::save(
2443 &store,
2444 StoreSnapshot {
2445 data,
2446 metadata: PersistenceMetadata::new(store.instance_id()),
2447 },
2448 )
2449 .await
2450 .unwrap();
2451
2452 let (_, meta) = PersistenceStore::load(&store).await.unwrap();
2453 assert_eq!(
2454 meta.writer_version.as_deref(),
2455 Some(kanban_core::KANBAN_VERSION),
2456 );
2457 assert_eq!(
2458 meta.writer_commit.as_deref(),
2459 Some(kanban_core::KANBAN_COMMIT),
2460 );
2461 });
2462 }
2463
2464 #[test]
2465 fn test_load_legacy_db_without_stamp_returns_none_writer_fields() {
2466 let dir = TempDir::new().unwrap();
2470 let path = dir.path().join("legacy_load.db");
2471 let rt = make_rt();
2472 rt.block_on(async {
2473 let pool = SqlitePoolOptions::new()
2474 .max_connections(1)
2475 .connect_with(
2476 SqliteConnectOptions::new()
2477 .filename(&path)
2478 .create_if_missing(true),
2479 )
2480 .await
2481 .unwrap();
2482 sqlx::raw_sql(
2483 "CREATE TABLE metadata (
2484 id INTEGER PRIMARY KEY CHECK (id = 1),
2485 instance_id TEXT NOT NULL,
2486 saved_at TEXT NOT NULL,
2487 schema_version INTEGER NOT NULL DEFAULT 1
2488 );
2489 INSERT INTO metadata (id, instance_id, saved_at, schema_version)
2490 VALUES (1, '550e8400-e29b-41d4-a716-446655440000', '2024-01-01T00:00:00Z', 1);",
2491 )
2492 .execute(&pool)
2493 .await
2494 .unwrap();
2495 pool.close().await;
2496
2497 let store = SqliteStore::open(&path).await.unwrap();
2498 let (_, meta) = PersistenceStore::load(&store).await.unwrap();
2499 assert!(meta.writer_version.is_none());
2500 assert!(meta.writer_commit.is_none());
2501 });
2502 }
2503
2504 #[test]
2505 fn test_open_rejects_future_schema_version() {
2506 let dir = TempDir::new().unwrap();
2507 let path = dir.path().join("future.db");
2508 let rt = make_rt();
2509 rt.block_on(async {
2510 let pool = SqlitePoolOptions::new()
2511 .max_connections(1)
2512 .connect_with(
2513 SqliteConnectOptions::new()
2514 .filename(&path)
2515 .create_if_missing(true),
2516 )
2517 .await
2518 .unwrap();
2519 sqlx::raw_sql(
2520 "CREATE TABLE metadata (
2521 id INTEGER PRIMARY KEY CHECK (id = 1),
2522 instance_id TEXT NOT NULL,
2523 saved_at TEXT NOT NULL,
2524 schema_version INTEGER NOT NULL DEFAULT 2,
2525 writer_version TEXT,
2526 writer_commit TEXT
2527 );
2528 INSERT INTO metadata (id, instance_id, saved_at, schema_version)
2529 VALUES (1, '550e8400-e29b-41d4-a716-446655440000', '2030-01-01T00:00:00Z', 99);",
2530 )
2531 .execute(&pool)
2532 .await
2533 .unwrap();
2534 pool.close().await;
2535
2536 let err = SqliteStore::open(&path)
2537 .await
2538 .err()
2539 .expect("schema_version 99 must be refused");
2540 assert!(
2541 matches!(
2542 err,
2543 KanbanError::UnsupportedFutureVersion {
2544 file_version: 99,
2545 binary_max: SUPPORTED_SCHEMA_VERSION
2546 }
2547 ),
2548 "expected UnsupportedFutureVersion, got: {err:?}"
2549 );
2550 });
2551 }
2552
2553 #[test]
2554 fn test_open_alters_in_writer_columns_on_legacy_v1_db() {
2555 let dir = TempDir::new().unwrap();
2559 let path = dir.path().join("legacy.db");
2560 let rt = make_rt();
2561 rt.block_on(async {
2562 let pool = SqlitePoolOptions::new()
2563 .max_connections(1)
2564 .connect_with(
2565 SqliteConnectOptions::new()
2566 .filename(&path)
2567 .create_if_missing(true),
2568 )
2569 .await
2570 .unwrap();
2571 sqlx::raw_sql(
2572 "CREATE TABLE metadata (
2573 id INTEGER PRIMARY KEY CHECK (id = 1),
2574 instance_id TEXT NOT NULL,
2575 saved_at TEXT NOT NULL,
2576 schema_version INTEGER NOT NULL DEFAULT 1
2577 );
2578 INSERT INTO metadata (id, instance_id, saved_at, schema_version)
2579 VALUES (1, '550e8400-e29b-41d4-a716-446655440000', '2024-01-01T00:00:00Z', 1);",
2580 )
2581 .execute(&pool)
2582 .await
2583 .unwrap();
2584 pool.close().await;
2585
2586 let store = SqliteStore::open(&path).await.unwrap();
2587
2588 for col in ["writer_version", "writer_commit"] {
2589 let exists: bool = sqlx::query_scalar(&format!(
2590 "SELECT COUNT(*) > 0 FROM pragma_table_info('metadata') WHERE name = '{col}'"
2591 ))
2592 .fetch_one(&store.pool)
2593 .await
2594 .unwrap();
2595 assert!(exists, "metadata.{col} must be ALTERed in on legacy open");
2596 }
2597
2598 let bumped: u32 =
2599 sqlx::query_scalar("SELECT schema_version FROM metadata WHERE id = 1")
2600 .fetch_one(&store.pool)
2601 .await
2602 .unwrap();
2603 assert_eq!(
2604 bumped, SUPPORTED_SCHEMA_VERSION,
2605 "schema_version must be bumped to current on legacy open"
2606 );
2607 });
2608 }
2609
2610 #[test]
2611 fn test_empty_sprint_log_status_returns_error() {
2612 use kanban_domain::data_store::DataStore;
2613 use kanban_domain::{Board, Card, Column, SprintLog};
2614
2615 let dir = TempDir::new().unwrap();
2616 let path = dir.path().join("validation.sqlite3");
2617 let rt = make_rt();
2618 rt.block_on(async {
2619 let store = SqliteStore::open(&path).await.unwrap();
2620
2621 let mut board = Board::new("B", None::<String>);
2622 let column = Column::new(board.id, "Col", 0);
2623 let mut card = Card::new(&mut board, column.id, "Task", 0);
2624 store.upsert_board(board).unwrap();
2625 store.upsert_column(column).unwrap();
2626
2627 let log = SprintLog::new(uuid::Uuid::new_v4(), 1, None::<String>, "");
2628 card.sprint_logs.push(log);
2629
2630 let result = store.upsert_card(card);
2631 assert!(
2632 result.is_err(),
2633 "upsert_card must reject a SprintLog with empty status"
2634 );
2635 });
2636 }
2637
2638 #[test]
2639 fn test_empty_board_name_returns_error() {
2640 use kanban_domain::data_store::DataStore;
2641 use kanban_domain::Board;
2642 let dir = TempDir::new().unwrap();
2643 let path = dir.path().join("validation.sqlite3");
2644 let rt = make_rt();
2645 rt.block_on(async {
2646 let store = SqliteStore::open(&path).await.unwrap();
2647 let board = Board::new("", None::<String>);
2648 let result = store.upsert_board(board);
2649 assert!(
2650 result.is_err(),
2651 "upsert_board must reject a Board with empty name"
2652 );
2653 });
2654 }
2655
2656 #[test]
2657 fn test_empty_column_name_returns_error() {
2658 use kanban_domain::data_store::DataStore;
2659 use kanban_domain::{Board, Column};
2660 let dir = TempDir::new().unwrap();
2661 let path = dir.path().join("validation.sqlite3");
2662 let rt = make_rt();
2663 rt.block_on(async {
2664 let store = SqliteStore::open(&path).await.unwrap();
2665 let board = Board::new("B", None::<String>);
2666 let board_id = board.id;
2667 store.upsert_board(board).unwrap();
2668 let col = Column::new(board_id, "", 0);
2669 let result = store.upsert_column(col);
2670 assert!(
2671 result.is_err(),
2672 "upsert_column must reject a Column with empty name"
2673 );
2674 });
2675 }
2676
2677 #[test]
2678 fn test_empty_card_title_returns_error() {
2679 use kanban_domain::data_store::DataStore;
2680 use kanban_domain::{Board, Card, Column};
2681 let dir = TempDir::new().unwrap();
2682 let path = dir.path().join("validation.sqlite3");
2683 let rt = make_rt();
2684 rt.block_on(async {
2685 let store = SqliteStore::open(&path).await.unwrap();
2686 let mut board = Board::new("B", None::<String>);
2687 let col = Column::new(board.id, "Col", 0);
2688 let col_id = col.id;
2689 let card = Card::new(&mut board, col_id, "", 0);
2691 store.upsert_board(board).unwrap();
2692 store.upsert_column(col).unwrap();
2693 let result = store.upsert_card(card);
2694 assert!(
2695 result.is_err(),
2696 "upsert_card must reject a Card with empty title"
2697 );
2698 });
2699 }
2700}