use do_memory_core::sync::{ConflictResolution, StorageSynchronizer, SyncConfig};
use do_memory_core::{Episode, TaskContext, TaskType};
use do_memory_storage_redb::RedbStorage;
use do_memory_storage_turso::TursoStorage;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
async fn create_test_turso() -> anyhow::Result<(TursoStorage, TempDir)> {
let dir = TempDir::new()?;
let db_path = dir.path().join("test_turso.db");
let db = libsql::Builder::new_local(&db_path)
.build()
.await
.map_err(|e| anyhow::anyhow!("Failed to create test database: {e}"))?;
let storage = TursoStorage::from_database(db)?;
storage.initialize_schema().await?;
Ok((storage, dir))
}
async fn create_test_redb() -> anyhow::Result<(RedbStorage, TempDir)> {
let dir = TempDir::new()?;
let db_path = dir.path().join("test_redb.db");
let storage = RedbStorage::new(&db_path).await?;
Ok((storage, dir))
}
#[tokio::test]
async fn should_sync_single_episode_from_turso_to_redb_cache() {
let (turso, _turso_dir) = create_test_turso().await.unwrap();
let (redb, _redb_dir) = create_test_redb().await.unwrap();
let sync = StorageSynchronizer::new(Arc::new(turso), Arc::new(redb));
let context = TaskContext::default();
let episode = Episode::new("Test task".to_string(), context, TaskType::Testing);
let episode_id = episode.episode_id;
sync.turso.store_episode(&episode).await.unwrap();
let result = sync.sync_episode_to_cache(episode_id).await;
assert!(result.is_ok(), "Sync should succeed");
let cached = sync.redb.get_episode(episode_id).await.unwrap();
assert!(cached.is_some(), "Episode should be in cache");
assert_eq!(cached.unwrap().episode_id, episode_id);
}
#[tokio::test]
async fn should_sync_all_recent_episodes_in_batch() {
let (turso, _turso_dir) = create_test_turso().await.unwrap();
let (redb, _redb_dir) = create_test_redb().await.unwrap();
let sync = StorageSynchronizer::new(Arc::new(turso), Arc::new(redb));
let context = TaskContext::default();
let mut episode_ids = Vec::new();
for i in 0..5 {
let episode = Episode::new(format!("Test task {i}"), context.clone(), TaskType::Testing);
episode_ids.push(episode.episode_id);
sync.turso.store_episode(&episode).await.unwrap();
}
let since = chrono::Utc::now() - chrono::Duration::hours(1);
let stats = sync.sync_all_recent_episodes(since).await.unwrap();
assert_eq!(stats.episodes_synced, 5, "Should sync all 5 episodes");
assert_eq!(stats.errors, 0, "Should have no errors");
for episode_id in episode_ids {
let cached = sync.redb.get_episode(episode_id).await.unwrap();
assert!(cached.is_some(), "Episode {episode_id} should be in cache");
}
}
#[tokio::test]
async fn should_track_sync_state_and_statistics() {
let (turso, _turso_dir) = create_test_turso().await.unwrap();
let (redb, _redb_dir) = create_test_redb().await.unwrap();
let sync = StorageSynchronizer::new(Arc::new(turso), Arc::new(redb));
let state = sync.get_sync_state().await;
assert_eq!(state.sync_count, 0);
assert!(state.last_sync.is_none());
let context = TaskContext::default();
let episode = Episode::new("Test task".to_string(), context, TaskType::Testing);
sync.turso.store_episode(&episode).await.unwrap();
let since = chrono::Utc::now() - chrono::Duration::hours(1);
sync.sync_all_recent_episodes(since).await.unwrap();
let state = sync.get_sync_state().await;
assert_eq!(state.sync_count, 1);
assert!(state.last_sync.is_some());
assert!(state.last_error.is_none());
}
#[tokio::test]
async fn should_run_periodic_background_sync_automatically() {
let (turso, _turso_dir) = create_test_turso().await.unwrap();
let (redb, _redb_dir) = create_test_redb().await.unwrap();
let sync = Arc::new(StorageSynchronizer::new(Arc::new(turso), Arc::new(redb)));
let context = TaskContext::default();
let episode = Episode::new("Test task".to_string(), context, TaskType::Testing);
let episode_id = episode.episode_id;
sync.turso.store_episode(&episode).await.unwrap();
let handle = sync.clone().start_periodic_sync(Duration::from_millis(100));
let start = std::time::Instant::now();
let timeout = Duration::from_secs(10); let mut synced = false;
while start.elapsed() < timeout {
if let Ok(Some(_)) = sync.redb.get_episode(episode_id).await {
synced = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await; }
handle.abort();
assert!(
synced,
"Episode should be synced to cache within {}s (took {:?})",
timeout.as_secs(),
start.elapsed()
);
let state = sync.get_sync_state().await;
assert!(
state.sync_count > 0,
"Should have performed at least one sync"
);
}
#[tokio::test]
async fn should_handle_missing_episode_gracefully() {
let (turso, _turso_dir) = create_test_turso().await.unwrap();
let (redb, _redb_dir) = create_test_redb().await.unwrap();
let sync = StorageSynchronizer::new(Arc::new(turso), Arc::new(redb));
let fake_id = uuid::Uuid::new_v4();
let result = sync.sync_episode_to_cache(fake_id).await;
assert!(result.is_err(), "Should fail for missing episode");
}
#[test]
fn should_resolve_conflicts_with_turso_wins_strategy() {
let context = TaskContext::default();
let episode1 = Arc::new(Episode::new(
"Task from Turso".to_string(),
context.clone(),
TaskType::Testing,
));
let mut episode2 = Episode::new("Task from redb".to_string(), context, TaskType::Testing);
episode2.episode_id = episode1.episode_id;
let episode2 = Arc::new(episode2);
let resolved = do_memory_core::sync::resolve_episode_conflict(
&episode1,
&episode2,
ConflictResolution::TursoWins,
);
assert_eq!(resolved.task_description, "Task from Turso");
}
#[test]
fn should_resolve_conflicts_with_redb_wins_strategy() {
let context = TaskContext::default();
let episode1 = Arc::new(Episode::new(
"Task from Turso".to_string(),
context.clone(),
TaskType::Testing,
));
let mut episode2 = Episode::new("Task from redb".to_string(), context, TaskType::Testing);
episode2.episode_id = episode1.episode_id;
let episode2 = Arc::new(episode2);
let resolved = do_memory_core::sync::resolve_episode_conflict(
&episode1,
&episode2,
ConflictResolution::RedbWins,
);
assert_eq!(resolved.task_description, "Task from redb");
}
#[test]
fn should_resolve_conflicts_with_most_recent_strategy() {
let context = TaskContext::default();
let episode1 = Arc::new(Episode::new(
"Older task".to_string(),
context.clone(),
TaskType::Testing,
));
let mut episode2 = Episode::new("Newer task".to_string(), context, TaskType::Testing);
episode2.episode_id = episode1.episode_id;
episode2.end_time = Some(chrono::Utc::now());
let episode2 = Arc::new(episode2);
let resolved = do_memory_core::sync::resolve_episode_conflict(
&episode1,
&episode2,
ConflictResolution::MostRecent,
);
assert_eq!(
resolved.task_description, "Newer task",
"Should choose the newer episode"
);
}
#[test]
fn should_provide_sensible_default_sync_configuration() {
let config = SyncConfig::default();
assert_eq!(config.sync_interval, Duration::from_secs(300));
assert_eq!(config.batch_size, 100);
assert!(config.sync_patterns);
assert!(config.sync_heuristics);
}