Skip to main content

do_memory_core/sync/
synchronizer.rs

1//! Storage synchronizer for coordinating Turso and redb
2
3use crate::{Error, MAX_QUERY_LIMIT, Result};
4use chrono::{DateTime, Utc};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::RwLock;
8use tokio::time::timeout;
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12use super::types::{SyncState, SyncStats};
13
14// ============================================================================
15// Timeout Constants
16// ============================================================================
17
18/// Timeout for sync episode operations (30 seconds)
19const SYNC_EPISODE_TIMEOUT: Duration = Duration::from_secs(30);
20
21/// Timeout for sync all episodes operations (60 seconds)
22const SYNC_ALL_TIMEOUT: Duration = Duration::from_secs(60);
23
24/// Storage synchronizer for coordinating Turso and redb
25pub struct StorageSynchronizer<T, R> {
26    /// Source storage (typically Turso - durable)
27    pub turso: Arc<T>,
28    /// Cache storage (typically redb - fast)
29    pub redb: Arc<R>,
30    sync_state: Arc<RwLock<SyncState>>,
31}
32
33impl<T, R> StorageSynchronizer<T, R> {
34    /// Create a new storage synchronizer
35    pub fn new(turso: Arc<T>, redb: Arc<R>) -> Self {
36        Self {
37            turso,
38            redb,
39            sync_state: Arc::new(RwLock::new(SyncState::default())),
40        }
41    }
42
43    /// Get the current synchronization state
44    pub async fn get_sync_state(&self) -> SyncState {
45        self.sync_state.read().await.clone()
46    }
47
48    /// Update sync state after a successful sync
49    async fn update_sync_state(&self, episodes_synced: usize, errors: usize) {
50        let mut state = self.sync_state.write().await;
51        state.last_sync = Some(chrono::Utc::now());
52        state.sync_count += 1;
53        if errors > 0 {
54            state.last_error = Some(format!(
55                "Synced {episodes_synced} episodes with {errors} errors"
56            ));
57        } else {
58            state.last_error = None;
59        }
60    }
61}
62
63// Concrete implementations using the StorageBackend trait
64
65impl<T, R> StorageSynchronizer<T, R>
66where
67    T: crate::storage::StorageBackend + 'static,
68    R: crate::storage::StorageBackend + 'static,
69{
70    /// Sync a single episode from Turso (source) to redb (cache)
71    ///
72    /// Fetches the episode from the source storage and stores it in the cache storage.
73    ///
74    /// # Arguments
75    ///
76    /// * `episode_id` - UUID of the episode to sync
77    ///
78    /// # Errors
79    ///
80    /// Returns error if episode not found or storage operation fails
81    pub async fn sync_episode_to_cache(&self, episode_id: Uuid) -> Result<()> {
82        let correlation_id = Uuid::new_v4();
83
84        info!(correlation_id = %correlation_id, "Syncing episode {} to cache", episode_id);
85
86        // Fetch from Turso (source of truth) with timeout
87        // timeout returns Result<Result<Option<Episode>, Error>, Elapsed>
88        let episode = match timeout(SYNC_EPISODE_TIMEOUT, self.turso.get_episode(episode_id)).await
89        {
90            Ok(Ok(Some(episode))) => episode,
91            Ok(Ok(None)) => {
92                return Err(Error::Storage(format!(
93                    "Episode {episode_id} not found in source storage"
94                )));
95            }
96            Ok(Err(e)) => return Err(Error::Storage(format!("Error fetching episode: {e}"))),
97            Err(_) => {
98                return Err(Error::Storage(format!(
99                    "Timeout fetching episode {episode_id} after {SYNC_EPISODE_TIMEOUT:?}"
100                )));
101            }
102        };
103
104        // Store in redb cache with timeout
105        match timeout(SYNC_EPISODE_TIMEOUT, self.redb.store_episode(&episode)).await {
106            Ok(Ok(())) => {}
107            Ok(Err(e)) => return Err(Error::Storage(format!("Error storing episode: {e}"))),
108            Err(_) => {
109                return Err(Error::Storage(format!(
110                    "Timeout storing episode {episode_id} after {SYNC_EPISODE_TIMEOUT:?}"
111                )));
112            }
113        }
114
115        info!(correlation_id = %correlation_id, "Successfully synced episode {} to cache", episode_id);
116        Ok(())
117    }
118
119    /// Sync all episodes modified since a given timestamp
120    ///
121    /// Queries the source storage for recent episodes and syncs them to the cache.
122    ///
123    /// # Arguments
124    ///
125    /// * `since` - Only sync episodes with `start_time` >= this timestamp
126    ///
127    /// # Returns
128    ///
129    /// Statistics about the sync operation (episodes synced, errors)
130    ///
131    /// # Errors
132    ///
133    /// Returns error if query fails, but continues syncing other episodes if individual stores fail
134    pub async fn sync_all_recent_episodes(&self, since: DateTime<Utc>) -> Result<SyncStats> {
135        let correlation_id = Uuid::new_v4();
136
137        info!(correlation_id = %correlation_id, "Syncing all episodes since {}", since);
138
139        // Query source storage for recent episodes with high limit for sync operations and with timeout
140        // timeout returns Result<Result<Vec<Episode>, Error>, Elapsed>
141        let episodes = match timeout(
142            SYNC_ALL_TIMEOUT,
143            self.turso
144                .query_episodes_since(since, Some(MAX_QUERY_LIMIT)),
145        )
146        .await
147        {
148            Ok(Ok(episodes)) => episodes,
149            Ok(Err(e)) => return Err(Error::Storage(format!("Error querying episodes: {e}"))),
150            Err(_) => {
151                return Err(Error::Storage(format!(
152                    "Timeout querying episodes after {SYNC_ALL_TIMEOUT:?}"
153                )));
154            }
155        };
156
157        let total = episodes.len();
158
159        let mut stats = SyncStats::default();
160
161        // Batch update cache with individual timeouts for each store operation
162        for episode in episodes {
163            let episode_id = episode.episode_id;
164            match timeout(SYNC_EPISODE_TIMEOUT, self.redb.store_episode(&episode)).await {
165                Ok(Ok(())) => {
166                    stats.episodes_synced += 1;
167                }
168                Ok(Err(e)) => {
169                    error!(correlation_id = %correlation_id, "Failed to sync episode {}: {}", episode_id, e);
170                    stats.errors += 1;
171                }
172                Err(_) => {
173                    error!(
174                        correlation_id = %correlation_id,
175                        "Timeout syncing episode {} after {:?}",
176                        episode_id, SYNC_EPISODE_TIMEOUT
177                    );
178                    stats.errors += 1;
179                }
180            }
181        }
182
183        // Update sync state
184        self.update_sync_state(stats.episodes_synced, stats.errors)
185            .await;
186
187        info!(
188            correlation_id = %correlation_id,
189            "Sync complete: {}/{} episodes synced, {} errors",
190            stats.episodes_synced, total, stats.errors
191        );
192
193        Ok(stats)
194    }
195
196    /// Start a periodic background sync task
197    ///
198    /// Spawns a background task that syncs recent episodes at the specified interval.
199    /// The task will continue running until the returned `JoinHandle` is dropped or aborted.
200    ///
201    /// # Arguments
202    ///
203    /// * `interval` - How often to run the sync
204    ///
205    /// # Returns
206    ///
207    /// `JoinHandle` that can be used to cancel the background task
208    ///
209    /// # Example
210    ///
211    /// ```ignore
212    /// use std::time::Duration;
213    /// use std::sync::Arc;
214    ///
215    /// let sync = Arc::new(StorageSynchronizer::new(turso, redb));
216    /// let handle = sync.start_periodic_sync(Duration::from_secs(300));
217    ///
218    /// // Later, to stop the sync:
219    /// handle.abort();
220    /// ```
221    pub fn start_periodic_sync(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
222        info!("Starting periodic sync with interval {:?}", interval);
223
224        tokio::spawn(async move {
225            let mut interval_timer = tokio::time::interval(interval);
226            loop {
227                interval_timer.tick().await;
228
229                let since = Utc::now() - chrono::Duration::hours(1);
230                let correlation_id = Uuid::new_v4();
231
232                match self.sync_all_recent_episodes(since).await {
233                    Ok(stats) => {
234                        debug!(
235                            correlation_id = %correlation_id,
236                            "Periodic sync successful: {} episodes synced",
237                            stats.episodes_synced
238                        );
239                    }
240                    Err(e) => {
241                        error!(correlation_id = %correlation_id, "Periodic sync failed: {}", e);
242                    }
243                }
244            }
245        })
246    }
247}