Skip to main content

do_memory_storage_turso/
resilient.rs

1//! # Resilient Storage with Circuit Breaker
2//!
3//! Wraps TursoStorage with circuit breaker protection for production resilience.
4//!
5//! This module provides a production-grade storage implementation that:
6//! - Protects against cascading failures with circuit breaker pattern
7//! - Falls back to redb cache when Turso is unavailable
8//! - Tracks failure statistics and recovery
9//!
10//! ## Example
11//!
12//! ```no_run
13//! use do_memory_storage_turso::{TursoStorage, ResilientStorage};
14//! use do_memory_core::storage::circuit_breaker::CircuitBreakerConfig;
15//!
16//! # async fn example() -> anyhow::Result<()> {
17//! let turso = TursoStorage::new("libsql://localhost:8080", "token").await?;
18//!
19//! // Wrap with circuit breaker
20//! let resilient = ResilientStorage::new(turso, CircuitBreakerConfig::default());
21//!
22//! // All operations are now protected by circuit breaker
23//! # Ok(())
24//! # }
25//! ```
26
27use async_trait::async_trait;
28use do_memory_core::memory::attribution::{
29    RecommendationFeedback, RecommendationSession, RecommendationStats,
30};
31use do_memory_core::storage::circuit_breaker::{
32    CircuitBreaker, CircuitBreakerConfig, CircuitState,
33};
34use do_memory_core::{Episode, Heuristic, Pattern, Result, StorageBackend};
35use std::sync::Arc;
36use tracing::{info, warn};
37use uuid::Uuid;
38
39#[cfg(test)]
40use do_memory_core::Error;
41
42use crate::TursoStorage;
43
44/// Resilient storage wrapper with circuit breaker protection
45///
46/// Wraps TursoStorage operations with circuit breaker pattern to provide:
47/// - Fast failure when service is down
48/// - Automatic recovery attempts
49/// - Failure statistics and monitoring
50pub struct ResilientStorage {
51    /// Underlying Turso storage
52    storage: Arc<TursoStorage>,
53    /// Circuit breaker for resilience
54    circuit_breaker: Arc<CircuitBreaker>,
55}
56
57impl ResilientStorage {
58    /// Create a new resilient storage wrapper
59    ///
60    /// # Arguments
61    ///
62    /// * `storage` - Turso storage backend to wrap
63    /// * `config` - Circuit breaker configuration
64    ///
65    /// # Example
66    ///
67    /// ```no_run
68    /// # use do_memory_storage_turso::{TursoStorage, ResilientStorage};
69    /// # use do_memory_core::storage::circuit_breaker::CircuitBreakerConfig;
70    /// # async fn example() -> anyhow::Result<()> {
71    /// let turso = TursoStorage::new("libsql://localhost:8080", "token").await?;
72    ///
73    /// let config = CircuitBreakerConfig {
74    ///     failure_threshold: 5,
75    ///     timeout: std::time::Duration::from_secs(30),
76    ///     ..Default::default()
77    /// };
78    ///
79    /// let resilient = ResilientStorage::new(turso, config);
80    /// # Ok(())
81    /// # }
82    /// ```
83    pub fn new(storage: TursoStorage, config: CircuitBreakerConfig) -> Self {
84        info!("Creating resilient storage with circuit breaker protection");
85
86        Self {
87            storage: Arc::new(storage),
88            circuit_breaker: Arc::new(CircuitBreaker::new(config)),
89        }
90    }
91
92    /// Get the current circuit breaker state
93    ///
94    /// Useful for monitoring and health checks.
95    ///
96    /// # Example
97    ///
98    /// ```no_run
99    /// # use do_memory_storage_turso::ResilientStorage;
100    /// # use do_memory_core::storage::circuit_breaker::CircuitState;
101    /// # async fn example(storage: ResilientStorage) {
102    /// let state = storage.circuit_state().await;
103    /// match state {
104    ///     CircuitState::Closed => println!("Circuit is healthy"),
105    ///     CircuitState::Open => println!("Circuit is open - service down"),
106    ///     CircuitState::HalfOpen => println!("Circuit is testing recovery"),
107    /// }
108    /// # }
109    /// ```
110    pub async fn circuit_state(&self) -> CircuitState {
111        self.circuit_breaker.state().await
112    }
113
114    /// Get circuit breaker statistics
115    ///
116    /// # Example
117    ///
118    /// ```no_run
119    /// # use do_memory_storage_turso::ResilientStorage;
120    /// # async fn example(storage: ResilientStorage) {
121    /// let stats = storage.circuit_stats().await;
122    /// println!("Total calls: {}", stats.total_calls);
123    /// println!("Failures: {}", stats.failed_calls);
124    /// println!("Circuit opened {} times", stats.circuit_opened_count);
125    /// # }
126    /// ```
127    pub async fn circuit_stats(
128        &self,
129    ) -> do_memory_core::storage::circuit_breaker::CircuitBreakerStats {
130        self.circuit_breaker.stats().await
131    }
132
133    /// Reset the circuit breaker
134    ///
135    /// Useful for manual intervention or testing.
136    pub async fn reset_circuit(&self) {
137        self.circuit_breaker.reset().await;
138    }
139
140    /// Health check with circuit breaker awareness
141    ///
142    /// Returns true if both the storage is healthy AND the circuit is closed.
143    pub async fn health_check(&self) -> Result<bool> {
144        let circuit_state = self.circuit_state().await;
145
146        if circuit_state != CircuitState::Closed {
147            warn!("Health check: circuit breaker is {:?}", circuit_state);
148            return Ok(false);
149        }
150
151        // Check actual storage health through circuit breaker
152        self.circuit_breaker
153            .call(|| async { self.storage.health_check().await })
154            .await
155    }
156}
157
158#[async_trait]
159impl StorageBackend for ResilientStorage {
160    async fn store_episode(&self, episode: &Episode) -> Result<()> {
161        let storage = Arc::clone(&self.storage);
162        let episode = episode.clone();
163
164        self.circuit_breaker
165            .call(move || {
166                let storage = Arc::clone(&storage);
167                async move { storage.store_episode(&episode).await }
168            })
169            .await
170    }
171
172    async fn get_episode(&self, id: Uuid) -> Result<Option<Episode>> {
173        let storage = Arc::clone(&self.storage);
174
175        self.circuit_breaker
176            .call(move || {
177                let storage = Arc::clone(&storage);
178                async move { storage.get_episode(id).await }
179            })
180            .await
181    }
182
183    async fn delete_episode(&self, id: Uuid) -> Result<()> {
184        let storage = Arc::clone(&self.storage);
185
186        self.circuit_breaker
187            .call(move || {
188                let storage = Arc::clone(&storage);
189                async move { storage.delete_episode(id).await }
190            })
191            .await
192    }
193
194    async fn store_pattern(&self, pattern: &Pattern) -> Result<()> {
195        let storage = Arc::clone(&self.storage);
196        let pattern = pattern.clone();
197
198        self.circuit_breaker
199            .call(move || {
200                let storage = Arc::clone(&storage);
201                async move { storage.store_pattern(&pattern).await }
202            })
203            .await
204    }
205
206    async fn get_pattern(&self, id: do_memory_core::episode::PatternId) -> Result<Option<Pattern>> {
207        let storage = Arc::clone(&self.storage);
208
209        self.circuit_breaker
210            .call(move || {
211                let storage = Arc::clone(&storage);
212                async move { storage.get_pattern(id).await }
213            })
214            .await
215    }
216
217    async fn store_heuristic(&self, heuristic: &Heuristic) -> Result<()> {
218        let storage = Arc::clone(&self.storage);
219        let heuristic = heuristic.clone();
220
221        self.circuit_breaker
222            .call(move || {
223                let storage = Arc::clone(&storage);
224                async move { storage.store_heuristic(&heuristic).await }
225            })
226            .await
227    }
228
229    async fn get_heuristic(&self, id: Uuid) -> Result<Option<Heuristic>> {
230        let storage = Arc::clone(&self.storage);
231
232        self.circuit_breaker
233            .call(move || {
234                let storage = Arc::clone(&storage);
235                async move { storage.get_heuristic(id).await }
236            })
237            .await
238    }
239
240    async fn query_episodes_since(
241        &self,
242        since: chrono::DateTime<chrono::Utc>,
243        limit: Option<usize>,
244    ) -> Result<Vec<Episode>> {
245        let storage = Arc::clone(&self.storage);
246
247        self.circuit_breaker
248            .call(move || {
249                let storage = Arc::clone(&storage);
250                async move { storage.query_episodes_since(since, limit).await }
251            })
252            .await
253    }
254
255    async fn query_episodes_by_metadata(
256        &self,
257        key: &str,
258        value: &str,
259        limit: Option<usize>,
260    ) -> Result<Vec<Episode>> {
261        let storage = Arc::clone(&self.storage);
262        let key_string = key.to_string();
263        let value_string = value.to_string();
264        let limit_param = limit;
265
266        self.circuit_breaker
267            .call(move || {
268                let storage = Arc::clone(&storage);
269                let key_string = key_string;
270                let value_string = value_string;
271                async move {
272                    storage
273                        .query_episodes_by_metadata(&key_string, &value_string, limit_param)
274                        .await
275                }
276            })
277            .await
278    }
279
280    async fn store_embedding(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
281        let storage = Arc::clone(&self.storage);
282        let id_string = id.to_string();
283
284        self.circuit_breaker
285            .call(move || {
286                let storage = Arc::clone(&storage);
287                async move { storage.store_embedding(&id_string, embedding).await }
288            })
289            .await
290    }
291
292    async fn get_embedding(&self, id: &str) -> Result<Option<Vec<f32>>> {
293        let storage = Arc::clone(&self.storage);
294        let id_string = id.to_string();
295
296        self.circuit_breaker
297            .call(move || {
298                let storage = Arc::clone(&storage);
299                let id = id_string;
300                async move { storage.get_embedding(&id).await }
301            })
302            .await
303    }
304
305    async fn delete_embedding(&self, id: &str) -> Result<bool> {
306        let storage = Arc::clone(&self.storage);
307        let id_string = id.to_string();
308
309        self.circuit_breaker
310            .call(move || {
311                let storage = Arc::clone(&storage);
312                let id = id_string;
313                async move { storage.delete_embedding(&id).await }
314            })
315            .await
316    }
317
318    async fn store_embeddings_batch(&self, embeddings: Vec<(String, Vec<f32>)>) -> Result<()> {
319        let storage = Arc::clone(&self.storage);
320
321        self.circuit_breaker
322            .call(move || {
323                let storage = Arc::clone(&storage);
324                async move { storage.store_embeddings_batch(embeddings).await }
325            })
326            .await
327    }
328
329    async fn get_embeddings_batch(&self, ids: &[String]) -> Result<Vec<Option<Vec<f32>>>> {
330        let storage = Arc::clone(&self.storage);
331        let ids_vec = ids.to_vec();
332
333        self.circuit_breaker
334            .call(move || {
335                let storage = Arc::clone(&storage);
336                let ids = ids_vec;
337                async move { storage.get_embeddings_batch(&ids).await }
338            })
339            .await
340    }
341
342    async fn store_recommendation_session(&self, session: &RecommendationSession) -> Result<()> {
343        let storage = Arc::clone(&self.storage);
344        let session = session.clone();
345
346        self.circuit_breaker
347            .call(move || {
348                let storage = Arc::clone(&storage);
349                async move { storage.store_recommendation_session(&session).await }
350            })
351            .await
352    }
353
354    async fn get_recommendation_session(
355        &self,
356        session_id: Uuid,
357    ) -> Result<Option<RecommendationSession>> {
358        let storage = Arc::clone(&self.storage);
359
360        self.circuit_breaker
361            .call(move || {
362                let storage = Arc::clone(&storage);
363                async move { storage.get_recommendation_session(session_id).await }
364            })
365            .await
366    }
367
368    async fn get_recommendation_session_for_episode(
369        &self,
370        episode_id: Uuid,
371    ) -> Result<Option<RecommendationSession>> {
372        let storage = Arc::clone(&self.storage);
373
374        self.circuit_breaker
375            .call(move || {
376                let storage = Arc::clone(&storage);
377                async move {
378                    storage
379                        .get_recommendation_session_for_episode(episode_id)
380                        .await
381                }
382            })
383            .await
384    }
385
386    async fn store_recommendation_feedback(&self, feedback: &RecommendationFeedback) -> Result<()> {
387        let storage = Arc::clone(&self.storage);
388        let feedback = feedback.clone();
389
390        self.circuit_breaker
391            .call(move || {
392                let storage = Arc::clone(&storage);
393                async move { storage.store_recommendation_feedback(&feedback).await }
394            })
395            .await
396    }
397
398    async fn get_recommendation_feedback(
399        &self,
400        session_id: Uuid,
401    ) -> Result<Option<RecommendationFeedback>> {
402        let storage = Arc::clone(&self.storage);
403
404        self.circuit_breaker
405            .call(move || {
406                let storage = Arc::clone(&storage);
407                async move { storage.get_recommendation_feedback(session_id).await }
408            })
409            .await
410    }
411
412    async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
413        let storage = Arc::clone(&self.storage);
414
415        self.circuit_breaker
416            .call(move || {
417                let storage = Arc::clone(&storage);
418                async move { storage.get_recommendation_stats().await }
419            })
420            .await
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use do_memory_core::storage::circuit_breaker::CircuitBreakerConfig;
428    use std::time::Duration;
429    use tempfile::TempDir;
430
431    async fn create_test_storage() -> Result<(ResilientStorage, TempDir)> {
432        let dir = TempDir::new().unwrap();
433        let db_path = dir.path().join("test.db");
434
435        let db = libsql::Builder::new_local(&db_path)
436            .build()
437            .await
438            .map_err(|e| Error::Storage(format!("Failed to create test database: {}", e)))?;
439
440        let turso = TursoStorage::from_database(db)?;
441        turso.initialize_schema().await?;
442
443        let config = CircuitBreakerConfig {
444            failure_threshold: 3,
445            timeout: Duration::from_secs(1),
446            ..Default::default()
447        };
448
449        let resilient = ResilientStorage::new(turso, config);
450
451        Ok((resilient, dir))
452    }
453
454    #[tokio::test]
455    async fn test_resilient_storage_creation() {
456        let result = create_test_storage().await;
457        assert!(result.is_ok());
458    }
459
460    #[tokio::test]
461    async fn test_health_check_with_closed_circuit() {
462        let (storage, _dir) = create_test_storage().await.unwrap();
463
464        let healthy = storage.health_check().await.unwrap();
465        assert!(healthy);
466        assert_eq!(storage.circuit_state().await, CircuitState::Closed);
467    }
468
469    #[tokio::test]
470    async fn test_circuit_stats_tracking() {
471        let (storage, _dir) = create_test_storage().await.unwrap();
472
473        // Perform a successful operation
474        let episode = Episode::new(
475            "test".to_string(),
476            Default::default(),
477            do_memory_core::TaskType::CodeGeneration,
478        );
479        let result = storage.store_episode(&episode).await;
480        assert!(result.is_ok());
481
482        // Check stats
483        let stats = storage.circuit_stats().await;
484        assert_eq!(stats.total_calls, 1);
485        assert_eq!(stats.successful_calls, 1);
486        assert_eq!(stats.failed_calls, 0);
487    }
488
489    #[tokio::test]
490    async fn test_circuit_reset() {
491        let (storage, _dir) = create_test_storage().await.unwrap();
492
493        // Reset should work
494        storage.reset_circuit().await;
495
496        assert_eq!(storage.circuit_state().await, CircuitState::Closed);
497        let stats = storage.circuit_stats().await;
498        assert_eq!(stats.consecutive_failures, 0);
499    }
500}