do_memory_storage_turso/
resilient.rs1use async_trait::async_trait;
28use do_memory_core::memory::attribution::{
29 RecommendationFeedback, RecommendationSession, RecommendationStats,
30};
31use do_memory_core::storage::circuit_breaker::{
32 CircuitBreaker, CircuitBreakerConfig, CircuitState,
33};
34use do_memory_core::{Episode, Heuristic, Pattern, Result, StorageBackend};
35use std::sync::Arc;
36use tracing::{info, warn};
37use uuid::Uuid;
38
39#[cfg(test)]
40use do_memory_core::Error;
41
42use crate::TursoStorage;
43
44pub struct ResilientStorage {
51 storage: Arc<TursoStorage>,
53 circuit_breaker: Arc<CircuitBreaker>,
55}
56
57impl ResilientStorage {
58 pub fn new(storage: TursoStorage, config: CircuitBreakerConfig) -> Self {
84 info!("Creating resilient storage with circuit breaker protection");
85
86 Self {
87 storage: Arc::new(storage),
88 circuit_breaker: Arc::new(CircuitBreaker::new(config)),
89 }
90 }
91
92 pub async fn circuit_state(&self) -> CircuitState {
111 self.circuit_breaker.state().await
112 }
113
114 pub async fn circuit_stats(
128 &self,
129 ) -> do_memory_core::storage::circuit_breaker::CircuitBreakerStats {
130 self.circuit_breaker.stats().await
131 }
132
133 pub async fn reset_circuit(&self) {
137 self.circuit_breaker.reset().await;
138 }
139
140 pub async fn health_check(&self) -> Result<bool> {
144 let circuit_state = self.circuit_state().await;
145
146 if circuit_state != CircuitState::Closed {
147 warn!("Health check: circuit breaker is {:?}", circuit_state);
148 return Ok(false);
149 }
150
151 self.circuit_breaker
153 .call(|| async { self.storage.health_check().await })
154 .await
155 }
156}
157
158#[async_trait]
159impl StorageBackend for ResilientStorage {
160 async fn store_episode(&self, episode: &Episode) -> Result<()> {
161 let storage = Arc::clone(&self.storage);
162 let episode = episode.clone();
163
164 self.circuit_breaker
165 .call(move || {
166 let storage = Arc::clone(&storage);
167 async move { storage.store_episode(&episode).await }
168 })
169 .await
170 }
171
172 async fn get_episode(&self, id: Uuid) -> Result<Option<Episode>> {
173 let storage = Arc::clone(&self.storage);
174
175 self.circuit_breaker
176 .call(move || {
177 let storage = Arc::clone(&storage);
178 async move { storage.get_episode(id).await }
179 })
180 .await
181 }
182
183 async fn delete_episode(&self, id: Uuid) -> Result<()> {
184 let storage = Arc::clone(&self.storage);
185
186 self.circuit_breaker
187 .call(move || {
188 let storage = Arc::clone(&storage);
189 async move { storage.delete_episode(id).await }
190 })
191 .await
192 }
193
194 async fn store_pattern(&self, pattern: &Pattern) -> Result<()> {
195 let storage = Arc::clone(&self.storage);
196 let pattern = pattern.clone();
197
198 self.circuit_breaker
199 .call(move || {
200 let storage = Arc::clone(&storage);
201 async move { storage.store_pattern(&pattern).await }
202 })
203 .await
204 }
205
206 async fn get_pattern(&self, id: do_memory_core::episode::PatternId) -> Result<Option<Pattern>> {
207 let storage = Arc::clone(&self.storage);
208
209 self.circuit_breaker
210 .call(move || {
211 let storage = Arc::clone(&storage);
212 async move { storage.get_pattern(id).await }
213 })
214 .await
215 }
216
217 async fn store_heuristic(&self, heuristic: &Heuristic) -> Result<()> {
218 let storage = Arc::clone(&self.storage);
219 let heuristic = heuristic.clone();
220
221 self.circuit_breaker
222 .call(move || {
223 let storage = Arc::clone(&storage);
224 async move { storage.store_heuristic(&heuristic).await }
225 })
226 .await
227 }
228
229 async fn get_heuristic(&self, id: Uuid) -> Result<Option<Heuristic>> {
230 let storage = Arc::clone(&self.storage);
231
232 self.circuit_breaker
233 .call(move || {
234 let storage = Arc::clone(&storage);
235 async move { storage.get_heuristic(id).await }
236 })
237 .await
238 }
239
240 async fn query_episodes_since(
241 &self,
242 since: chrono::DateTime<chrono::Utc>,
243 limit: Option<usize>,
244 ) -> Result<Vec<Episode>> {
245 let storage = Arc::clone(&self.storage);
246
247 self.circuit_breaker
248 .call(move || {
249 let storage = Arc::clone(&storage);
250 async move { storage.query_episodes_since(since, limit).await }
251 })
252 .await
253 }
254
255 async fn query_episodes_by_metadata(
256 &self,
257 key: &str,
258 value: &str,
259 limit: Option<usize>,
260 ) -> Result<Vec<Episode>> {
261 let storage = Arc::clone(&self.storage);
262 let key_string = key.to_string();
263 let value_string = value.to_string();
264 let limit_param = limit;
265
266 self.circuit_breaker
267 .call(move || {
268 let storage = Arc::clone(&storage);
269 let key_string = key_string;
270 let value_string = value_string;
271 async move {
272 storage
273 .query_episodes_by_metadata(&key_string, &value_string, limit_param)
274 .await
275 }
276 })
277 .await
278 }
279
280 async fn store_embedding(&self, id: &str, embedding: Vec<f32>) -> Result<()> {
281 let storage = Arc::clone(&self.storage);
282 let id_string = id.to_string();
283
284 self.circuit_breaker
285 .call(move || {
286 let storage = Arc::clone(&storage);
287 async move { storage.store_embedding(&id_string, embedding).await }
288 })
289 .await
290 }
291
292 async fn get_embedding(&self, id: &str) -> Result<Option<Vec<f32>>> {
293 let storage = Arc::clone(&self.storage);
294 let id_string = id.to_string();
295
296 self.circuit_breaker
297 .call(move || {
298 let storage = Arc::clone(&storage);
299 let id = id_string;
300 async move { storage.get_embedding(&id).await }
301 })
302 .await
303 }
304
305 async fn delete_embedding(&self, id: &str) -> Result<bool> {
306 let storage = Arc::clone(&self.storage);
307 let id_string = id.to_string();
308
309 self.circuit_breaker
310 .call(move || {
311 let storage = Arc::clone(&storage);
312 let id = id_string;
313 async move { storage.delete_embedding(&id).await }
314 })
315 .await
316 }
317
318 async fn store_embeddings_batch(&self, embeddings: Vec<(String, Vec<f32>)>) -> Result<()> {
319 let storage = Arc::clone(&self.storage);
320
321 self.circuit_breaker
322 .call(move || {
323 let storage = Arc::clone(&storage);
324 async move { storage.store_embeddings_batch(embeddings).await }
325 })
326 .await
327 }
328
329 async fn get_embeddings_batch(&self, ids: &[String]) -> Result<Vec<Option<Vec<f32>>>> {
330 let storage = Arc::clone(&self.storage);
331 let ids_vec = ids.to_vec();
332
333 self.circuit_breaker
334 .call(move || {
335 let storage = Arc::clone(&storage);
336 let ids = ids_vec;
337 async move { storage.get_embeddings_batch(&ids).await }
338 })
339 .await
340 }
341
342 async fn store_recommendation_session(&self, session: &RecommendationSession) -> Result<()> {
343 let storage = Arc::clone(&self.storage);
344 let session = session.clone();
345
346 self.circuit_breaker
347 .call(move || {
348 let storage = Arc::clone(&storage);
349 async move { storage.store_recommendation_session(&session).await }
350 })
351 .await
352 }
353
354 async fn get_recommendation_session(
355 &self,
356 session_id: Uuid,
357 ) -> Result<Option<RecommendationSession>> {
358 let storage = Arc::clone(&self.storage);
359
360 self.circuit_breaker
361 .call(move || {
362 let storage = Arc::clone(&storage);
363 async move { storage.get_recommendation_session(session_id).await }
364 })
365 .await
366 }
367
368 async fn get_recommendation_session_for_episode(
369 &self,
370 episode_id: Uuid,
371 ) -> Result<Option<RecommendationSession>> {
372 let storage = Arc::clone(&self.storage);
373
374 self.circuit_breaker
375 .call(move || {
376 let storage = Arc::clone(&storage);
377 async move {
378 storage
379 .get_recommendation_session_for_episode(episode_id)
380 .await
381 }
382 })
383 .await
384 }
385
386 async fn store_recommendation_feedback(&self, feedback: &RecommendationFeedback) -> Result<()> {
387 let storage = Arc::clone(&self.storage);
388 let feedback = feedback.clone();
389
390 self.circuit_breaker
391 .call(move || {
392 let storage = Arc::clone(&storage);
393 async move { storage.store_recommendation_feedback(&feedback).await }
394 })
395 .await
396 }
397
398 async fn get_recommendation_feedback(
399 &self,
400 session_id: Uuid,
401 ) -> Result<Option<RecommendationFeedback>> {
402 let storage = Arc::clone(&self.storage);
403
404 self.circuit_breaker
405 .call(move || {
406 let storage = Arc::clone(&storage);
407 async move { storage.get_recommendation_feedback(session_id).await }
408 })
409 .await
410 }
411
412 async fn get_recommendation_stats(&self) -> Result<RecommendationStats> {
413 let storage = Arc::clone(&self.storage);
414
415 self.circuit_breaker
416 .call(move || {
417 let storage = Arc::clone(&storage);
418 async move { storage.get_recommendation_stats().await }
419 })
420 .await
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use do_memory_core::storage::circuit_breaker::CircuitBreakerConfig;
428 use std::time::Duration;
429 use tempfile::TempDir;
430
431 async fn create_test_storage() -> Result<(ResilientStorage, TempDir)> {
432 let dir = TempDir::new().unwrap();
433 let db_path = dir.path().join("test.db");
434
435 let db = libsql::Builder::new_local(&db_path)
436 .build()
437 .await
438 .map_err(|e| Error::Storage(format!("Failed to create test database: {}", e)))?;
439
440 let turso = TursoStorage::from_database(db)?;
441 turso.initialize_schema().await?;
442
443 let config = CircuitBreakerConfig {
444 failure_threshold: 3,
445 timeout: Duration::from_secs(1),
446 ..Default::default()
447 };
448
449 let resilient = ResilientStorage::new(turso, config);
450
451 Ok((resilient, dir))
452 }
453
454 #[tokio::test]
455 async fn test_resilient_storage_creation() {
456 let result = create_test_storage().await;
457 assert!(result.is_ok());
458 }
459
460 #[tokio::test]
461 async fn test_health_check_with_closed_circuit() {
462 let (storage, _dir) = create_test_storage().await.unwrap();
463
464 let healthy = storage.health_check().await.unwrap();
465 assert!(healthy);
466 assert_eq!(storage.circuit_state().await, CircuitState::Closed);
467 }
468
469 #[tokio::test]
470 async fn test_circuit_stats_tracking() {
471 let (storage, _dir) = create_test_storage().await.unwrap();
472
473 let episode = Episode::new(
475 "test".to_string(),
476 Default::default(),
477 do_memory_core::TaskType::CodeGeneration,
478 );
479 let result = storage.store_episode(&episode).await;
480 assert!(result.is_ok());
481
482 let stats = storage.circuit_stats().await;
484 assert_eq!(stats.total_calls, 1);
485 assert_eq!(stats.successful_calls, 1);
486 assert_eq!(stats.failed_calls, 0);
487 }
488
489 #[tokio::test]
490 async fn test_circuit_reset() {
491 let (storage, _dir) = create_test_storage().await.unwrap();
492
493 storage.reset_circuit().await;
495
496 assert_eq!(storage.circuit_state().await, CircuitState::Closed);
497 let stats = storage.circuit_stats().await;
498 assert_eq!(stats.consecutive_failures, 0);
499 }
500}