do_memory_core/sync/
synchronizer.rs1use crate::{Error, MAX_QUERY_LIMIT, Result};
4use chrono::{DateTime, Utc};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::RwLock;
8use tokio::time::timeout;
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12use super::types::{SyncState, SyncStats};
13
14const SYNC_EPISODE_TIMEOUT: Duration = Duration::from_secs(30);
20
21const SYNC_ALL_TIMEOUT: Duration = Duration::from_secs(60);
23
24pub struct StorageSynchronizer<T, R> {
26 pub turso: Arc<T>,
28 pub redb: Arc<R>,
30 sync_state: Arc<RwLock<SyncState>>,
31}
32
33impl<T, R> StorageSynchronizer<T, R> {
34 pub fn new(turso: Arc<T>, redb: Arc<R>) -> Self {
36 Self {
37 turso,
38 redb,
39 sync_state: Arc::new(RwLock::new(SyncState::default())),
40 }
41 }
42
43 pub async fn get_sync_state(&self) -> SyncState {
45 self.sync_state.read().await.clone()
46 }
47
48 async fn update_sync_state(&self, episodes_synced: usize, errors: usize) {
50 let mut state = self.sync_state.write().await;
51 state.last_sync = Some(chrono::Utc::now());
52 state.sync_count += 1;
53 if errors > 0 {
54 state.last_error = Some(format!(
55 "Synced {episodes_synced} episodes with {errors} errors"
56 ));
57 } else {
58 state.last_error = None;
59 }
60 }
61}
62
63impl<T, R> StorageSynchronizer<T, R>
66where
67 T: crate::storage::StorageBackend + 'static,
68 R: crate::storage::StorageBackend + 'static,
69{
70 pub async fn sync_episode_to_cache(&self, episode_id: Uuid) -> Result<()> {
82 let correlation_id = Uuid::new_v4();
83
84 info!(correlation_id = %correlation_id, "Syncing episode {} to cache", episode_id);
85
86 let episode = match timeout(SYNC_EPISODE_TIMEOUT, self.turso.get_episode(episode_id)).await
89 {
90 Ok(Ok(Some(episode))) => episode,
91 Ok(Ok(None)) => {
92 return Err(Error::Storage(format!(
93 "Episode {episode_id} not found in source storage"
94 )));
95 }
96 Ok(Err(e)) => return Err(Error::Storage(format!("Error fetching episode: {e}"))),
97 Err(_) => {
98 return Err(Error::Storage(format!(
99 "Timeout fetching episode {episode_id} after {SYNC_EPISODE_TIMEOUT:?}"
100 )));
101 }
102 };
103
104 match timeout(SYNC_EPISODE_TIMEOUT, self.redb.store_episode(&episode)).await {
106 Ok(Ok(())) => {}
107 Ok(Err(e)) => return Err(Error::Storage(format!("Error storing episode: {e}"))),
108 Err(_) => {
109 return Err(Error::Storage(format!(
110 "Timeout storing episode {episode_id} after {SYNC_EPISODE_TIMEOUT:?}"
111 )));
112 }
113 }
114
115 info!(correlation_id = %correlation_id, "Successfully synced episode {} to cache", episode_id);
116 Ok(())
117 }
118
119 pub async fn sync_all_recent_episodes(&self, since: DateTime<Utc>) -> Result<SyncStats> {
135 let correlation_id = Uuid::new_v4();
136
137 info!(correlation_id = %correlation_id, "Syncing all episodes since {}", since);
138
139 let episodes = match timeout(
142 SYNC_ALL_TIMEOUT,
143 self.turso
144 .query_episodes_since(since, Some(MAX_QUERY_LIMIT)),
145 )
146 .await
147 {
148 Ok(Ok(episodes)) => episodes,
149 Ok(Err(e)) => return Err(Error::Storage(format!("Error querying episodes: {e}"))),
150 Err(_) => {
151 return Err(Error::Storage(format!(
152 "Timeout querying episodes after {SYNC_ALL_TIMEOUT:?}"
153 )));
154 }
155 };
156
157 let total = episodes.len();
158
159 let mut stats = SyncStats::default();
160
161 for episode in episodes {
163 let episode_id = episode.episode_id;
164 match timeout(SYNC_EPISODE_TIMEOUT, self.redb.store_episode(&episode)).await {
165 Ok(Ok(())) => {
166 stats.episodes_synced += 1;
167 }
168 Ok(Err(e)) => {
169 error!(correlation_id = %correlation_id, "Failed to sync episode {}: {}", episode_id, e);
170 stats.errors += 1;
171 }
172 Err(_) => {
173 error!(
174 correlation_id = %correlation_id,
175 "Timeout syncing episode {} after {:?}",
176 episode_id, SYNC_EPISODE_TIMEOUT
177 );
178 stats.errors += 1;
179 }
180 }
181 }
182
183 self.update_sync_state(stats.episodes_synced, stats.errors)
185 .await;
186
187 info!(
188 correlation_id = %correlation_id,
189 "Sync complete: {}/{} episodes synced, {} errors",
190 stats.episodes_synced, total, stats.errors
191 );
192
193 Ok(stats)
194 }
195
196 pub fn start_periodic_sync(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
222 info!("Starting periodic sync with interval {:?}", interval);
223
224 tokio::spawn(async move {
225 let mut interval_timer = tokio::time::interval(interval);
226 loop {
227 interval_timer.tick().await;
228
229 let since = Utc::now() - chrono::Duration::hours(1);
230 let correlation_id = Uuid::new_v4();
231
232 match self.sync_all_recent_episodes(since).await {
233 Ok(stats) => {
234 debug!(
235 correlation_id = %correlation_id,
236 "Periodic sync successful: {} episodes synced",
237 stats.episodes_synced
238 );
239 }
240 Err(e) => {
241 error!(correlation_id = %correlation_id, "Periodic sync failed: {}", e);
242 }
243 }
244 }
245 })
246 }
247}