use crate::backend::KanbanBackend;
use async_trait::async_trait;
use kanban_domain::commands::Command;
use kanban_domain::data_store::GraphMutFn;
use kanban_domain::{
ArchivedCard, Board, Card, Column, CommandStore, DataStore, DependencyGraph, InMemoryStore,
KanbanError, KanbanResult, Snapshot, Sprint,
};
use kanban_persistence::{
snapshot_from_json_bytes, snapshot_to_json_bytes, PersistenceMetadata, PersistenceStore,
StoreSnapshot,
};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
};
use uuid::Uuid;
pub struct JsonDataStore {
file_store: Arc<dyn PersistenceStore + Send + Sync>,
inner: RwLock<Option<InMemoryStore>>,
last_metadata: RwLock<Option<PersistenceMetadata>>,
dirty: AtomicBool,
}
impl JsonDataStore {
pub fn new(file_store: Arc<dyn PersistenceStore + Send + Sync>) -> Self {
Self {
file_store,
inner: RwLock::new(None),
last_metadata: RwLock::new(None),
dirty: AtomicBool::new(false),
}
}
fn ensure_loaded(&self) -> KanbanResult<()> {
{
let guard = self
.inner
.read()
.map_err(|_| KanbanError::Internal("json_backend: inner RwLock poisoned".into()))?;
if guard.is_some() {
return Ok(());
}
}
let store = InMemoryStore::new();
let loaded = self.file_store.load_sync().map_err(KanbanError::from)?;
if let Some((ss, meta)) = loaded {
let snapshot = snapshot_from_json_bytes(&ss.data).map_err(KanbanError::from)?;
store.apply_snapshot(snapshot)?;
let mut guard = self.last_metadata.write().map_err(|_| {
KanbanError::Internal("json_backend: last_metadata RwLock poisoned".into())
})?;
*guard = Some(meta);
}
let mut guard = self
.inner
.write()
.map_err(|_| KanbanError::Internal("json_backend: inner RwLock poisoned".into()))?;
if guard.is_some() {
return Ok(());
}
*guard = Some(store);
drop(guard);
Ok(())
}
fn with_read<T>(&self, f: impl FnOnce(&InMemoryStore) -> KanbanResult<T>) -> KanbanResult<T> {
self.ensure_loaded()?;
let guard = self
.inner
.read()
.map_err(|_| KanbanError::Internal("json_backend: inner RwLock poisoned".into()))?;
f(guard.as_ref().expect("ensure_loaded guarantees Some"))
}
async fn do_flush(&self) -> KanbanResult<()> {
let snapshot = {
let guard = self
.inner
.read()
.map_err(|_| KanbanError::Internal("json_backend: inner RwLock poisoned".into()))?;
let store = match guard.as_ref() {
Some(s) => s,
None => return Ok(()), };
store.snapshot()?
};
let data = snapshot_to_json_bytes(&snapshot).map_err(KanbanError::from)?;
let metadata = PersistenceMetadata::new(self.file_store.instance_id());
let returned = self
.file_store
.save(StoreSnapshot { data, metadata })
.await
.map_err(KanbanError::from)?;
let mut guard = self.last_metadata.write().map_err(|_| {
KanbanError::Internal("json_backend: last_metadata RwLock poisoned".into())
})?;
*guard = Some(returned);
Ok(())
}
fn with_mutate<T>(&self, f: impl FnOnce(&InMemoryStore) -> KanbanResult<T>) -> KanbanResult<T> {
self.ensure_loaded()?;
let guard = self
.inner
.read()
.map_err(|_| KanbanError::Internal("json_backend: inner RwLock poisoned".into()))?;
let result = f(guard.as_ref().expect("ensure_loaded guarantees Some"))?;
self.dirty.store(true, Ordering::Release);
Ok(result)
}
}
impl DataStore for JsonDataStore {
fn get_board(&self, id: Uuid) -> KanbanResult<Option<Board>> {
self.with_read(|s| s.get_board(id))
}
fn list_boards(&self) -> KanbanResult<Vec<Board>> {
self.with_read(|s| s.list_boards())
}
fn upsert_board(&self, board: Board) -> KanbanResult<()> {
self.with_mutate(|s| s.upsert_board(board))
}
fn delete_board(&self, id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_board(id))
}
fn get_column(&self, id: Uuid) -> KanbanResult<Option<Column>> {
self.with_read(|s| s.get_column(id))
}
fn list_columns_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Column>> {
self.with_read(|s| s.list_columns_by_board(board_id))
}
fn list_all_columns(&self) -> KanbanResult<Vec<Column>> {
self.with_read(|s| s.list_all_columns())
}
fn upsert_column(&self, column: Column) -> KanbanResult<()> {
self.with_mutate(|s| s.upsert_column(column))
}
fn delete_column(&self, id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_column(id))
}
fn delete_columns_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_columns_by_board(board_id))
}
fn get_card(&self, id: Uuid) -> KanbanResult<Option<Card>> {
self.with_read(|s| s.get_card(id))
}
fn list_all_cards(&self) -> KanbanResult<Vec<Card>> {
self.with_read(|s| s.list_all_cards())
}
fn list_cards_by_column(&self, column_id: Uuid) -> KanbanResult<Vec<Card>> {
self.with_read(|s| s.list_cards_by_column(column_id))
}
fn list_cards_by_sprint(&self, sprint_id: Uuid) -> KanbanResult<Vec<Card>> {
self.with_read(|s| s.list_cards_by_sprint(sprint_id))
}
fn count_cards_in_column(&self, column_id: Uuid) -> KanbanResult<usize> {
self.with_read(|s| s.count_cards_in_column(column_id))
}
fn count_cards_in_column_excluding(
&self,
column_id: Uuid,
exclude: &[Uuid],
) -> KanbanResult<usize> {
self.with_read(|s| s.count_cards_in_column_excluding(column_id, exclude))
}
fn upsert_card(&self, card: Card) -> KanbanResult<()> {
self.with_mutate(|s| s.upsert_card(card))
}
fn delete_card(&self, id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_card(id))
}
fn delete_cards_by_columns(&self, column_ids: &[Uuid]) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_cards_by_columns(column_ids))
}
fn clear_sprint_from_cards(
&self,
sprint_id: Uuid,
timestamp: chrono::DateTime<chrono::Utc>,
) -> KanbanResult<()> {
self.with_mutate(|s| s.clear_sprint_from_cards(sprint_id, timestamp))
}
fn get_archived_card(&self, card_id: Uuid) -> KanbanResult<Option<ArchivedCard>> {
self.with_read(|s| s.get_archived_card(card_id))
}
fn list_archived_cards(&self) -> KanbanResult<Vec<ArchivedCard>> {
self.with_read(|s| s.list_archived_cards())
}
fn insert_archived_card(&self, ac: ArchivedCard) -> KanbanResult<()> {
self.with_mutate(|s| s.insert_archived_card(ac))
}
fn delete_archived_card(&self, card_id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_archived_card(card_id))
}
fn clear_sprint_from_archived_cards(
&self,
sprint_id: Uuid,
timestamp: chrono::DateTime<chrono::Utc>,
) -> KanbanResult<()> {
self.with_mutate(|s| s.clear_sprint_from_archived_cards(sprint_id, timestamp))
}
fn get_sprint(&self, id: Uuid) -> KanbanResult<Option<Sprint>> {
self.with_read(|s| s.get_sprint(id))
}
fn list_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<Vec<Sprint>> {
self.with_read(|s| s.list_sprints_by_board(board_id))
}
fn list_all_sprints(&self) -> KanbanResult<Vec<Sprint>> {
self.with_read(|s| s.list_all_sprints())
}
fn upsert_sprint(&self, sprint: Sprint) -> KanbanResult<()> {
self.with_mutate(|s| s.upsert_sprint(sprint))
}
fn delete_sprint(&self, id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_sprint(id))
}
fn delete_sprints_by_board(&self, board_id: Uuid) -> KanbanResult<()> {
self.with_mutate(|s| s.delete_sprints_by_board(board_id))
}
fn get_graph(&self) -> KanbanResult<DependencyGraph> {
self.with_read(|s| s.get_graph())
}
fn set_graph(&self, graph: DependencyGraph) -> KanbanResult<()> {
self.with_mutate(|s| s.set_graph(graph))
}
fn modify_graph(&self, f: GraphMutFn) -> KanbanResult<()> {
self.with_mutate(|s| s.modify_graph(f))
}
fn snapshot(&self) -> KanbanResult<Snapshot> {
self.with_read(|s| s.snapshot())
}
fn apply_snapshot(&self, snapshot: Snapshot) -> KanbanResult<()> {
self.with_mutate(|s| s.apply_snapshot(snapshot))
}
}
impl CommandStore for JsonDataStore {
fn append_commands(&self, cmds: &[Command]) -> KanbanResult<u64> {
self.with_mutate(|s| s.append_commands(cmds))
}
fn command_count(&self) -> KanbanResult<u64> {
self.with_read(|s| s.command_count())
}
fn load_commands(&self, from: u64, to: u64) -> KanbanResult<Vec<Vec<Command>>> {
self.with_read(|s| s.load_commands(from, to))
}
fn load_all_commands(&self) -> KanbanResult<(Vec<Vec<Command>>, u64)> {
self.with_read(|s| s.load_all_commands())
}
}
#[async_trait]
impl KanbanBackend for JsonDataStore {
fn as_data_store(&self) -> &dyn DataStore {
self
}
async fn flush(&self) -> KanbanResult<()> {
if !self.dirty.swap(false, Ordering::AcqRel) {
return Ok(());
}
let result = self.do_flush().await;
if result.is_err() {
self.dirty.store(true, Ordering::Release);
}
result
}
async fn reload(&self) -> KanbanResult<()> {
{
let mut guard = self
.inner
.write()
.map_err(|_| KanbanError::Internal("json_backend: inner RwLock poisoned".into()))?;
*guard = None;
} self.dirty.store(false, Ordering::Release);
Ok(())
}
fn needs_flush(&self) -> bool {
self.dirty.load(Ordering::Acquire)
}
fn needs_save_worker(&self) -> bool {
true
}
fn instance_id(&self) -> Uuid {
self.file_store.instance_id()
}
fn persistence_metadata(&self) -> Option<PersistenceMetadata> {
self.last_metadata.read().ok().and_then(|g| g.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use kanban_domain::Board;
use kanban_persistence_json::JsonFileStore;
use tempfile::tempdir;
fn make_store(path: &std::path::Path) -> JsonDataStore {
JsonDataStore::new(Arc::new(JsonFileStore::new(path)))
}
#[tokio::test(flavor = "multi_thread")]
async fn test_json_backend_exposes_metadata_after_flush() {
let dir = tempdir().unwrap();
let path = dir.path().join("md.json");
let jds = make_store(&path);
jds.upsert_board(Board::new("B", None::<String>)).unwrap();
jds.flush().await.unwrap();
let meta = jds
.persistence_metadata()
.expect("flushed JSON backend must expose metadata");
assert_eq!(
meta.writer_version.as_deref(),
Some(kanban_core::KANBAN_VERSION),
);
assert_eq!(
meta.writer_commit.as_deref(),
Some(kanban_core::KANBAN_COMMIT),
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_json_backend_metadata_is_none_before_any_load_or_save() {
let dir = tempdir().unwrap();
let path = dir.path().join("untouched.json");
let jds = make_store(&path);
assert!(jds.persistence_metadata().is_none());
}
#[tokio::test]
async fn test_ensure_loaded_works_in_single_thread_runtime() {
let dir = tempdir().unwrap();
let path = dir.path().join("single_thread.json");
let jds = make_store(&path);
let boards = jds.list_boards().unwrap();
assert!(boards.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_flush_restores_dirty_flag_on_io_failure() {
use async_trait::async_trait;
use kanban_persistence::{
PersistenceError, PersistenceMetadata, PersistenceResult, PersistenceStore,
StoreSnapshot,
};
struct FailingStore;
#[async_trait]
impl PersistenceStore for FailingStore {
async fn save(&self, _: StoreSnapshot) -> PersistenceResult<PersistenceMetadata> {
Err(PersistenceError::Io(std::io::Error::other(
"injected save failure",
)))
}
async fn load(&self) -> PersistenceResult<(StoreSnapshot, PersistenceMetadata)> {
Err(PersistenceError::Serialization("not implemented".into()))
}
async fn exists(&self) -> bool {
false
}
fn path(&self) -> &std::path::Path {
std::path::Path::new("")
}
fn instance_id(&self) -> uuid::Uuid {
uuid::Uuid::nil()
}
fn load_sync(&self) -> PersistenceResult<Option<(StoreSnapshot, PersistenceMetadata)>> {
Ok(None)
}
}
let jds = JsonDataStore::new(Arc::new(FailingStore));
jds.upsert_board(Board::new("B", None::<String>)).unwrap();
assert!(jds.needs_flush(), "must be dirty before flush attempt");
let result = jds.flush().await;
assert!(result.is_err(), "flush should propagate the I/O failure");
assert!(
jds.needs_flush(),
"dirty flag must be restored after a failed flush"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_construction_does_no_io() {
let dir = tempdir().unwrap();
let path = dir.path().join("nonexistent.json");
let _store = make_store(&path);
assert!(!path.exists());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_list_boards_triggers_load_on_existing_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.json");
let (boards_json, _) = {
let store = JsonFileStore::new(&path);
let board = Board::new("Alpha", None::<String>);
let snap = Snapshot {
boards: vec![board],
..Snapshot::new()
};
let data = snapshot_to_json_bytes(&snap).unwrap();
let meta = PersistenceMetadata::new(Uuid::new_v4());
store
.save(StoreSnapshot {
data,
metadata: meta,
})
.await
.unwrap();
(snap.boards, ())
};
let jds = make_store(&path);
{
let guard = jds.inner.read().unwrap();
assert!(guard.is_none(), "inner should be None before first read");
}
let boards = jds.list_boards().unwrap();
assert_eq!(boards.len(), boards_json.len());
assert_eq!(boards[0].name, "Alpha");
{
let guard = jds.inner.read().unwrap();
assert!(guard.is_some(), "inner should be Some after first read");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_needs_flush_false_when_clean() {
let dir = tempdir().unwrap();
let jds = make_store(&dir.path().join("t.json"));
assert!(!jds.needs_flush());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_needs_flush_true_after_upsert() {
let dir = tempdir().unwrap();
let jds = make_store(&dir.path().join("t.json"));
jds.upsert_board(Board::new("B", None::<String>)).unwrap();
assert!(jds.needs_flush());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_flush_writes_to_disk() {
let dir = tempdir().unwrap();
let path = dir.path().join("flush.json");
let jds = make_store(&path);
jds.upsert_board(Board::new("Flushed", None::<String>))
.unwrap();
jds.flush().await.unwrap();
assert!(!jds.needs_flush(), "dirty flag cleared after flush");
let jds2 = make_store(&path);
let boards = jds2.list_boards().unwrap();
assert_eq!(boards.len(), 1);
assert_eq!(boards[0].name, "Flushed");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reload_clears_cache() {
let dir = tempdir().unwrap();
let path = dir.path().join("reload.json");
let writer = make_store(&path);
writer
.upsert_board(Board::new("Initial", None::<String>))
.unwrap();
writer.flush().await.unwrap();
let reader = make_store(&path);
let boards = reader.list_boards().unwrap();
assert_eq!(boards[0].name, "Initial");
writer
.upsert_board(Board::new("Updated", None::<String>))
.unwrap();
writer.flush().await.unwrap();
let boards = reader.list_boards().unwrap();
assert_eq!(boards.len(), 1, "stale before reload");
reader.reload().await.unwrap();
let boards = reader.list_boards().unwrap();
assert_eq!(boards.len(), 2, "should see both boards after reload");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_needs_save_worker_returns_true() {
let dir = tempdir().unwrap();
let jds = make_store(&dir.path().join("t.json"));
assert!(jds.needs_save_worker());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_apply_snapshot_sets_dirty_flag() {
let dir = tempdir().unwrap();
let jds = make_store(&dir.path().join("t.json"));
jds.apply_snapshot(Snapshot::new()).unwrap();
assert!(jds.needs_flush(), "apply_snapshot must mark backend dirty");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_ensure_loaded_is_idempotent_under_concurrent_access() {
use kanban_persistence::{PersistenceMetadata, PersistenceStore, StoreSnapshot};
let dir = tempdir().unwrap();
let path = dir.path().join("concurrent.json");
{
let store = Arc::new(JsonFileStore::new(&path));
let board = Board::new("ConcurrentBoard", None::<String>);
let snap = kanban_domain::Snapshot {
boards: vec![board],
..kanban_domain::Snapshot::new()
};
let data = snapshot_to_json_bytes(&snap).unwrap();
store
.save(StoreSnapshot {
data,
metadata: PersistenceMetadata::new(uuid::Uuid::new_v4()),
})
.await
.unwrap();
}
let jds = Arc::new(make_store(&path));
let jds2 = Arc::clone(&jds);
let t1 = tokio::task::spawn_blocking(move || jds.list_boards());
let t2 = tokio::task::spawn_blocking(move || jds2.list_boards());
let (r1, r2) = tokio::join!(t1, t2);
let boards1 = r1.unwrap().unwrap();
let boards2 = r2.unwrap().unwrap();
assert_eq!(boards1.len(), 1);
assert_eq!(boards2.len(), 1);
assert_eq!(boards1[0].name, "ConcurrentBoard");
assert_eq!(boards2[0].name, "ConcurrentBoard");
}
}