llm_memory_graph/
error.rs

1//! Error types for LLM-Memory-Graph operations
2
3/// Result type alias for LLM-Memory-Graph operations
4pub type Result<T> = std::result::Result<T, Error>;
5
6/// Error types that can occur during graph operations
7#[derive(Debug, thiserror::Error)]
8pub enum Error {
9    /// Storage backend error
10    #[error("Storage error: {0}")]
11    Storage(String),
12
13    /// Serialization/deserialization error
14    #[error("Serialization error: {0}")]
15    Serialization(String),
16
17    /// Node not found error
18    #[error("Node not found: {0}")]
19    NodeNotFound(String),
20
21    /// Session not found error
22    #[error("Session not found: {0}")]
23    SessionNotFound(String),
24
25    /// Invalid node type error
26    #[error("Invalid node type: expected {expected}, got {actual}")]
27    InvalidNodeType {
28        /// Expected node type
29        expected: String,
30        /// Actual node type encountered
31        actual: String,
32    },
33
34    /// Schema validation error
35    #[error("Schema validation error: {0}")]
36    ValidationError(String),
37
38    /// Graph traversal error
39    #[error("Graph traversal error: {0}")]
40    TraversalError(String),
41
42    /// Configuration error
43    #[error("Configuration error: {0}")]
44    ConfigError(String),
45
46    /// I/O error
47    #[error("I/O error: {0}")]
48    Io(#[from] std::io::Error),
49
50    /// Async runtime error
51    #[error("Runtime error: {0}")]
52    RuntimeError(String),
53
54    /// Async operation timeout
55    #[error("Operation timed out after {0}ms")]
56    Timeout(u64),
57
58    /// Concurrent modification conflict
59    #[error("Concurrent modification detected: {0}")]
60    ConcurrentModification(String),
61
62    /// Connection pool exhausted
63    #[error("Connection pool exhausted")]
64    PoolExhausted,
65
66    /// Metrics error
67    #[error("Metrics error: {0}")]
68    Metrics(String),
69
70    /// Generic error
71    #[error("{0}")]
72    Other(String),
73}
74
75impl From<sled::Error> for Error {
76    fn from(err: sled::Error) -> Self {
77        Error::Storage(err.to_string())
78    }
79}
80
81impl From<serde_json::Error> for Error {
82    fn from(err: serde_json::Error) -> Self {
83        Error::Serialization(err.to_string())
84    }
85}
86
87impl From<rmp_serde::encode::Error> for Error {
88    fn from(err: rmp_serde::encode::Error) -> Self {
89        Error::Serialization(err.to_string())
90    }
91}
92
93impl From<rmp_serde::decode::Error> for Error {
94    fn from(err: rmp_serde::decode::Error) -> Self {
95        Error::Serialization(err.to_string())
96    }
97}
98
99impl From<bincode::Error> for Error {
100    fn from(err: bincode::Error) -> Self {
101        Error::Serialization(err.to_string())
102    }
103}
104
105impl From<prometheus::Error> for Error {
106    fn from(err: prometheus::Error) -> Self {
107        Error::Metrics(err.to_string())
108    }
109}
110
111impl Error {
112    /// Create a timeout error with the specified duration in milliseconds
113    pub fn timeout(duration_ms: u64) -> Self {
114        Error::Timeout(duration_ms)
115    }
116
117    /// Create a concurrent modification error with context
118    pub fn concurrent_modification<S: Into<String>>(context: S) -> Self {
119        Error::ConcurrentModification(context.into())
120    }
121
122    /// Create a pool exhausted error
123    pub fn pool_exhausted() -> Self {
124        Error::PoolExhausted
125    }
126
127    /// Check if this error is a timeout
128    pub fn is_timeout(&self) -> bool {
129        matches!(self, Error::Timeout(_))
130    }
131
132    /// Check if this error is a concurrent modification
133    pub fn is_concurrent_modification(&self) -> bool {
134        matches!(self, Error::ConcurrentModification(_))
135    }
136
137    /// Check if this error is pool exhausted
138    pub fn is_pool_exhausted(&self) -> bool {
139        matches!(self, Error::PoolExhausted)
140    }
141
142    /// Check if this error is retryable
143    pub fn is_retryable(&self) -> bool {
144        matches!(
145            self,
146            Error::Timeout(_) | Error::PoolExhausted | Error::ConcurrentModification(_)
147        )
148    }
149}
150
151/// Extension trait for adding context to Results in async operations
152pub trait ResultExt<T> {
153    /// Add context to an error
154    fn context<S: Into<String>>(self, context: S) -> Result<T>;
155
156    /// Add context to an error using a closure (lazy evaluation)
157    fn with_context<F, S>(self, f: F) -> Result<T>
158    where
159        F: FnOnce() -> S,
160        S: Into<String>;
161}
162
163impl<T> ResultExt<T> for Result<T> {
164    fn context<S: Into<String>>(self, context: S) -> Result<T> {
165        self.map_err(|e| Error::Other(format!("{}: {}", context.into(), e)))
166    }
167
168    fn with_context<F, S>(self, f: F) -> Result<T>
169    where
170        F: FnOnce() -> S,
171        S: Into<String>,
172    {
173        self.map_err(|e| Error::Other(format!("{}: {}", f().into(), e)))
174    }
175}
176
177/// Async timeout utilities
178pub mod timeout {
179    use super::{Error, Result};
180    use std::future::Future;
181    use std::time::Duration;
182
183    /// Execute an async operation with a timeout
184    ///
185    /// Returns `Error::Timeout` if the operation doesn't complete within the specified duration.
186    ///
187    /// # Examples
188    ///
189    /// ```no_run
190    /// use llm_memory_graph::error::timeout::with_timeout;
191    /// use std::time::Duration;
192    ///
193    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
194    /// let result = with_timeout(
195    ///     Duration::from_secs(5),
196    ///     async {
197    ///         // Your async operation here
198    ///         Ok::<_, llm_memory_graph::error::Error>(42)
199    ///     }
200    /// ).await?;
201    /// # Ok(())
202    /// # }
203    /// ```
204    pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T>
205    where
206        F: Future<Output = Result<T>>,
207    {
208        match tokio::time::timeout(duration, future).await {
209            Ok(result) => result,
210            Err(_) => Err(Error::timeout(duration.as_millis() as u64)),
211        }
212    }
213
214    /// Execute an async operation with a timeout in milliseconds
215    pub async fn with_timeout_ms<F, T>(timeout_ms: u64, future: F) -> Result<T>
216    where
217        F: Future<Output = Result<T>>,
218    {
219        with_timeout(Duration::from_millis(timeout_ms), future).await
220    }
221
222    /// Execute an async operation with a timeout in seconds
223    pub async fn with_timeout_secs<F, T>(timeout_secs: u64, future: F) -> Result<T>
224    where
225        F: Future<Output = Result<T>>,
226    {
227        with_timeout(Duration::from_secs(timeout_secs), future).await
228    }
229}
230
231/// Retry utilities for handling transient async errors
232pub mod retry {
233    use super::Result;
234    use std::future::Future;
235    use std::time::Duration;
236
237    /// Retry configuration
238    #[derive(Debug, Clone)]
239    pub struct RetryConfig {
240        /// Maximum number of retry attempts
241        pub max_attempts: usize,
242        /// Initial delay between retries
243        pub initial_delay: Duration,
244        /// Maximum delay between retries
245        pub max_delay: Duration,
246        /// Backoff multiplier (exponential backoff)
247        pub backoff_multiplier: f64,
248    }
249
250    impl Default for RetryConfig {
251        fn default() -> Self {
252            Self {
253                max_attempts: 3,
254                initial_delay: Duration::from_millis(100),
255                max_delay: Duration::from_secs(5),
256                backoff_multiplier: 2.0,
257            }
258        }
259    }
260
261    impl RetryConfig {
262        /// Create a new retry configuration
263        pub fn new() -> Self {
264            Self::default()
265        }
266
267        /// Set maximum number of retry attempts
268        pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
269            self.max_attempts = max_attempts;
270            self
271        }
272
273        /// Set initial delay between retries
274        pub fn with_initial_delay(mut self, delay: Duration) -> Self {
275            self.initial_delay = delay;
276            self
277        }
278
279        /// Set maximum delay between retries
280        pub fn with_max_delay(mut self, delay: Duration) -> Self {
281            self.max_delay = delay;
282            self
283        }
284
285        /// Set backoff multiplier
286        pub fn with_backoff_multiplier(mut self, multiplier: f64) -> Self {
287            self.backoff_multiplier = multiplier;
288            self
289        }
290    }
291
292    /// Execute an async operation with retry logic
293    ///
294    /// Retries the operation if it returns a retryable error (timeout, pool exhausted, or concurrent modification).
295    ///
296    /// # Examples
297    ///
298    /// ```no_run
299    /// use llm_memory_graph::error::retry::{with_retry, RetryConfig};
300    ///
301    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
302    /// let config = RetryConfig::new().with_max_attempts(5);
303    /// let result = with_retry(config, || async {
304    ///     // Your async operation here
305    ///     Ok::<_, llm_memory_graph::error::Error>(42)
306    /// }).await?;
307    /// # Ok(())
308    /// # }
309    /// ```
310    pub async fn with_retry<F, Fut, T>(config: RetryConfig, mut operation: F) -> Result<T>
311    where
312        F: FnMut() -> Fut,
313        Fut: Future<Output = Result<T>>,
314    {
315        let mut attempt = 0;
316        let mut delay = config.initial_delay;
317
318        loop {
319            attempt += 1;
320            match operation().await {
321                Ok(value) => return Ok(value),
322                Err(err) => {
323                    // Only retry if the error is retryable and we haven't exceeded max attempts
324                    if !err.is_retryable() || attempt >= config.max_attempts {
325                        return Err(err);
326                    }
327
328                    // Wait before retrying with exponential backoff
329                    tokio::time::sleep(delay).await;
330
331                    // Calculate next delay with exponential backoff
332                    delay = std::cmp::min(
333                        Duration::from_millis(
334                            (delay.as_millis() as f64 * config.backoff_multiplier) as u64,
335                        ),
336                        config.max_delay,
337                    );
338                }
339            }
340        }
341    }
342
343    /// Execute an async operation with retry using default configuration
344    pub async fn with_default_retry<F, Fut, T>(operation: F) -> Result<T>
345    where
346        F: FnMut() -> Fut,
347        Fut: Future<Output = Result<T>>,
348    {
349        with_retry(RetryConfig::default(), operation).await
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::retry::RetryConfig;
356    use super::*;
357    use std::sync::atomic::{AtomicUsize, Ordering};
358    use std::sync::Arc;
359
360    #[test]
361    fn test_timeout_error_creation() {
362        let err = Error::timeout(5000);
363        assert!(err.is_timeout());
364        assert!(err.is_retryable());
365        assert_eq!(format!("{}", err), "Operation timed out after 5000ms");
366    }
367
368    #[test]
369    fn test_concurrent_modification_error() {
370        let err = Error::concurrent_modification("node was modified by another operation");
371        assert!(err.is_concurrent_modification());
372        assert!(err.is_retryable());
373        assert_eq!(
374            format!("{}", err),
375            "Concurrent modification detected: node was modified by another operation"
376        );
377    }
378
379    #[test]
380    fn test_pool_exhausted_error() {
381        let err = Error::pool_exhausted();
382        assert!(err.is_pool_exhausted());
383        assert!(err.is_retryable());
384        assert_eq!(format!("{}", err), "Connection pool exhausted");
385    }
386
387    #[test]
388    fn test_error_type_checks() {
389        let timeout = Error::timeout(1000);
390        let concurrent = Error::concurrent_modification("test");
391        let pool = Error::pool_exhausted();
392        let storage = Error::Storage("test".to_string());
393
394        assert!(timeout.is_timeout());
395        assert!(!timeout.is_concurrent_modification());
396        assert!(!timeout.is_pool_exhausted());
397
398        assert!(!concurrent.is_timeout());
399        assert!(concurrent.is_concurrent_modification());
400        assert!(!concurrent.is_pool_exhausted());
401
402        assert!(!pool.is_timeout());
403        assert!(!pool.is_concurrent_modification());
404        assert!(pool.is_pool_exhausted());
405
406        assert!(!storage.is_timeout());
407        assert!(!storage.is_concurrent_modification());
408        assert!(!storage.is_pool_exhausted());
409    }
410
411    #[test]
412    fn test_retryable_errors() {
413        assert!(Error::timeout(1000).is_retryable());
414        assert!(Error::concurrent_modification("test").is_retryable());
415        assert!(Error::pool_exhausted().is_retryable());
416
417        assert!(!Error::Storage("test".to_string()).is_retryable());
418        assert!(!Error::NodeNotFound("test".to_string()).is_retryable());
419        assert!(!Error::Serialization("test".to_string()).is_retryable());
420    }
421
422    #[test]
423    fn test_result_context() {
424        let result: Result<i32> = Err(Error::Storage("disk full".to_string()));
425        let with_context = result.context("Failed to save node");
426
427        assert!(with_context.is_err());
428        let err = with_context.unwrap_err();
429        assert!(format!("{}", err).contains("Failed to save node"));
430        assert!(format!("{}", err).contains("disk full"));
431    }
432
433    #[test]
434    fn test_result_with_context_lazy() {
435        let result: Result<i32> = Err(Error::Storage("connection lost".to_string()));
436        let with_context = result.with_context(|| format!("Operation failed at {}", 42));
437
438        assert!(with_context.is_err());
439        let err = with_context.unwrap_err();
440        assert!(format!("{}", err).contains("Operation failed at 42"));
441        assert!(format!("{}", err).contains("connection lost"));
442    }
443
444    #[tokio::test]
445    async fn test_timeout_with_fast_operation() {
446        use super::timeout::with_timeout_ms;
447
448        let result = with_timeout_ms(1000, async {
449            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
450            Ok::<_, Error>(42)
451        })
452        .await;
453
454        assert!(result.is_ok());
455        assert_eq!(result.unwrap(), 42);
456    }
457
458    #[tokio::test]
459    async fn test_timeout_with_slow_operation() {
460        use super::timeout::with_timeout_ms;
461
462        let result = with_timeout_ms(100, async {
463            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
464            Ok::<_, Error>(42)
465        })
466        .await;
467
468        assert!(result.is_err());
469        let err = result.unwrap_err();
470        assert!(err.is_timeout());
471    }
472
473    #[tokio::test]
474    async fn test_timeout_with_error() {
475        use super::timeout::with_timeout_ms;
476
477        let result = with_timeout_ms(1000, async {
478            Err::<i32, Error>(Error::Storage("test error".to_string()))
479        })
480        .await;
481
482        assert!(result.is_err());
483        let err = result.unwrap_err();
484        assert!(!err.is_timeout());
485        assert!(matches!(err, Error::Storage(_)));
486    }
487
488    #[tokio::test]
489    async fn test_timeout_secs() {
490        use super::timeout::with_timeout_secs;
491
492        let result = with_timeout_secs(1, async {
493            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
494            Ok::<_, Error>(100)
495        })
496        .await;
497
498        assert!(result.is_ok());
499        assert_eq!(result.unwrap(), 100);
500    }
501
502    #[tokio::test]
503    async fn test_retry_success_on_first_attempt() {
504        use super::retry::{with_retry, RetryConfig};
505
506        let config = RetryConfig::new().with_max_attempts(3);
507        let result = with_retry(config, || async { Ok::<_, Error>(42) }).await;
508
509        assert!(result.is_ok());
510        assert_eq!(result.unwrap(), 42);
511    }
512
513    #[tokio::test]
514    async fn test_retry_success_after_failures() {
515        use super::retry::{with_retry, RetryConfig};
516
517        let attempt_count = Arc::new(AtomicUsize::new(0));
518        let attempt_count_clone = Arc::clone(&attempt_count);
519
520        let config = RetryConfig::new()
521            .with_max_attempts(5)
522            .with_initial_delay(std::time::Duration::from_millis(10));
523
524        let result = with_retry(config, || {
525            let count = Arc::clone(&attempt_count_clone);
526            async move {
527                let attempt = count.fetch_add(1, Ordering::SeqCst);
528                if attempt < 2 {
529                    Err(Error::timeout(100))
530                } else {
531                    Ok(42)
532                }
533            }
534        })
535        .await;
536
537        assert!(result.is_ok());
538        assert_eq!(result.unwrap(), 42);
539        assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
540    }
541
542    #[tokio::test]
543    async fn test_retry_fails_after_max_attempts() {
544        use super::retry::{with_retry, RetryConfig};
545
546        let attempt_count = Arc::new(AtomicUsize::new(0));
547        let attempt_count_clone = Arc::clone(&attempt_count);
548
549        let config = RetryConfig::new()
550            .with_max_attempts(3)
551            .with_initial_delay(std::time::Duration::from_millis(10));
552
553        let result = with_retry(config, || {
554            let count = Arc::clone(&attempt_count_clone);
555            async move {
556                count.fetch_add(1, Ordering::SeqCst);
557                Err::<i32, Error>(Error::pool_exhausted())
558            }
559        })
560        .await;
561
562        assert!(result.is_err());
563        assert!(result.unwrap_err().is_pool_exhausted());
564        assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
565    }
566
567    #[tokio::test]
568    async fn test_retry_no_retry_for_non_retryable_error() {
569        use super::retry::{with_retry, RetryConfig};
570
571        let attempt_count = Arc::new(AtomicUsize::new(0));
572        let attempt_count_clone = Arc::clone(&attempt_count);
573
574        let config = RetryConfig::new().with_max_attempts(5);
575
576        let result = with_retry(config, || {
577            let count = Arc::clone(&attempt_count_clone);
578            async move {
579                count.fetch_add(1, Ordering::SeqCst);
580                Err::<i32, Error>(Error::Storage("permanent error".to_string()))
581            }
582        })
583        .await;
584
585        assert!(result.is_err());
586        // Should only try once since storage errors are not retryable
587        assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
588    }
589
590    #[tokio::test]
591    async fn test_retry_with_concurrent_modification() {
592        use super::retry::{with_retry, RetryConfig};
593
594        let attempt_count = Arc::new(AtomicUsize::new(0));
595        let attempt_count_clone = Arc::clone(&attempt_count);
596
597        let config = RetryConfig::new()
598            .with_max_attempts(4)
599            .with_initial_delay(std::time::Duration::from_millis(10));
600
601        let result = with_retry(config, || {
602            let count = Arc::clone(&attempt_count_clone);
603            async move {
604                let attempt = count.fetch_add(1, Ordering::SeqCst);
605                if attempt < 2 {
606                    Err(Error::concurrent_modification("version mismatch"))
607                } else {
608                    Ok(100)
609                }
610            }
611        })
612        .await;
613
614        assert!(result.is_ok());
615        assert_eq!(result.unwrap(), 100);
616        assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
617    }
618
619    #[tokio::test]
620    async fn test_retry_exponential_backoff() {
621        use super::retry::{with_retry, RetryConfig};
622        use std::time::Instant;
623
624        let attempt_count = Arc::new(AtomicUsize::new(0));
625        let attempt_count_clone = Arc::clone(&attempt_count);
626
627        let config = RetryConfig::new()
628            .with_max_attempts(3)
629            .with_initial_delay(std::time::Duration::from_millis(50))
630            .with_backoff_multiplier(2.0);
631
632        let start = Instant::now();
633        let result = with_retry(config, || {
634            let count = Arc::clone(&attempt_count_clone);
635            async move {
636                count.fetch_add(1, Ordering::SeqCst);
637                Err::<i32, Error>(Error::timeout(100))
638            }
639        })
640        .await;
641
642        let elapsed = start.elapsed();
643
644        assert!(result.is_err());
645        assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
646        // Should take at least 50ms + 100ms = 150ms (first delay + second delay)
647        assert!(elapsed.as_millis() >= 150);
648    }
649
650    #[tokio::test]
651    async fn test_default_retry() {
652        use super::retry::with_default_retry;
653
654        let attempt_count = Arc::new(AtomicUsize::new(0));
655        let attempt_count_clone = Arc::clone(&attempt_count);
656
657        let result = with_default_retry(|| {
658            let count = Arc::clone(&attempt_count_clone);
659            async move {
660                let attempt = count.fetch_add(1, Ordering::SeqCst);
661                if attempt < 2 {
662                    Err(Error::pool_exhausted())
663                } else {
664                    Ok(42)
665                }
666            }
667        })
668        .await;
669
670        assert!(result.is_ok());
671        assert_eq!(result.unwrap(), 42);
672    }
673
674    #[test]
675    fn test_retry_config_builder() {
676        let config = RetryConfig::new()
677            .with_max_attempts(10)
678            .with_initial_delay(std::time::Duration::from_millis(200))
679            .with_max_delay(std::time::Duration::from_secs(30))
680            .with_backoff_multiplier(3.0);
681
682        assert_eq!(config.max_attempts, 10);
683        assert_eq!(config.initial_delay, std::time::Duration::from_millis(200));
684        assert_eq!(config.max_delay, std::time::Duration::from_secs(30));
685        assert_eq!(config.backoff_multiplier, 3.0);
686    }
687}