use crate::config;
use crate::AppConfig;
use kanban_domain::{DataStore, KanbanError};
use kanban_persistence::{
snapshot_from_json_bytes, PersistenceStore, StoreRegistry, StoreSnapshot,
};
use std::collections::HashSet;
use std::sync::Arc;
pub struct StoreManager {
registry: Arc<StoreRegistry>,
}
impl StoreManager {
pub fn new(registry: StoreRegistry) -> Self {
Self {
registry: Arc::new(registry),
}
}
pub fn registry(&self) -> &StoreRegistry {
&self.registry
}
pub fn has_backends(&self) -> bool {
!self.registry.is_empty()
}
pub fn backend_names(&self) -> Vec<&str> {
self.registry.backend_names()
}
pub fn is_sqlite(&self, locator: &str) -> bool {
match self.detect_backend(locator).as_deref() {
Some("sqlite") => true,
None => {
locator.ends_with(".sqlite")
|| locator.ends_with(".sqlite3")
|| locator.ends_with(".db")
}
_ => false,
}
}
pub fn detect_backend(&self, locator: &str) -> Option<String> {
if let Some(name) = self.registry.detect_backend(locator) {
return Some(name.to_string());
}
#[cfg(feature = "sqlite")]
{
let path = std::path::Path::new(locator);
if path.exists() {
if let Ok(mut f) = std::fs::File::open(path) {
use std::io::Read;
let mut hdr = [0u8; 16];
let n = f.read(&mut hdr).unwrap_or(0);
if hdr[..n].starts_with(b"SQLite format 3\0") {
return Some("sqlite".to_string());
}
}
}
}
None
}
pub fn sync_backend_with_file(&self, locator: &str, config: &mut AppConfig) -> bool {
if let Some(detected) = self.detect_backend(locator) {
if detected != config.effective_storage_backend() {
config.storage_backend = Some(detected);
return true;
}
}
false
}
pub async fn make_backend(
&self,
locator: &str,
config: &AppConfig,
) -> Result<std::sync::Arc<dyn crate::backend::KanbanBackend>, KanbanError> {
if self.is_sqlite(locator) {
#[cfg(feature = "sqlite")]
{
let backend = crate::sqlite_backend::SqliteBackend::open(locator).await?;
return Ok(std::sync::Arc::new(backend));
}
#[cfg(not(feature = "sqlite"))]
return Err(KanbanError::Internal(format!(
"path '{}' requires the sqlite feature which is not compiled in",
locator
)));
}
let store = self.make_store(config.effective_storage_backend(), locator)?;
#[cfg(feature = "json")]
return Ok(std::sync::Arc::new(
crate::json_backend::JsonDataStore::new(store),
));
#[cfg(not(feature = "json"))]
Err(KanbanError::Internal(format!(
"path '{}' requires the json feature which is not compiled in",
locator
)))
}
pub fn make_store(
&self,
backend: &str,
locator: &str,
) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
Ok(self.registry.create_store(backend, locator)?)
}
pub fn make_store_with_config(
&self,
file: Option<&str>,
config: &AppConfig,
) -> Result<Arc<dyn PersistenceStore + Send + Sync>, KanbanError> {
let locator = match file {
Some(path) => path.to_string(),
None => config::resolve_storage_location(config),
};
let backend = self
.detect_backend(&locator)
.unwrap_or_else(|| config.effective_storage_backend().to_string());
self.make_store(&backend, &locator)
}
pub async fn validate_and_load_store(
&self,
backend: &str,
path: &str,
) -> Result<kanban_domain::Snapshot, KanbanError> {
if matches!(backend, "sqlite" | "sqlite3" | "db") {
#[cfg(feature = "sqlite")]
{
if !std::path::Path::new(path).exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Storage file does not exist: {}", path),
)
.into());
}
let store = kanban_persistence_sqlite::SqliteStore::open(path).await?;
return store.snapshot();
}
#[cfg(not(feature = "sqlite"))]
return Err(KanbanError::validation("sqlite feature not compiled in"));
}
let store = self.make_store(backend, path)?;
if !store.exists().await {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Storage file does not exist: {}", path),
)
.into());
}
let (snapshot, _metadata) = store.load().await?;
let data = snapshot_from_json_bytes(&snapshot.data)?;
Ok(data)
}
pub async fn export_to_sqlite(
&self,
export: kanban_domain::export::AllBoardsExport,
filename: &str,
) -> Result<(), KanbanError> {
#[cfg(feature = "sqlite")]
{
use kanban_domain::export::BoardImporter;
use kanban_domain::{DependencyGraph, Snapshot};
let entities = BoardImporter::extract_entities(export);
let snapshot = Snapshot {
boards: entities.boards,
columns: entities.columns,
cards: entities.cards,
archived_cards: entities.archived_cards,
sprints: entities.sprints,
graph: DependencyGraph::default(),
};
let store = kanban_persistence_sqlite::SqliteStore::open(filename).await?;
store.apply_snapshot(snapshot)?;
Ok(())
}
#[cfg(not(feature = "sqlite"))]
{
let _ = export;
let _ = filename;
Err(KanbanError::validation("sqlite feature not compiled in"))
}
}
pub async fn migrate_store(
&self,
from_backend: &str,
from_path: &str,
to_backend: &str,
to_path: &str,
) -> Result<(), KanbanError> {
let from = std::path::Path::new(from_path);
let to = std::path::Path::new(to_path);
if !from.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Source file not found: {}", from.display()),
)
.into());
}
if to.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
format!(
"Destination already exists: {}. Remove it first or use a different path.",
to.display()
),
)
.into());
}
let mut store_snapshot: StoreSnapshot = match from_backend {
"sqlite" | "sqlite3" | "db" => {
#[cfg(feature = "sqlite")]
{
use kanban_persistence::PersistenceMetadata;
let store = kanban_persistence_sqlite::SqliteStore::open(from_path).await?;
let snapshot = store.snapshot()?;
let data = kanban_persistence::snapshot_to_json_bytes(&snapshot)?;
StoreSnapshot {
data,
metadata: PersistenceMetadata::new(uuid::Uuid::new_v4()),
}
}
#[cfg(not(feature = "sqlite"))]
return Err(KanbanError::validation("sqlite feature not compiled in"));
}
_ => {
let source = self.make_store(from_backend, from_path)?;
let (snap, _) = source.load().await?;
snap
}
};
repair_snapshot_fks(&mut store_snapshot)?;
match to_backend {
"sqlite" | "sqlite3" | "db" => {
#[cfg(feature = "sqlite")]
{
let repaired = snapshot_from_json_bytes(&store_snapshot.data)?;
let store = kanban_persistence_sqlite::SqliteStore::open(to_path).await?;
let outcome = store.apply_snapshot(repaired.clone());
store.close().await;
drop(store);
if let Err(e) = outcome {
cleanup_destination_files(to_path).await;
return Err(e);
}
}
#[cfg(not(feature = "sqlite"))]
return Err(KanbanError::validation("sqlite feature not compiled in"));
}
_ => {
let target = self.make_store(to_backend, to_path)?;
let outcome = target.save(store_snapshot).await;
target.close().await;
drop(target);
if let Err(e) = outcome {
cleanup_destination_files(to_path).await;
return Err(e.into());
}
}
}
Ok(())
}
}
impl Clone for StoreManager {
fn clone(&self) -> Self {
Self {
registry: Arc::clone(&self.registry),
}
}
}
async fn remove_file_with_windows_retry(path: &std::path::Path) {
for delay_ms in [0u64, 50, 100, 200, 400] {
if delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
if std::fs::remove_file(path).is_ok() {
return;
}
if !path.exists() {
return;
}
}
tracing::warn!(
path = %path.display(),
"failed to remove file after retry backoff; orphan may remain on disk"
);
}
async fn cleanup_destination_files(to_path: &str) {
remove_file_with_windows_retry(std::path::Path::new(to_path)).await;
let wal = format!("{}-wal", to_path);
let shm = format!("{}-shm", to_path);
remove_file_with_windows_retry(std::path::Path::new(&wal)).await;
remove_file_with_windows_retry(std::path::Path::new(&shm)).await;
}
fn repair_snapshot_fks(snapshot: &mut StoreSnapshot) -> Result<(), KanbanError> {
let mut data: serde_json::Value = serde_json::from_slice(&snapshot.data).map_err(|e| {
KanbanError::validation(format!("Failed to parse snapshot for FK repair: {e}"))
})?;
let valid_columns: HashSet<String> = data["columns"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|c| c["id"].as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let valid_sprints: HashSet<String> = data["sprints"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|s| s["id"].as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let fallback_column: Option<String> = data["columns"].as_array().and_then(|arr| {
arr.iter()
.min_by_key(|c| c["position"].as_i64().unwrap_or(i64::MAX))
.and_then(|c| c["id"].as_str())
.map(String::from)
});
if let Some(cards) = data["cards"].as_array_mut() {
for card in cards.iter_mut() {
fix_card_fks(
card,
&valid_columns,
&valid_sprints,
fallback_column.as_deref(),
);
}
}
if let Some(archived) = data["archived_cards"].as_array_mut() {
for entry in archived.iter_mut() {
if let Some(card) = entry.get_mut("card") {
fix_card_fks(
card,
&valid_columns,
&valid_sprints,
fallback_column.as_deref(),
);
}
}
}
snapshot.data = serde_json::to_vec(&data).map_err(|e| {
KanbanError::validation(format!("Failed to serialize repaired snapshot: {e}"))
})?;
Ok(())
}
fn fix_card_fks(
card: &mut serde_json::Value,
valid_columns: &HashSet<String>,
valid_sprints: &HashSet<String>,
fallback_column: Option<&str>,
) {
if let Some(sprint_id) = card["sprint_id"].as_str() {
if !valid_sprints.contains(sprint_id) {
card["sprint_id"] = serde_json::Value::Null;
}
}
if let Some(col_id) = card["column_id"].as_str() {
if !valid_columns.contains(col_id) {
if let Some(fb) = fallback_column {
card["column_id"] = serde_json::Value::String(fb.to_string());
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use kanban_persistence::StoreRegistry;
use tempfile::tempdir;
fn make_sm() -> StoreManager {
let mut registry = StoreRegistry::new();
#[cfg(feature = "sqlite")]
registry.register(Box::new(kanban_persistence_sqlite::SqliteStoreFactory));
#[cfg(feature = "json")]
registry.register(Box::new(kanban_persistence_json::JsonStoreFactory));
StoreManager::new(registry)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_make_backend_json_path_returns_json_data_store() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.json");
let sm = make_sm();
let cfg = AppConfig::default();
let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
assert!(!backend.needs_flush(), "new JSON backend starts clean");
assert!(
backend.needs_save_worker(),
"JSON backend requires a background flush worker"
);
}
#[cfg(feature = "sqlite")]
mod sqlite_backend_tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_make_backend_sqlite_path_returns_sqlite_store() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.sqlite");
let sm = make_sm();
let cfg = AppConfig::default();
let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
assert!(!backend.needs_flush(), "new SQLite backend starts clean");
assert!(
!backend.needs_save_worker(),
"SQLite backend is write-through and does not need a save worker"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_make_backend_detects_sqlite_by_magic_bytes() {
let dir = tempdir().unwrap();
let path = dir.path().join("noext");
kanban_persistence_sqlite::SqliteStore::open(path.to_str().unwrap())
.await
.unwrap();
let sm = make_sm();
let cfg = AppConfig::default();
let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
assert!(
!backend.needs_save_worker(),
"magic-byte SQLite detection should yield a write-through backend"
);
let boards = backend.list_boards().unwrap();
assert!(boards.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_make_backend_detects_json_by_content() {
use kanban_persistence::{PersistenceMetadata, PersistenceStore, StoreSnapshot};
let dir = tempdir().unwrap();
let path = dir.path().join("noext");
{
let jfs = kanban_persistence_json::JsonFileStore::new(&path);
let snap = kanban_domain::Snapshot::new();
let data = kanban_persistence::snapshot_to_json_bytes(&snap).unwrap();
let meta = PersistenceMetadata::new(uuid::Uuid::new_v4());
jfs.save(StoreSnapshot {
data,
metadata: meta,
})
.await
.unwrap();
}
let sm = make_sm();
let cfg = AppConfig::default();
let backend = sm.make_backend(path.to_str().unwrap(), &cfg).await.unwrap();
assert!(
backend.needs_save_worker(),
"content-sniffed JSON backend requires a save worker"
);
let boards = backend.list_boards().unwrap();
assert!(boards.is_empty());
}
}
}