Skip to main content

kanban_persistence_sqlite/
sqlite_store.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use chrono::{DateTime, Utc};
5use kanban_core::graph::{Edge, Graph};
6use kanban_domain::data_store::DataStore;
7use kanban_domain::{
8    ArchivedCard, Board, Card, CardEdgeType, Column, DependencyGraph, KanbanError, KanbanResult,
9    Snapshot, Sprint, SprintLog,
10};
11use kanban_persistence::{
12    PersistenceError, PersistenceMetadata, PersistenceResult, PersistenceStore, StoreSnapshot,
13};
14use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow};
15use sqlx::{Pool, Row, Sqlite};
16use uuid::Uuid;
17
18const SCHEMA: &str = include_str!("schema.sql");
19
20/// SQLite-backed persistence store using sqlx connection pool.
21pub struct SqliteStore {
22    pool: Pool<Sqlite>,
23    path: PathBuf,
24    instance_id: Uuid,
25}
26
27fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
28    let handle = tokio::runtime::Handle::current();
29    debug_assert!(
30        handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread,
31        "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
32         tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
33         because synchronous DataStore methods need to block on async SQLite I/O."
34    );
35    tokio::task::block_in_place(|| handle.block_on(f))
36}
37
38fn db_err(e: sqlx::Error) -> KanbanError {
39    KanbanError::Database(e.to_string())
40}
41
42fn ser_err(msg: impl std::fmt::Display) -> KanbanError {
43    KanbanError::Serialization(msg.to_string())
44}
45
46fn p_uuid(s: &str) -> KanbanResult<Uuid> {
47    Uuid::parse_str(s).map_err(ser_err)
48}
49
50fn p_dt(s: &str) -> KanbanResult<DateTime<Utc>> {
51    DateTime::parse_from_rfc3339(s)
52        .map_err(ser_err)
53        .map(|dt| dt.with_timezone(&Utc))
54}
55
56fn p_enum<T: serde::de::DeserializeOwned>(s: &str, label: &str) -> KanbanResult<T> {
57    serde_json::from_value(serde_json::Value::String(s.to_owned()))
58        .map_err(|_| ser_err(format!("unknown {label} variant: {s}")))
59}
60
61fn fmt_dt(dt: &DateTime<Utc>) -> String {
62    dt.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
63}
64
65fn opt_dt(dt: &Option<DateTime<Utc>>) -> Option<String> {
66    dt.as_ref().map(fmt_dt)
67}
68
69// --- Row parsers ---
70
71fn row_to_board(
72    row: &SqliteRow,
73    sprint_names: Vec<String>,
74    sprint_counters: HashMap<String, u32>,
75) -> KanbanResult<Board> {
76    let id_str: String = row.try_get("id").map_err(db_err)?;
77    let active_sprint_id_str: Option<String> = row.try_get("active_sprint_id").map_err(db_err)?;
78    let completion_column_id_str: Option<String> =
79        row.try_get("completion_column_id").map_err(db_err)?;
80    let task_sort_field_str: String = row.try_get("task_sort_field").map_err(db_err)?;
81    let task_sort_order_str: String = row.try_get("task_sort_order").map_err(db_err)?;
82    let task_list_view_str: String = row.try_get("task_list_view").map_err(db_err)?;
83    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
84    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
85    let sprint_duration_days_raw: Option<i32> =
86        row.try_get("sprint_duration_days").map_err(db_err)?;
87
88    Ok(Board {
89        id: p_uuid(&id_str)?,
90        name: row.try_get("name").map_err(db_err)?,
91        description: row.try_get("description").map_err(db_err)?,
92        sprint_prefix: row.try_get("sprint_prefix").map_err(db_err)?,
93        card_prefix: row.try_get("card_prefix").map_err(db_err)?,
94        task_sort_field: p_enum(&task_sort_field_str, "task_sort_field")?,
95        task_sort_order: p_enum(&task_sort_order_str, "task_sort_order")?,
96        sprint_duration_days: sprint_duration_days_raw.map(|v| v as u32),
97        sprint_names,
98        sprint_name_used_count: row
99            .try_get::<i32, _>("sprint_name_used_count")
100            .map_err(db_err)? as usize,
101        next_sprint_number: row
102            .try_get::<i32, _>("next_sprint_number")
103            .map_err(db_err)? as u32,
104        active_sprint_id: active_sprint_id_str.as_deref().map(p_uuid).transpose()?,
105        task_list_view: p_enum(&task_list_view_str, "task_list_view")?,
106        card_counter: row.try_get::<i32, _>("card_counter").map_err(db_err)? as u32,
107        sprint_counters,
108        completion_column_id: completion_column_id_str
109            .as_deref()
110            .map(p_uuid)
111            .transpose()?,
112        position: row.try_get::<i32, _>("position").map_err(db_err)?,
113        created_at: p_dt(&created_at_str)?,
114        updated_at: p_dt(&updated_at_str)?,
115    })
116}
117
118fn row_to_column(row: &SqliteRow) -> KanbanResult<Column> {
119    let id_str: String = row.try_get("id").map_err(db_err)?;
120    let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
121    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
122    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
123
124    Ok(Column {
125        id: p_uuid(&id_str)?,
126        board_id: p_uuid(&board_id_str)?,
127        name: row.try_get("name").map_err(db_err)?,
128        position: row.try_get("position").map_err(db_err)?,
129        wip_limit: row.try_get("wip_limit").map_err(db_err)?,
130        created_at: p_dt(&created_at_str)?,
131        updated_at: p_dt(&updated_at_str)?,
132    })
133}
134
135fn row_to_card(row: &SqliteRow, sprint_logs: Vec<SprintLog>) -> KanbanResult<Card> {
136    let id_str: String = row.try_get("id").map_err(db_err)?;
137    let column_id_str: String = row.try_get("column_id").map_err(db_err)?;
138    let sprint_id_str: Option<String> = row.try_get("sprint_id").map_err(db_err)?;
139    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
140    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
141    let completed_at_str: Option<String> = row.try_get("completed_at").map_err(db_err)?;
142    let due_date_str: Option<String> = row.try_get("due_date").map_err(db_err)?;
143    let priority_str: String = row.try_get("priority").map_err(db_err)?;
144    let status_str: String = row.try_get("status").map_err(db_err)?;
145    let points_raw: Option<i32> = row.try_get("points").map_err(db_err)?;
146
147    Ok(Card {
148        id: p_uuid(&id_str)?,
149        column_id: p_uuid(&column_id_str)?,
150        title: row.try_get("title").map_err(db_err)?,
151        description: row.try_get("description").map_err(db_err)?,
152        priority: p_enum(&priority_str, "priority")?,
153        status: p_enum(&status_str, "status")?,
154        position: row.try_get("position").map_err(db_err)?,
155        due_date: due_date_str.as_deref().map(p_dt).transpose()?,
156        points: points_raw
157            .map(|v| u8::try_from(v).map_err(|_| ser_err(format!("points {v} out of range"))))
158            .transpose()?,
159        card_number: row.try_get::<i32, _>("card_number").map_err(db_err)? as u32,
160        sprint_id: sprint_id_str.as_deref().map(p_uuid).transpose()?,
161        created_at: p_dt(&created_at_str)?,
162        updated_at: p_dt(&updated_at_str)?,
163        completed_at: completed_at_str.as_deref().map(p_dt).transpose()?,
164        sprint_logs,
165    })
166}
167
168fn row_to_sprint(row: &SqliteRow) -> KanbanResult<Sprint> {
169    let id_str: String = row.try_get("id").map_err(db_err)?;
170    let board_id_str: String = row.try_get("board_id").map_err(db_err)?;
171    let status_str: String = row.try_get("status").map_err(db_err)?;
172    let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
173    let updated_at_str: String = row.try_get("updated_at").map_err(db_err)?;
174    let start_date_str: Option<String> = row.try_get("start_date").map_err(db_err)?;
175    let end_date_str: Option<String> = row.try_get("end_date").map_err(db_err)?;
176    let name_index_raw: Option<i32> = row.try_get("name_index").map_err(db_err)?;
177
178    Ok(Sprint {
179        id: p_uuid(&id_str)?,
180        board_id: p_uuid(&board_id_str)?,
181        sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
182        name_index: name_index_raw.map(|v| v as usize),
183        prefix: row.try_get("prefix").map_err(db_err)?,
184        card_prefix: row.try_get("card_prefix").map_err(db_err)?,
185        status: p_enum(&status_str, "sprint status")?,
186        start_date: start_date_str.as_deref().map(p_dt).transpose()?,
187        end_date: end_date_str.as_deref().map(p_dt).transpose()?,
188        created_at: p_dt(&created_at_str)?,
189        updated_at: p_dt(&updated_at_str)?,
190    })
191}
192
193fn rows_to_graph(rows: &[SqliteRow]) -> KanbanResult<DependencyGraph> {
194    let mut graph: Graph<CardEdgeType> = Graph::new();
195    for row in rows {
196        let source_str: String = row.try_get("source_id").map_err(db_err)?;
197        let target_str: String = row.try_get("target_id").map_err(db_err)?;
198        let edge_type_str: String = row.try_get("edge_type").map_err(db_err)?;
199        let direction_str: String = row.try_get("direction").map_err(db_err)?;
200        let weight: Option<f64> = row.try_get("weight").map_err(db_err)?;
201        let created_at_str: String = row.try_get("created_at").map_err(db_err)?;
202        let archived_at_str: Option<String> = row.try_get("archived_at").map_err(db_err)?;
203
204        graph.add_edge(Edge {
205            source: p_uuid(&source_str)?,
206            target: p_uuid(&target_str)?,
207            edge_type: p_enum(&edge_type_str, "edge_type")?,
208            direction: p_enum(&direction_str, "edge direction")?,
209            weight: weight.map(|w| w as f32),
210            created_at: p_dt(&created_at_str)?,
211            archived_at: archived_at_str.as_deref().map(p_dt).transpose()?,
212        });
213    }
214    Ok(DependencyGraph { cards: graph })
215}
216
217fn row_to_sprint_log(row: &SqliteRow) -> KanbanResult<SprintLog> {
218    let sprint_id_str: String = row.try_get("sprint_id").map_err(db_err)?;
219    let started_at_str: String = row.try_get("started_at").map_err(db_err)?;
220    let ended_at_str: Option<String> = row.try_get("ended_at").map_err(db_err)?;
221
222    Ok(SprintLog {
223        sprint_id: p_uuid(&sprint_id_str)?,
224        sprint_number: row.try_get::<i32, _>("sprint_number").map_err(db_err)? as u32,
225        sprint_name: row.try_get("sprint_name").map_err(db_err)?,
226        started_at: p_dt(&started_at_str)?,
227        ended_at: ended_at_str.as_deref().map(p_dt).transpose()?,
228        status: row.try_get("status").map_err(db_err)?,
229    })
230}
231
232// --- SqliteStore ---
233
234impl SqliteStore {
235    pub async fn open(path: impl AsRef<Path>) -> KanbanResult<Self> {
236        let handle = tokio::runtime::Handle::current();
237        if handle.runtime_flavor() != tokio::runtime::RuntimeFlavor::MultiThread {
238            return Err(KanbanError::Database(
239                "SqliteStore requires a multi-threaded Tokio runtime (e.g. #[tokio::main] or \
240                 tokio::runtime::Runtime::new()). The current_thread runtime is not supported \
241                 because synchronous DataStore methods need to block on async SQLite I/O."
242                    .to_string(),
243            ));
244        }
245
246        let path_buf = path.as_ref().to_path_buf();
247
248        let options = SqliteConnectOptions::new()
249            .filename(&path_buf)
250            .create_if_missing(true)
251            .foreign_keys(true)
252            .pragma("journal_mode", "wal");
253
254        let pool = SqlitePoolOptions::new()
255            .max_connections(2)
256            .connect_with(options)
257            .await
258            .map_err(|e| KanbanError::Database(e.to_string()))?;
259
260        sqlx::raw_sql(SCHEMA)
261            .execute(&pool)
262            .await
263            .map_err(|e| KanbanError::Database(e.to_string()))?;
264
265        Self::migrate(&pool).await?;
266
267        let instance_id = Self::load_or_create_instance_id(&pool).await?;
268
269        Ok(Self {
270            pool,
271            path: path_buf,
272            instance_id,
273        })
274    }
275
276    async fn load_or_create_instance_id(pool: &Pool<Sqlite>) -> KanbanResult<Uuid> {
277        let row: Option<String> =
278            sqlx::query_scalar("SELECT instance_id FROM metadata WHERE id = 1")
279                .fetch_optional(pool)
280                .await
281                .map_err(db_err)?;
282        match row {
283            Some(s) => p_uuid(&s),
284            None => {
285                let id = Uuid::new_v4();
286                let now = Utc::now().to_rfc3339();
287                sqlx::query(
288                    "INSERT INTO metadata (id, instance_id, saved_at, schema_version) VALUES (1, ?, ?, 1)",
289                )
290                .bind(id.to_string())
291                .bind(&now)
292                .execute(pool)
293                .await
294                .map_err(db_err)?;
295                Ok(id)
296            }
297        }
298    }
299
300    async fn migrate(pool: &Pool<Sqlite>) -> KanbanResult<()> {
301        // Drop command_log and undo_state tables if they exist (legacy persistence
302        // of undo history — commands are now in-session only).
303        let has_command_log: bool = sqlx::query_scalar(
304            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='command_log'",
305        )
306        .fetch_one(pool)
307        .await
308        .map_err(db_err)?;
309
310        if has_command_log {
311            tracing::info!(
312                "dropping legacy command_log table: undo history is now in-session only and cannot be carried back to pre-KAN-405 builds"
313            );
314            sqlx::raw_sql("DROP TABLE IF EXISTS command_log")
315                .execute(pool)
316                .await
317                .map_err(db_err)?;
318        }
319
320        let has_undo_state: bool = sqlx::query_scalar(
321            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='undo_state'",
322        )
323        .fetch_one(pool)
324        .await
325        .map_err(db_err)?;
326
327        if has_undo_state {
328            tracing::info!(
329                "dropping legacy undo_state table: undo cursor is now in-session only and cannot be carried back to pre-KAN-405 builds"
330            );
331            sqlx::raw_sql("DROP TABLE IF EXISTS undo_state")
332                .execute(pool)
333                .await
334                .map_err(db_err)?;
335        }
336
337        let has_position_col: bool = sqlx::query_scalar(
338            "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'position'",
339        )
340        .fetch_one(pool)
341        .await
342        .map_err(db_err)?;
343
344        if !has_position_col {
345            sqlx::raw_sql("ALTER TABLE boards ADD COLUMN position INTEGER NOT NULL DEFAULT 0")
346                .execute(pool)
347                .await
348                .map_err(db_err)?;
349        }
350
351        let has_card_counter_col: bool = sqlx::query_scalar(
352            "SELECT COUNT(*) > 0 FROM pragma_table_info('boards') WHERE name = 'card_counter'",
353        )
354        .fetch_one(pool)
355        .await
356        .map_err(db_err)?;
357
358        if !has_card_counter_col {
359            sqlx::raw_sql("ALTER TABLE boards ADD COLUMN card_counter INTEGER NOT NULL DEFAULT 1")
360                .execute(pool)
361                .await
362                .map_err(db_err)?;
363        }
364
365        Ok(())
366    }
367
368    pub fn pool(&self) -> &Pool<Sqlite> {
369        &self.pool
370    }
371
372    pub async fn checkpoint(&self) -> KanbanResult<()> {
373        sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
374            .execute(&self.pool)
375            .await
376            .map_err(|e| KanbanError::Database(e.to_string()))?;
377        Ok(())
378    }
379
380    async fn fetch_board_aux(
381        &self,
382        board_id: &str,
383    ) -> KanbanResult<(Vec<String>, HashMap<String, u32>)> {
384        let name_rows =
385            sqlx::query("SELECT name FROM board_sprint_names WHERE board_id = ? ORDER BY position")
386                .bind(board_id)
387                .fetch_all(&self.pool)
388                .await
389                .map_err(db_err)?;
390        let sprint_names: Vec<String> = name_rows
391            .iter()
392            .map(|r| r.try_get("name").map_err(db_err))
393            .collect::<KanbanResult<_>>()?;
394
395        let counter_rows =
396            sqlx::query("SELECT prefix, counter FROM board_sprint_counters WHERE board_id = ?")
397                .bind(board_id)
398                .fetch_all(&self.pool)
399                .await
400                .map_err(db_err)?;
401        let mut sprint_counters = HashMap::new();
402        for row in &counter_rows {
403            let prefix: String = row.try_get("prefix").map_err(db_err)?;
404            let counter: i32 = row.try_get("counter").map_err(db_err)?;
405            sprint_counters.insert(prefix, counter as u32);
406        }
407
408        Ok((sprint_names, sprint_counters))
409    }
410
411    async fn write_board_with_conn(
412        conn: &mut sqlx::SqliteConnection,
413        board: &Board,
414    ) -> KanbanResult<()> {
415        let id = board.id.to_string();
416
417        sqlx::query(
418            "INSERT INTO boards (id, name, description, sprint_prefix, card_prefix,
419                task_sort_field, task_sort_order, sprint_duration_days,
420                sprint_name_used_count, next_sprint_number, active_sprint_id,
421                task_list_view, card_counter, completion_column_id, position,
422                created_at, updated_at)
423             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
424             ON CONFLICT(id) DO UPDATE SET
425                name=excluded.name, description=excluded.description,
426                sprint_prefix=excluded.sprint_prefix, card_prefix=excluded.card_prefix,
427                task_sort_field=excluded.task_sort_field, task_sort_order=excluded.task_sort_order,
428                sprint_duration_days=excluded.sprint_duration_days,
429                sprint_name_used_count=excluded.sprint_name_used_count,
430                next_sprint_number=excluded.next_sprint_number,
431                active_sprint_id=excluded.active_sprint_id,
432                task_list_view=excluded.task_list_view, card_counter=excluded.card_counter,
433                completion_column_id=excluded.completion_column_id,
434                position=excluded.position,
435                updated_at=excluded.updated_at",
436        )
437        .bind(&id)
438        .bind(&board.name)
439        .bind(&board.description)
440        .bind(&board.sprint_prefix)
441        .bind(&board.card_prefix)
442        .bind(format!("{:?}", board.task_sort_field))
443        .bind(format!("{:?}", board.task_sort_order))
444        .bind(board.sprint_duration_days.map(|v| v as i32))
445        .bind(board.sprint_name_used_count as i32)
446        .bind(board.next_sprint_number as i32)
447        .bind(board.active_sprint_id.map(|id| id.to_string()))
448        .bind(format!("{:?}", board.task_list_view))
449        .bind(board.card_counter as i32)
450        .bind(board.completion_column_id.map(|id| id.to_string()))
451        .bind(board.position)
452        .bind(fmt_dt(&board.created_at))
453        .bind(fmt_dt(&board.updated_at))
454        .execute(&mut *conn)
455        .await
456        .map_err(db_err)?;
457
458        sqlx::query("DELETE FROM board_sprint_names WHERE board_id = ?")
459            .bind(&id)
460            .execute(&mut *conn)
461            .await
462            .map_err(db_err)?;
463        for (i, name) in board.sprint_names.iter().enumerate() {
464            sqlx::query(
465                "INSERT INTO board_sprint_names (board_id, position, name) VALUES (?, ?, ?)",
466            )
467            .bind(&id)
468            .bind(i as i32)
469            .bind(name)
470            .execute(&mut *conn)
471            .await
472            .map_err(db_err)?;
473        }
474
475        sqlx::query("DELETE FROM board_sprint_counters WHERE board_id = ?")
476            .bind(&id)
477            .execute(&mut *conn)
478            .await
479            .map_err(db_err)?;
480        for (prefix, counter) in &board.sprint_counters {
481            sqlx::query(
482                "INSERT INTO board_sprint_counters (board_id, prefix, counter) VALUES (?, ?, ?)",
483            )
484            .bind(&id)
485            .bind(prefix)
486            .bind(*counter as i32)
487            .execute(&mut *conn)
488            .await
489            .map_err(db_err)?;
490        }
491
492        Ok(())
493    }
494
495    async fn write_board_async(&self, board: &Board) -> KanbanResult<()> {
496        let mut tx = self.pool.begin().await.map_err(db_err)?;
497        Self::write_board_with_conn(&mut tx, board).await?;
498        tx.commit().await.map_err(db_err)?;
499        Ok(())
500    }
501
502    async fn fetch_sprint_logs_for_card(&self, card_id: &str) -> KanbanResult<Vec<SprintLog>> {
503        let rows = sqlx::query(
504            "SELECT sprint_id, sprint_number, sprint_name, started_at, ended_at, status
505             FROM sprint_logs WHERE card_id = ? ORDER BY id",
506        )
507        .bind(card_id)
508        .fetch_all(&self.pool)
509        .await
510        .map_err(db_err)?;
511        rows.iter().map(row_to_sprint_log).collect()
512    }
513
514    async fn write_card_with_conn(
515        conn: &mut sqlx::SqliteConnection,
516        card: &Card,
517    ) -> KanbanResult<()> {
518        let id = card.id.to_string();
519
520        sqlx::query(
521            "INSERT INTO cards (id, column_id, title, description, priority, status, position,
522                due_date, points, card_number, sprint_id, created_at, updated_at, completed_at)
523             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
524             ON CONFLICT(id) DO UPDATE SET
525                column_id=excluded.column_id, title=excluded.title,
526                description=excluded.description, priority=excluded.priority,
527                status=excluded.status, position=excluded.position,
528                due_date=excluded.due_date, points=excluded.points,
529                card_number=excluded.card_number, sprint_id=excluded.sprint_id,
530                updated_at=excluded.updated_at, completed_at=excluded.completed_at",
531        )
532        .bind(&id)
533        .bind(card.column_id.to_string())
534        .bind(&card.title)
535        .bind(&card.description)
536        .bind(format!("{:?}", card.priority))
537        .bind(format!("{:?}", card.status))
538        .bind(card.position)
539        .bind(opt_dt(&card.due_date))
540        .bind(card.points.map(|v| v as i32))
541        .bind(card.card_number as i32)
542        .bind(card.sprint_id.map(|id| id.to_string()))
543        .bind(fmt_dt(&card.created_at))
544        .bind(fmt_dt(&card.updated_at))
545        .bind(opt_dt(&card.completed_at))
546        .execute(&mut *conn)
547        .await
548        .map_err(db_err)?;
549
550        sqlx::query("DELETE FROM sprint_logs WHERE card_id = ?")
551            .bind(&id)
552            .execute(&mut *conn)
553            .await
554            .map_err(db_err)?;
555        for log in &card.sprint_logs {
556            sqlx::query(
557                "INSERT INTO sprint_logs (card_id, sprint_id, sprint_number, sprint_name,
558                    started_at, ended_at, status)
559                 VALUES (?, ?, ?, ?, ?, ?, ?)",
560            )
561            .bind(&id)
562            .bind(log.sprint_id.to_string())
563            .bind(log.sprint_number as i32)
564            .bind(&log.sprint_name)
565            .bind(fmt_dt(&log.started_at))
566            .bind(opt_dt(&log.ended_at))
567            .bind(&log.status)
568            .execute(&mut *conn)
569            .await
570            .map_err(db_err)?;
571        }
572
573        Ok(())
574    }
575
576    async fn write_card_async(&self, card: &Card) -> KanbanResult<()> {
577        let mut tx = self.pool.begin().await.map_err(db_err)?;
578        Self::write_card_with_conn(&mut tx, card).await?;
579        tx.commit().await.map_err(db_err)?;
580        Ok(())
581    }
582
583    async fn fetch_sprint_logs_batch(
584        &self,
585        card_ids: &[String],
586    ) -> KanbanResult<HashMap<String, Vec<SprintLog>>> {
587        if card_ids.is_empty() {
588            return Ok(HashMap::new());
589        }
590        let placeholders = card_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
591        let sql = format!(
592            "SELECT card_id, sprint_id, sprint_number, sprint_name, started_at, ended_at, status
593             FROM sprint_logs WHERE card_id IN ({placeholders}) ORDER BY id"
594        );
595        let mut query = sqlx::query(&sql);
596        for id in card_ids {
597            query = query.bind(id);
598        }
599        let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
600        let mut map: HashMap<String, Vec<SprintLog>> = HashMap::new();
601        for row in &rows {
602            let card_id: String = row.try_get("card_id").map_err(db_err)?;
603            let log = row_to_sprint_log(row)?;
604            map.entry(card_id).or_default().push(log);
605        }
606        Ok(map)
607    }
608
609    async fn fetch_cards_with_filter(
610        &self,
611        where_clause: &str,
612        binds: &[String],
613    ) -> KanbanResult<Vec<Card>> {
614        let sql = format!(
615            "SELECT id, column_id, title, description, priority, status, position,
616                    due_date, points, card_number, sprint_id, created_at, updated_at, completed_at
617             FROM cards WHERE id NOT IN (SELECT card_id FROM archived_cards) {}
618             ORDER BY position ASC, created_at ASC",
619            where_clause
620        );
621        let mut query = sqlx::query(&sql);
622        for b in binds {
623            query = query.bind(b);
624        }
625        let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
626
627        let card_ids: Vec<String> = rows
628            .iter()
629            .map(|r| r.try_get("id").map_err(db_err))
630            .collect::<KanbanResult<_>>()?;
631        let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
632
633        let mut cards = Vec::with_capacity(rows.len());
634        for row in &rows {
635            let id_str: String = row.try_get("id").map_err(db_err)?;
636            let logs = logs_map.remove(&id_str).unwrap_or_default();
637            cards.push(row_to_card(row, logs)?);
638        }
639        Ok(cards)
640    }
641
642    async fn get_graph_with_conn(
643        conn: &mut sqlx::SqliteConnection,
644    ) -> KanbanResult<DependencyGraph> {
645        let rows = sqlx::query(
646            "SELECT source_id, target_id, edge_type, direction, weight, created_at, archived_at
647             FROM card_edges",
648        )
649        .fetch_all(&mut *conn)
650        .await
651        .map_err(db_err)?;
652        rows_to_graph(&rows)
653    }
654
655    async fn modify_graph_async(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
656        let mut tx = self.pool.begin().await.map_err(db_err)?;
657        let mut graph = Self::get_graph_with_conn(&mut tx).await?;
658        f(&mut graph)?;
659        Self::write_graph_with_conn(&mut tx, &graph).await?;
660        tx.commit().await.map_err(db_err)?;
661        Ok(())
662    }
663
664    async fn write_graph_with_conn(
665        conn: &mut sqlx::SqliteConnection,
666        graph: &DependencyGraph,
667    ) -> KanbanResult<()> {
668        sqlx::query("DELETE FROM card_edges")
669            .execute(&mut *conn)
670            .await
671            .map_err(db_err)?;
672
673        for edge in graph.cards.edges() {
674            sqlx::query(
675                "INSERT INTO card_edges
676                    (source_id, target_id, edge_type, direction, weight, created_at, archived_at)
677                 VALUES (?, ?, ?, ?, ?, ?, ?)",
678            )
679            .bind(edge.source.to_string())
680            .bind(edge.target.to_string())
681            .bind(format!("{:?}", edge.edge_type))
682            .bind(format!("{:?}", edge.direction))
683            .bind(edge.weight.map(|w| w as f64))
684            .bind(fmt_dt(&edge.created_at))
685            .bind(opt_dt(&edge.archived_at))
686            .execute(&mut *conn)
687            .await
688            .map_err(db_err)?;
689        }
690
691        Ok(())
692    }
693
694    async fn write_graph_async(&self, graph: &DependencyGraph) -> KanbanResult<()> {
695        let mut tx = self.pool.begin().await.map_err(db_err)?;
696        Self::write_graph_with_conn(&mut tx, graph).await?;
697        tx.commit().await.map_err(db_err)?;
698        Ok(())
699    }
700
701    async fn snapshot_async(&self) -> KanbanResult<Snapshot> {
702        let boards = self.list_boards_async().await?;
703        let columns = self.list_all_columns_async().await?;
704        let cards = self.fetch_cards_with_filter("", &[]).await?;
705        let archived_cards = self.list_archived_cards_async().await?;
706        let sprints = self.list_all_sprints_async().await?;
707        let graph = self.get_graph_async().await?;
708        Ok(Snapshot::from_data(
709            boards,
710            columns,
711            cards,
712            archived_cards,
713            sprints,
714            graph,
715        ))
716    }
717
718    async fn apply_snapshot_async(&self, snapshot: Snapshot) -> KanbanResult<()> {
719        let mut tx = self.pool.begin().await.map_err(db_err)?;
720
721        sqlx::query("PRAGMA defer_foreign_keys = ON")
722            .execute(&mut *tx)
723            .await
724            .map_err(db_err)?;
725
726        sqlx::query("DELETE FROM card_edges")
727            .execute(&mut *tx)
728            .await
729            .map_err(db_err)?;
730        sqlx::query("DELETE FROM archived_cards")
731            .execute(&mut *tx)
732            .await
733            .map_err(db_err)?;
734        sqlx::query("DELETE FROM sprint_logs")
735            .execute(&mut *tx)
736            .await
737            .map_err(db_err)?;
738        sqlx::query("DELETE FROM cards")
739            .execute(&mut *tx)
740            .await
741            .map_err(db_err)?;
742        sqlx::query("DELETE FROM sprints")
743            .execute(&mut *tx)
744            .await
745            .map_err(db_err)?;
746        sqlx::query("DELETE FROM board_sprint_names")
747            .execute(&mut *tx)
748            .await
749            .map_err(db_err)?;
750        sqlx::query("DELETE FROM board_sprint_counters")
751            .execute(&mut *tx)
752            .await
753            .map_err(db_err)?;
754        sqlx::query("DELETE FROM columns")
755            .execute(&mut *tx)
756            .await
757            .map_err(db_err)?;
758        sqlx::query("DELETE FROM boards")
759            .execute(&mut *tx)
760            .await
761            .map_err(db_err)?;
762
763        for board in &snapshot.boards {
764            Self::write_board_with_conn(&mut tx, board).await?;
765        }
766        for column in &snapshot.columns {
767            Self::write_column_with_conn(&mut tx, column).await?;
768        }
769        for sprint in &snapshot.sprints {
770            Self::write_sprint_with_conn(&mut tx, sprint).await?;
771        }
772        for card in &snapshot.cards {
773            Self::write_card_with_conn(&mut tx, card).await?;
774        }
775        for ac in &snapshot.archived_cards {
776            Self::write_archived_card_with_conn(&mut tx, ac).await?;
777        }
778        Self::write_graph_with_conn(&mut tx, &snapshot.graph).await?;
779
780        tx.commit().await.map_err(db_err)?;
781        Ok(())
782    }
783
784    async fn fetch_all_board_aux(
785        &self,
786    ) -> KanbanResult<(
787        HashMap<String, Vec<String>>,
788        HashMap<String, HashMap<String, u32>>,
789    )> {
790        let name_rows = sqlx::query(
791            "SELECT board_id, name FROM board_sprint_names ORDER BY board_id, position",
792        )
793        .fetch_all(&self.pool)
794        .await
795        .map_err(db_err)?;
796        let mut names_map: HashMap<String, Vec<String>> = HashMap::new();
797        for row in &name_rows {
798            let board_id: String = row.try_get("board_id").map_err(db_err)?;
799            let name: String = row.try_get("name").map_err(db_err)?;
800            names_map.entry(board_id).or_default().push(name);
801        }
802
803        let counter_rows =
804            sqlx::query("SELECT board_id, prefix, counter FROM board_sprint_counters")
805                .fetch_all(&self.pool)
806                .await
807                .map_err(db_err)?;
808        let mut counters_map: HashMap<String, HashMap<String, u32>> = HashMap::new();
809        for row in &counter_rows {
810            let board_id: String = row.try_get("board_id").map_err(db_err)?;
811            let prefix: String = row.try_get("prefix").map_err(db_err)?;
812            let counter: i32 = row.try_get("counter").map_err(db_err)?;
813            counters_map
814                .entry(board_id)
815                .or_default()
816                .insert(prefix, counter as u32);
817        }
818
819        Ok((names_map, counters_map))
820    }
821
822    async fn list_boards_async(&self) -> KanbanResult<Vec<Board>> {
823        let rows = sqlx::query(
824            "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
825                    task_sort_order, sprint_duration_days, sprint_name_used_count,
826                    next_sprint_number, active_sprint_id, task_list_view,
827                    COALESCE(card_counter, 1) as card_counter,
828                    completion_column_id, position, created_at, updated_at
829             FROM boards ORDER BY position ASC",
830        )
831        .fetch_all(&self.pool)
832        .await
833        .map_err(db_err)?;
834
835        let (mut names_map, mut counters_map) = self.fetch_all_board_aux().await?;
836
837        let mut boards = Vec::with_capacity(rows.len());
838        for row in &rows {
839            let id_str: String = row.try_get("id").map_err(db_err)?;
840            let names = names_map.remove(&id_str).unwrap_or_default();
841            let counters = counters_map.remove(&id_str).unwrap_or_default();
842            boards.push(row_to_board(row, names, counters)?);
843        }
844        Ok(boards)
845    }
846
847    async fn list_all_columns_async(&self) -> KanbanResult<Vec<Column>> {
848        let rows = sqlx::query(
849            "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
850             FROM columns ORDER BY position",
851        )
852        .fetch_all(&self.pool)
853        .await
854        .map_err(db_err)?;
855        rows.iter().map(row_to_column).collect()
856    }
857
858    async fn list_all_sprints_async(&self) -> KanbanResult<Vec<Sprint>> {
859        let rows = sqlx::query(
860            "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
861                    status, start_date, end_date, created_at, updated_at
862             FROM sprints ORDER BY sprint_number",
863        )
864        .fetch_all(&self.pool)
865        .await
866        .map_err(db_err)?;
867        rows.iter().map(row_to_sprint).collect()
868    }
869
870    async fn list_archived_cards_async(&self) -> KanbanResult<Vec<ArchivedCard>> {
871        let rows = sqlx::query(
872            "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
873                    c.position, c.due_date, c.points, c.card_number, c.sprint_id,
874                    c.created_at, c.updated_at, c.completed_at,
875                    ac.archived_at, ac.original_column_id, ac.original_position
876             FROM archived_cards ac
877             JOIN cards c ON ac.card_id = c.id
878             ORDER BY ac.archived_at",
879        )
880        .fetch_all(&self.pool)
881        .await
882        .map_err(db_err)?;
883
884        let card_ids: Vec<String> = rows
885            .iter()
886            .map(|r| r.try_get("id").map_err(db_err))
887            .collect::<KanbanResult<_>>()?;
888        let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
889
890        let mut result = Vec::with_capacity(rows.len());
891        for row in &rows {
892            let id_str: String = row.try_get("id").map_err(db_err)?;
893            let logs = logs_map.remove(&id_str).unwrap_or_default();
894            let card = row_to_card(row, logs)?;
895            let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
896            let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
897            result.push(ArchivedCard {
898                card,
899                archived_at: p_dt(&archived_at_str)?,
900                original_column_id: p_uuid(&orig_col_str)?,
901                original_position: row.try_get("original_position").map_err(db_err)?,
902            });
903        }
904        Ok(result)
905    }
906
907    async fn get_graph_async(&self) -> KanbanResult<DependencyGraph> {
908        let rows = sqlx::query(
909            "SELECT source_id, target_id, edge_type, direction, weight, created_at, archived_at
910             FROM card_edges",
911        )
912        .fetch_all(&self.pool)
913        .await
914        .map_err(db_err)?;
915        rows_to_graph(&rows)
916    }
917
918    async fn write_column_with_conn(
919        conn: &mut sqlx::SqliteConnection,
920        column: &Column,
921    ) -> KanbanResult<()> {
922        sqlx::query(
923            "INSERT INTO columns (id, board_id, name, position, wip_limit, created_at, updated_at)
924             VALUES (?, ?, ?, ?, ?, ?, ?)
925             ON CONFLICT(id) DO UPDATE SET
926                board_id=excluded.board_id, name=excluded.name,
927                position=excluded.position, wip_limit=excluded.wip_limit,
928                updated_at=excluded.updated_at",
929        )
930        .bind(column.id.to_string())
931        .bind(column.board_id.to_string())
932        .bind(&column.name)
933        .bind(column.position)
934        .bind(column.wip_limit)
935        .bind(fmt_dt(&column.created_at))
936        .bind(fmt_dt(&column.updated_at))
937        .execute(&mut *conn)
938        .await
939        .map_err(db_err)?;
940        Ok(())
941    }
942
943    async fn write_column_async(&self, column: &Column) -> KanbanResult<()> {
944        Self::write_column_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, column).await
945    }
946
947    async fn write_sprint_with_conn(
948        conn: &mut sqlx::SqliteConnection,
949        sprint: &Sprint,
950    ) -> KanbanResult<()> {
951        sqlx::query(
952            "INSERT INTO sprints (id, board_id, sprint_number, name_index, prefix, card_prefix,
953                status, start_date, end_date, created_at, updated_at)
954             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
955             ON CONFLICT(id) DO UPDATE SET
956                board_id=excluded.board_id, sprint_number=excluded.sprint_number,
957                name_index=excluded.name_index, prefix=excluded.prefix,
958                card_prefix=excluded.card_prefix, status=excluded.status,
959                start_date=excluded.start_date, end_date=excluded.end_date,
960                updated_at=excluded.updated_at",
961        )
962        .bind(sprint.id.to_string())
963        .bind(sprint.board_id.to_string())
964        .bind(sprint.sprint_number as i32)
965        .bind(sprint.name_index.map(|v| v as i32))
966        .bind(&sprint.prefix)
967        .bind(&sprint.card_prefix)
968        .bind(format!("{:?}", sprint.status))
969        .bind(opt_dt(&sprint.start_date))
970        .bind(opt_dt(&sprint.end_date))
971        .bind(fmt_dt(&sprint.created_at))
972        .bind(fmt_dt(&sprint.updated_at))
973        .execute(&mut *conn)
974        .await
975        .map_err(db_err)?;
976        Ok(())
977    }
978
979    async fn write_sprint_async(&self, sprint: &Sprint) -> KanbanResult<()> {
980        Self::write_sprint_with_conn(&mut *self.pool.acquire().await.map_err(db_err)?, sprint).await
981    }
982
983    async fn write_archived_card_with_conn(
984        conn: &mut sqlx::SqliteConnection,
985        ac: &ArchivedCard,
986    ) -> KanbanResult<()> {
987        Self::write_card_with_conn(conn, &ac.card).await?;
988        sqlx::query(
989            "INSERT INTO archived_cards (card_id, archived_at, original_column_id, original_position)
990             VALUES (?, ?, ?, ?)
991             ON CONFLICT(card_id) DO UPDATE SET
992                archived_at=excluded.archived_at,
993                original_column_id=excluded.original_column_id,
994                original_position=excluded.original_position",
995        )
996        .bind(ac.card.id.to_string())
997        .bind(fmt_dt(&ac.archived_at))
998        .bind(ac.original_column_id.to_string())
999        .bind(ac.original_position)
1000        .execute(&mut *conn)
1001        .await
1002        .map_err(db_err)?;
1003        Ok(())
1004    }
1005
1006    async fn write_archived_card_async(&self, ac: &ArchivedCard) -> KanbanResult<()> {
1007        let mut tx = self.pool.begin().await.map_err(db_err)?;
1008        Self::write_archived_card_with_conn(&mut tx, ac).await?;
1009        tx.commit().await.map_err(db_err)?;
1010        Ok(())
1011    }
1012}
1013
1014impl DataStore for SqliteStore {
1015    // Board
1016
1017    fn get_board(&self, id: Uuid) -> KanbanResult<Option<Board>> {
1018        run(async {
1019            let id_str = id.to_string();
1020            let row = sqlx::query(
1021                "SELECT id, name, description, sprint_prefix, card_prefix, task_sort_field,
1022                        task_sort_order, sprint_duration_days, sprint_name_used_count,
1023                        next_sprint_number, active_sprint_id, task_list_view,
1024                        COALESCE(card_counter, 1) as card_counter,
1025                        completion_column_id, position, created_at, updated_at
1026                 FROM boards WHERE id = ?",
1027            )
1028            .bind(&id_str)
1029            .fetch_optional(&self.pool)
1030            .await
1031            .map_err(db_err)?;
1032
1033            match row {
1034                Some(row) => {
1035                    let (names, counters) = self.fetch_board_aux(&id_str).await?;
1036                    Ok(Some(row_to_board(&row, names, counters)?))
1037                }
1038                None => Ok(None),
1039            }
1040        })
1041    }
1042
1043    fn list_boards(&self) -> KanbanResult<Vec<Board>> {
1044        run(self.list_boards_async())
1045    }
1046
1047    fn upsert_board(&self, board: Board) -> KanbanResult<()> {
1048        run(self.write_board_async(&board))
1049    }
1050
1051    fn delete_board(&self, id: Uuid) -> KanbanResult<()> {
1052        run(async {
1053            sqlx::query("DELETE FROM boards WHERE id = ?")
1054                .bind(id.to_string())
1055                .execute(&self.pool)
1056                .await
1057                .map_err(db_err)?;
1058            Ok(())
1059        })
1060    }
1061
1062    // Column
1063
1064    fn get_column(&self, id: Uuid) -> KanbanResult<Option<Column>> {
1065        run(async {
1066            let row = sqlx::query(
1067                "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1068                 FROM columns WHERE id = ?",
1069            )
1070            .bind(id.to_string())
1071            .fetch_optional(&self.pool)
1072            .await
1073            .map_err(db_err)?;
1074            row.as_ref().map(row_to_column).transpose()
1075        })
1076    }
1077
1078    fn list_columns_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Column>> {
1079        run(async {
1080            let rows = sqlx::query(
1081                "SELECT id, board_id, name, position, wip_limit, created_at, updated_at
1082                 FROM columns WHERE board_id = ? ORDER BY position",
1083            )
1084            .bind(board_id.to_string())
1085            .fetch_all(&self.pool)
1086            .await
1087            .map_err(db_err)?;
1088            rows.iter().map(row_to_column).collect()
1089        })
1090    }
1091
1092    fn list_all_columns(&self) -> KanbanResult<Vec<Column>> {
1093        run(self.list_all_columns_async())
1094    }
1095
1096    fn upsert_column(&self, column: Column) -> KanbanResult<()> {
1097        run(self.write_column_async(&column))
1098    }
1099
1100    fn delete_column(&self, id: Uuid) -> KanbanResult<()> {
1101        run(async {
1102            sqlx::query("DELETE FROM columns WHERE id = ?")
1103                .bind(id.to_string())
1104                .execute(&self.pool)
1105                .await
1106                .map_err(db_err)?;
1107            Ok(())
1108        })
1109    }
1110
1111    fn delete_columns_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1112        run(async {
1113            sqlx::query("DELETE FROM columns WHERE board_id = ?")
1114                .bind(board_id.to_string())
1115                .execute(&self.pool)
1116                .await
1117                .map_err(db_err)?;
1118            Ok(())
1119        })
1120    }
1121
1122    // Card
1123
1124    fn get_card(&self, id: Uuid) -> KanbanResult<Option<Card>> {
1125        run(async {
1126            let id_str = id.to_string();
1127            let row = sqlx::query(
1128                "SELECT id, column_id, title, description, priority, status, position,
1129                        due_date, points, card_number, sprint_id, created_at, updated_at,
1130                        completed_at
1131                 FROM cards
1132                 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1133            )
1134            .bind(&id_str)
1135            .fetch_optional(&self.pool)
1136            .await
1137            .map_err(db_err)?;
1138
1139            match row {
1140                Some(row) => {
1141                    let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1142                    Ok(Some(row_to_card(&row, logs)?))
1143                }
1144                None => Ok(None),
1145            }
1146        })
1147    }
1148
1149    fn list_all_cards(&self) -> KanbanResult<Vec<Card>> {
1150        run(self.fetch_cards_with_filter("", &[]))
1151    }
1152
1153    fn list_cards_by_column(&self, column_id: Uuid) -> KanbanResult<Vec<Card>> {
1154        run(self.fetch_cards_with_filter("AND column_id = ?", &[column_id.to_string()]))
1155    }
1156
1157    fn list_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<Vec<Card>> {
1158        if column_ids.is_empty() {
1159            return Ok(Vec::new());
1160        }
1161        let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1162        let where_clause = format!("AND column_id IN ({placeholders})");
1163        let binds: Vec<String> = column_ids.iter().map(|id| id.to_string()).collect();
1164        run(self.fetch_cards_with_filter(&where_clause, &binds))
1165    }
1166
1167    fn list_cards_by_sprint(&self, sprint_id: Uuid) -> KanbanResult<Vec<Card>> {
1168        run(self.fetch_cards_with_filter("AND sprint_id = ?", &[sprint_id.to_string()]))
1169    }
1170
1171    fn count_cards_in_column(&self, column_id: Uuid) -> KanbanResult<usize> {
1172        run(async {
1173            let row = sqlx::query(
1174                "SELECT COUNT(*) as cnt FROM cards
1175                 WHERE column_id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1176            )
1177            .bind(column_id.to_string())
1178            .fetch_one(&self.pool)
1179            .await
1180            .map_err(db_err)?;
1181            Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1182        })
1183    }
1184
1185    fn count_cards_in_column_excluding(
1186        &self,
1187        column_id: Uuid,
1188        exclude: &[Uuid],
1189    ) -> KanbanResult<usize> {
1190        run(async {
1191            if exclude.is_empty() {
1192                return self.count_cards_in_column(column_id);
1193            }
1194            let placeholders = exclude.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1195            let sql = format!(
1196                "SELECT COUNT(*) as cnt FROM cards
1197                 WHERE column_id = ?
1198                   AND id NOT IN (SELECT card_id FROM archived_cards)
1199                   AND id NOT IN ({placeholders})"
1200            );
1201            let mut query = sqlx::query(&sql).bind(column_id.to_string());
1202            for id in exclude {
1203                query = query.bind(id.to_string());
1204            }
1205            let row = query.fetch_one(&self.pool).await.map_err(db_err)?;
1206            Ok(row.try_get::<i32, _>("cnt").map_err(db_err)? as usize)
1207        })
1208    }
1209
1210    fn upsert_card(&self, card: Card) -> KanbanResult<()> {
1211        run(self.write_card_async(&card))
1212    }
1213
1214    fn delete_card(&self, id: Uuid) -> KanbanResult<()> {
1215        run(async {
1216            sqlx::query(
1217                "DELETE FROM cards
1218                 WHERE id = ? AND id NOT IN (SELECT card_id FROM archived_cards)",
1219            )
1220            .bind(id.to_string())
1221            .execute(&self.pool)
1222            .await
1223            .map_err(db_err)?;
1224            Ok(())
1225        })
1226    }
1227
1228    fn delete_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<()> {
1229        run(async {
1230            if column_ids.is_empty() {
1231                return Ok(());
1232            }
1233            let placeholders = column_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1234            let sql = format!(
1235                "DELETE FROM cards
1236                 WHERE column_id IN ({placeholders})
1237                   AND id NOT IN (SELECT card_id FROM archived_cards)"
1238            );
1239            let mut query = sqlx::query(&sql);
1240            for id in column_ids {
1241                query = query.bind(id.to_string());
1242            }
1243            query.execute(&self.pool).await.map_err(db_err)?;
1244            Ok(())
1245        })
1246    }
1247
1248    fn clear_sprint_from_cards(
1249        &self,
1250        sprint_id: Uuid,
1251        timestamp: DateTime<Utc>,
1252    ) -> KanbanResult<()> {
1253        run(async {
1254            let now = fmt_dt(&timestamp);
1255            sqlx::query(
1256                "UPDATE cards SET sprint_id = NULL, updated_at = ?
1257                 WHERE sprint_id = ?
1258                   AND id NOT IN (SELECT card_id FROM archived_cards)",
1259            )
1260            .bind(&now)
1261            .bind(sprint_id.to_string())
1262            .execute(&self.pool)
1263            .await
1264            .map_err(db_err)?;
1265            Ok(())
1266        })
1267    }
1268
1269    // Archived card
1270
1271    fn get_archived_card(&self, card_id: Uuid) -> KanbanResult<Option<ArchivedCard>> {
1272        run(async {
1273            let id_str = card_id.to_string();
1274            let row = sqlx::query(
1275                "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1276                        c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1277                        c.created_at, c.updated_at, c.completed_at,
1278                        ac.archived_at, ac.original_column_id, ac.original_position
1279                 FROM archived_cards ac
1280                 JOIN cards c ON ac.card_id = c.id
1281                 WHERE ac.card_id = ?",
1282            )
1283            .bind(&id_str)
1284            .fetch_optional(&self.pool)
1285            .await
1286            .map_err(db_err)?;
1287
1288            match row {
1289                Some(row) => {
1290                    let logs = self.fetch_sprint_logs_for_card(&id_str).await?;
1291                    let card = row_to_card(&row, logs)?;
1292                    let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1293                    let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1294                    Ok(Some(ArchivedCard {
1295                        card,
1296                        archived_at: p_dt(&archived_at_str)?,
1297                        original_column_id: p_uuid(&orig_col_str)?,
1298                        original_position: row.try_get("original_position").map_err(db_err)?,
1299                    }))
1300                }
1301                None => Ok(None),
1302            }
1303        })
1304    }
1305
1306    fn list_archived_cards(&self) -> KanbanResult<Vec<ArchivedCard>> {
1307        run(self.list_archived_cards_async())
1308    }
1309
1310    fn insert_archived_card(&self, ac: ArchivedCard) -> KanbanResult<()> {
1311        run(self.write_archived_card_async(&ac))
1312    }
1313
1314    fn delete_archived_card(&self, card_id: Uuid) -> KanbanResult<()> {
1315        run(async {
1316            let mut tx = self.pool.begin().await.map_err(db_err)?;
1317            sqlx::query("DELETE FROM archived_cards WHERE card_id = ?")
1318                .bind(card_id.to_string())
1319                .execute(&mut *tx)
1320                .await
1321                .map_err(db_err)?;
1322            sqlx::query("DELETE FROM cards WHERE id = ?")
1323                .bind(card_id.to_string())
1324                .execute(&mut *tx)
1325                .await
1326                .map_err(db_err)?;
1327            tx.commit().await.map_err(db_err)
1328        })
1329    }
1330
1331    fn list_archived_cards_by_columns(
1332        &self,
1333        column_ids: &[Uuid],
1334    ) -> KanbanResult<Vec<ArchivedCard>> {
1335        if column_ids.is_empty() {
1336            return Ok(Vec::new());
1337        }
1338        run(async {
1339            let placeholders: Vec<&str> = column_ids.iter().map(|_| "?").collect();
1340            let sql = format!(
1341                "SELECT c.id, c.column_id, c.title, c.description, c.priority, c.status,
1342                        c.position, c.due_date, c.points, c.card_number, c.sprint_id,
1343                        c.created_at, c.updated_at, c.completed_at,
1344                        ac.archived_at, ac.original_column_id, ac.original_position
1345                 FROM archived_cards ac
1346                 JOIN cards c ON ac.card_id = c.id
1347                 WHERE ac.original_column_id IN ({})
1348                 ORDER BY ac.archived_at",
1349                placeholders.join(", ")
1350            );
1351            let mut query = sqlx::query(&sql);
1352            for id in column_ids {
1353                query = query.bind(id.to_string());
1354            }
1355            let rows = query.fetch_all(&self.pool).await.map_err(db_err)?;
1356
1357            let card_ids: Vec<String> = rows
1358                .iter()
1359                .map(|r| r.try_get("id").map_err(db_err))
1360                .collect::<KanbanResult<_>>()?;
1361            let mut logs_map = self.fetch_sprint_logs_batch(&card_ids).await?;
1362
1363            let mut result = Vec::with_capacity(rows.len());
1364            for row in &rows {
1365                let id_str: String = row.try_get("id").map_err(db_err)?;
1366                let logs = logs_map.remove(&id_str).unwrap_or_default();
1367                let card = row_to_card(row, logs)?;
1368                let archived_at_str: String = row.try_get("archived_at").map_err(db_err)?;
1369                let orig_col_str: String = row.try_get("original_column_id").map_err(db_err)?;
1370                result.push(ArchivedCard {
1371                    card,
1372                    archived_at: p_dt(&archived_at_str)?,
1373                    original_column_id: p_uuid(&orig_col_str)?,
1374                    original_position: row.try_get("original_position").map_err(db_err)?,
1375                });
1376            }
1377            Ok(result)
1378        })
1379    }
1380
1381    fn clear_sprint_from_archived_cards(
1382        &self,
1383        sprint_id: Uuid,
1384        timestamp: DateTime<Utc>,
1385    ) -> KanbanResult<()> {
1386        run(async {
1387            let now = fmt_dt(&timestamp);
1388            sqlx::query(
1389                "UPDATE cards SET sprint_id = NULL, updated_at = ?
1390                 WHERE sprint_id = ?
1391                   AND id IN (SELECT card_id FROM archived_cards)",
1392            )
1393            .bind(&now)
1394            .bind(sprint_id.to_string())
1395            .execute(&self.pool)
1396            .await
1397            .map_err(db_err)?;
1398            Ok(())
1399        })
1400    }
1401
1402    // Sprint
1403
1404    fn get_sprint(&self, id: Uuid) -> KanbanResult<Option<Sprint>> {
1405        run(async {
1406            let row = sqlx::query(
1407                "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1408                        status, start_date, end_date, created_at, updated_at
1409                 FROM sprints WHERE id = ?",
1410            )
1411            .bind(id.to_string())
1412            .fetch_optional(&self.pool)
1413            .await
1414            .map_err(db_err)?;
1415            row.as_ref().map(row_to_sprint).transpose()
1416        })
1417    }
1418
1419    fn list_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Sprint>> {
1420        run(async {
1421            let rows = sqlx::query(
1422                "SELECT id, board_id, sprint_number, name_index, prefix, card_prefix,
1423                        status, start_date, end_date, created_at, updated_at
1424                 FROM sprints WHERE board_id = ? ORDER BY sprint_number",
1425            )
1426            .bind(board_id.to_string())
1427            .fetch_all(&self.pool)
1428            .await
1429            .map_err(db_err)?;
1430            rows.iter().map(row_to_sprint).collect()
1431        })
1432    }
1433
1434    fn list_all_sprints(&self) -> KanbanResult<Vec<Sprint>> {
1435        run(self.list_all_sprints_async())
1436    }
1437
1438    fn upsert_sprint(&self, sprint: Sprint) -> KanbanResult<()> {
1439        run(self.write_sprint_async(&sprint))
1440    }
1441
1442    fn delete_sprint(&self, id: Uuid) -> KanbanResult<()> {
1443        run(async {
1444            sqlx::query("DELETE FROM sprints WHERE id = ?")
1445                .bind(id.to_string())
1446                .execute(&self.pool)
1447                .await
1448                .map_err(db_err)?;
1449            Ok(())
1450        })
1451    }
1452
1453    fn delete_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
1454        run(async {
1455            sqlx::query("DELETE FROM sprints WHERE board_id = ?")
1456                .bind(board_id.to_string())
1457                .execute(&self.pool)
1458                .await
1459                .map_err(db_err)?;
1460            Ok(())
1461        })
1462    }
1463
1464    // Graph
1465
1466    fn get_graph(&self) -> KanbanResult<DependencyGraph> {
1467        run(self.get_graph_async())
1468    }
1469
1470    fn set_graph(&self, graph: DependencyGraph) -> KanbanResult<()> {
1471        run(self.write_graph_async(&graph))
1472    }
1473
1474    fn modify_graph(&self, f: kanban_domain::GraphMutFn) -> KanbanResult<()> {
1475        run(self.modify_graph_async(f))
1476    }
1477
1478    // Snapshot
1479
1480    fn snapshot(&self) -> KanbanResult<Snapshot> {
1481        run(self.snapshot_async())
1482    }
1483
1484    fn apply_snapshot(&self, snapshot: Snapshot) -> KanbanResult<()> {
1485        run(self.apply_snapshot_async(snapshot))
1486    }
1487}
1488
1489#[async_trait::async_trait]
1490impl PersistenceStore for SqliteStore {
1491    async fn save(&self, snapshot: StoreSnapshot) -> PersistenceResult<PersistenceMetadata> {
1492        let domain_snapshot: Snapshot = serde_json::from_slice(&snapshot.data)
1493            .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1494        self.apply_snapshot_async(domain_snapshot)
1495            .await
1496            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1497        self.checkpoint()
1498            .await
1499            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1500        Ok(PersistenceMetadata::new(self.instance_id))
1501    }
1502
1503    async fn load(&self) -> PersistenceResult<(StoreSnapshot, PersistenceMetadata)> {
1504        let domain_snapshot = self
1505            .snapshot_async()
1506            .await
1507            .map_err(|e| PersistenceError::Database(e.to_string()))?;
1508        let data = serde_json::to_vec(&domain_snapshot)
1509            .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
1510        let meta = PersistenceMetadata::new(self.instance_id);
1511        Ok((
1512            StoreSnapshot {
1513                data,
1514                metadata: meta.clone(),
1515            },
1516            meta,
1517        ))
1518    }
1519
1520    async fn exists(&self) -> bool {
1521        self.path.exists()
1522    }
1523
1524    fn path(&self) -> &std::path::Path {
1525        &self.path
1526    }
1527
1528    fn instance_id(&self) -> Uuid {
1529        self.instance_id
1530    }
1531
1532    async fn close(&self) {
1533        self.pool.close().await;
1534    }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539    use super::*;
1540    use tempfile::TempDir;
1541
1542    fn make_rt() -> tokio::runtime::Runtime {
1543        tokio::runtime::Builder::new_multi_thread()
1544            .enable_all()
1545            .build()
1546            .unwrap()
1547    }
1548
1549    #[test]
1550    fn test_sqlitestore_path_is_preserved() {
1551        let dir = TempDir::new().unwrap();
1552        let path = dir.path().join("test.db");
1553        let rt = make_rt();
1554        let store = rt.block_on(SqliteStore::open(&path)).unwrap();
1555        assert_eq!(store.path(), path.as_path());
1556    }
1557
1558    #[test]
1559    fn test_sqlitestore_instance_id_persists_across_reopen() {
1560        let dir = TempDir::new().unwrap();
1561        let path = dir.path().join("test.db");
1562        let rt = make_rt();
1563        let id1 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1564        let id2 = rt.block_on(SqliteStore::open(&path)).unwrap().instance_id();
1565        assert_eq!(id1, id2, "instance_id must be stable across reopens");
1566    }
1567
1568    #[test]
1569    fn test_sqlitestore_persistence_save_load_roundtrip() {
1570        use kanban_domain::{Board, DependencyGraph};
1571        use kanban_persistence::snapshot_to_json_bytes;
1572
1573        let dir = TempDir::new().unwrap();
1574        let path = dir.path().join("test.db");
1575        let rt = make_rt();
1576
1577        rt.block_on(async {
1578            let store = SqliteStore::open(&path).await.unwrap();
1579            let board = Board::new("Test Board".to_string(), None);
1580            let snapshot = Snapshot::from_data(
1581                vec![board],
1582                vec![],
1583                vec![],
1584                vec![],
1585                vec![],
1586                DependencyGraph::new(),
1587            );
1588            let data = snapshot_to_json_bytes(&snapshot).unwrap();
1589            let meta = PersistenceMetadata::new(store.instance_id());
1590            let store_snap = StoreSnapshot {
1591                data,
1592                metadata: meta,
1593            };
1594
1595            PersistenceStore::save(&store, store_snap).await.unwrap();
1596
1597            let (loaded_snap, _meta) = PersistenceStore::load(&store).await.unwrap();
1598            let loaded: Snapshot = serde_json::from_slice(&loaded_snap.data).unwrap();
1599            assert_eq!(loaded.boards.len(), 1);
1600            assert_eq!(loaded.boards[0].name, "Test Board");
1601        });
1602    }
1603
1604    #[test]
1605    fn test_sqlitestore_exists_returns_true_after_open() {
1606        let dir = TempDir::new().unwrap();
1607        let path = dir.path().join("test.db");
1608        let rt = make_rt();
1609        rt.block_on(async {
1610            let store = SqliteStore::open(&path).await.unwrap();
1611            assert!(PersistenceStore::exists(&store).await);
1612        });
1613    }
1614
1615    #[test]
1616    fn test_checkpoint_executes_without_error() {
1617        let dir = TempDir::new().unwrap();
1618        let path = dir.path().join("test.sqlite3");
1619        let rt = make_rt();
1620        rt.block_on(async {
1621            let store = SqliteStore::open(&path).await.unwrap();
1622            store.checkpoint().await.unwrap();
1623        });
1624    }
1625
1626    #[test]
1627    fn test_save_checkpoints_wal_file_stays_minimal() {
1628        let dir = TempDir::new().unwrap();
1629        let path = dir.path().join("test.sqlite3");
1630        let rt = make_rt();
1631        rt.block_on(async {
1632            let store = SqliteStore::open(&path).await.unwrap();
1633            let (snapshot, _) = PersistenceStore::load(&store).await.unwrap();
1634            PersistenceStore::save(&store, snapshot).await.unwrap();
1635            let wal_path = path.with_extension("sqlite3-wal");
1636            if wal_path.exists() {
1637                assert!(
1638                    wal_path.metadata().unwrap().len() < 32 * 1024,
1639                    "WAL file should be minimal after save+checkpoint"
1640                );
1641            }
1642        });
1643    }
1644
1645    #[test]
1646    fn test_delete_archived_card_orphaned_cards_row_is_still_cleaned_up() {
1647        use kanban_domain::data_store::DataStore;
1648        let dir = TempDir::new().unwrap();
1649        let path = dir.path().join("test.sqlite3");
1650        let rt = make_rt();
1651        rt.block_on(async {
1652            let store = SqliteStore::open(&path).await.unwrap();
1653
1654            let mut board = kanban_domain::Board::new("B".to_string(), None);
1655            let column = kanban_domain::Column::new(board.id, "Col".to_string(), 0);
1656            let card = kanban_domain::Card::new(&mut board, column.id, "Task".to_string(), 0);
1657            let card_id = card.id;
1658            let column_id = column.id;
1659            store.upsert_board(board).unwrap();
1660            store.upsert_column(column).unwrap();
1661            store.upsert_card(card.clone()).unwrap();
1662
1663            // Insert into archived_cards WITHOUT calling delete_card first,
1664            // leaving an orphaned row in the cards table.
1665            let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
1666            store.insert_archived_card(archived).unwrap();
1667
1668            store.delete_archived_card(card_id).unwrap();
1669
1670            assert!(
1671                store.list_archived_cards().unwrap().is_empty(),
1672                "card should be gone from archived_cards"
1673            );
1674            assert!(
1675                store.list_all_cards().unwrap().is_empty(),
1676                "orphaned cards row should also be removed by delete_archived_card"
1677            );
1678        });
1679    }
1680
1681    #[test]
1682    fn test_delete_archived_card_removes_from_cards_table() {
1683        use kanban_domain::data_store::DataStore;
1684        let dir = TempDir::new().unwrap();
1685        let path = dir.path().join("test.sqlite3");
1686        let rt = make_rt();
1687        rt.block_on(async {
1688            let store = SqliteStore::open(&path).await.unwrap();
1689
1690            let mut board = kanban_domain::Board::new("B".to_string(), None);
1691            let column = kanban_domain::Column::new(board.id, "Col".to_string(), 0);
1692            let card = kanban_domain::Card::new(&mut board, column.id, "Task".to_string(), 0);
1693            let card_id = card.id;
1694            let column_id = column.id;
1695            store.upsert_board(board).unwrap();
1696            store.upsert_column(column).unwrap();
1697            store.upsert_card(card.clone()).unwrap();
1698
1699            let archived = kanban_domain::ArchivedCard::new(card, column_id, 0);
1700            store.insert_archived_card(archived).unwrap();
1701            store.delete_card(card_id).unwrap();
1702
1703            assert_eq!(store.list_archived_cards().unwrap().len(), 1);
1704
1705            store.delete_archived_card(card_id).unwrap();
1706
1707            assert!(
1708                store.list_archived_cards().unwrap().is_empty(),
1709                "card should be gone from archived_cards"
1710            );
1711            assert!(
1712                store.list_all_cards().unwrap().is_empty(),
1713                "card should also be gone from cards table, not restored as active"
1714            );
1715            assert!(
1716                store.get_card(card_id).unwrap().is_none(),
1717                "get_card should return None for permanently deleted card"
1718            );
1719        });
1720    }
1721}