1use crate::config;
2use crate::AppConfig;
3use kanban_domain::commands::Command;
4#[cfg(feature = "sqlite")]
5use kanban_domain::commands::CommandContext;
6use kanban_domain::KanbanError;
7#[cfg(feature = "sqlite")]
8use kanban_domain::{DataStore, InMemoryStore, Snapshot};
9#[cfg(feature = "sqlite")]
10use kanban_persistence::snapshot_to_json_bytes;
11use kanban_persistence::{
12 snapshot_from_json_bytes, PersistenceStore, StoreRegistry, StoreSnapshot,
13};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17type CommandLog = Option<(Vec<Vec<Command>>, u64, Option<Vec<u8>>)>;
18
19pub struct StoreManager {
25 registry: Arc<StoreRegistry>,
26}
27
28impl StoreManager {
29 pub fn new(registry: StoreRegistry) -> Self {
32 Self {
33 registry: Arc::new(registry),
34 }
35 }
36
37 pub fn registry(&self) -> &StoreRegistry {
40 &self.registry
41 }
42
43 pub fn has_backends(&self) -> bool {
45 !self.registry.is_empty()
46 }
47
48 pub fn backend_names(&self) -> Vec<&str> {
50 self.registry.backend_names()
51 }
52
53 pub fn is_sqlite(&self, locator: &str) -> bool {
57 match self.detect_backend(locator).as_deref() {
58 Some("sqlite") => true,
59 None => {
60 locator.ends_with(".sqlite")
61 || locator.ends_with(".sqlite3")
62 || locator.ends_with(".db")
63 }
64 _ => false,
65 }
66 }
67
68 pub fn detect_backend(&self, locator: &str) -> Option<String> {
72 if let Some(name) = self.registry.detect_backend(locator) {
73 return Some(name.to_string());
74 }
75 #[cfg(feature = "sqlite")]
76 {
77 let path = std::path::Path::new(locator);
78 if path.exists() {
79 if let Ok(mut f) = std::fs::File::open(path) {
80 use std::io::Read;
81 let mut hdr = [0u8; 16];
82 let n = f.read(&mut hdr).unwrap_or(0);
83 if hdr[..n].starts_with(b"SQLite format 3\0") {
84 return Some("sqlite".to_string());
85 }
86 }
87 }
88 }
89 None
90 }
91
92 pub fn sync_backend_with_file(&self, locator: &str, config: &mut AppConfig) -> bool {
95 if let Some(detected) = self.detect_backend(locator) {
96 if detected != config.effective_storage_backend() {
97 config.storage_backend = Some(detected);
98 return true;
99 }
100 }
101 false
102 }
103
104 pub async fn make_backend(
107 &self,
108 locator: &str,
109 config: &AppConfig,
110 ) -> Result<std::sync::Arc<dyn crate::backend::KanbanBackend>, KanbanError> {
111 if self.is_sqlite(locator) {
112 #[cfg(feature = "sqlite")]
113 {
114 let store = kanban_persistence_sqlite::SqliteStore::open(locator)
115 .await
116 .map_err(|e| KanbanError::Database(e.to_string()))?;
117 return Ok(std::sync::Arc::new(store));
118 }
119 #[cfg(not(feature = "sqlite"))]
120 return Err(KanbanError::Internal(format!(
121 "path '{}' requires the sqlite feature which is not compiled in",
122 locator
123 )));
124 }
125 let store = self.make_store(config.effective_storage_backend(), locator)?;
126 #[cfg(feature = "json")]
127 return Ok(std::sync::Arc::new(
128 crate::json_backend::JsonDataStore::new(store),
129 ));
130 #[cfg(not(feature = "json"))]
131 Err(KanbanError::Internal(format!(
132 "path '{}' requires the json feature which is not compiled in",
133 locator
134 )))
135 }
136
137 pub fn make_store(
140 &self,
141 backend: &str,
142 locator: &str,
143 ) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
144 Ok(self.registry.create_store(backend, locator)?)
145 }
146
147 pub fn make_store_with_config(
152 &self,
153 file: Option<&str>,
154 config: &AppConfig,
155 ) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
156 let locator = match file {
157 Some(path) => path.to_string(),
158 None => config::resolve_storage_location(config),
159 };
160 let backend = self
161 .detect_backend(&locator)
162 .unwrap_or_else(|| config.effective_storage_backend().to_string());
163 self.make_store(&backend, &locator)
164 }
165
166 pub async fn validate_and_load_store(
173 &self,
174 backend: &str,
175 path: &str,
176 ) -> Result<kanban_domain::Snapshot, KanbanError> {
177 if matches!(backend, "sqlite" | "sqlite3" | "db") {
178 #[cfg(feature = "sqlite")]
179 {
180 if !std::path::Path::new(path).exists() {
181 return Err(std::io::Error::new(
182 std::io::ErrorKind::NotFound,
183 format!("Storage file does not exist: {}", path),
184 )
185 .into());
186 }
187 let store = kanban_persistence_sqlite::SqliteStore::open(path).await?;
188 return store.snapshot();
189 }
190 #[cfg(not(feature = "sqlite"))]
191 return Err(KanbanError::validation("sqlite feature not compiled in"));
192 }
193 let store = self.make_store(backend, path)?;
194 if !store.exists().await {
195 return Err(std::io::Error::new(
196 std::io::ErrorKind::NotFound,
197 format!("Storage file does not exist: {}", path),
198 )
199 .into());
200 }
201 let (snapshot, _metadata) = store.load().await?;
202 let data = snapshot_from_json_bytes(&snapshot.data)?;
203 Ok(data)
204 }
205
206 pub async fn export_to_sqlite(
211 &self,
212 export: kanban_domain::export::AllBoardsExport,
213 filename: &str,
214 ) -> Result<(), KanbanError> {
215 #[cfg(feature = "sqlite")]
216 {
217 use kanban_domain::export::BoardImporter;
218 use kanban_domain::{DependencyGraph, Snapshot};
219
220 let entities = BoardImporter::extract_entities(export);
221 let snapshot = Snapshot {
222 boards: entities.boards,
223 columns: entities.columns,
224 cards: entities.cards,
225 archived_cards: entities.archived_cards,
226 sprints: entities.sprints,
227 graph: DependencyGraph::default(),
228 };
229 let store = kanban_persistence_sqlite::SqliteStore::open(filename).await?;
230 store.apply_snapshot(snapshot)?;
231 Ok(())
232 }
233 #[cfg(not(feature = "sqlite"))]
234 {
235 let _ = export;
236 let _ = filename;
237 Err(KanbanError::validation("sqlite feature not compiled in"))
238 }
239 }
240
241 pub async fn migrate_store(
248 &self,
249 from_backend: &str,
250 from_path: &str,
251 to_backend: &str,
252 to_path: &str,
253 ) -> Result<(), KanbanError> {
254 let from = std::path::Path::new(from_path);
255 let to = std::path::Path::new(to_path);
256 if !from.exists() {
257 return Err(std::io::Error::new(
258 std::io::ErrorKind::NotFound,
259 format!("Source file not found: {}", from.display()),
260 )
261 .into());
262 }
263 if to.exists() {
264 return Err(std::io::Error::new(
265 std::io::ErrorKind::AlreadyExists,
266 format!(
267 "Destination already exists: {}. Remove it first or use a different path.",
268 to.display()
269 ),
270 )
271 .into());
272 }
273
274 let (mut store_snapshot, command_log): (StoreSnapshot, CommandLog) = match from_backend {
277 "sqlite" | "sqlite3" | "db" => {
278 #[cfg(feature = "sqlite")]
279 {
280 use kanban_domain::CommandStore;
281 use kanban_persistence::PersistenceMetadata;
282 let store = kanban_persistence_sqlite::SqliteStore::open(from_path).await?;
283 let snapshot = store.snapshot()?;
284 let data = snapshot_to_json_bytes(&snapshot)?;
285 let snap = StoreSnapshot {
286 data,
287 metadata: PersistenceMetadata::new(uuid::Uuid::new_v4()),
288 };
289 let cmd_log = match store.load_all_commands() {
290 Ok((batches, count)) if !batches.is_empty() => {
291 let baseline_bytes = snapshot_to_json_bytes(&Snapshot::new()).ok();
292 Some((batches, count, baseline_bytes))
293 }
294 _ => None,
295 };
296 (snap, cmd_log)
297 }
298 #[cfg(not(feature = "sqlite"))]
299 return Err(KanbanError::validation("sqlite feature not compiled in"));
300 }
301 _ => {
302 let source = self.make_store(from_backend, from_path)?;
303 let (snap, _) = source.load().await?;
304 let cmd_log = match source.get_command_log() {
305 Ok((batches, cursor, baseline_bytes)) if !batches.is_empty() => {
306 Some((batches, cursor, baseline_bytes))
307 }
308 _ => None,
309 };
310 (snap, cmd_log)
311 }
312 };
313
314 repair_snapshot_fks(&mut store_snapshot)?;
315
316 match to_backend {
318 "sqlite" | "sqlite3" | "db" => {
319 #[cfg(feature = "sqlite")]
320 {
321 let repaired = snapshot_from_json_bytes(&store_snapshot.data)?;
322 let store = kanban_persistence_sqlite::SqliteStore::open(to_path).await?;
323 if let Err(e) = store.apply_snapshot(repaired.clone()) {
324 let _ = std::fs::remove_file(to_path);
325 let _ = std::fs::remove_file(format!("{}-wal", to_path));
326 let _ = std::fs::remove_file(format!("{}-shm", to_path));
327 return Err(e);
328 }
329 if let Some((batches, _cursor, baseline_bytes)) = command_log {
330 if let Err(e) =
331 transfer_commands_to_sqlite(&store, &batches, baseline_bytes.as_deref())
332 {
333 tracing::warn!("Command log transfer failed (undo history lost): {e}");
334 }
335 }
336 }
337 #[cfg(not(feature = "sqlite"))]
338 return Err(KanbanError::validation("sqlite feature not compiled in"));
339 }
340 _ => {
341 let target = self.make_store(to_backend, to_path)?;
342 if let Some((batches, cursor, baseline_bytes)) = command_log {
344 if let Err(e) = target
345 .sync_command_log(&batches, cursor, baseline_bytes.as_deref())
346 .await
347 {
348 tracing::warn!("Command log transfer failed (undo history lost): {e}");
349 }
350 }
351 if let Err(e) = target.save(store_snapshot).await {
352 let _ = std::fs::remove_file(to_path);
353 let _ = std::fs::remove_file(format!("{}-wal", to_path));
354 let _ = std::fs::remove_file(format!("{}-shm", to_path));
355 return Err(e.into());
356 }
357 }
358 }
359 Ok(())
360 }
361}
362
363#[cfg(feature = "sqlite")]
364fn transfer_commands_to_sqlite(
365 store: &kanban_persistence_sqlite::SqliteStore,
366 batches: &[Vec<Command>],
367 baseline_bytes: Option<&[u8]>,
368) -> Result<(), KanbanError> {
369 use kanban_domain::CommandStore;
370
371 let baseline: Snapshot = if let Some(bytes) = baseline_bytes {
372 serde_json::from_slice(bytes)
373 .map_err(|e| KanbanError::validation(format!("Failed to parse baseline: {e}")))?
374 } else {
375 Snapshot::new()
376 };
377
378 let temp = InMemoryStore::new();
379 temp.apply_snapshot(baseline)?;
380
381 for (i, batch) in batches.iter().enumerate() {
382 let ctx = CommandContext {
383 store: &temp as &dyn DataStore,
384 };
385 for cmd in batch {
386 cmd.execute(&ctx)?;
387 }
388 store.append_commands(batch)?;
389 let snap = temp.snapshot()?;
390 store.store_snapshot_at((i + 1) as u64, &snap)?;
391 }
392
393 Ok(())
394}
395
396impl Clone for StoreManager {
397 fn clone(&self) -> Self {
398 Self {
399 registry: Arc::clone(&self.registry),
400 }
401 }
402}
403
404fn repair_snapshot_fks(snapshot: &mut StoreSnapshot) -> Result<(), KanbanError> {
405 let mut data: serde_json::Value = serde_json::from_slice(&snapshot.data).map_err(|e| {
406 KanbanError::validation(format!("Failed to parse snapshot for FK repair: {e}"))
407 })?;
408
409 let valid_columns: HashSet<String> = data["columns"]
410 .as_array()
411 .map(|arr| {
412 arr.iter()
413 .filter_map(|c| c["id"].as_str().map(String::from))
414 .collect()
415 })
416 .unwrap_or_default();
417
418 let valid_sprints: HashSet<String> = data["sprints"]
419 .as_array()
420 .map(|arr| {
421 arr.iter()
422 .filter_map(|s| s["id"].as_str().map(String::from))
423 .collect()
424 })
425 .unwrap_or_default();
426
427 let fallback_column: Option<String> = data["columns"].as_array().and_then(|arr| {
428 arr.iter()
429 .min_by_key(|c| c["position"].as_i64().unwrap_or(i64::MAX))
430 .and_then(|c| c["id"].as_str())
431 .map(String::from)
432 });
433
434 if let Some(cards) = data["cards"].as_array_mut() {
435 for card in cards.iter_mut() {
436 fix_card_fks(
437 card,
438 &valid_columns,
439 &valid_sprints,
440 fallback_column.as_deref(),
441 );
442 }
443 }
444
445 if let Some(archived) = data["archived_cards"].as_array_mut() {
446 for entry in archived.iter_mut() {
447 if let Some(card) = entry.get_mut("card") {
448 fix_card_fks(
449 card,
450 &valid_columns,
451 &valid_sprints,
452 fallback_column.as_deref(),
453 );
454 }
455 }
456 }
457
458 snapshot.data = serde_json::to_vec(&data).map_err(|e| {
459 KanbanError::validation(format!("Failed to serialize repaired snapshot: {e}"))
460 })?;
461
462 Ok(())
463}
464
465fn fix_card_fks(
466 card: &mut serde_json::Value,
467 valid_columns: &HashSet<String>,
468 valid_sprints: &HashSet<String>,
469 fallback_column: Option<&str>,
470) {
471 if let Some(sprint_id) = card["sprint_id"].as_str() {
472 if !valid_sprints.contains(sprint_id) {
473 card["sprint_id"] = serde_json::Value::Null;
474 }
475 }
476 if let Some(col_id) = card["column_id"].as_str() {
477 if !valid_columns.contains(col_id) {
478 if let Some(fb) = fallback_column {
479 card["column_id"] = serde_json::Value::String(fb.to_string());
480 }
481 }
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use kanban_persistence::StoreRegistry;
489 use tempfile::tempdir;
490
491 fn make_sm() -> StoreManager {
492 let mut registry = StoreRegistry::new();
493 #[cfg(feature = "sqlite")]
494 registry.register(Box::new(kanban_persistence_sqlite::SqliteStoreFactory));
495 #[cfg(feature = "json")]
496 registry.register(Box::new(kanban_persistence_json::JsonStoreFactory));
497 StoreManager::new(registry)
498 }
499
500 #[tokio::test(flavor = "multi_thread")]
501 async fn test_make_backend_json_path_returns_json_data_store() {
502 let dir = tempdir().unwrap();
503 let path = dir.path().join("test.json");
504 let sm = make_sm();
505 let cfg = AppConfig::default();
506 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
507 assert!(!backend.needs_flush(), "new JSON backend starts clean");
508 assert!(
509 backend.needs_save_worker(),
510 "JSON backend requires a background flush worker"
511 );
512 }
513
514 #[cfg(feature = "sqlite")]
515 mod sqlite_backend_tests {
516 use super::*;
517
518 #[tokio::test(flavor = "multi_thread")]
519 async fn test_make_backend_sqlite_path_returns_sqlite_store() {
520 let dir = tempdir().unwrap();
521 let path = dir.path().join("test.sqlite");
522 let sm = make_sm();
523 let cfg = AppConfig::default();
524 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
525 assert!(!backend.needs_flush(), "new SQLite backend starts clean");
526 assert!(
527 !backend.needs_save_worker(),
528 "SQLite backend is write-through and does not need a save worker"
529 );
530 }
531
532 #[tokio::test(flavor = "multi_thread")]
533 async fn test_make_backend_detects_sqlite_by_magic_bytes() {
534 let dir = tempdir().unwrap();
535 let path = dir.path().join("noext");
536
537 kanban_persistence_sqlite::SqliteStore::open(path.to_str().unwrap())
540 .await
541 .unwrap();
542
543 let sm = make_sm();
544 let cfg = AppConfig::default();
545 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
546 assert!(
547 !backend.needs_save_worker(),
548 "magic-byte SQLite detection should yield a write-through backend"
549 );
550 let boards = backend.list_boards().unwrap();
551 assert!(boards.is_empty());
552 }
553
554 #[tokio::test(flavor = "multi_thread")]
555 async fn test_make_backend_detects_json_by_content() {
556 use kanban_persistence::{PersistenceMetadata, PersistenceStore, StoreSnapshot};
557 let dir = tempdir().unwrap();
558 let path = dir.path().join("noext");
559
560 {
563 let jfs = kanban_persistence_json::JsonFileStore::new(&path);
564 let snap = kanban_domain::Snapshot::new();
565 let data = kanban_persistence::snapshot_to_json_bytes(&snap).unwrap();
566 let meta = PersistenceMetadata::new(uuid::Uuid::new_v4());
567 jfs.save(StoreSnapshot {
568 data,
569 metadata: meta,
570 })
571 .await
572 .unwrap();
573 }
574
575 let sm = make_sm();
576 let cfg = AppConfig::default();
577 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
578 assert!(
579 backend.needs_save_worker(),
580 "content-sniffed JSON backend requires a save worker"
581 );
582 let boards = backend.list_boards().unwrap();
583 assert!(boards.is_empty());
584 }
585 }
586}