Skip to main content

kanban_service/
store_manager.rs

1use crate::config;
2use crate::AppConfig;
3use kanban_domain::commands::Command;
4#[cfg(feature = "sqlite")]
5use kanban_domain::commands::CommandContext;
6use kanban_domain::KanbanError;
7#[cfg(feature = "sqlite")]
8use kanban_domain::{DataStore, InMemoryStore, Snapshot};
9#[cfg(feature = "sqlite")]
10use kanban_persistence::snapshot_to_json_bytes;
11use kanban_persistence::{
12    snapshot_from_json_bytes, PersistenceStore, StoreRegistry, StoreSnapshot,
13};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17type CommandLog = Option<(Vec<Vec<Command>>, u64, Option<Vec<u8>>)>;
18
19/// Owns the `StoreRegistry` and exposes the high-level operations that used
20/// to live as free functions in `kanban_service`. Callers (the CLI, TUI, MCP)
21/// construct a `StoreManager` with whichever factories they want available,
22/// then thread it through request handlers — inverting the old model where
23/// `kanban-service` hard-coded `default_registry()`.
24pub struct StoreManager {
25    registry: Arc<StoreRegistry>,
26}
27
28impl StoreManager {
29    /// Wraps `registry` in an `Arc`. Cloning a `StoreManager` is cheap —
30    /// all clones share the same underlying registry.
31    pub fn new(registry: StoreRegistry) -> Self {
32        Self {
33            registry: Arc::new(registry),
34        }
35    }
36
37    /// Returns a reference to the underlying `StoreRegistry`.
38    /// Useful for introspection and testing.
39    pub fn registry(&self) -> &StoreRegistry {
40        &self.registry
41    }
42
43    /// Returns `true` if at least one backend factory is registered.
44    pub fn has_backends(&self) -> bool {
45        !self.registry.is_empty()
46    }
47
48    /// Returns the names of all registered factories in registration order.
49    pub fn backend_names(&self) -> Vec<&str> {
50        self.registry.backend_names()
51    }
52
53    /// Returns `true` if `locator` points to a SQLite database — either
54    /// because `detect_backend` recognised it as `"sqlite"`, or because the
55    /// file extension matches one of the conventional SQLite extensions.
56    pub fn is_sqlite(&self, locator: &str) -> bool {
57        match self.detect_backend(locator).as_deref() {
58            Some("sqlite") => true,
59            None => {
60                locator.ends_with(".sqlite")
61                    || locator.ends_with(".sqlite3")
62                    || locator.ends_with(".db")
63            }
64            _ => false,
65        }
66    }
67
68    /// Pattern-matches `locator` against all registered factories and returns
69    /// the name of the first match. For existing SQLite files, detects by
70    /// magic bytes even when no SQLite factory is in the registry.
71    pub fn detect_backend(&self, locator: &str) -> Option<String> {
72        if let Some(name) = self.registry.detect_backend(locator) {
73            return Some(name.to_string());
74        }
75        #[cfg(feature = "sqlite")]
76        {
77            let path = std::path::Path::new(locator);
78            if path.exists() {
79                if let Ok(mut f) = std::fs::File::open(path) {
80                    use std::io::Read;
81                    let mut hdr = [0u8; 16];
82                    let n = f.read(&mut hdr).unwrap_or(0);
83                    if hdr[..n].starts_with(b"SQLite format 3\0") {
84                        return Some("sqlite".to_string());
85                    }
86                }
87            }
88        }
89        None
90    }
91
92    /// Updates `config.storage_backend` to match the backend inferred from
93    /// `locator`. Returns `true` if the config value changed.
94    pub fn sync_backend_with_file(&self, locator: &str, config: &mut AppConfig) -> bool {
95        if let Some(detected) = self.detect_backend(locator) {
96            if detected != config.effective_storage_backend() {
97                config.storage_backend = Some(detected);
98                return true;
99            }
100        }
101        false
102    }
103
104    /// Creates a [`KanbanBackend`] for `locator`, selecting SQLite or JSON
105    /// automatically from the file content / extension.
106    pub async fn make_backend(
107        &self,
108        locator: &str,
109        config: &AppConfig,
110    ) -> Result<std::sync::Arc<dyn crate::backend::KanbanBackend>, KanbanError> {
111        if self.is_sqlite(locator) {
112            #[cfg(feature = "sqlite")]
113            {
114                let store = kanban_persistence_sqlite::SqliteStore::open(locator)
115                    .await
116                    .map_err(|e| KanbanError::Database(e.to_string()))?;
117                return Ok(std::sync::Arc::new(store));
118            }
119            #[cfg(not(feature = "sqlite"))]
120            return Err(KanbanError::Internal(format!(
121                "path '{}' requires the sqlite feature which is not compiled in",
122                locator
123            )));
124        }
125        let store = self.make_store(config.effective_storage_backend(), locator)?;
126        #[cfg(feature = "json")]
127        return Ok(std::sync::Arc::new(
128            crate::json_backend::JsonDataStore::new(store),
129        ));
130        #[cfg(not(feature = "json"))]
131        Err(KanbanError::Internal(format!(
132            "path '{}' requires the json feature which is not compiled in",
133            locator
134        )))
135    }
136
137    /// Creates a `PersistenceStore` for the named `backend` at `locator`.
138    /// Returns an error if `backend` is not registered in this manager.
139    pub fn make_store(
140        &self,
141        backend: &str,
142        locator: &str,
143    ) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
144        Ok(self.registry.create_store(backend, locator)?)
145    }
146
147    /// Creates a store from an explicit file locator, or falls back to the
148    /// storage location in `config` when `file` is `None`. The backend is
149    /// inferred from the locator; if no factory matches, `config`'s backend
150    /// is used as a fallback.
151    pub fn make_store_with_config(
152        &self,
153        file: Option<&str>,
154        config: &AppConfig,
155    ) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
156        let locator = match file {
157            Some(path) => path.to_string(),
158            None => config::resolve_storage_location(config),
159        };
160        let backend = self
161            .detect_backend(&locator)
162            .unwrap_or_else(|| config.effective_storage_backend().to_string());
163        self.make_store(&backend, &locator)
164    }
165
166    /// Creates a store for `path`, verifies the file exists, then loads and
167    /// deserializes the snapshot. Returns an error if the file is missing or
168    /// the data cannot be parsed.
169    ///
170    /// For `.sqlite`/`.db` files, bypasses the registry and uses `SqliteStore`
171    /// directly.
172    pub async fn validate_and_load_store(
173        &self,
174        backend: &str,
175        path: &str,
176    ) -> Result<kanban_domain::Snapshot, KanbanError> {
177        if matches!(backend, "sqlite" | "sqlite3" | "db") {
178            #[cfg(feature = "sqlite")]
179            {
180                if !std::path::Path::new(path).exists() {
181                    return Err(std::io::Error::new(
182                        std::io::ErrorKind::NotFound,
183                        format!("Storage file does not exist: {}", path),
184                    )
185                    .into());
186                }
187                let store = kanban_persistence_sqlite::SqliteStore::open(path).await?;
188                return store.snapshot();
189            }
190            #[cfg(not(feature = "sqlite"))]
191            return Err(KanbanError::validation("sqlite feature not compiled in"));
192        }
193        let store = self.make_store(backend, path)?;
194        if !store.exists().await {
195            return Err(std::io::Error::new(
196                std::io::ErrorKind::NotFound,
197                format!("Storage file does not exist: {}", path),
198            )
199            .into());
200        }
201        let (snapshot, _metadata) = store.load().await?;
202        let data = snapshot_from_json_bytes(&snapshot.data)?;
203        Ok(data)
204    }
205
206    /// Exports a board selection to a new SQLite file via `SqliteStore`.
207    ///
208    /// **Note:** The dependency graph is not part of the `AllBoardsExport` format
209    /// and will not be present in the exported file.
210    pub async fn export_to_sqlite(
211        &self,
212        export: kanban_domain::export::AllBoardsExport,
213        filename: &str,
214    ) -> Result<(), KanbanError> {
215        #[cfg(feature = "sqlite")]
216        {
217            use kanban_domain::export::BoardImporter;
218            use kanban_domain::{DependencyGraph, Snapshot};
219
220            let entities = BoardImporter::extract_entities(export);
221            let snapshot = Snapshot {
222                boards: entities.boards,
223                columns: entities.columns,
224                cards: entities.cards,
225                archived_cards: entities.archived_cards,
226                sprints: entities.sprints,
227                graph: DependencyGraph::default(),
228            };
229            let store = kanban_persistence_sqlite::SqliteStore::open(filename).await?;
230            store.apply_snapshot(snapshot)?;
231            Ok(())
232        }
233        #[cfg(not(feature = "sqlite"))]
234        {
235            let _ = export;
236            let _ = filename;
237            Err(KanbanError::validation("sqlite feature not compiled in"))
238        }
239    }
240
241    /// Copies a snapshot from one backend/path pair to another, repairing
242    /// any dangling foreign keys in the process. Rolls back (deletes the
243    /// partial destination file) on failure.
244    ///
245    /// SQLite source/destination are handled directly via `SqliteStore`;
246    /// JSON and other registry-backed backends go through the `StoreRegistry`.
247    pub async fn migrate_store(
248        &self,
249        from_backend: &str,
250        from_path: &str,
251        to_backend: &str,
252        to_path: &str,
253    ) -> Result<(), KanbanError> {
254        let from = std::path::Path::new(from_path);
255        let to = std::path::Path::new(to_path);
256        if !from.exists() {
257            return Err(std::io::Error::new(
258                std::io::ErrorKind::NotFound,
259                format!("Source file not found: {}", from.display()),
260            )
261            .into());
262        }
263        if to.exists() {
264            return Err(std::io::Error::new(
265                std::io::ErrorKind::AlreadyExists,
266                format!(
267                    "Destination already exists: {}. Remove it first or use a different path.",
268                    to.display()
269                ),
270            )
271            .into());
272        }
273
274        // Load snapshot from source into a StoreSnapshot (JSON bytes) for FK repair,
275        // and extract the command log for undo history preservation.
276        let (mut store_snapshot, command_log): (StoreSnapshot, CommandLog) = match from_backend {
277            "sqlite" | "sqlite3" | "db" => {
278                #[cfg(feature = "sqlite")]
279                {
280                    use kanban_domain::CommandStore;
281                    use kanban_persistence::PersistenceMetadata;
282                    let store = kanban_persistence_sqlite::SqliteStore::open(from_path).await?;
283                    let snapshot = store.snapshot()?;
284                    let data = snapshot_to_json_bytes(&snapshot)?;
285                    let snap = StoreSnapshot {
286                        data,
287                        metadata: PersistenceMetadata::new(uuid::Uuid::new_v4()),
288                    };
289                    let cmd_log = match store.load_all_commands() {
290                        Ok((batches, count)) if !batches.is_empty() => {
291                            let baseline_bytes = snapshot_to_json_bytes(&Snapshot::new()).ok();
292                            Some((batches, count, baseline_bytes))
293                        }
294                        _ => None,
295                    };
296                    (snap, cmd_log)
297                }
298                #[cfg(not(feature = "sqlite"))]
299                return Err(KanbanError::validation("sqlite feature not compiled in"));
300            }
301            _ => {
302                let source = self.make_store(from_backend, from_path)?;
303                let (snap, _) = source.load().await?;
304                let cmd_log = match source.get_command_log() {
305                    Ok((batches, cursor, baseline_bytes)) if !batches.is_empty() => {
306                        Some((batches, cursor, baseline_bytes))
307                    }
308                    _ => None,
309                };
310                (snap, cmd_log)
311            }
312        };
313
314        repair_snapshot_fks(&mut store_snapshot)?;
315
316        // Save to destination
317        match to_backend {
318            "sqlite" | "sqlite3" | "db" => {
319                #[cfg(feature = "sqlite")]
320                {
321                    let repaired = snapshot_from_json_bytes(&store_snapshot.data)?;
322                    let store = kanban_persistence_sqlite::SqliteStore::open(to_path).await?;
323                    if let Err(e) = store.apply_snapshot(repaired.clone()) {
324                        let _ = std::fs::remove_file(to_path);
325                        let _ = std::fs::remove_file(format!("{}-wal", to_path));
326                        let _ = std::fs::remove_file(format!("{}-shm", to_path));
327                        return Err(e);
328                    }
329                    if let Some((batches, _cursor, baseline_bytes)) = command_log {
330                        if let Err(e) =
331                            transfer_commands_to_sqlite(&store, &batches, baseline_bytes.as_deref())
332                        {
333                            tracing::warn!("Command log transfer failed (undo history lost): {e}");
334                        }
335                    }
336                }
337                #[cfg(not(feature = "sqlite"))]
338                return Err(KanbanError::validation("sqlite feature not compiled in"));
339            }
340            _ => {
341                let target = self.make_store(to_backend, to_path)?;
342                // Sync command log BEFORE save, since save() reads from in-memory state
343                if let Some((batches, cursor, baseline_bytes)) = command_log {
344                    if let Err(e) = target
345                        .sync_command_log(&batches, cursor, baseline_bytes.as_deref())
346                        .await
347                    {
348                        tracing::warn!("Command log transfer failed (undo history lost): {e}");
349                    }
350                }
351                if let Err(e) = target.save(store_snapshot).await {
352                    let _ = std::fs::remove_file(to_path);
353                    let _ = std::fs::remove_file(format!("{}-wal", to_path));
354                    let _ = std::fs::remove_file(format!("{}-shm", to_path));
355                    return Err(e.into());
356                }
357            }
358        }
359        Ok(())
360    }
361}
362
363#[cfg(feature = "sqlite")]
364fn transfer_commands_to_sqlite(
365    store: &kanban_persistence_sqlite::SqliteStore,
366    batches: &[Vec<Command>],
367    baseline_bytes: Option<&[u8]>,
368) -> Result<(), KanbanError> {
369    use kanban_domain::CommandStore;
370
371    let baseline: Snapshot = if let Some(bytes) = baseline_bytes {
372        serde_json::from_slice(bytes)
373            .map_err(|e| KanbanError::validation(format!("Failed to parse baseline: {e}")))?
374    } else {
375        Snapshot::new()
376    };
377
378    let temp = InMemoryStore::new();
379    temp.apply_snapshot(baseline)?;
380
381    for (i, batch) in batches.iter().enumerate() {
382        let ctx = CommandContext {
383            store: &temp as &dyn DataStore,
384        };
385        for cmd in batch {
386            cmd.execute(&ctx)?;
387        }
388        store.append_commands(batch)?;
389        let snap = temp.snapshot()?;
390        store.store_snapshot_at((i + 1) as u64, &snap)?;
391    }
392
393    Ok(())
394}
395
396impl Clone for StoreManager {
397    fn clone(&self) -> Self {
398        Self {
399            registry: Arc::clone(&self.registry),
400        }
401    }
402}
403
404fn repair_snapshot_fks(snapshot: &mut StoreSnapshot) -> Result<(), KanbanError> {
405    let mut data: serde_json::Value = serde_json::from_slice(&snapshot.data).map_err(|e| {
406        KanbanError::validation(format!("Failed to parse snapshot for FK repair: {e}"))
407    })?;
408
409    let valid_columns: HashSet<String> = data["columns"]
410        .as_array()
411        .map(|arr| {
412            arr.iter()
413                .filter_map(|c| c["id"].as_str().map(String::from))
414                .collect()
415        })
416        .unwrap_or_default();
417
418    let valid_sprints: HashSet<String> = data["sprints"]
419        .as_array()
420        .map(|arr| {
421            arr.iter()
422                .filter_map(|s| s["id"].as_str().map(String::from))
423                .collect()
424        })
425        .unwrap_or_default();
426
427    let fallback_column: Option<String> = data["columns"].as_array().and_then(|arr| {
428        arr.iter()
429            .min_by_key(|c| c["position"].as_i64().unwrap_or(i64::MAX))
430            .and_then(|c| c["id"].as_str())
431            .map(String::from)
432    });
433
434    if let Some(cards) = data["cards"].as_array_mut() {
435        for card in cards.iter_mut() {
436            fix_card_fks(
437                card,
438                &valid_columns,
439                &valid_sprints,
440                fallback_column.as_deref(),
441            );
442        }
443    }
444
445    if let Some(archived) = data["archived_cards"].as_array_mut() {
446        for entry in archived.iter_mut() {
447            if let Some(card) = entry.get_mut("card") {
448                fix_card_fks(
449                    card,
450                    &valid_columns,
451                    &valid_sprints,
452                    fallback_column.as_deref(),
453                );
454            }
455        }
456    }
457
458    snapshot.data = serde_json::to_vec(&data).map_err(|e| {
459        KanbanError::validation(format!("Failed to serialize repaired snapshot: {e}"))
460    })?;
461
462    Ok(())
463}
464
465fn fix_card_fks(
466    card: &mut serde_json::Value,
467    valid_columns: &HashSet<String>,
468    valid_sprints: &HashSet<String>,
469    fallback_column: Option<&str>,
470) {
471    if let Some(sprint_id) = card["sprint_id"].as_str() {
472        if !valid_sprints.contains(sprint_id) {
473            card["sprint_id"] = serde_json::Value::Null;
474        }
475    }
476    if let Some(col_id) = card["column_id"].as_str() {
477        if !valid_columns.contains(col_id) {
478            if let Some(fb) = fallback_column {
479                card["column_id"] = serde_json::Value::String(fb.to_string());
480            }
481        }
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488    use kanban_persistence::StoreRegistry;
489    use tempfile::tempdir;
490
491    fn make_sm() -> StoreManager {
492        let mut registry = StoreRegistry::new();
493        #[cfg(feature = "sqlite")]
494        registry.register(Box::new(kanban_persistence_sqlite::SqliteStoreFactory));
495        #[cfg(feature = "json")]
496        registry.register(Box::new(kanban_persistence_json::JsonStoreFactory));
497        StoreManager::new(registry)
498    }
499
500    #[tokio::test(flavor = "multi_thread")]
501    async fn test_make_backend_json_path_returns_json_data_store() {
502        let dir = tempdir().unwrap();
503        let path = dir.path().join("test.json");
504        let sm = make_sm();
505        let cfg = AppConfig::default();
506        let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
507        assert!(!backend.needs_flush(), "new JSON backend starts clean");
508        assert!(
509            backend.needs_save_worker(),
510            "JSON backend requires a background flush worker"
511        );
512    }
513
514    #[cfg(feature = "sqlite")]
515    mod sqlite_backend_tests {
516        use super::*;
517
518        #[tokio::test(flavor = "multi_thread")]
519        async fn test_make_backend_sqlite_path_returns_sqlite_store() {
520            let dir = tempdir().unwrap();
521            let path = dir.path().join("test.sqlite");
522            let sm = make_sm();
523            let cfg = AppConfig::default();
524            let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
525            assert!(!backend.needs_flush(), "new SQLite backend starts clean");
526            assert!(
527                !backend.needs_save_worker(),
528                "SQLite backend is write-through and does not need a save worker"
529            );
530        }
531
532        #[tokio::test(flavor = "multi_thread")]
533        async fn test_make_backend_detects_sqlite_by_magic_bytes() {
534            let dir = tempdir().unwrap();
535            let path = dir.path().join("noext");
536
537            // Create a real SQLite file with no extension so the registry can
538            // detect it via magic bytes.
539            kanban_persistence_sqlite::SqliteStore::open(path.to_str().unwrap())
540                .await
541                .unwrap();
542
543            let sm = make_sm();
544            let cfg = AppConfig::default();
545            let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
546            assert!(
547                !backend.needs_save_worker(),
548                "magic-byte SQLite detection should yield a write-through backend"
549            );
550            let boards = backend.list_boards().unwrap();
551            assert!(boards.is_empty());
552        }
553
554        #[tokio::test(flavor = "multi_thread")]
555        async fn test_make_backend_detects_json_by_content() {
556            use kanban_persistence::{PersistenceMetadata, PersistenceStore, StoreSnapshot};
557            let dir = tempdir().unwrap();
558            let path = dir.path().join("noext");
559
560            // Write a valid JSON envelope file with no extension so the registry
561            // detects it as JSON via content sniffing (first byte is '{').
562            {
563                let jfs = kanban_persistence_json::JsonFileStore::new(&path);
564                let snap = kanban_domain::Snapshot::new();
565                let data = kanban_persistence::snapshot_to_json_bytes(&snap).unwrap();
566                let meta = PersistenceMetadata::new(uuid::Uuid::new_v4());
567                jfs.save(StoreSnapshot {
568                    data,
569                    metadata: meta,
570                })
571                .await
572                .unwrap();
573            }
574
575            let sm = make_sm();
576            let cfg = AppConfig::default();
577            let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
578            assert!(
579                backend.needs_save_worker(),
580                "content-sniffed JSON backend requires a save worker"
581            );
582            let boards = backend.list_boards().unwrap();
583            assert!(boards.is_empty());
584        }
585    }
586}