1use crate::config;
2use crate::AppConfig;
3use kanban_domain::{DataStore, KanbanError};
4use kanban_persistence::{
5 snapshot_from_json_bytes, PersistenceStore, StoreRegistry, StoreSnapshot,
6};
7use std::collections::HashSet;
8use std::sync::Arc;
9
10pub struct StoreManager {
16 registry: Arc<StoreRegistry>,
17}
18
19impl StoreManager {
20 pub fn new(registry: StoreRegistry) -> Self {
23 Self {
24 registry: Arc::new(registry),
25 }
26 }
27
28 pub fn registry(&self) -> &StoreRegistry {
31 &self.registry
32 }
33
34 pub fn has_backends(&self) -> bool {
36 !self.registry.is_empty()
37 }
38
39 pub fn backend_names(&self) -> Vec<&str> {
41 self.registry.backend_names()
42 }
43
44 pub fn is_sqlite(&self, locator: &str) -> bool {
48 match self.detect_backend(locator).as_deref() {
49 Some("sqlite") => true,
50 None => {
51 locator.ends_with(".sqlite")
52 || locator.ends_with(".sqlite3")
53 || locator.ends_with(".db")
54 }
55 _ => false,
56 }
57 }
58
59 pub fn detect_backend(&self, locator: &str) -> Option<String> {
63 if let Some(name) = self.registry.detect_backend(locator) {
64 return Some(name.to_string());
65 }
66 #[cfg(feature = "sqlite")]
67 {
68 let path = std::path::Path::new(locator);
69 if path.exists() {
70 if let Ok(mut f) = std::fs::File::open(path) {
71 use std::io::Read;
72 let mut hdr = [0u8; 16];
73 let n = f.read(&mut hdr).unwrap_or(0);
74 if hdr[..n].starts_with(b"SQLite format 3\0") {
75 return Some("sqlite".to_string());
76 }
77 }
78 }
79 }
80 None
81 }
82
83 pub fn sync_backend_with_file(&self, locator: &str, config: &mut AppConfig) -> bool {
86 if let Some(detected) = self.detect_backend(locator) {
87 if detected != config.effective_storage_backend() {
88 config.storage_backend = Some(detected);
89 return true;
90 }
91 }
92 false
93 }
94
95 pub async fn make_backend(
98 &self,
99 locator: &str,
100 config: &AppConfig,
101 ) -> Result<std::sync::Arc<dyn crate::backend::KanbanBackend>, KanbanError> {
102 if self.is_sqlite(locator) {
103 #[cfg(feature = "sqlite")]
104 {
105 let backend = crate::sqlite_backend::SqliteBackend::open(locator).await?;
110 return Ok(std::sync::Arc::new(backend));
111 }
112 #[cfg(not(feature = "sqlite"))]
113 return Err(KanbanError::Internal(format!(
114 "path '{}' requires the sqlite feature which is not compiled in",
115 locator
116 )));
117 }
118 let store = self.make_store(config.effective_storage_backend(), locator)?;
119 #[cfg(feature = "json")]
120 return Ok(std::sync::Arc::new(
121 crate::json_backend::JsonDataStore::new(store),
122 ));
123 #[cfg(not(feature = "json"))]
124 Err(KanbanError::Internal(format!(
125 "path '{}' requires the json feature which is not compiled in",
126 locator
127 )))
128 }
129
130 pub fn make_store(
133 &self,
134 backend: &str,
135 locator: &str,
136 ) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
137 Ok(self.registry.create_store(backend, locator)?)
138 }
139
140 pub fn make_store_with_config(
145 &self,
146 file: Option<&str>,
147 config: &AppConfig,
148 ) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
149 let locator = match file {
150 Some(path) => path.to_string(),
151 None => config::resolve_storage_location(config),
152 };
153 let backend = self
154 .detect_backend(&locator)
155 .unwrap_or_else(|| config.effective_storage_backend().to_string());
156 self.make_store(&backend, &locator)
157 }
158
159 pub async fn validate_and_load_store(
166 &self,
167 backend: &str,
168 path: &str,
169 ) -> Result<kanban_domain::Snapshot, KanbanError> {
170 if matches!(backend, "sqlite" | "sqlite3" | "db") {
171 #[cfg(feature = "sqlite")]
172 {
173 if !std::path::Path::new(path).exists() {
174 return Err(std::io::Error::new(
175 std::io::ErrorKind::NotFound,
176 format!("Storage file does not exist: {}", path),
177 )
178 .into());
179 }
180 let store = kanban_persistence_sqlite::SqliteStore::open(path).await?;
181 return store.snapshot();
182 }
183 #[cfg(not(feature = "sqlite"))]
184 return Err(KanbanError::validation("sqlite feature not compiled in"));
185 }
186 let store = self.make_store(backend, path)?;
187 if !store.exists().await {
188 return Err(std::io::Error::new(
189 std::io::ErrorKind::NotFound,
190 format!("Storage file does not exist: {}", path),
191 )
192 .into());
193 }
194 let (snapshot, _metadata) = store.load().await?;
195 let data = snapshot_from_json_bytes(&snapshot.data)?;
196 Ok(data)
197 }
198
199 pub async fn export_to_sqlite(
204 &self,
205 export: kanban_domain::export::AllBoardsExport,
206 filename: &str,
207 ) -> Result<(), KanbanError> {
208 #[cfg(feature = "sqlite")]
209 {
210 use kanban_domain::export::BoardImporter;
211 use kanban_domain::{DependencyGraph, Snapshot};
212
213 let entities = BoardImporter::extract_entities(export);
214 let snapshot = Snapshot {
215 boards: entities.boards,
216 columns: entities.columns,
217 cards: entities.cards,
218 archived_cards: entities.archived_cards,
219 sprints: entities.sprints,
220 graph: DependencyGraph::default(),
221 };
222 let store = kanban_persistence_sqlite::SqliteStore::open(filename).await?;
223 store.apply_snapshot(snapshot)?;
224 Ok(())
225 }
226 #[cfg(not(feature = "sqlite"))]
227 {
228 let _ = export;
229 let _ = filename;
230 Err(KanbanError::validation("sqlite feature not compiled in"))
231 }
232 }
233
234 pub async fn migrate_store(
241 &self,
242 from_backend: &str,
243 from_path: &str,
244 to_backend: &str,
245 to_path: &str,
246 ) -> Result<(), KanbanError> {
247 let from = std::path::Path::new(from_path);
248 let to = std::path::Path::new(to_path);
249 if !from.exists() {
250 return Err(std::io::Error::new(
251 std::io::ErrorKind::NotFound,
252 format!("Source file not found: {}", from.display()),
253 )
254 .into());
255 }
256 if to.exists() {
257 return Err(std::io::Error::new(
258 std::io::ErrorKind::AlreadyExists,
259 format!(
260 "Destination already exists: {}. Remove it first or use a different path.",
261 to.display()
262 ),
263 )
264 .into());
265 }
266
267 let mut store_snapshot: StoreSnapshot = match from_backend {
269 "sqlite" | "sqlite3" | "db" => {
270 #[cfg(feature = "sqlite")]
271 {
272 use kanban_persistence::PersistenceMetadata;
273 let store = kanban_persistence_sqlite::SqliteStore::open(from_path).await?;
274 let snapshot = store.snapshot()?;
275 let data = kanban_persistence::snapshot_to_json_bytes(&snapshot)?;
276 StoreSnapshot {
277 data,
278 metadata: PersistenceMetadata::new(uuid::Uuid::new_v4()),
279 }
280 }
281 #[cfg(not(feature = "sqlite"))]
282 return Err(KanbanError::validation("sqlite feature not compiled in"));
283 }
284 _ => {
285 let source = self.make_store(from_backend, from_path)?;
286 let (snap, _) = source.load().await?;
287 snap
288 }
289 };
290
291 repair_snapshot_fks(&mut store_snapshot)?;
292
293 match to_backend {
295 "sqlite" | "sqlite3" | "db" => {
296 #[cfg(feature = "sqlite")]
297 {
298 let repaired = snapshot_from_json_bytes(&store_snapshot.data)?;
299 let store = kanban_persistence_sqlite::SqliteStore::open(to_path).await?;
300 let outcome = store.apply_snapshot(repaired.clone());
301 store.close().await;
302 drop(store);
303 if let Err(e) = outcome {
304 cleanup_destination_files(to_path).await;
305 return Err(e);
306 }
307 }
308 #[cfg(not(feature = "sqlite"))]
309 return Err(KanbanError::validation("sqlite feature not compiled in"));
310 }
311 _ => {
312 let target = self.make_store(to_backend, to_path)?;
313 let outcome = target.save(store_snapshot).await;
314 target.close().await;
315 drop(target);
316 if let Err(e) = outcome {
317 cleanup_destination_files(to_path).await;
318 return Err(e.into());
319 }
320 }
321 }
322 Ok(())
323 }
324}
325
326impl Clone for StoreManager {
327 fn clone(&self) -> Self {
328 Self {
329 registry: Arc::clone(&self.registry),
330 }
331 }
332}
333
334async fn remove_file_with_windows_retry(path: &std::path::Path) {
339 for delay_ms in [0u64, 50, 100, 200, 400] {
340 if delay_ms > 0 {
341 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
342 }
343 if std::fs::remove_file(path).is_ok() {
344 return;
345 }
346 if !path.exists() {
347 return;
348 }
349 }
350 tracing::warn!(
351 path = %path.display(),
352 "failed to remove file after retry backoff; orphan may remain on disk"
353 );
354}
355
356async fn cleanup_destination_files(to_path: &str) {
361 remove_file_with_windows_retry(std::path::Path::new(to_path)).await;
362 let wal = format!("{}-wal", to_path);
363 let shm = format!("{}-shm", to_path);
364 remove_file_with_windows_retry(std::path::Path::new(&wal)).await;
365 remove_file_with_windows_retry(std::path::Path::new(&shm)).await;
366}
367
368fn repair_snapshot_fks(snapshot: &mut StoreSnapshot) -> Result<(), KanbanError> {
369 let mut data: serde_json::Value = serde_json::from_slice(&snapshot.data).map_err(|e| {
370 KanbanError::validation(format!("Failed to parse snapshot for FK repair: {e}"))
371 })?;
372
373 let valid_columns: HashSet<String> = data["columns"]
374 .as_array()
375 .map(|arr| {
376 arr.iter()
377 .filter_map(|c| c["id"].as_str().map(String::from))
378 .collect()
379 })
380 .unwrap_or_default();
381
382 let valid_sprints: HashSet<String> = data["sprints"]
383 .as_array()
384 .map(|arr| {
385 arr.iter()
386 .filter_map(|s| s["id"].as_str().map(String::from))
387 .collect()
388 })
389 .unwrap_or_default();
390
391 let fallback_column: Option<String> = data["columns"].as_array().and_then(|arr| {
392 arr.iter()
393 .min_by_key(|c| c["position"].as_i64().unwrap_or(i64::MAX))
394 .and_then(|c| c["id"].as_str())
395 .map(String::from)
396 });
397
398 if let Some(cards) = data["cards"].as_array_mut() {
399 for card in cards.iter_mut() {
400 fix_card_fks(
401 card,
402 &valid_columns,
403 &valid_sprints,
404 fallback_column.as_deref(),
405 );
406 }
407 }
408
409 if let Some(archived) = data["archived_cards"].as_array_mut() {
410 for entry in archived.iter_mut() {
411 if let Some(card) = entry.get_mut("card") {
412 fix_card_fks(
413 card,
414 &valid_columns,
415 &valid_sprints,
416 fallback_column.as_deref(),
417 );
418 }
419 }
420 }
421
422 snapshot.data = serde_json::to_vec(&data).map_err(|e| {
423 KanbanError::validation(format!("Failed to serialize repaired snapshot: {e}"))
424 })?;
425
426 Ok(())
427}
428
429fn fix_card_fks(
430 card: &mut serde_json::Value,
431 valid_columns: &HashSet<String>,
432 valid_sprints: &HashSet<String>,
433 fallback_column: Option<&str>,
434) {
435 if let Some(sprint_id) = card["sprint_id"].as_str() {
436 if !valid_sprints.contains(sprint_id) {
437 card["sprint_id"] = serde_json::Value::Null;
438 }
439 }
440 if let Some(col_id) = card["column_id"].as_str() {
441 if !valid_columns.contains(col_id) {
442 if let Some(fb) = fallback_column {
443 card["column_id"] = serde_json::Value::String(fb.to_string());
444 }
445 }
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use kanban_persistence::StoreRegistry;
453 use tempfile::tempdir;
454
455 fn make_sm() -> StoreManager {
456 let mut registry = StoreRegistry::new();
457 #[cfg(feature = "sqlite")]
458 registry.register(Box::new(kanban_persistence_sqlite::SqliteStoreFactory));
459 #[cfg(feature = "json")]
460 registry.register(Box::new(kanban_persistence_json::JsonStoreFactory));
461 StoreManager::new(registry)
462 }
463
464 #[tokio::test(flavor = "multi_thread")]
465 async fn test_make_backend_json_path_returns_json_data_store() {
466 let dir = tempdir().unwrap();
467 let path = dir.path().join("test.json");
468 let sm = make_sm();
469 let cfg = AppConfig::default();
470 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
471 assert!(!backend.needs_flush(), "new JSON backend starts clean");
472 assert!(
473 backend.needs_save_worker(),
474 "JSON backend requires a background flush worker"
475 );
476 }
477
478 #[cfg(feature = "sqlite")]
479 mod sqlite_backend_tests {
480 use super::*;
481
482 #[tokio::test(flavor = "multi_thread")]
483 async fn test_make_backend_sqlite_path_returns_sqlite_store() {
484 let dir = tempdir().unwrap();
485 let path = dir.path().join("test.sqlite");
486 let sm = make_sm();
487 let cfg = AppConfig::default();
488 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
489 assert!(!backend.needs_flush(), "new SQLite backend starts clean");
490 assert!(
491 !backend.needs_save_worker(),
492 "SQLite backend is write-through and does not need a save worker"
493 );
494 }
495
496 #[tokio::test(flavor = "multi_thread")]
497 async fn test_make_backend_detects_sqlite_by_magic_bytes() {
498 let dir = tempdir().unwrap();
499 let path = dir.path().join("noext");
500
501 kanban_persistence_sqlite::SqliteStore::open(path.to_str().unwrap())
504 .await
505 .unwrap();
506
507 let sm = make_sm();
508 let cfg = AppConfig::default();
509 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
510 assert!(
511 !backend.needs_save_worker(),
512 "magic-byte SQLite detection should yield a write-through backend"
513 );
514 let boards = backend.list_boards().unwrap();
515 assert!(boards.is_empty());
516 }
517
518 #[tokio::test(flavor = "multi_thread")]
519 async fn test_make_backend_detects_json_by_content() {
520 use kanban_persistence::{PersistenceMetadata, PersistenceStore, StoreSnapshot};
521 let dir = tempdir().unwrap();
522 let path = dir.path().join("noext");
523
524 {
527 let jfs = kanban_persistence_json::JsonFileStore::new(&path);
528 let snap = kanban_domain::Snapshot::new();
529 let data = kanban_persistence::snapshot_to_json_bytes(&snap).unwrap();
530 let meta = PersistenceMetadata::new(uuid::Uuid::new_v4());
531 jfs.save(StoreSnapshot {
532 data,
533 metadata: meta,
534 })
535 .await
536 .unwrap();
537 }
538
539 let sm = make_sm();
540 let cfg = AppConfig::default();
541 let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
542 assert!(
543 backend.needs_save_worker(),
544 "content-sniffed JSON backend requires a save worker"
545 );
546 let boards = backend.list_boards().unwrap();
547 assert!(boards.is_empty());
548 }
549 }
550}