Skip to main content

kanban_persistence_sqlite/
sqlite_store.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use chrono::{DateTime, Utc};
5use kanban_domain::data_store::DataStore;
6use kanban_domain::{
7    ArchivedCard, Board, Card, Column, DependencyGraph, KanbanError, KanbanResult, Snapshot,
8    Sprint, SprintLog,
9};
10use kanban_persistence::{
11    PersistenceError, PersistenceMetadata, PersistenceResult, PersistenceStore, StoreSnapshot,
12};
13use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
14use sqlx::{Pool, Row, Sqlite};
15use uuid::Uuid;
16
17const SCHEMA: &str = include_str!("schema.sql");
18
19/// The highest schema_version this binary understands. Used both to
20/// stamp fresh databases and to refuse files written by a future binary.
21pub const SUPPORTED_SCHEMA_VERSION: u32 = 2;
22
23/// (instance_id, saved_at, writer_version, writer_commit, schema_version).
24/// Tuple shape returned by the metadata-singleton SELECT — extracted to a
25/// type alias to keep clippy's type-complexity lint happy.
26type MetadataRow = (String, String, Option<String>, Option<String>, u32);
27
28/// SQLite-backed persistence store using sqlx connection pool.
29pub struct SqliteStore {
30    pool: Pool<Sqlite>,
31    path: PathBuf,
32    instance_id: Uuid,
33}
34
35fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
36    let handle = tokio::runtime::Handle::current();
37    debug_assert!(
38        handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread,
39        "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
40         tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
41         because synchronous DataStore methods need to block on async SQLite I/O."
42    );
43    tokio::task::block_in_place(|| handle.block_on(f))
44}
45
46fn db_err(e: sqlx::Error) -> KanbanError {
47    KanbanError::Database(e.to_string())
48}
49
50fn ser_err(msg: impl std::fmt::Display) -> KanbanError {
51    KanbanError::Serialization(msg.to_string())
52}
53
54fn p_uuid(s: &str) -> KanbanResult<Uuid> {
55    Uuid::parse_str(s).map_err(ser_err)
56}
57
58fn p_dt(s: &str) -> KanbanResult<DateTime<Utc>> {
59    DateTime::parse_from_rfc3339(s)
60        .map_err(ser_err)
61        .map(|dt| dt.with_timezone(&Utc))
62}
63
64fn p_enum<T: serde::de::DeserializeOwned>(s: &str, label: &str) -> KanbanResult<T> {
65    serde_json::from_value(serde_json::Value::String(s.to_owned()))
66        .map_err(|_| ser_err(format!("unknown {label} variant: {s}")))
67}
68
69/// Render a unit-variant enum to its serde wire name (e.g.
70/// `CardEdgeType::Spawns -> "ParentOf"`). Symmetric with [`p_enum`].
71/// Avoids coupling the on-disk format to `#[derive(Debug)]`, which is
72/// easy to customize and would break the read/write round-trip silently.
73fn ser_enum<T: serde::Serialize + std::fmt::Debug>(v: &T, label: &str) -> KanbanResult<String> {
74    match serde_json::to_value(v).map_err(ser_err)? {
75        serde_json::Value::String(s) => Ok(s),
76        other => Err(ser_err(format!(
77            "{label} did not serialise to a JSON string: {other}"
78        ))),
79    }
80}
81
82fn fmt_dt(dt: &DateTime<Utc>) -> String {
83    dt.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
84}
85
86fn required_str<'a>(value: &'a str, field: &str) -> KanbanResult<&'a str> {
87    if value.is_empty() {
88        Err(ser_err(format!(
89            "required field '{field}' must not be empty"
90        )))
91    } else {
92        Ok(value)
93    }
94}
95
96fn opt_dt(dt: &Option<DateTime<Utc>>) -> Option<String> {
97    dt.as_ref().map(fmt_dt)
98}
99
100// --- Row parsers ---
101
102fn row_to_board(
103    row: &SqliteRow,
104    sprint_names: Vec<String>,
105    sprint_counters: HashMap<String, u32>,
106) -> KanbanResult<Board> {
107    let id_str: String = row.try_get("id").map_err(db_err)?;
108    let active_sprint_id_str: Option<String> = row.try_get("active_sprint_id").map_err(db_err)?;
109    let completion_column_id_str: Option<String> =
110        row.try_get("completion_column_id").map_err(db_err)?;
111    let task_sort_field_str: String = row.try_get("task_sort_field").map_err(db_err)?;
112    let task_sort_order_str: String = row.try_get("task_sort_order").map_err(db_err)?;
113    let task_list_view_str: String = row.try_get("task_list_view").map_err(db_err)?;
114    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
115    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
116    let sprint_duration_days_raw: Option<i32> =
117        row.try_get("sprint_duration_days").map_err(db_err)?;
118
119    Ok(Board {
120        id: p_uuid(&id_str)?,
121        name: row.try_get("name").map_err(db_err)?,
122        description: row.try_get("description").map_err(db_err)?,
123        sprint_prefix: row.try_get("sprint_prefix").map_err(db_err)?,
124        card_prefix: row.try_get("card_prefix").map_err(db_err)?,
125        task_sort_field: p_enum(&task_sort_field_str, "task_sort_field")?,
126        task_sort_order: p_enum(&task_sort_order_str, "task_sort_order")?,
127        sprint_duration_days: sprint_duration_days_raw.map(|v| v as u32),
128        sprint_names,
129        sprint_name_used_count: row
130            .try_get::<i32, _>("sprint_name_used_count")
131            .map_err(db_err)? as usize,
132        next_sprint_number: row
133            .try_get::<i32, _>("next_sprint_number")
134            .map_err(db_err)? as u32,
135        active_sprint_id: active_sprint_id_str.as_deref().map(p_uuid).transpose()?,
136        task_list_view: p_enum(&task_list_view_str, "task_list_view")?,
137        card_counter: row.try_get::<i32, _>("card_counter").map_err(db_err)? as u32,
138        sprint_counters,
139        completion_column_id: completion_column_id_str
140            .as_deref()
141            .map(p_uuid)
142            .transpose()?,
143        position: row.try_get::<i32, _>("position").map_err(db_err)?,
144        created_at: p_dt(&created_at_str)?,
145        updated_at: p_dt(&updated_at_str)?,
146    })
147}
148
149fn row_to_column(row: &SqliteRow) -> KanbanResult<Column> {
150    let id_str: String = row.try_get("id").map_err(db_err)?;
151    let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
152    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
153    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
154
155    Ok(Column {
156        id: p_uuid(&id_str)?,
157        board_id: p_uuid(&board_id_str)?,
158        name: row.try_get("name").map_err(db_err)?,
159        position: row.try_get("position").map_err(db_err)?,
160        wip_limit: row.try_get("wip_limit").map_err(db_err)?,
161        created_at: p_dt(&created_at_str)?,
162        updated_at: p_dt(&updated_at_str)?,
163    })
164}
165
166fn row_to_card(row: &SqliteRow, sprint_logs: Vec<SprintLog>) -> KanbanResult<Card> {
167    let id_str: String = row.try_get("id").map_err(db_err)?;
168    let column_id_str: String = row.try_get("column_id").map_err(db_err)?;
169    let sprint_id_str: Option<String> = row.try_get("sprint_id").map_err(db_err)?;
170    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
171    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
172    let completed_at_str: Option<String> = row.try_get("completed_at").map_err(db_err)?;
173    let due_date_str: Option<String> = row.try_get("due_date").map_err(db_err)?;
174    let priority_str: String = row.try_get("priority").map_err(db_err)?;
175    let status_str: String = row.try_get("status").map_err(db_err)?;
176    let points_raw: Option<i32> = row.try_get("points").map_err(db_err)?;
177
178    Ok(Card {
179        id: p_uuid(&id_str)?,
180        column_id: p_uuid(&column_id_str)?,
181        title: row.try_get("title").map_err(db_err)?,
182        description: row.try_get("description").map_err(db_err)?,
183        priority: p_enum(&priority_str, "priority")?,
184        status: p_enum(&status_str, "status")?,
185        position: row.try_get("position").map_err(db_err)?,
186        due_date: due_date_str.as_deref().map(p_dt).transpose()?,
187        points: points_raw
188            .map(|v| u8::try_from(v).map_err(|_| ser_err(format!("points {v} out of range"))))
189            .transpose()?,
190        card_number: row.try_get::<i32, _>("card_number").map_err(db_err)? as u32,
191        sprint_id: sprint_id_str.as_deref().map(p_uuid).transpose()?,
192        created_at: p_dt(&created_at_str)?,
193        updated_at: p_dt(&updated_at_str)?,
194        completed_at: completed_at_str.as_deref().map(p_dt).transpose()?,
195        sprint_logs,
196    })
197}
198
199fn row_to_sprint(row: &SqliteRow) -> KanbanResult<Sprint> {
200    let id_str: String = row.try_get("id").map_err(db_err)?;
201    let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
202    let status_str: String = row.try_get("status").map_err(db_err)?;
203    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
204    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
205    let start_date_str: Option<String> = row.try_get("start_date").map_err(db_err)?;
206    let end_date_str: Option<String> = row.try_get("end_date").map_err(db_err)?;
207    let name_index_raw: Option<i32> = row.try_get("name_index").map_err(db_err)?;
208
209    Ok(Sprint {
210        id: p_uuid(&id_str)?,
211        board_id: p_uuid(&board_id_str)?,
212        sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
213        name_index: name_index_raw.map(|v| v as usize),
214        prefix: row.try_get("prefix").map_err(db_err)?,
215        card_prefix: row.try_get("card_prefix").map_err(db_err)?,
216        status: p_enum(&status_str, "sprint status")?,
217        start_date: start_date_str.as_deref().map(p_dt).transpose()?,
218        end_date: end_date_str.as_deref().map(p_dt).transpose()?,
219        created_at: p_dt(&created_at_str)?,
220        updated_at: p_dt(&updated_at_str)?,
221    })
222}
223
224/// Parse the four common edge columns (source / target / timestamps)
225/// shared by `spawns_edges`, `blocks_edges`, and `relates_edges`.
226fn row_to_edge_base(row: &SqliteRow) -> KanbanResult<kanban_core::EdgeBase> {
227    let source_str: String = row.try_get("source_id").map_err(db_err)?;
228    let target_str: String = row.try_get("target_id").map_err(db_err)?;
229    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
230    let archived_at_str: Option<String> = row.try_get("archived_at").map_err(db_err)?;
231    Ok(kanban_core::EdgeBase {
232        source: p_uuid(&source_str)?,
233        target: p_uuid(&target_str)?,
234        created_at: p_dt(&created_at_str)?,
235        archived_at: archived_at_str.as_deref().map(p_dt).transpose()?,
236    })
237}
238
239fn row_to_sprint_log(row: &SqliteRow) -> KanbanResult<SprintLog> {
240    let sprint_id_str: String = row.try_get("sprint_id").map_err(db_err)?;
241    let started_at_str: String = row.try_get("started_at").map_err(db_err)?;
242    let ended_at_str: Option<String> = row.try_get("ended_at").map_err(db_err)?;
243
244    Ok(SprintLog {
245        sprint_id: p_uuid(&sprint_id_str)?,
246        sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
247        sprint_name: row.try_get("sprint_name").map_err(db_err)?,
248        started_at: p_dt(&started_at_str)?,
249        ended_at: ended_at_str.as_deref().map(p_dt).transpose()?,
250        status: row.try_get("status").map_err(db_err)?,
251    })
252}
253
254// --- SqliteStore ---
255
256impl SqliteStore {
257    pub async fn open(path: impl AsRef<Path>) -> KanbanResult<Self> {
258        let handle = tokio::runtime::Handle::current();
259        if handle.runtime_flavor() != tokio::runtime::RuntimeFlavor::MultiThread {
260            return Err(KanbanError::Database(
261                "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
262                 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
263                 because synchronous DataStore methods need to block on async SQLite I/O."
264                    .to_string(),
265            ));
266        }
267
268        let path_buf = path.as_ref().to_path_buf();
269
270        let options = SqliteConnectOptions::new()
271            .filename(&path_buf)
272            .create_if_missing(true)
273            .foreign_keys(true)
274            .pragma("journal_mode", "wal");
275
276        let pool = SqlitePoolOptions::new()
277            .max_connections(2)
278            .connect_with(options)
279            .await
280            .map_err(|e| KanbanError::Database(e.to_string()))?;
281
282        // KAN-522: refuse a future-version DB BEFORE any schema-modifying
283        // step runs. Otherwise the legacy-table drops, the SCHEMA's
284        // CREATE TABLE IF NOT EXISTS for tables the file lacks, and
285        // migrate()'s ALTERs would all mutate a file we're about to refuse.
286        //
287        // Why two round-trips rather than one correlated subquery: SQLite
288        // parses the inner SELECT eagerly at prepare time and errors with
289        // "no such table: metadata" on a fresh DB, even when an outer
290        // `WHERE EXISTS` would short-circuit it at runtime. So we split:
291        // first probe `sqlite_master`, then read `schema_version` only if
292        // the table is there. ~µs overhead, executes once per `open()`.
293        let metadata_table_exists: bool = sqlx::query_scalar(
294            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='metadata'",
295        )
296        .fetch_one(&pool)
297        .await
298        .map_err(|e| KanbanError::Database(e.to_string()))?;
299        if metadata_table_exists {
300            let pre_migrate_version: Option<u32> =
301                sqlx::query_scalar("SELECT schema_version FROM metadata WHERE id = 1")
302                    .fetch_optional(&pool)
303                    .await
304                    .map_err(|e| KanbanError::Database(e.to_string()))?;
305            if let Some(v) = pre_migrate_version {
306                if v > SUPPORTED_SCHEMA_VERSION {
307                    return Err(KanbanError::UnsupportedFutureVersion {
308                        file_version: v,
309                        binary_max: SUPPORTED_SCHEMA_VERSION,
310                    });
311                }
312            }
313        }
314
315        // Drop the legacy command_log table (pre-KAN-405 schema with
316        // columns `idx` / `cmd_json`) before SCHEMA runs, so the new
317        // command_log schema (`batch_index` / `commands_json` / `created_at`)
318        // can be created cleanly. Detected by absence of the new
319        // `batch_index` column on an existing command_log table.
320        Self::drop_legacy_command_log_if_present(&pool).await?;
321
322        sqlx::raw_sql(SCHEMA)
323            .execute(&pool)
324            .await
325            .map_err(|e| KanbanError::Database(e.to_string()))?;
326
327        Self::migrate(&pool).await?;
328
329        let instance_id = Self::load_or_create_instance_id(&pool).await?;
330
331        Ok(Self {
332            pool,
333            path: path_buf,
334            instance_id,
335        })
336    }
337
338    async fn load_or_create_instance_id(pool: &Pool<Sqlite>) -> KanbanResult<Uuid> {
339        let row: Option<String> =
340            sqlx::query_scalar("SELECT instance_id FROM metadata WHERE id = 1")
341                .fetch_optional(pool)
342                .await
343                .map_err(db_err)?;
344        match row {
345            Some(s) => p_uuid(&s),
346            None => {
347                let id = Uuid::new_v4();
348                let now = Utc::now().to_rfc3339();
349                sqlx::query(
350                    "INSERT INTO metadata (id, instance_id, saved_at, schema_version) VALUES (1, ?, ?, ?)",
351                )
352                .bind(id.to_string())
353                .bind(&now)
354                .bind(SUPPORTED_SCHEMA_VERSION)
355                .execute(pool)
356                .await
357                .map_err(db_err)?;
358                Ok(id)
359            }
360        }
361    }
362
363    /// Drops the legacy command_log table if it exists and lacks the new
364    /// `batch_index` column. Called before SCHEMA so the new table can be
365    /// created cleanly via `CREATE TABLE IF NOT EXISTS`.
366    async fn drop_legacy_command_log_if_present(pool: &Pool<Sqlite>) -> KanbanResult<()> {
367        let has_command_log: bool = sqlx::query_scalar(
368            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='command_log'",
369        )
370        .fetch_one(pool)
371        .await
372        .map_err(db_err)?;
373        if !has_command_log {
374            return Ok(());
375        }
376        let has_batch_index_col: bool = sqlx::query_scalar(
377            "SELECT COUNT(*) > 0 FROM pragma_table_info('command_log') WHERE name = 'batch_index'",
378        )
379        .fetch_one(pool)
380        .await
381        .map_err(db_err)?;
382        if !has_batch_index_col {
383            tracing::info!(
384                "dropping legacy command_log table (pre-KAN-405 schema) so the KAN-191 schema can be applied"
385            );
386            sqlx::raw_sql("DROP TABLE IF EXISTS command_log")
387                .execute(pool)
388                .await
389                .map_err(db_err)?;
390        }
391        Ok(())
392    }
393
394    async fn migrate(pool: &Pool<Sqlite>) -> KanbanResult<()> {
395        // KAN-191 reintroduces command_log persistence (KAN-405 had dropped it).
396        // The dense batch_index → JSON mapping is created by SCHEMA at open
397        // time; no migration of the legacy column-set is needed because the
398        // schema is owned by this crate.
399
400        let has_undo_state: bool = sqlx::query_scalar(
401            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='undo_state'",
402        )
403        .fetch_one(pool)
404        .await
405        .map_err(db_err)?;
406
407        if has_undo_state {
408            tracing::info!(
409                "dropping legacy undo_state table: undo cursor stays in-session, only command_log persists"
410            );
411            sqlx::raw_sql("DROP TABLE IF EXISTS undo_state")
412                .execute(pool)
413                .await
414                .map_err(db_err)?;
415        }
416
417        let has_position_col: bool = sqlx::query_scalar(
418            "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'position'",
419        )
420        .fetch_one(pool)
421        .await
422        .map_err(db_err)?;
423
424        if !has_position_col {
425            sqlx::raw_sql("ALTER TABLE boards ADD COLUMN position INTEGER NOT NULL DEFAULT 0")
426                .execute(pool)
427                .await
428                .map_err(db_err)?;
429        }
430
431        let has_card_counter_col: bool = sqlx::query_scalar(
432            "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'card_counter'",
433        )
434        .fetch_one(pool)
435        .await
436        .map_err(db_err)?;
437
438        if !has_card_counter_col {
439            sqlx::raw_sql("ALTER TABLE boards ADD COLUMN card_counter INTEGER NOT NULL DEFAULT 1")
440                .execute(pool)
441                .await
442                .map_err(db_err)?;
443        }
444
445        Self::drop_legacy_card_edges_if_present(pool).await?;
446
447        // KAN-522: ALTER in writer-stamp columns on pre-v2 metadata tables.
448        for col in ["writer_version", "writer_commit"] {
449            let has_col: bool = sqlx::query_scalar(&format!(
450                "SELECT COUNT(*) > 0 FROM pragma_table_info('metadata') WHERE name = '{col}'"
451            ))
452            .fetch_one(pool)
453            .await
454            .map_err(db_err)?;
455            if !has_col {
456                sqlx::raw_sql(&format!("ALTER TABLE metadata ADD COLUMN {col} TEXT"))
457                    .execute(pool)
458                    .await
459                    .map_err(db_err)?;
460            }
461        }
462        // Once the ALTERs above have caught the schema up, normalise
463        // schema_version. Doing it unconditionally is idempotent and
464        // also self-heals any DBs where the field drifted.
465        sqlx::query("UPDATE metadata SET schema_version = ? WHERE id = 1 AND schema_version < ?")
466            .bind(SUPPORTED_SCHEMA_VERSION)
467            .bind(SUPPORTED_SCHEMA_VERSION)
468            .execute(pool)
469            .await
470            .map_err(db_err)?;
471
472        Ok(())
473    }
474
475    /// Drop the pre-KAN-504 `card_edges` table (single table with an
476    /// `edge_type` column) if present. The per-kind `spawns_edges` /
477    /// `blocks_edges` / `relates_edges` tables created by SCHEMA
478    /// replace it; nothing of KAN-504's graph work is live so we
479    /// don't need to copy data forward — any rows in the legacy
480    /// table belong to a development-only database.
481    async fn drop_legacy_card_edges_if_present(pool: &Pool<Sqlite>) -> KanbanResult<()> {
482        let has_card_edges: bool = sqlx::query_scalar(
483            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='card_edges'",
484        )
485        .fetch_one(pool)
486        .await
487        .map_err(db_err)?;
488        if !has_card_edges {
489            return Ok(());
490        }
491        tracing::info!(
492            "dropping legacy card_edges table (pre per-kind schema); per-kind tables take over"
493        );
494        sqlx::raw_sql(
495            "DROP INDEX IF EXISTS idx_card_edges_source;
496             DROP INDEX IF EXISTS idx_card_edges_target;
497             DROP TABLE IF EXISTS card_edges;",
498        )
499        .execute(pool)
500        .await
501        .map_err(db_err)?;
502        Ok(())
503    }
504
505    pub fn pool(&self) -> &Pool<Sqlite> {
506        &self.pool
507    }
508
509    /// Read the metadata singleton row from the DB. Cheap (single row, indexed by primary key).
510    /// Returns `Ok(None)` if the row is absent — only possible on a brand-new DB
511    /// before `load_or_create_instance_id` has run, which the public API doesn't expose.
512    pub fn read_metadata_sync(&self) -> KanbanResult<Option<PersistenceMetadata>> {
513        run(async {
514            let row: Option<MetadataRow> = sqlx::query_as(
515                "SELECT instance_id, saved_at, writer_version, writer_commit, schema_version \
516                 FROM metadata WHERE id = 1",
517            )
518            .fetch_optional(&self.pool)
519            .await
520            .map_err(db_err)?;
521            let Some(row) = row else {
522                return Ok(None);
523            };
524            let instance_id = p_uuid(&row.0)?;
525            let saved_at = p_dt(&row.1)?;
526            Ok(Some(PersistenceMetadata {
527                instance_id,
528                saved_at,
529                writer_version: row.2,
530                writer_commit: row.3,
531                format_version: Some(row.4),
532            }))
533        })
534    }
535
536    /// Record the current binary as the most-recent writer of this DB by
537    /// stamping `saved_at`, `writer_version`, and `writer_commit` into the
538    /// metadata singleton row. Returns the timestamp it wrote so callers can
539    /// echo it back into a `PersistenceMetadata` without re-reading the row.
540    ///
541    /// Separated from [`checkpoint`] so each function does one thing — see
542    /// the post-PR-288 review for the SRP rationale.
543    pub async fn stamp_writer(&self) -> KanbanResult<DateTime<Utc>> {
544        let now = Utc::now();
545        sqlx::query(
546            "UPDATE metadata SET saved_at = ?, writer_version = ?, writer_commit = ? WHERE id = 1",
547        )
548        .bind(now.to_rfc3339())
549        .bind(kanban_core::KANBAN_VERSION)
550        .bind(kanban_core::KANBAN_COMMIT)
551        .execute(&self.pool)
552        .await
553        .map_err(|e| KanbanError::Database(e.to_string()))?;
554        Ok(now)
555    }
556
557    /// Truncate the WAL. Pure I/O step; does not touch the writer-stamp
558    /// columns. Callers that want a durable save with attribution should
559    /// invoke [`stamp_writer`] alongside this.
560    pub async fn checkpoint(&self) -> KanbanResult<()> {
561        sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
562            .execute(&self.pool)
563            .await
564            .map_err(|e| KanbanError::Database(e.to_string()))?;
565        Ok(())
566    }
567
568    // ── Command log (audit foundation; not yet wired through SqliteBackend) ──
569
570    /// Append a single command batch at logical index `batch_index`.
571    /// `commands_json` is the serde-JSON encoding of the `Vec<Command>` batch.
572    pub async fn append_command_batch(
573        &self,
574        batch_index: u64,
575        commands_json: &str,
576    ) -> KanbanResult<()> {
577        sqlx::query(
578            "INSERT INTO command_log (batch_index, commands_json, created_at) VALUES (?, ?, ?)",
579        )
580        .bind(batch_index as i64)
581        .bind(commands_json)
582        .bind(fmt_dt(&Utc::now()))
583        .execute(&self.pool)
584        .await
585        .map_err(db_err)?;
586        Ok(())
587    }
588
589    /// Load all persisted command batches in order. Returns the JSON strings
590    /// so callers can deserialise inside the domain layer.
591    pub async fn load_all_command_batches(&self) -> KanbanResult<Vec<String>> {
592        let rows = sqlx::query("SELECT commands_json FROM command_log ORDER BY batch_index ASC")
593            .fetch_all(&self.pool)
594            .await
595            .map_err(db_err)?;
596        let mut out = Vec::with_capacity(rows.len());
597        for row in &rows {
598            out.push(row.try_get::<String, _>("commands_json").map_err(db_err)?);
599        }
600        Ok(out)
601    }
602
603    /// Remove batches with logical index >= `after`. Retains [0, after).
604    pub async fn truncate_command_log_after(&self, after: u64) -> KanbanResult<()> {
605        sqlx::query("DELETE FROM command_log WHERE batch_index >= ?")
606            .bind(after as i64)
607            .execute(&self.pool)
608            .await
609            .map_err(db_err)?;
610        Ok(())
611    }
612
613    /// Remove the oldest `drop_count` batches and renumber the rest so the
614    /// surviving log starts at index 0.
615    pub async fn shift_command_log(&self, drop_count: u64) -> KanbanResult<()> {
616        if drop_count == 0 {
617            return Ok(());
618        }
619        let mut tx = self.pool.begin().await.map_err(db_err)?;
620        sqlx::query("DELETE FROM command_log WHERE batch_index < ?")
621            .bind(drop_count as i64)
622            .execute(&mut *tx)
623            .await
624            .map_err(db_err)?;
625        sqlx::query("UPDATE command_log SET batch_index = batch_index - ?")
626            .bind(drop_count as i64)
627            .execute(&mut *tx)
628            .await
629            .map_err(db_err)?;
630        tx.commit().await.map_err(db_err)?;
631        Ok(())
632    }
633
634    async fn fetch_board_aux(
635        &self,
636        board_id: &str,
637    ) -> KanbanResult<(Vec<String>, HashMap<String, u32>)> {
638        let name_rows =
639            sqlx::query("SELECT name FROM board_sprint_names WHERE board_id = ? ORDER BY position")
640                .bind(board_id)
641                .fetch_all(&self.pool)
642                .await
643                .map_err(db_err)?;
644        let sprint_names: Vec<String> = name_rows
645            .iter()
646            .map(|r| r.try_get("name").map_err(db_err))
647            .collect::<KanbanResult<_>>()?;
648
649        let counter_rows =
650            sqlx::query("SELECT prefix, counter FROM board_sprint_counters WHERE board_id = ?")
651                .bind(board_id)
652                .fetch_all(&self.pool)
653                .await
654                .map_err(db_err)?;
655        let mut sprint_counters = HashMap::new();
656        for row in &counter_rows {
657            let prefix: String = row.try_get("prefix").map_err(db_err)?;
658            let counter: i32 = row.try_get("counter").map_err(db_err)?;
659            sprint_counters.insert(prefix, counter as u32);
660        }
661
662        Ok((sprint_names, sprint_counters))
663    }
664
665    async fn write_board_with_conn(
666        conn: &mut sqlx::SqliteConnection,
667        board: &Board,
668    ) -> KanbanResult<()> {
669        let id = board.id.to_string();
670
671        sqlx::query(
672            "INSERT INTO boards (id, name, description, sprint_prefix, card_prefix,
673                task_sort_field, task_sort_order, sprint_duration_days,
674                sprint_name_used_count, next_sprint_number, active_sprint_id,
675                task_list_view, card_counter, completion_column_id, position,
676                created_at, updated_at)
677             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
678             ON CONFLICT(id) DO UPDATE SET
679                name=excluded.name, description=excluded.description,
680                sprint_prefix=excluded.sprint_prefix, card_prefix=excluded.card_prefix,
681                task_sort_field=excluded.task_sort_field, task_sort_order=excluded.task_sort_order,
682                sprint_duration_days=excluded.sprint_duration_days,
683                sprint_name_used_count=excluded.sprint_name_used_count,
684                next_sprint_number=excluded.next_sprint_number,
685                active_sprint_id=excluded.active_sprint_id,
686                task_list_view=excluded.task_list_view, card_counter=excluded.card_counter,
687                completion_column_id=excluded.completion_column_id,
688                position=excluded.position,
689                updated_at=excluded.updated_at",
690        )
691        .bind(&id)
692        .bind(required_str(&board.name, "board.name")?)
693        .bind(&board.description)
694        .bind(&board.sprint_prefix)
695        .bind(&board.card_prefix)
696        .bind(format!("{:?}", board.task_sort_field))
697        .bind(format!("{:?}", board.task_sort_order))
698        .bind(board.sprint_duration_days.map(|v| v as i32))
699        .bind(board.sprint_name_used_count as i32)
700        .bind(board.next_sprint_number as i32)
701        .bind(board.active_sprint_id.map(|id| id.to_string()))
702        .bind(format!("{:?}", board.task_list_view))
703        .bind(board.card_counter as i32)
704        .bind(board.completion_column_id.map(|id| id.to_string()))
705        .bind(board.position)
706        .bind(fmt_dt(&board.created_at))
707        .bind(fmt_dt(&board.updated_at))
708        .execute(&mut *conn)
709        .await
710        .map_err(db_err)?;
711
712        sqlx::query("DELETE FROM board_sprint_names WHERE board_id = ?")
713            .bind(&id)
714            .execute(&mut *conn)
715            .await
716            .map_err(db_err)?;
717        for (i, name) in board.sprint_names.iter().enumerate() {
718            sqlx::query(
719                "INSERT INTO board_sprint_names (board_id, position, name) VALUES (?, ?, ?)",
720            )
721            .bind(&id)
722            .bind(i as i32)
723            .bind(required_str(name, "board.sprint_names[*]")?)
724            .execute(&mut *conn)
725            .await
726            .map_err(db_err)?;
727        }
728
729        sqlx::query("DELETE FROM board_sprint_counters WHERE board_id = ?")
730            .bind(&id)
731            .execute(&mut *conn)
732            .await
733            .map_err(db_err)?;
734        for (prefix, counter) in &board.sprint_counters {
735            sqlx::query(
736                "INSERT INTO board_sprint_counters (board_id, prefix, counter) VALUES (?, ?, ?)",
737            )
738            .bind(&id)
739            .bind(prefix)
740            .bind(*counter as i32)
741            .execute(&mut *conn)
742            .await
743            .map_err(db_err)?;
744        }
745
746        Ok(())
747    }
748
749    async fn write_board_async(&self, board: &Board) -> KanbanResult<()> {
750        let mut tx = self.pool.begin().await.map_err(db_err)?;
751        Self::write_board_with_conn(&mut tx, board).await?;
752        tx.commit().await.map_err(db_err)?;
753        Ok(())
754    }
755
756    async fn fetch_sprint_logs_for_card(&self, card_id: &str) -> KanbanResult<Vec<SprintLog>> {
757        let rows = sqlx::query(
758            "SELECT sprint_id, sprint_number, sprint_name, started_at, ended_at, status
759             FROM sprint_logs WHERE card_id = ? ORDER BY id",
760        )
761        .bind(card_id)
762        .fetch_all(&self.pool)
763        .await
764        .map_err(db_err)?;
765        rows.iter().map(row_to_sprint_log).collect()
766    }
767
768    async fn write_card_with_conn(
769        conn: &mut sqlx::SqliteConnection,
770        card: &Card,
771    ) -> KanbanResult<()> {
772        let id = card.id.to_string();
773
774        sqlx::query(
775            "INSERT INTO cards (id, column_id, title, description, priority, status, position,
776                due_date, points, card_number, sprint_id, created_at, updated_at, completed_at)
777             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
778             ON CONFLICT(id) DO UPDATE SET
779                column_id=excluded.column_id, title=excluded.title,
780                description=excluded.description, priority=excluded.priority,
781                status=excluded.status, position=excluded.position,
782                due_date=excluded.due_date, points=excluded.points,
783                card_number=excluded.card_number, sprint_id=excluded.sprint_id,
784                updated_at=excluded.updated_at, completed_at=excluded.completed_at",
785        )
786        .bind(&id)
787        .bind(card.column_id.to_string())
788        .bind(required_str(&card.title, "card.title")?)
789        .bind(&card.description)
790        .bind(format!("{:?}", card.priority))
791        .bind(format!("{:?}", card.status))
792        .bind(card.position)
793        .bind(opt_dt(&card.due_date))
794        .bind(card.points.map(|v| v as i32))
795        .bind(card.card_number as i32)
796        .bind(card.sprint_id.map(|id| id.to_string()))
797        .bind(fmt_dt(&card.created_at))
798        .bind(fmt_dt(&card.updated_at))
799        .bind(opt_dt(&card.completed_at))
800        .execute(&mut *conn)
801        .await
802        .map_err(db_err)?;
803
804        sqlx::query("DELETE FROM sprint_logs WHERE card_id = ?")
805            .bind(&id)
806            .execute(&mut *conn)
807            .await
808            .map_err(db_err)?;
809        for log in &card.sprint_logs {
810            sqlx::query(
811                "INSERT INTO sprint_logs (card_id, sprint_id, sprint_number, sprint_name,
812                    started_at, ended_at, status)
813                 VALUES (?, ?, ?, ?, ?, ?, ?)",
814            )
815            .bind(&id)
816            .bind(log.sprint_id.to_string())
817            .bind(log.sprint_number as i32)
818            .bind(&log.sprint_name)
819            .bind(fmt_dt(&log.started_at))
820            .bind(opt_dt(&log.ended_at))
821            .bind(required_str(&log.status, "sprint_log.status")?)
822            .execute(&mut *conn)
823            .await
824            .map_err(db_err)?;
825        }
826
827        Ok(())
828    }
829
830    async fn write_card_async(&self, card: &Card) -> KanbanResult<()> {
831        let mut tx = self.pool.begin().await.map_err(db_err)?;
832        Self::write_card_with_conn(&mut tx, card).await?;
833        tx.commit().await.map_err(db_err)?;
834        Ok(())
835    }
836
837    async fn fetch_sprint_logs_batch(
838        &self,
839        card_ids: &[String],
840    ) -> KanbanResult<HashMap<String, Vec<SprintLog>>> {
841        if card_ids.is_empty() {
842            return Ok(HashMap::new());
843        }
844        let placeholders = card_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
845        let sql = format!(
846            "SELECT card_id, sprint_id, sprint_number, sprint_name, started_at, ended_at, status
847             FROM sprint_logs WHERE card_id IN ({placeholders}) ORDER BY id"
848        );
849        let mut query = sqlx::query(&sql);
850        for id in card_ids {
851            query = query.bind(id);
852        }
853        let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
854        let mut map: HashMap<String, Vec<SprintLog>> = HashMap::new();
855        for row in &rows {
856            let card_id: String = row.try_get("card_id").map_err(db_err)?;
857            let log = row_to_sprint_log(row)?;
858            map.entry(card_id).or_default().push(log);
859        }
860        Ok(map)
861    }
862
863    async fn fetch_cards_with_filter(
864        &self,
865        where_clause: &str,
866        binds: &[String],
867    ) -> KanbanResult<Vec<Card>> {
868        let sql = format!(
869            "SELECT id, column_id, title, description, priority, status, position,
870                    due_date, points, card_number, sprint_id, created_at, updated_at, completed_at
871             FROM cards WHERE id NOT IN (SELECT card_id FROM archived_cards) {}
872             ORDER BY position ASC, created_at ASC",
873            where_clause
874        );
875        let mut query = sqlx::query(&sql);
876        for b in binds {
877            query = query.bind(b);
878        }
879        let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
880
881        let card_ids: Vec<String> = rows
882            .iter()
883            .map(|r| r.try_get("id").map_err(db_err))
884            .collect::<KanbanResult<_>>()?;
885        let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
886
887        let mut cards = Vec::with_capacity(rows.len());
888        for row in &rows {
889            let id_str: String = row.try_get("id").map_err(db_err)?;
890            let logs = logs_map.remove(&id_str).unwrap_or_default();
891            cards.push(row_to_card(row, logs)?);
892        }
893        Ok(cards)
894    }
895
896    async fn get_graph_with_conn(
897        conn: &mut sqlx::SqliteConnection,
898    ) -> KanbanResult<DependencyGraph> {
899        use kanban_core::EdgeBase;
900        use kanban_domain::{BlocksEdge, RelatesEdge, SpawnsEdge};
901
902        let mut spawns: Vec<SpawnsEdge> = Vec::new();
903        for row in
904            sqlx::query("SELECT source_id, target_id, created_at, archived_at FROM spawns_edges")
905                .fetch_all(&mut *conn)
906                .await
907                .map_err(db_err)?
908        {
909            spawns.push(SpawnsEdge {
910                base: row_to_edge_base(&row)?,
911            });
912        }
913
914        let mut blocks: Vec<BlocksEdge> = Vec::new();
915        for row in sqlx::query(
916            "SELECT source_id, target_id, severity, created_at, archived_at FROM blocks_edges",
917        )
918        .fetch_all(&mut *conn)
919        .await
920        .map_err(db_err)?
921        {
922            let severity_str: String = row.try_get("severity").map_err(db_err)?;
923            blocks.push(BlocksEdge {
924                base: row_to_edge_base(&row)?,
925                severity: p_enum(&severity_str, "severity")?,
926            });
927        }
928
929        let mut relates: Vec<RelatesEdge> = Vec::new();
930        for row in sqlx::query(
931            "SELECT source_id, target_id, kind, created_at, archived_at FROM relates_edges",
932        )
933        .fetch_all(&mut *conn)
934        .await
935        .map_err(db_err)?
936        {
937            let kind_str: String = row.try_get("kind").map_err(db_err)?;
938            relates.push(RelatesEdge {
939                base: row_to_edge_base(&row)?,
940                kind: p_enum(&kind_str, "relates kind")?,
941            });
942        }
943
944        let _ = EdgeBase::<uuid::Uuid>::new; // keep import in scope for symmetry; suppress unused
945        DependencyGraph::from_validated_per_kind_edges(spawns, blocks, relates)
946    }
947
948    async fn modify_graph_async(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
949        let mut tx = self.pool.begin().await.map_err(db_err)?;
950        let mut graph = Self::get_graph_with_conn(&mut tx).await?;
951        f(&mut graph)?;
952        Self::write_graph_with_conn(&mut tx, &graph).await?;
953        tx.commit().await.map_err(db_err)?;
954        Ok(())
955    }
956
957    async fn write_graph_with_conn(
958        conn: &mut sqlx::SqliteConnection,
959        graph: &DependencyGraph,
960    ) -> KanbanResult<()> {
961        use kanban_core::Edge as _;
962
963        sqlx::query("DELETE FROM spawns_edges")
964            .execute(&mut *conn)
965            .await
966            .map_err(db_err)?;
967        sqlx::query("DELETE FROM blocks_edges")
968            .execute(&mut *conn)
969            .await
970            .map_err(db_err)?;
971        sqlx::query("DELETE FROM relates_edges")
972            .execute(&mut *conn)
973            .await
974            .map_err(db_err)?;
975
976        for e in graph.spawns_edges() {
977            sqlx::query(
978                "INSERT INTO spawns_edges
979                    (source_id, target_id, created_at, archived_at)
980                 VALUES (?, ?, ?, ?)",
981            )
982            .bind(e.source().to_string())
983            .bind(e.target().to_string())
984            .bind(fmt_dt(&e.created_at()))
985            .bind(opt_dt(&e.archived_at()))
986            .execute(&mut *conn)
987            .await
988            .map_err(db_err)?;
989        }
990        for e in graph.blocks_edges() {
991            sqlx::query(
992                "INSERT INTO blocks_edges
993                    (source_id, target_id, severity, created_at, archived_at)
994                 VALUES (?, ?, ?, ?, ?)",
995            )
996            .bind(e.source().to_string())
997            .bind(e.target().to_string())
998            .bind(ser_enum(&e.severity, "severity")?)
999            .bind(fmt_dt(&e.created_at()))
1000            .bind(opt_dt(&e.archived_at()))
1001            .execute(&mut *conn)
1002            .await
1003            .map_err(db_err)?;
1004        }
1005        for e in graph.relates_edges() {
1006            sqlx::query(
1007                "INSERT INTO relates_edges
1008                    (source_id, target_id, kind, created_at, archived_at)
1009                 VALUES (?, ?, ?, ?, ?)",
1010            )
1011            .bind(e.source().to_string())
1012            .bind(e.target().to_string())
1013            .bind(ser_enum(&e.kind, "relates kind")?)
1014            .bind(fmt_dt(&e.created_at()))
1015            .bind(opt_dt(&e.archived_at()))
1016            .execute(&mut *conn)
1017            .await
1018            .map_err(db_err)?;
1019        }
1020
1021        Ok(())
1022    }
1023
1024    async fn write_graph_async(&self, graph: &DependencyGraph) -> KanbanResult<()> {
1025        let mut tx = self.pool.begin().await.map_err(db_err)?;
1026        Self::write_graph_with_conn(&mut tx, graph).await?;
1027        tx.commit().await.map_err(db_err)?;
1028        Ok(())
1029    }
1030
1031    async fn snapshot_async(&self) -> KanbanResult<Snapshot> {
1032        let boards = self.list_boards_async().await?;
1033        let columns = self.list_all_columns_async().await?;
1034        let cards = self.fetch_cards_with_filter("", &[]).await?;
1035        let archived_cards = self.list_archived_cards_async().await?;
1036        let sprints = self.list_all_sprints_async().await?;
1037        let graph = self.get_graph_async().await?;
1038        Ok(Snapshot::from_data(
1039            boards,
1040            columns,
1041            cards,
1042            archived_cards,
1043            sprints,
1044            graph,
1045        ))
1046    }
1047
1048    async fn apply_snapshot_async(&self, snapshot: Snapshot) -> KanbanResult<()> {
1049        let mut tx = self.pool.begin().await.map_err(db_err)?;
1050
1051        sqlx::query("PRAGMA defer_foreign_keys = ON")
1052            .execute(&mut *tx)
1053            .await
1054            .map_err(db_err)?;
1055
1056        sqlx::query("DELETE FROM spawns_edges")
1057            .execute(&mut *tx)
1058            .await
1059            .map_err(db_err)?;
1060        sqlx::query("DELETE FROM blocks_edges")
1061            .execute(&mut *tx)
1062            .await
1063            .map_err(db_err)?;
1064        sqlx::query("DELETE FROM relates_edges")
1065            .execute(&mut *tx)
1066            .await
1067            .map_err(db_err)?;
1068        sqlx::query("DELETE FROM archived_cards")
1069            .execute(&mut *tx)
1070            .await
1071            .map_err(db_err)?;
1072        sqlx::query("DELETE FROM sprint_logs")
1073            .execute(&mut *tx)
1074            .await
1075            .map_err(db_err)?;
1076        sqlx::query("DELETE FROM cards")
1077            .execute(&mut *tx)
1078            .await
1079            .map_err(db_err)?;
1080        sqlx::query("DELETE FROM sprints")
1081            .execute(&mut *tx)
1082            .await
1083            .map_err(db_err)?;
1084        sqlx::query("DELETE FROM board_sprint_names")
1085            .execute(&mut *tx)
1086            .await
1087            .map_err(db_err)?;
1088        sqlx::query("DELETE FROM board_sprint_counters")
1089            .execute(&mut *tx)
1090            .await
1091            .map_err(db_err)?;
1092        sqlx::query("DELETE FROM columns")
1093            .execute(&mut *tx)
1094            .await
1095            .map_err(db_err)?;
1096        sqlx::query("DELETE FROM boards")
1097            .execute(&mut *tx)
1098            .await
1099            .map_err(db_err)?;
1100
1101        for board in &snapshot.boards {
1102            Self::write_board_with_conn(&mut tx, board).await?;
1103        }
1104        for column in &snapshot.columns {
1105            Self::write_column_with_conn(&mut tx, column).await?;
1106        }
1107        for sprint in &snapshot.sprints {
1108            Self::write_sprint_with_conn(&mut tx, sprint).await?;
1109        }
1110        for card in &snapshot.cards {
1111            Self::write_card_with_conn(&mut tx, card).await?;
1112        }
1113        for ac in &snapshot.archived_cards {
1114            Self::write_archived_card_with_conn(&mut tx, ac).await?;
1115        }
1116        Self::write_graph_with_conn(&mut tx, &snapshot.graph).await?;
1117
1118        tx.commit().await.map_err(db_err)?;
1119        Ok(())
1120    }
1121
1122    async fn fetch_all_board_aux(
1123        &self,
1124    ) -> KanbanResult<(
1125        HashMap<String, Vec<String>>,
1126        HashMap<String, HashMap<String, u32>>,
1127    )> {
1128        let name_rows = sqlx::query(
1129            "SELECT board_id, name FROM board_sprint_names ORDER BY board_id, position",
1130        )
1131        .fetch_all(&self.pool)
1132        .await
1133        .map_err(db_err)?;
1134        let mut names_map: HashMap<String, Vec<String>> = HashMap::new();
1135        for row in &name_rows {
1136            let board_id: String = row.try_get("board_id").map_err(db_err)?;
1137            let name: String = row.try_get("name").map_err(db_err)?;
1138            names_map.entry(board_id).or_default().push(name);
1139        }
1140
1141        let counter_rows =
1142            sqlx::query("SELECT board_id, prefix, counter FROM board_sprint_counters")
1143                .fetch_all(&self.pool)
1144                .await
1145                .map_err(db_err)?;
1146        let mut counters_map: HashMap<String, HashMap<String, u32>> = HashMap::new();
1147        for row in &counter_rows {
1148            let board_id: String = row.try_get("board_id").map_err(db_err)?;
1149            let prefix: String = row.try_get("prefix").map_err(db_err)?;
1150            let counter: i32 = row.try_get("counter").map_err(db_err)?;
1151            counters_map
1152                .entry(board_id)
1153                .or_default()
1154                .insert(prefix, counter as u32);
1155        }
1156
1157        Ok((names_map, counters_map))
1158    }
1159
1160    async fn list_boards_async(&self) -> KanbanResult<Vec<Board>> {
1161        let rows = sqlx::query(
1162            "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1163                    task_sort_order, sprint_duration_days, sprint_name_used_count,
1164                    next_sprint_number, active_sprint_id, task_list_view,
1165                    COALESCE(card_counter, 1) as card_counter,
1166                    completion_column_id, position, created_at, updated_at
1167             FROM boards ORDER BY position ASC",
1168        )
1169        .fetch_all(&self.pool)
1170        .await
1171        .map_err(db_err)?;
1172
1173        let (mut names_map, mut counters_map) = self.fetch_all_board_aux().await?;
1174
1175        let mut boards = Vec::with_capacity(rows.len());
1176        for row in &rows {
1177            let id_str: String = row.try_get("id").map_err(db_err)?;
1178            let names = names_map.remove(&id_str).unwrap_or_default();
1179            let counters = counters_map.remove(&id_str).unwrap_or_default();
1180            boards.push(row_to_board(row, names, counters)?);
1181        }
1182        Ok(boards)
1183    }
1184
1185    async fn list_all_columns_async(&self) -> KanbanResult<Vec<Column>> {
1186        let rows = sqlx::query(
1187            "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1188             FROM columns ORDER BY position",
1189        )
1190        .fetch_all(&self.pool)
1191        .await
1192        .map_err(db_err)?;
1193        rows.iter().map(row_to_column).collect()
1194    }
1195
1196    async fn list_all_sprints_async(&self) -> KanbanResult<Vec<Sprint>> {
1197        let rows = sqlx::query(
1198            "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1199                    status, start_date, end_date, created_at, updated_at
1200             FROM sprints ORDER BY sprint_number",
1201        )
1202        .fetch_all(&self.pool)
1203        .await
1204        .map_err(db_err)?;
1205        rows.iter().map(row_to_sprint).collect()
1206    }
1207
1208    async fn list_archived_cards_async(&self) -> KanbanResult<Vec<ArchivedCard>> {
1209        let rows = sqlx::query(
1210            "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1211                    c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1212                    c.created_at, c.updated_at, c.completed_at,
1213                    ac.archived_at, ac.original_column_id, ac.original_position
1214             FROM archived_cards ac
1215             JOIN cards c ON ac.card_id = c.id
1216             ORDER BY ac.archived_at",
1217        )
1218        .fetch_all(&self.pool)
1219        .await
1220        .map_err(db_err)?;
1221
1222        let card_ids: Vec<String> = rows
1223            .iter()
1224            .map(|r| r.try_get("id").map_err(db_err))
1225            .collect::<KanbanResult<_>>()?;
1226        let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1227
1228        let mut result = Vec::with_capacity(rows.len());
1229        for row in &rows {
1230            let id_str: String = row.try_get("id").map_err(db_err)?;
1231            let logs = logs_map.remove(&id_str).unwrap_or_default();
1232            let card = row_to_card(row, logs)?;
1233            let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1234            let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1235            result.push(ArchivedCard {
1236                card,
1237                archived_at: p_dt(&archived_at_str)?,
1238                original_column_id: p_uuid(&orig_col_str)?,
1239                original_position: row.try_get("original_position").map_err(db_err)?,
1240            });
1241        }
1242        Ok(result)
1243    }
1244
1245    async fn get_graph_async(&self) -> KanbanResult<DependencyGraph> {
1246        // Wrap the three per-kind edge reads in a single transaction so
1247        // a concurrent writer between query 1 (spawns) and query 3
1248        // (relates) cannot yield an inconsistent in-memory snapshot.
1249        // SQLite under WAL gives the transaction a stable read view; the
1250        // tx is read-only and is committed without writes.
1251        let mut tx = self.pool.begin().await.map_err(db_err)?;
1252        let graph = Self::get_graph_with_conn(&mut tx).await?;
1253        tx.commit().await.map_err(db_err)?;
1254        Ok(graph)
1255    }
1256
1257    async fn write_column_with_conn(
1258        conn: &mut sqlx::SqliteConnection,
1259        column: &Column,
1260    ) -> KanbanResult<()> {
1261        sqlx::query(
1262            "INSERT INTO columns (id, board_id, name, position, wip_limit, created_at, updated_at)
1263             VALUES (?, ?, ?, ?, ?, ?, ?)
1264             ON CONFLICT(id) DO UPDATE SET
1265                board_id=excluded.board_id, name=excluded.name,
1266                position=excluded.position, wip_limit=excluded.wip_limit,
1267                updated_at=excluded.updated_at",
1268        )
1269        .bind(column.id.to_string())
1270        .bind(column.board_id.to_string())
1271        .bind(required_str(&column.name, "column.name")?)
1272        .bind(column.position)
1273        .bind(column.wip_limit)
1274        .bind(fmt_dt(&column.created_at))
1275        .bind(fmt_dt(&column.updated_at))
1276        .execute(&mut *conn)
1277        .await
1278        .map_err(db_err)?;
1279        Ok(())
1280    }
1281
1282    async fn write_column_async(&self, column: &Column) -> KanbanResult<()> {
1283        Self::write_column_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, column).await
1284    }
1285
1286    async fn write_sprint_with_conn(
1287        conn: &mut sqlx::SqliteConnection,
1288        sprint: &Sprint,
1289    ) -> KanbanResult<()> {
1290        sqlx::query(
1291            "INSERT INTO sprints (id, board_id, sprint_number, name_index, prefix, card_prefix,
1292                status, start_date, end_date, created_at, updated_at)
1293             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1294             ON CONFLICT(id) DO UPDATE SET
1295                board_id=excluded.board_id, sprint_number=excluded.sprint_number,
1296                name_index=excluded.name_index, prefix=excluded.prefix,
1297                card_prefix=excluded.card_prefix, status=excluded.status,
1298                start_date=excluded.start_date, end_date=excluded.end_date,
1299                updated_at=excluded.updated_at",
1300        )
1301        .bind(sprint.id.to_string())
1302        .bind(sprint.board_id.to_string())
1303        .bind(sprint.sprint_number as i32)
1304        .bind(sprint.name_index.map(|v| v as i32))
1305        .bind(&sprint.prefix)
1306        .bind(&sprint.card_prefix)
1307        .bind(format!("{:?}", sprint.status))
1308        .bind(opt_dt(&sprint.start_date))
1309        .bind(opt_dt(&sprint.end_date))
1310        .bind(fmt_dt(&sprint.created_at))
1311        .bind(fmt_dt(&sprint.updated_at))
1312        .execute(&mut *conn)
1313        .await
1314        .map_err(db_err)?;
1315        Ok(())
1316    }
1317
1318    async fn write_sprint_async(&self, sprint: &Sprint) -> KanbanResult<()> {
1319        Self::write_sprint_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, sprint).await
1320    }
1321
1322    async fn write_archived_card_with_conn(
1323        conn: &mut sqlx::SqliteConnection,
1324        ac: &ArchivedCard,
1325    ) -> KanbanResult<()> {
1326        Self::write_card_with_conn(conn, &ac.card).await?;
1327        sqlx::query(
1328            "INSERT INTO archived_cards (card_id, archived_at, original_column_id, original_position)
1329             VALUES (?, ?, ?, ?)
1330             ON CONFLICT(card_id) DO UPDATE SET
1331                archived_at=excluded.archived_at,
1332                original_column_id=excluded.original_column_id,
1333                original_position=excluded.original_position",
1334        )
1335        .bind(ac.card.id.to_string())
1336        .bind(fmt_dt(&ac.archived_at))
1337        .bind(ac.original_column_id.to_string())
1338        .bind(ac.original_position)
1339        .execute(&mut *conn)
1340        .await
1341        .map_err(db_err)?;
1342        Ok(())
1343    }
1344
1345    async fn write_archived_card_async(&self, ac: &ArchivedCard) -> KanbanResult<()> {
1346        let mut tx = self.pool.begin().await.map_err(db_err)?;
1347        Self::write_archived_card_with_conn(&mut tx, ac).await?;
1348        tx.commit().await.map_err(db_err)?;
1349        Ok(())
1350    }
1351}
1352
1353impl DataStore for SqliteStore {
1354    // Board
1355
1356    fn get_board(&self, id: Uuid) -> KanbanResult<Option<Board>> {
1357        run(async {
1358            let id_str = id.to_string();
1359            let row = sqlx::query(
1360                "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1361                        task_sort_order, sprint_duration_days, sprint_name_used_count,
1362                        next_sprint_number, active_sprint_id, task_list_view,
1363                        COALESCE(card_counter, 1) as card_counter,
1364                        completion_column_id, position, created_at, updated_at
1365                 FROM boards WHERE id = ?",
1366            )
1367            .bind(&id_str)
1368            .fetch_optional(&self.pool)
1369            .await
1370            .map_err(db_err)?;
1371
1372            match row {
1373                Some(row) => {
1374                    let (names, counters) = self.fetch_board_aux(&id_str).await?;
1375                    Ok(Some(row_to_board(&row, names, counters)?))
1376                }
1377                None => Ok(None),
1378            }
1379        })
1380    }
1381
1382    fn list_boards(&self) -> KanbanResult<Vec<Board>> {
1383        run(self.list_boards_async())
1384    }
1385
1386    fn upsert_board(&self, board: Board) -> KanbanResult<()> {
1387        run(self.write_board_async(&board))
1388    }
1389
1390    fn delete_board(&self, id: Uuid) -> KanbanResult<()> {
1391        run(async {
1392            sqlx::query("DELETE FROM boards WHERE id = ?")
1393                .bind(id.to_string())
1394                .execute(&self.pool)
1395                .await
1396                .map_err(db_err)?;
1397            Ok(())
1398        })
1399    }
1400
1401    // Column
1402
1403    fn get_column(&self, id: Uuid) -> KanbanResult<Option<Column>> {
1404        run(async {
1405            let row = sqlx::query(
1406                "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1407                 FROM columns WHERE id = ?",
1408            )
1409            .bind(id.to_string())
1410            .fetch_optional(&self.pool)
1411            .await
1412            .map_err(db_err)?;
1413            row.as_ref().map(row_to_column).transpose()
1414        })
1415    }
1416
1417    fn list_columns_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Column>> {
1418        run(async {
1419            let rows = sqlx::query(
1420                "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1421                 FROM columns WHERE board_id = ? ORDER BY position",
1422            )
1423            .bind(board_id.to_string())
1424            .fetch_all(&self.pool)
1425            .await
1426            .map_err(db_err)?;
1427            rows.iter().map(row_to_column).collect()
1428        })
1429    }
1430
1431    fn list_all_columns(&self) -> KanbanResult<Vec<Column>> {
1432        run(self.list_all_columns_async())
1433    }
1434
1435    fn upsert_column(&self, column: Column) -> KanbanResult<()> {
1436        run(self.write_column_async(&column))
1437    }
1438
1439    fn delete_column(&self, id: Uuid) -> KanbanResult<()> {
1440        run(async {
1441            sqlx::query("DELETE FROM columns WHERE id = ?")
1442                .bind(id.to_string())
1443                .execute(&self.pool)
1444                .await
1445                .map_err(db_err)?;
1446            Ok(())
1447        })
1448    }
1449
1450    fn delete_columns_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1451        run(async {
1452            sqlx::query("DELETE FROM columns WHERE board_id = ?")
1453                .bind(board_id.to_string())
1454                .execute(&self.pool)
1455                .await
1456                .map_err(db_err)?;
1457            Ok(())
1458        })
1459    }
1460
1461    // Card
1462
1463    fn get_card(&self, id: Uuid) -> KanbanResult<Option<Card>> {
1464        run(async {
1465            let id_str = id.to_string();
1466            let row = sqlx::query(
1467                "SELECT id, column_id, title, description, priority, status, position,
1468                        due_date, points, card_number, sprint_id, created_at, updated_at,
1469                        completed_at
1470                 FROM cards
1471                 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1472            )
1473            .bind(&id_str)
1474            .fetch_optional(&self.pool)
1475            .await
1476            .map_err(db_err)?;
1477
1478            match row {
1479                Some(row) => {
1480                    let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1481                    Ok(Some(row_to_card(&row, logs)?))
1482                }
1483                None => Ok(None),
1484            }
1485        })
1486    }
1487
1488    fn list_all_cards(&self) -> KanbanResult<Vec<Card>> {
1489        run(self.fetch_cards_with_filter("", &[]))
1490    }
1491
1492    fn list_cards_by_column(&self, column_id: Uuid) -> KanbanResult<Vec<Card>> {
1493        run(self.fetch_cards_with_filter("AND column_id = ?", &[column_id.to_string()]))
1494    }
1495
1496    fn list_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<Vec<Card>> {
1497        if column_ids.is_empty() {
1498            return Ok(Vec::new());
1499        }
1500        let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1501        let where_clause = format!("AND column_id IN ({placeholders})");
1502        let binds: Vec<String> = column_ids.iter().map(|id| id.to_string()).collect();
1503        run(self.fetch_cards_with_filter(&where_clause, &binds))
1504    }
1505
1506    fn list_cards_by_sprint(&self, sprint_id: Uuid) -> KanbanResult<Vec<Card>> {
1507        run(self.fetch_cards_with_filter("AND sprint_id = ?", &[sprint_id.to_string()]))
1508    }
1509
1510    fn count_cards_in_column(&self, column_id: Uuid) -> KanbanResult<usize> {
1511        run(async {
1512            let row = sqlx::query(
1513                "SELECT COUNT(*) as cnt FROM cards
1514                 WHERE column_id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1515            )
1516            .bind(column_id.to_string())
1517            .fetch_one(&self.pool)
1518            .await
1519            .map_err(db_err)?;
1520            Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1521        })
1522    }
1523
1524    fn count_cards_in_column_excluding(
1525        &self,
1526        column_id: Uuid,
1527        exclude: &[Uuid],
1528    ) -> KanbanResult<usize> {
1529        run(async {
1530            if exclude.is_empty() {
1531                return self.count_cards_in_column(column_id);
1532            }
1533            let placeholders = exclude.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1534            let sql = format!(
1535                "SELECT COUNT(*) as cnt FROM cards
1536                 WHERE column_id = ?
1537                   AND id NOT IN (SELECT card_id FROM archived_cards)
1538                   AND id NOT IN ({placeholders})"
1539            );
1540            let mut query = sqlx::query(&sql).bind(column_id.to_string());
1541            for id in exclude {
1542                query = query.bind(id.to_string());
1543            }
1544            let row = query.fetch_one(&self.pool).await.map_err(db_err)?;
1545            Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1546        })
1547    }
1548
1549    fn upsert_card(&self, card: Card) -> KanbanResult<()> {
1550        run(self.write_card_async(&card))
1551    }
1552
1553    fn delete_card(&self, id: Uuid) -> KanbanResult<()> {
1554        run(async {
1555            sqlx::query(
1556                "DELETE FROM cards
1557                 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1558            )
1559            .bind(id.to_string())
1560            .execute(&self.pool)
1561            .await
1562            .map_err(db_err)?;
1563            Ok(())
1564        })
1565    }
1566
1567    fn delete_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<()> {
1568        run(async {
1569            if column_ids.is_empty() {
1570                return Ok(());
1571            }
1572            let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1573            let sql = format!(
1574                "DELETE FROM cards
1575                 WHERE column_id IN ({placeholders})
1576                   AND id NOT IN (SELECT card_id FROM archived_cards)"
1577            );
1578            let mut query = sqlx::query(&sql);
1579            for id in column_ids {
1580                query = query.bind(id.to_string());
1581            }
1582            query.execute(&self.pool).await.map_err(db_err)?;
1583            Ok(())
1584        })
1585    }
1586
1587    fn clear_sprint_from_cards(
1588        &self,
1589        sprint_id: Uuid,
1590        timestamp: DateTime<Utc>,
1591    ) -> KanbanResult<()> {
1592        run(async {
1593            let now = fmt_dt(&timestamp);
1594            sqlx::query(
1595                "UPDATE cards SET sprint_id = NULL, updated_at = ?
1596                 WHERE sprint_id = ?
1597                   AND id NOT IN (SELECT card_id FROM archived_cards)",
1598            )
1599            .bind(&now)
1600            .bind(sprint_id.to_string())
1601            .execute(&self.pool)
1602            .await
1603            .map_err(db_err)?;
1604            Ok(())
1605        })
1606    }
1607
1608    // Archived card
1609
1610    fn get_archived_card(&self, card_id: Uuid) -> KanbanResult<Option<ArchivedCard>> {
1611        run(async {
1612            let id_str = card_id.to_string();
1613            let row = sqlx::query(
1614                "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1615                        c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1616                        c.created_at, c.updated_at, c.completed_at,
1617                        ac.archived_at, ac.original_column_id, ac.original_position
1618                 FROM archived_cards ac
1619                 JOIN cards c ON ac.card_id = c.id
1620                 WHERE ac.card_id = ?",
1621            )
1622            .bind(&id_str)
1623            .fetch_optional(&self.pool)
1624            .await
1625            .map_err(db_err)?;
1626
1627            match row {
1628                Some(row) => {
1629                    let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1630                    let card = row_to_card(&row, logs)?;
1631                    let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1632                    let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1633                    Ok(Some(ArchivedCard {
1634                        card,
1635                        archived_at: p_dt(&archived_at_str)?,
1636                        original_column_id: p_uuid(&orig_col_str)?,
1637                        original_position: row.try_get("original_position").map_err(db_err)?,
1638                    }))
1639                }
1640                None => Ok(None),
1641            }
1642        })
1643    }
1644
1645    fn list_archived_cards(&self) -> KanbanResult<Vec<ArchivedCard>> {
1646        run(self.list_archived_cards_async())
1647    }
1648
1649    fn insert_archived_card(&self, ac: ArchivedCard) -> KanbanResult<()> {
1650        run(self.write_archived_card_async(&ac))
1651    }
1652
1653    fn delete_archived_card(&self, card_id: Uuid) -> KanbanResult<()> {
1654        run(async {
1655            let mut tx = self.pool.begin().await.map_err(db_err)?;
1656            sqlx::query("DELETE FROM archived_cards WHERE card_id = ?")
1657                .bind(card_id.to_string())
1658                .execute(&mut *tx)
1659                .await
1660                .map_err(db_err)?;
1661            sqlx::query("DELETE FROM cards WHERE id = ?")
1662                .bind(card_id.to_string())
1663                .execute(&mut *tx)
1664                .await
1665                .map_err(db_err)?;
1666            tx.commit().await.map_err(db_err)
1667        })
1668    }
1669
1670    fn list_archived_cards_by_columns(
1671        &self,
1672        column_ids: &[Uuid],
1673    ) -> KanbanResult<Vec<ArchivedCard>> {
1674        if column_ids.is_empty() {
1675            return Ok(Vec::new());
1676        }
1677        run(async {
1678            let placeholders: Vec<&str> = column_ids.iter().map(|_| "?").collect();
1679            let sql = format!(
1680                "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1681                        c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1682                        c.created_at, c.updated_at, c.completed_at,
1683                        ac.archived_at, ac.original_column_id, ac.original_position
1684                 FROM archived_cards ac
1685                 JOIN cards c ON ac.card_id = c.id
1686                 WHERE ac.original_column_id IN ({})
1687                 ORDER BY ac.archived_at",
1688                placeholders.join(", ")
1689            );
1690            let mut query = sqlx::query(&sql);
1691            for id in column_ids {
1692                query = query.bind(id.to_string());
1693            }
1694            let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
1695
1696            let card_ids: Vec<String> = rows
1697                .iter()
1698                .map(|r| r.try_get("id").map_err(db_err))
1699                .collect::<KanbanResult<_>>()?;
1700            let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1701
1702            let mut result = Vec::with_capacity(rows.len());
1703            for row in &rows {
1704                let id_str: String = row.try_get("id").map_err(db_err)?;
1705                let logs = logs_map.remove(&id_str).unwrap_or_default();
1706                let card = row_to_card(row, logs)?;
1707                let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1708                let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1709                result.push(ArchivedCard {
1710                    card,
1711                    archived_at: p_dt(&archived_at_str)?,
1712                    original_column_id: p_uuid(&orig_col_str)?,
1713                    original_position: row.try_get("original_position").map_err(db_err)?,
1714                });
1715            }
1716            Ok(result)
1717        })
1718    }
1719
1720    fn clear_sprint_from_archived_cards(
1721        &self,
1722        sprint_id: Uuid,
1723        timestamp: DateTime<Utc>,
1724    ) -> KanbanResult<()> {
1725        run(async {
1726            let now = fmt_dt(&timestamp);
1727            sqlx::query(
1728                "UPDATE cards SET sprint_id = NULL, updated_at = ?
1729                 WHERE sprint_id = ?
1730                   AND id IN (SELECT card_id FROM archived_cards)",
1731            )
1732            .bind(&now)
1733            .bind(sprint_id.to_string())
1734            .execute(&self.pool)
1735            .await
1736            .map_err(db_err)?;
1737            Ok(())
1738        })
1739    }
1740
1741    // Sprint
1742
1743    fn get_sprint(&self, id: Uuid) -> KanbanResult<Option<Sprint>> {
1744        run(async {
1745            let row = sqlx::query(
1746                "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1747                        status, start_date, end_date, created_at, updated_at
1748                 FROM sprints WHERE id = ?",
1749            )
1750            .bind(id.to_string())
1751            .fetch_optional(&self.pool)
1752            .await
1753            .map_err(db_err)?;
1754            row.as_ref().map(row_to_sprint).transpose()
1755        })
1756    }
1757
1758    fn list_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Sprint>> {
1759        run(async {
1760            let rows = sqlx::query(
1761                "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1762                        status, start_date, end_date, created_at, updated_at
1763                 FROM sprints WHERE board_id = ? ORDER BY sprint_number",
1764            )
1765            .bind(board_id.to_string())
1766            .fetch_all(&self.pool)
1767            .await
1768            .map_err(db_err)?;
1769            rows.iter().map(row_to_sprint).collect()
1770        })
1771    }
1772
1773    fn list_all_sprints(&self) -> KanbanResult<Vec<Sprint>> {
1774        run(self.list_all_sprints_async())
1775    }
1776
1777    fn upsert_sprint(&self, sprint: Sprint) -> KanbanResult<()> {
1778        run(self.write_sprint_async(&sprint))
1779    }
1780
1781    fn delete_sprint(&self, id: Uuid) -> KanbanResult<()> {
1782        run(async {
1783            sqlx::query("DELETE FROM sprints WHERE id = ?")
1784                .bind(id.to_string())
1785                .execute(&self.pool)
1786                .await
1787                .map_err(db_err)?;
1788            Ok(())
1789        })
1790    }
1791
1792    fn delete_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1793        run(async {
1794            sqlx::query("DELETE FROM sprints WHERE board_id = ?")
1795                .bind(board_id.to_string())
1796                .execute(&self.pool)
1797                .await
1798                .map_err(db_err)?;
1799            Ok(())
1800        })
1801    }
1802
1803    // Graph
1804
1805    fn get_graph(&self) -> KanbanResult<DependencyGraph> {
1806        run(self.get_graph_async())
1807    }
1808
1809    fn set_graph(&self, graph: DependencyGraph) -> KanbanResult<()> {
1810        run(self.write_graph_async(&graph))
1811    }
1812
1813    fn modify_graph(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
1814        run(self.modify_graph_async(f))
1815    }
1816
1817    // Snapshot
1818
1819    fn snapshot(&self) -> KanbanResult<Snapshot> {
1820        run(self.snapshot_async())
1821    }
1822
1823    fn apply_snapshot(&self, snapshot: Snapshot) -> KanbanResult<()> {
1824        run(self.apply_snapshot_async(snapshot))
1825    }
1826}
1827
1828#[async_trait::async_trait]
1829impl PersistenceStore for SqliteStore {
1830    async fn save(&self, snapshot: StoreSnapshot) -> PersistenceResult<PersistenceMetadata> {
1831        let domain_snapshot: Snapshot = serde_json::from_slice(&snapshot.data)
1832            .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1833        self.apply_snapshot_async(domain_snapshot)
1834            .await
1835            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1836
1837        let saved_at = self
1838            .stamp_writer()
1839            .await
1840            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1841        self.checkpoint()
1842            .await
1843            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1844
1845        // Values are all knowable inline — stamp_writer just wrote them and
1846        // returned the timestamp, the writer/commit are compile-time consts.
1847        // format_version is SUPPORTED because migrate() on open() normalised
1848        // schema_version to SUPPORTED and nothing in the save path touches
1849        // it. The read paths (load, read_metadata_sync) re-read from the row
1850        // to honour the "DB is the source of truth" contract.
1851        Ok(PersistenceMetadata {
1852            instance_id: self.instance_id,
1853            saved_at,
1854            writer_version: Some(kanban_core::KANBAN_VERSION.to_string()),
1855            writer_commit: Some(kanban_core::KANBAN_COMMIT.to_string()),
1856            format_version: Some(SUPPORTED_SCHEMA_VERSION),
1857        })
1858    }
1859
1860    async fn load(&self) -> PersistenceResult<(StoreSnapshot, PersistenceMetadata)> {
1861        let domain_snapshot = self
1862            .snapshot_async()
1863            .await
1864            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1865        let data = serde_json::to_vec(&domain_snapshot)
1866            .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1867
1868        let row: (String, Option<String>, Option<String>, u32) = sqlx::query_as(
1869            "SELECT saved_at, writer_version, writer_commit, schema_version \
1870             FROM metadata WHERE id = 1",
1871        )
1872        .fetch_one(&self.pool)
1873        .await
1874        .map_err(|e| PersistenceError::Database(e.to_string()))?;
1875        let saved_at = DateTime::parse_from_rfc3339(&row.0)
1876            .map_err(|e| PersistenceError::Serialization(e.to_string()))?
1877            .with_timezone(&Utc);
1878        let meta = PersistenceMetadata {
1879            instance_id: self.instance_id,
1880            saved_at,
1881            writer_version: row.1,
1882            writer_commit: row.2,
1883            format_version: Some(row.3),
1884        };
1885        Ok((
1886            StoreSnapshot {
1887                data,
1888                metadata: meta.clone(),
1889            },
1890            meta,
1891        ))
1892    }
1893
1894    async fn exists(&self) -> bool {
1895        self.path.exists()
1896    }
1897
1898    fn path(&self) -> &std::path::Path {
1899        &self.path
1900    }
1901
1902    fn instance_id(&self) -> Uuid {
1903        self.instance_id
1904    }
1905
1906    async fn close(&self) {
1907        self.pool.close().await;
1908    }
1909}
1910
1911#[cfg(test)]
1912mod tests {
1913    use super::*;
1914    use tempfile::TempDir;
1915
1916    fn make_rt() -> tokio::runtime::Runtime {
1917        tokio::runtime::Builder::new_multi_thread()
1918            .enable_all()
1919            .build()
1920            .unwrap()
1921    }
1922
1923    #[test]
1924    fn test_sqlitestore_path_is_preserved() {
1925        let dir = TempDir::new().unwrap();
1926        let path = dir.path().join("test.db");
1927        let rt = make_rt();
1928        let store = rt.block_on(SqliteStore::open(&path)).unwrap();
1929        assert_eq!(store.path(), path.as_path());
1930    }
1931
1932    #[test]
1933    fn test_sqlitestore_instance_id_persists_across_reopen() {
1934        let dir = TempDir::new().unwrap();
1935        let path = dir.path().join("test.db");
1936        let rt = make_rt();
1937        let id1 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1938        let id2 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1939        assert_eq!(id1, id2, "instance_id must be stable across reopens");
1940    }
1941
1942    #[test]
1943    fn test_sqlitestore_persistence_save_load_roundtrip() {
1944        use kanban_domain::{Board, DependencyGraph};
1945        use kanban_persistence::snapshot_to_json_bytes;
1946
1947        let dir = TempDir::new().unwrap();
1948        let path = dir.path().join("test.db");
1949        let rt = make_rt();
1950
1951        rt.block_on(async {
1952            let store = SqliteStore::open(&path).await.unwrap();
1953            let board = Board::new("Test Board", None::<String>);
1954            let snapshot = Snapshot::from_data(
1955                vec![board],
1956                vec![],
1957                vec![],
1958                vec![],
1959                vec![],
1960                DependencyGraph::new(),
1961            );
1962            let data = snapshot_to_json_bytes(&snapshot).unwrap();
1963            let meta = PersistenceMetadata::new(store.instance_id());
1964            let store_snap = StoreSnapshot {
1965                data,
1966                metadata: meta,
1967            };
1968
1969            PersistenceStore::save(&store, store_snap).await.unwrap();
1970
1971            let (loaded_snap, _meta) = PersistenceStore::load(&store).await.unwrap();
1972            let loaded: Snapshot = serde_json::from_slice(&loaded_snap.data).unwrap();
1973            assert_eq!(loaded.boards.len(), 1);
1974            assert_eq!(loaded.boards[0].name, "Test Board");
1975        });
1976    }
1977
1978    #[test]
1979    fn test_sqlitestore_exists_returns_true_after_open() {
1980        let dir = TempDir::new().unwrap();
1981        let path = dir.path().join("test.db");
1982        let rt = make_rt();
1983        rt.block_on(async {
1984            let store = SqliteStore::open(&path).await.unwrap();
1985            assert!(PersistenceStore::exists(&store).await);
1986        });
1987    }
1988
1989    #[test]
1990    fn test_checkpoint_executes_without_error() {
1991        let dir = TempDir::new().unwrap();
1992        let path = dir.path().join("test.sqlite3");
1993        let rt = make_rt();
1994        rt.block_on(async {
1995            let store = SqliteStore::open(&path).await.unwrap();
1996            store.checkpoint().await.unwrap();
1997        });
1998    }
1999
2000    #[test]
2001    fn test_save_checkpoints_wal_file_stays_minimal() {
2002        let dir = TempDir::new().unwrap();
2003        let path = dir.path().join("test.sqlite3");
2004        let rt = make_rt();
2005        rt.block_on(async {
2006            let store = SqliteStore::open(&path).await.unwrap();
2007            let (snapshot, _) = PersistenceStore::load(&store).await.unwrap();
2008            PersistenceStore::save(&store, snapshot).await.unwrap();
2009            let wal_path = path.with_extension("sqlite3-wal");
2010            if wal_path.exists() {
2011                assert!(
2012                    wal_path.metadata().unwrap().len() < 32 * 1024,
2013                    "WAL file should be minimal after save+checkpoint"
2014                );
2015            }
2016        });
2017    }
2018
2019    /// Per-kind tables hard-reject metadata outside their respective
2020    /// CHECK constraints. Pin the constraint via a direct insert
2021    /// attempt so any future schema relaxation has to choose
2022    /// whether to drop or update this test.
2023    #[test]
2024    fn test_blocks_edges_rejects_unknown_severity() {
2025        let dir = TempDir::new().unwrap();
2026        let path = dir.path().join("check.sqlite3");
2027        let rt = make_rt();
2028        rt.block_on(async {
2029            let store = SqliteStore::open(&path).await.unwrap();
2030            let insert = sqlx::query(
2031                "INSERT INTO blocks_edges
2032                    (source_id, target_id, severity, created_at, archived_at)
2033                 VALUES (?, ?, 'Catastrophic', ?, NULL)",
2034            )
2035            .bind(uuid::Uuid::nil().to_string())
2036            .bind(uuid::Uuid::from_u128(0x42).to_string())
2037            .bind(chrono::Utc::now().to_rfc3339())
2038            .execute(store.pool())
2039            .await;
2040            assert!(
2041                insert.is_err(),
2042                "CHECK on severity must reject 'Catastrophic'; got {:?}",
2043                insert
2044            );
2045        });
2046    }
2047
2048    #[test]
2049    fn test_relates_edges_rejects_unknown_kind() {
2050        let dir = TempDir::new().unwrap();
2051        let path = dir.path().join("check_relates.sqlite3");
2052        let rt = make_rt();
2053        rt.block_on(async {
2054            let store = SqliteStore::open(&path).await.unwrap();
2055            let insert = sqlx::query(
2056                "INSERT INTO relates_edges
2057                    (source_id, target_id, kind, created_at, archived_at)
2058                 VALUES (?, ?, 'Unknown', ?, NULL)",
2059            )
2060            .bind(uuid::Uuid::nil().to_string())
2061            .bind(uuid::Uuid::from_u128(0x42).to_string())
2062            .bind(chrono::Utc::now().to_rfc3339())
2063            .execute(store.pool())
2064            .await;
2065            assert!(insert.is_err());
2066        });
2067    }
2068
2069    /// SqliteStore::open drops the pre-KAN-504 `card_edges` table on
2070    /// first encounter so the per-kind tables can take over.
2071    /// Pre-KAN-504 graph work is not live anywhere, so the data on
2072    /// such a table is dev-only and the drop is safe.
2073    #[test]
2074    fn test_open_drops_legacy_card_edges_table() {
2075        let dir = TempDir::new().unwrap();
2076        let path = dir.path().join("legacy.sqlite3");
2077        let rt = make_rt();
2078        rt.block_on(async {
2079            // Pre-seed the legacy table by direct sqlx access without
2080            // going through SqliteStore::open (the open path would
2081            // drop it before we could inspect).
2082            let pool = sqlx::sqlite::SqlitePoolOptions::new()
2083                .max_connections(1)
2084                .connect_with(
2085                    sqlx::sqlite::SqliteConnectOptions::new()
2086                        .filename(&path)
2087                        .create_if_missing(true),
2088                )
2089                .await
2090                .unwrap();
2091            sqlx::raw_sql(
2092                "CREATE TABLE card_edges (
2093                    source_id TEXT NOT NULL,
2094                    target_id TEXT NOT NULL,
2095                    edge_type TEXT NOT NULL,
2096                    direction TEXT NOT NULL,
2097                    weight REAL,
2098                    created_at TEXT NOT NULL,
2099                    archived_at TEXT,
2100                    PRIMARY KEY (source_id, target_id, edge_type)
2101                )",
2102            )
2103            .execute(&pool)
2104            .await
2105            .unwrap();
2106            drop(pool);
2107
2108            // Opening triggers the drop + per-kind table creation.
2109            let store = SqliteStore::open(&path).await.unwrap();
2110            let has_card_edges: bool = sqlx::query_scalar(
2111                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='card_edges'",
2112            )
2113            .fetch_one(store.pool())
2114            .await
2115            .unwrap();
2116            assert!(!has_card_edges, "legacy card_edges table must be dropped");
2117
2118            for table in ["spawns_edges", "blocks_edges", "relates_edges"] {
2119                let has: bool = sqlx::query_scalar(&format!(
2120                    "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='{}'",
2121                    table
2122                ))
2123                .fetch_one(store.pool())
2124                .await
2125                .unwrap();
2126                assert!(has, "{table} must exist after open");
2127            }
2128        });
2129    }
2130
2131    #[test]
2132    fn test_delete_archived_card_orphaned_cards_row_is_still_cleaned_up() {
2133        use kanban_domain::data_store::DataStore;
2134        let dir = TempDir::new().unwrap();
2135        let path = dir.path().join("test.sqlite3");
2136        let rt = make_rt();
2137        rt.block_on(async {
2138            let store = SqliteStore::open(&path).await.unwrap();
2139
2140            let mut board = kanban_domain::Board::new("B", None::<String>);
2141            let column = kanban_domain::Column::new(board.id, "Col", 0);
2142            let card = kanban_domain::Card::new(&mut board, column.id, "Task", 0);
2143            let card_id = card.id;
2144            let column_id = column.id;
2145            store.upsert_board(board).unwrap();
2146            store.upsert_column(column).unwrap();
2147            store.upsert_card(card.clone()).unwrap();
2148
2149            // Insert into archived_cards WITHOUT calling delete_card first,
2150            // leaving an orphaned row in the cards table.
2151            let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
2152            store.insert_archived_card(archived).unwrap();
2153
2154            store.delete_archived_card(card_id).unwrap();
2155
2156            assert!(
2157                store.list_archived_cards().unwrap().is_empty(),
2158                "card should be gone from archived_cards"
2159            );
2160            assert!(
2161                store.list_all_cards().unwrap().is_empty(),
2162                "orphaned cards row should also be removed by delete_archived_card"
2163            );
2164        });
2165    }
2166
2167    #[test]
2168    fn test_delete_archived_card_removes_from_cards_table() {
2169        use kanban_domain::data_store::DataStore;
2170        let dir = TempDir::new().unwrap();
2171        let path = dir.path().join("test.sqlite3");
2172        let rt = make_rt();
2173        rt.block_on(async {
2174            let store = SqliteStore::open(&path).await.unwrap();
2175
2176            let mut board = kanban_domain::Board::new("B", None::<String>);
2177            let column = kanban_domain::Column::new(board.id, "Col", 0);
2178            let card = kanban_domain::Card::new(&mut board, column.id, "Task", 0);
2179            let card_id = card.id;
2180            let column_id = column.id;
2181            store.upsert_board(board).unwrap();
2182            store.upsert_column(column).unwrap();
2183            store.upsert_card(card.clone()).unwrap();
2184
2185            let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
2186            store.insert_archived_card(archived).unwrap();
2187            store.delete_card(card_id).unwrap();
2188
2189            assert_eq!(store.list_archived_cards().unwrap().len(), 1);
2190
2191            store.delete_archived_card(card_id).unwrap();
2192
2193            assert!(
2194                store.list_archived_cards().unwrap().is_empty(),
2195                "card should be gone from archived_cards"
2196            );
2197            assert!(
2198                store.list_all_cards().unwrap().is_empty(),
2199                "card should also be gone from cards table, not restored as active"
2200            );
2201            assert!(
2202                store.get_card(card_id).unwrap().is_none(),
2203                "get_card should return None for permanently deleted card"
2204            );
2205        });
2206    }
2207
2208    #[test]
2209    fn test_read_metadata_sync_reflects_actual_schema_version_from_db_row() {
2210        // Contract: PersistenceMetadata.format_version is the format the DB
2211        // is currently at, not whatever the binary's SUPPORTED happens to be.
2212        // After open(), the two coincide (migrate normalises). We manually
2213        // UPDATE schema_version below to a value that differs from SUPPORTED
2214        // and confirm the read reflects the DB, not the const.
2215        let dir = TempDir::new().unwrap();
2216        let path = dir.path().join("drift.db");
2217        let rt = make_rt();
2218        rt.block_on(async {
2219            let store = SqliteStore::open(&path).await.unwrap();
2220            sqlx::query("UPDATE metadata SET schema_version = 1 WHERE id = 1")
2221                .execute(&store.pool)
2222                .await
2223                .unwrap();
2224
2225            let meta = store
2226                .read_metadata_sync()
2227                .unwrap()
2228                .expect("metadata row should exist");
2229            assert_eq!(
2230                meta.format_version,
2231                Some(1),
2232                "format_version must reflect the DB row (1), not the binary's SUPPORTED ({SUPPORTED_SCHEMA_VERSION})"
2233            );
2234        });
2235    }
2236
2237    #[test]
2238    fn test_load_reports_actual_schema_version_in_metadata() {
2239        use kanban_domain::DependencyGraph;
2240        use kanban_persistence::snapshot_to_json_bytes;
2241
2242        let dir = TempDir::new().unwrap();
2243        let path = dir.path().join("load_drift.db");
2244        let rt = make_rt();
2245        rt.block_on(async {
2246            let store = SqliteStore::open(&path).await.unwrap();
2247            // Seed the row so load() has something to read.
2248            let snapshot = Snapshot::from_data(
2249                vec![],
2250                vec![],
2251                vec![],
2252                vec![],
2253                vec![],
2254                DependencyGraph::new(),
2255            );
2256            let data = snapshot_to_json_bytes(&snapshot).unwrap();
2257            PersistenceStore::save(
2258                &store,
2259                StoreSnapshot {
2260                    data,
2261                    metadata: PersistenceMetadata::new(store.instance_id()),
2262                },
2263            )
2264            .await
2265            .unwrap();
2266            sqlx::query("UPDATE metadata SET schema_version = 1 WHERE id = 1")
2267                .execute(&store.pool)
2268                .await
2269                .unwrap();
2270
2271            let (_, meta) = PersistenceStore::load(&store).await.unwrap();
2272            assert_eq!(
2273                meta.format_version,
2274                Some(1),
2275                "load() must report the DB's actual schema_version, not the const"
2276            );
2277        });
2278    }
2279
2280    #[test]
2281    fn test_stamp_writer_updates_metadata_row_and_returns_timestamp() {
2282        let dir = TempDir::new().unwrap();
2283        let path = dir.path().join("stamped.db");
2284        let rt = make_rt();
2285        rt.block_on(async {
2286            let store = SqliteStore::open(&path).await.unwrap();
2287            // Wipe what migrate-on-open already wrote so the assertion is clean.
2288            sqlx::query(
2289                "UPDATE metadata SET writer_version = NULL, writer_commit = NULL WHERE id = 1",
2290            )
2291            .execute(&store.pool)
2292            .await
2293            .unwrap();
2294
2295            let before = Utc::now();
2296            let stamped_at = store.stamp_writer().await.unwrap();
2297            let after = Utc::now();
2298
2299            assert!(
2300                stamped_at >= before && stamped_at <= after,
2301                "returned timestamp must be in [before, after]: {stamped_at:?}"
2302            );
2303
2304            let (saved_at_str, wv, wc): (String, Option<String>, Option<String>) = sqlx::query_as(
2305                "SELECT saved_at, writer_version, writer_commit FROM metadata WHERE id = 1",
2306            )
2307            .fetch_one(&store.pool)
2308            .await
2309            .unwrap();
2310            assert_eq!(wv.as_deref(), Some(kanban_core::KANBAN_VERSION));
2311            assert_eq!(wc.as_deref(), Some(kanban_core::KANBAN_COMMIT));
2312            assert_eq!(saved_at_str, stamped_at.to_rfc3339());
2313        });
2314    }
2315
2316    #[test]
2317    fn test_checkpoint_alone_does_not_stamp_writer_fields() {
2318        let dir = TempDir::new().unwrap();
2319        let path = dir.path().join("ckpt_only.db");
2320        let rt = make_rt();
2321        rt.block_on(async {
2322            let store = SqliteStore::open(&path).await.unwrap();
2323            sqlx::query(
2324                "UPDATE metadata SET writer_version = NULL, writer_commit = NULL WHERE id = 1",
2325            )
2326            .execute(&store.pool)
2327            .await
2328            .unwrap();
2329
2330            store.checkpoint().await.unwrap();
2331
2332            let (wv, wc): (Option<String>, Option<String>) =
2333                sqlx::query_as("SELECT writer_version, writer_commit FROM metadata WHERE id = 1")
2334                    .fetch_one(&store.pool)
2335                    .await
2336                    .unwrap();
2337            assert!(
2338                wv.is_none() && wc.is_none(),
2339                "checkpoint() must be WAL-only post-split; got wv={wv:?} wc={wc:?}"
2340            );
2341        });
2342    }
2343
2344    #[test]
2345    fn test_fresh_db_records_schema_version_2() {
2346        let dir = TempDir::new().unwrap();
2347        let path = dir.path().join("fresh.db");
2348        let rt = make_rt();
2349        rt.block_on(async {
2350            let store = SqliteStore::open(&path).await.unwrap();
2351            let version: u32 =
2352                sqlx::query_scalar("SELECT schema_version FROM metadata WHERE id = 1")
2353                    .fetch_one(&store.pool)
2354                    .await
2355                    .unwrap();
2356            assert_eq!(version, SUPPORTED_SCHEMA_VERSION);
2357        });
2358    }
2359
2360    #[test]
2361    fn test_fresh_db_has_writer_version_and_writer_commit_columns() {
2362        let dir = TempDir::new().unwrap();
2363        let path = dir.path().join("fresh.db");
2364        let rt = make_rt();
2365        rt.block_on(async {
2366            let store = SqliteStore::open(&path).await.unwrap();
2367            for col in ["writer_version", "writer_commit"] {
2368                let exists: bool = sqlx::query_scalar(&format!(
2369                    "SELECT COUNT(*) > 0 FROM pragma_table_info('metadata') WHERE name = '{col}'"
2370                ))
2371                .fetch_one(&store.pool)
2372                .await
2373                .unwrap();
2374                assert!(exists, "metadata.{col} column must exist on fresh DB");
2375            }
2376        });
2377    }
2378
2379    #[test]
2380    fn test_save_stamps_writer_version_and_commit_into_metadata_row() {
2381        use kanban_domain::DependencyGraph;
2382        use kanban_persistence::snapshot_to_json_bytes;
2383
2384        let dir = TempDir::new().unwrap();
2385        let path = dir.path().join("stamped.db");
2386        let rt = make_rt();
2387        rt.block_on(async {
2388            let store = SqliteStore::open(&path).await.unwrap();
2389            let snapshot = Snapshot::from_data(
2390                vec![],
2391                vec![],
2392                vec![],
2393                vec![],
2394                vec![],
2395                DependencyGraph::new(),
2396            );
2397            let data = snapshot_to_json_bytes(&snapshot).unwrap();
2398            let store_snap = StoreSnapshot {
2399                data,
2400                metadata: PersistenceMetadata::new(store.instance_id()),
2401            };
2402            let returned = PersistenceStore::save(&store, store_snap).await.unwrap();
2403
2404            assert_eq!(
2405                returned.writer_version.as_deref(),
2406                Some(kanban_core::KANBAN_VERSION),
2407            );
2408            assert_eq!(
2409                returned.writer_commit.as_deref(),
2410                Some(kanban_core::KANBAN_COMMIT),
2411            );
2412
2413            let row: (Option<String>, Option<String>) =
2414                sqlx::query_as("SELECT writer_version, writer_commit FROM metadata WHERE id = 1")
2415                    .fetch_one(&store.pool)
2416                    .await
2417                    .unwrap();
2418            assert_eq!(row.0.as_deref(), Some(kanban_core::KANBAN_VERSION));
2419            assert_eq!(row.1.as_deref(), Some(kanban_core::KANBAN_COMMIT));
2420        });
2421    }
2422
2423    #[test]
2424    fn test_load_returns_writer_stamp_from_metadata_row() {
2425        use kanban_domain::DependencyGraph;
2426        use kanban_persistence::snapshot_to_json_bytes;
2427
2428        let dir = TempDir::new().unwrap();
2429        let path = dir.path().join("loaded.db");
2430        let rt = make_rt();
2431        rt.block_on(async {
2432            let store = SqliteStore::open(&path).await.unwrap();
2433            let snapshot = Snapshot::from_data(
2434                vec![],
2435                vec![],
2436                vec![],
2437                vec![],
2438                vec![],
2439                DependencyGraph::new(),
2440            );
2441            let data = snapshot_to_json_bytes(&snapshot).unwrap();
2442            PersistenceStore::save(
2443                &store,
2444                StoreSnapshot {
2445                    data,
2446                    metadata: PersistenceMetadata::new(store.instance_id()),
2447                },
2448            )
2449            .await
2450            .unwrap();
2451
2452            let (_, meta) = PersistenceStore::load(&store).await.unwrap();
2453            assert_eq!(
2454                meta.writer_version.as_deref(),
2455                Some(kanban_core::KANBAN_VERSION),
2456            );
2457            assert_eq!(
2458                meta.writer_commit.as_deref(),
2459                Some(kanban_core::KANBAN_COMMIT),
2460            );
2461        });
2462    }
2463
2464    #[test]
2465    fn test_load_legacy_db_without_stamp_returns_none_writer_fields() {
2466        // Pre-KAN-522 DB: metadata row exists, schema_version was bumped to
2467        // current by migrate(), but writer_version/writer_commit are still
2468        // NULL because no save has happened since the ALTER.
2469        let dir = TempDir::new().unwrap();
2470        let path = dir.path().join("legacy_load.db");
2471        let rt = make_rt();
2472        rt.block_on(async {
2473            let pool = SqlitePoolOptions::new()
2474                .max_connections(1)
2475                .connect_with(
2476                    SqliteConnectOptions::new()
2477                        .filename(&path)
2478                        .create_if_missing(true),
2479                )
2480                .await
2481                .unwrap();
2482            sqlx::raw_sql(
2483                "CREATE TABLE metadata (
2484                    id INTEGER PRIMARY KEY CHECK (id = 1),
2485                    instance_id TEXT NOT NULL,
2486                    saved_at TEXT NOT NULL,
2487                    schema_version INTEGER NOT NULL DEFAULT 1
2488                );
2489                INSERT INTO metadata (id, instance_id, saved_at, schema_version)
2490                VALUES (1, '550e8400-e29b-41d4-a716-446655440000', '2024-01-01T00:00:00Z', 1);",
2491            )
2492            .execute(&pool)
2493            .await
2494            .unwrap();
2495            pool.close().await;
2496
2497            let store = SqliteStore::open(&path).await.unwrap();
2498            let (_, meta) = PersistenceStore::load(&store).await.unwrap();
2499            assert!(meta.writer_version.is_none());
2500            assert!(meta.writer_commit.is_none());
2501        });
2502    }
2503
2504    #[test]
2505    fn test_open_rejects_future_schema_version() {
2506        let dir = TempDir::new().unwrap();
2507        let path = dir.path().join("future.db");
2508        let rt = make_rt();
2509        rt.block_on(async {
2510            let pool = SqlitePoolOptions::new()
2511                .max_connections(1)
2512                .connect_with(
2513                    SqliteConnectOptions::new()
2514                        .filename(&path)
2515                        .create_if_missing(true),
2516                )
2517                .await
2518                .unwrap();
2519            sqlx::raw_sql(
2520                "CREATE TABLE metadata (
2521                    id INTEGER PRIMARY KEY CHECK (id = 1),
2522                    instance_id TEXT NOT NULL,
2523                    saved_at TEXT NOT NULL,
2524                    schema_version INTEGER NOT NULL DEFAULT 2,
2525                    writer_version TEXT,
2526                    writer_commit TEXT
2527                );
2528                INSERT INTO metadata (id, instance_id, saved_at, schema_version)
2529                VALUES (1, '550e8400-e29b-41d4-a716-446655440000', '2030-01-01T00:00:00Z', 99);",
2530            )
2531            .execute(&pool)
2532            .await
2533            .unwrap();
2534            pool.close().await;
2535
2536            let err = SqliteStore::open(&path)
2537                .await
2538                .err()
2539                .expect("schema_version 99 must be refused");
2540            assert!(
2541                matches!(
2542                    err,
2543                    KanbanError::UnsupportedFutureVersion {
2544                        file_version: 99,
2545                        binary_max: SUPPORTED_SCHEMA_VERSION
2546                    }
2547                ),
2548                "expected UnsupportedFutureVersion, got: {err:?}"
2549            );
2550        });
2551    }
2552
2553    #[test]
2554    fn test_open_alters_in_writer_columns_on_legacy_v1_db() {
2555        // Simulate a pre-KAN-522 SQLite file: metadata table without the
2556        // writer_* columns and schema_version = 1. SqliteStore::open must
2557        // ALTER in the new columns and bump schema_version to 2.
2558        let dir = TempDir::new().unwrap();
2559        let path = dir.path().join("legacy.db");
2560        let rt = make_rt();
2561        rt.block_on(async {
2562            let pool = SqlitePoolOptions::new()
2563                .max_connections(1)
2564                .connect_with(
2565                    SqliteConnectOptions::new()
2566                        .filename(&path)
2567                        .create_if_missing(true),
2568                )
2569                .await
2570                .unwrap();
2571            sqlx::raw_sql(
2572                "CREATE TABLE metadata (
2573                    id INTEGER PRIMARY KEY CHECK (id = 1),
2574                    instance_id TEXT NOT NULL,
2575                    saved_at TEXT NOT NULL,
2576                    schema_version INTEGER NOT NULL DEFAULT 1
2577                );
2578                INSERT INTO metadata (id, instance_id, saved_at, schema_version)
2579                VALUES (1, '550e8400-e29b-41d4-a716-446655440000', '2024-01-01T00:00:00Z', 1);",
2580            )
2581            .execute(&pool)
2582            .await
2583            .unwrap();
2584            pool.close().await;
2585
2586            let store = SqliteStore::open(&path).await.unwrap();
2587
2588            for col in ["writer_version", "writer_commit"] {
2589                let exists: bool = sqlx::query_scalar(&format!(
2590                    "SELECT COUNT(*) > 0 FROM pragma_table_info('metadata') WHERE name = '{col}'"
2591                ))
2592                .fetch_one(&store.pool)
2593                .await
2594                .unwrap();
2595                assert!(exists, "metadata.{col} must be ALTERed in on legacy open");
2596            }
2597
2598            let bumped: u32 =
2599                sqlx::query_scalar("SELECT schema_version FROM metadata WHERE id = 1")
2600                    .fetch_one(&store.pool)
2601                    .await
2602                    .unwrap();
2603            assert_eq!(
2604                bumped, SUPPORTED_SCHEMA_VERSION,
2605                "schema_version must be bumped to current on legacy open"
2606            );
2607        });
2608    }
2609
2610    #[test]
2611    fn test_empty_sprint_log_status_returns_error() {
2612        use kanban_domain::data_store::DataStore;
2613        use kanban_domain::{Board, Card, Column, SprintLog};
2614
2615        let dir = TempDir::new().unwrap();
2616        let path = dir.path().join("validation.sqlite3");
2617        let rt = make_rt();
2618        rt.block_on(async {
2619            let store = SqliteStore::open(&path).await.unwrap();
2620
2621            let mut board = Board::new("B", None::<String>);
2622            let column = Column::new(board.id, "Col", 0);
2623            let mut card = Card::new(&mut board, column.id, "Task", 0);
2624            store.upsert_board(board).unwrap();
2625            store.upsert_column(column).unwrap();
2626
2627            let log = SprintLog::new(uuid::Uuid::new_v4(), 1, None::<String>, "");
2628            card.sprint_logs.push(log);
2629
2630            let result = store.upsert_card(card);
2631            assert!(
2632                result.is_err(),
2633                "upsert_card must reject a SprintLog with empty status"
2634            );
2635        });
2636    }
2637
2638    #[test]
2639    fn test_empty_board_name_returns_error() {
2640        use kanban_domain::data_store::DataStore;
2641        use kanban_domain::Board;
2642        let dir = TempDir::new().unwrap();
2643        let path = dir.path().join("validation.sqlite3");
2644        let rt = make_rt();
2645        rt.block_on(async {
2646            let store = SqliteStore::open(&path).await.unwrap();
2647            let board = Board::new("", None::<String>);
2648            let result = store.upsert_board(board);
2649            assert!(
2650                result.is_err(),
2651                "upsert_board must reject a Board with empty name"
2652            );
2653        });
2654    }
2655
2656    #[test]
2657    fn test_empty_column_name_returns_error() {
2658        use kanban_domain::data_store::DataStore;
2659        use kanban_domain::{Board, Column};
2660        let dir = TempDir::new().unwrap();
2661        let path = dir.path().join("validation.sqlite3");
2662        let rt = make_rt();
2663        rt.block_on(async {
2664            let store = SqliteStore::open(&path).await.unwrap();
2665            let board = Board::new("B", None::<String>);
2666            let board_id = board.id;
2667            store.upsert_board(board).unwrap();
2668            let col = Column::new(board_id, "", 0);
2669            let result = store.upsert_column(col);
2670            assert!(
2671                result.is_err(),
2672                "upsert_column must reject a Column with empty name"
2673            );
2674        });
2675    }
2676
2677    #[test]
2678    fn test_empty_card_title_returns_error() {
2679        use kanban_domain::data_store::DataStore;
2680        use kanban_domain::{Board, Card, Column};
2681        let dir = TempDir::new().unwrap();
2682        let path = dir.path().join("validation.sqlite3");
2683        let rt = make_rt();
2684        rt.block_on(async {
2685            let store = SqliteStore::open(&path).await.unwrap();
2686            let mut board = Board::new("B", None::<String>);
2687            let col = Column::new(board.id, "Col", 0);
2688            let col_id = col.id;
2689            // Card::new borrows &mut board -- call it before upsert_board moves board
2690            let card = Card::new(&mut board, col_id, "", 0);
2691            store.upsert_board(board).unwrap();
2692            store.upsert_column(col).unwrap();
2693            let result = store.upsert_card(card);
2694            assert!(
2695                result.is_err(),
2696                "upsert_card must reject a Card with empty title"
2697            );
2698        });
2699    }
2700}