use crate::{Error, MAX_QUERY_LIMIT, Result};
use chrono::{DateTime, Utc};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::timeout;
use tracing::{debug, error, info};
use uuid::Uuid;
use super::types::{SyncState, SyncStats};
const SYNC_EPISODE_TIMEOUT: Duration = Duration::from_secs(30);
const SYNC_ALL_TIMEOUT: Duration = Duration::from_secs(60);
pub struct StorageSynchronizer<T, R> {
pub turso: Arc<T>,
pub redb: Arc<R>,
sync_state: Arc<RwLock<SyncState>>,
}
impl<T, R> StorageSynchronizer<T, R> {
pub fn new(turso: Arc<T>, redb: Arc<R>) -> Self {
Self {
turso,
redb,
sync_state: Arc::new(RwLock::new(SyncState::default())),
}
}
pub async fn get_sync_state(&self) -> SyncState {
self.sync_state.read().await.clone()
}
async fn update_sync_state(&self, episodes_synced: usize, errors: usize) {
let mut state = self.sync_state.write().await;
state.last_sync = Some(chrono::Utc::now());
state.sync_count += 1;
if errors > 0 {
state.last_error = Some(format!(
"Synced {episodes_synced} episodes with {errors} errors"
));
} else {
state.last_error = None;
}
}
}
impl<T, R> StorageSynchronizer<T, R>
where
T: crate::storage::StorageBackend + 'static,
R: crate::storage::StorageBackend + 'static,
{
pub async fn sync_episode_to_cache(&self, episode_id: Uuid) -> Result<()> {
let correlation_id = Uuid::new_v4();
info!(correlation_id = %correlation_id, "Syncing episode {} to cache", episode_id);
let episode = match timeout(SYNC_EPISODE_TIMEOUT, self.turso.get_episode(episode_id)).await
{
Ok(Ok(Some(episode))) => episode,
Ok(Ok(None)) => {
return Err(Error::Storage(format!(
"Episode {episode_id} not found in source storage"
)));
}
Ok(Err(e)) => return Err(Error::Storage(format!("Error fetching episode: {e}"))),
Err(_) => {
return Err(Error::Storage(format!(
"Timeout fetching episode {episode_id} after {SYNC_EPISODE_TIMEOUT:?}"
)));
}
};
match timeout(SYNC_EPISODE_TIMEOUT, self.redb.store_episode(&episode)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(Error::Storage(format!("Error storing episode: {e}"))),
Err(_) => {
return Err(Error::Storage(format!(
"Timeout storing episode {episode_id} after {SYNC_EPISODE_TIMEOUT:?}"
)));
}
}
info!(correlation_id = %correlation_id, "Successfully synced episode {} to cache", episode_id);
Ok(())
}
pub async fn sync_all_recent_episodes(&self, since: DateTime<Utc>) -> Result<SyncStats> {
let correlation_id = Uuid::new_v4();
info!(correlation_id = %correlation_id, "Syncing all episodes since {}", since);
let episodes = match timeout(
SYNC_ALL_TIMEOUT,
self.turso
.query_episodes_since(since, Some(MAX_QUERY_LIMIT)),
)
.await
{
Ok(Ok(episodes)) => episodes,
Ok(Err(e)) => return Err(Error::Storage(format!("Error querying episodes: {e}"))),
Err(_) => {
return Err(Error::Storage(format!(
"Timeout querying episodes after {SYNC_ALL_TIMEOUT:?}"
)));
}
};
let total = episodes.len();
let mut stats = SyncStats::default();
for episode in episodes {
let episode_id = episode.episode_id;
match timeout(SYNC_EPISODE_TIMEOUT, self.redb.store_episode(&episode)).await {
Ok(Ok(())) => {
stats.episodes_synced += 1;
}
Ok(Err(e)) => {
error!(correlation_id = %correlation_id, "Failed to sync episode {}: {}", episode_id, e);
stats.errors += 1;
}
Err(_) => {
error!(
correlation_id = %correlation_id,
"Timeout syncing episode {} after {:?}",
episode_id, SYNC_EPISODE_TIMEOUT
);
stats.errors += 1;
}
}
}
self.update_sync_state(stats.episodes_synced, stats.errors)
.await;
info!(
correlation_id = %correlation_id,
"Sync complete: {}/{} episodes synced, {} errors",
stats.episodes_synced, total, stats.errors
);
Ok(stats)
}
pub fn start_periodic_sync(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
info!("Starting periodic sync with interval {:?}", interval);
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
interval_timer.tick().await;
let since = Utc::now() - chrono::Duration::hours(1);
let correlation_id = Uuid::new_v4();
match self.sync_all_recent_episodes(since).await {
Ok(stats) => {
debug!(
correlation_id = %correlation_id,
"Periodic sync successful: {} episodes synced",
stats.episodes_synced
);
}
Err(e) => {
error!(correlation_id = %correlation_id, "Periodic sync failed: {}", e);
}
}
}
})
}
}