1pub type Result<T> = std::result::Result<T, Error>;
5
6#[derive(Debug, thiserror::Error)]
8pub enum Error {
9 #[error("Storage error: {0}")]
11 Storage(String),
12
13 #[error("Serialization error: {0}")]
15 Serialization(String),
16
17 #[error("Node not found: {0}")]
19 NodeNotFound(String),
20
21 #[error("Session not found: {0}")]
23 SessionNotFound(String),
24
25 #[error("Invalid node type: expected {expected}, got {actual}")]
27 InvalidNodeType {
28 expected: String,
30 actual: String,
32 },
33
34 #[error("Schema validation error: {0}")]
36 ValidationError(String),
37
38 #[error("Graph traversal error: {0}")]
40 TraversalError(String),
41
42 #[error("Configuration error: {0}")]
44 ConfigError(String),
45
46 #[error("I/O error: {0}")]
48 Io(#[from] std::io::Error),
49
50 #[error("Runtime error: {0}")]
52 RuntimeError(String),
53
54 #[error("Operation timed out after {0}ms")]
56 Timeout(u64),
57
58 #[error("Concurrent modification detected: {0}")]
60 ConcurrentModification(String),
61
62 #[error("Connection pool exhausted")]
64 PoolExhausted,
65
66 #[error("Metrics error: {0}")]
68 Metrics(String),
69
70 #[error("{0}")]
72 Other(String),
73}
74
75impl From<sled::Error> for Error {
76 fn from(err: sled::Error) -> Self {
77 Error::Storage(err.to_string())
78 }
79}
80
81impl From<serde_json::Error> for Error {
82 fn from(err: serde_json::Error) -> Self {
83 Error::Serialization(err.to_string())
84 }
85}
86
87impl From<rmp_serde::encode::Error> for Error {
88 fn from(err: rmp_serde::encode::Error) -> Self {
89 Error::Serialization(err.to_string())
90 }
91}
92
93impl From<rmp_serde::decode::Error> for Error {
94 fn from(err: rmp_serde::decode::Error) -> Self {
95 Error::Serialization(err.to_string())
96 }
97}
98
99impl From<bincode::Error> for Error {
100 fn from(err: bincode::Error) -> Self {
101 Error::Serialization(err.to_string())
102 }
103}
104
105impl From<prometheus::Error> for Error {
106 fn from(err: prometheus::Error) -> Self {
107 Error::Metrics(err.to_string())
108 }
109}
110
111impl Error {
112 pub fn timeout(duration_ms: u64) -> Self {
114 Error::Timeout(duration_ms)
115 }
116
117 pub fn concurrent_modification<S: Into<String>>(context: S) -> Self {
119 Error::ConcurrentModification(context.into())
120 }
121
122 pub fn pool_exhausted() -> Self {
124 Error::PoolExhausted
125 }
126
127 pub fn is_timeout(&self) -> bool {
129 matches!(self, Error::Timeout(_))
130 }
131
132 pub fn is_concurrent_modification(&self) -> bool {
134 matches!(self, Error::ConcurrentModification(_))
135 }
136
137 pub fn is_pool_exhausted(&self) -> bool {
139 matches!(self, Error::PoolExhausted)
140 }
141
142 pub fn is_retryable(&self) -> bool {
144 matches!(
145 self,
146 Error::Timeout(_) | Error::PoolExhausted | Error::ConcurrentModification(_)
147 )
148 }
149}
150
151pub trait ResultExt<T> {
153 fn context<S: Into<String>>(self, context: S) -> Result<T>;
155
156 fn with_context<F, S>(self, f: F) -> Result<T>
158 where
159 F: FnOnce() -> S,
160 S: Into<String>;
161}
162
163impl<T> ResultExt<T> for Result<T> {
164 fn context<S: Into<String>>(self, context: S) -> Result<T> {
165 self.map_err(|e| Error::Other(format!("{}: {}", context.into(), e)))
166 }
167
168 fn with_context<F, S>(self, f: F) -> Result<T>
169 where
170 F: FnOnce() -> S,
171 S: Into<String>,
172 {
173 self.map_err(|e| Error::Other(format!("{}: {}", f().into(), e)))
174 }
175}
176
177pub mod timeout {
179 use super::{Error, Result};
180 use std::future::Future;
181 use std::time::Duration;
182
183 pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T>
205 where
206 F: Future<Output = Result<T>>,
207 {
208 match tokio::time::timeout(duration, future).await {
209 Ok(result) => result,
210 Err(_) => Err(Error::timeout(duration.as_millis() as u64)),
211 }
212 }
213
214 pub async fn with_timeout_ms<F, T>(timeout_ms: u64, future: F) -> Result<T>
216 where
217 F: Future<Output = Result<T>>,
218 {
219 with_timeout(Duration::from_millis(timeout_ms), future).await
220 }
221
222 pub async fn with_timeout_secs<F, T>(timeout_secs: u64, future: F) -> Result<T>
224 where
225 F: Future<Output = Result<T>>,
226 {
227 with_timeout(Duration::from_secs(timeout_secs), future).await
228 }
229}
230
231pub mod retry {
233 use super::Result;
234 use std::future::Future;
235 use std::time::Duration;
236
237 #[derive(Debug, Clone)]
239 pub struct RetryConfig {
240 pub max_attempts: usize,
242 pub initial_delay: Duration,
244 pub max_delay: Duration,
246 pub backoff_multiplier: f64,
248 }
249
250 impl Default for RetryConfig {
251 fn default() -> Self {
252 Self {
253 max_attempts: 3,
254 initial_delay: Duration::from_millis(100),
255 max_delay: Duration::from_secs(5),
256 backoff_multiplier: 2.0,
257 }
258 }
259 }
260
261 impl RetryConfig {
262 pub fn new() -> Self {
264 Self::default()
265 }
266
267 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
269 self.max_attempts = max_attempts;
270 self
271 }
272
273 pub fn with_initial_delay(mut self, delay: Duration) -> Self {
275 self.initial_delay = delay;
276 self
277 }
278
279 pub fn with_max_delay(mut self, delay: Duration) -> Self {
281 self.max_delay = delay;
282 self
283 }
284
285 pub fn with_backoff_multiplier(mut self, multiplier: f64) -> Self {
287 self.backoff_multiplier = multiplier;
288 self
289 }
290 }
291
292 pub async fn with_retry<F, Fut, T>(config: RetryConfig, mut operation: F) -> Result<T>
311 where
312 F: FnMut() -> Fut,
313 Fut: Future<Output = Result<T>>,
314 {
315 let mut attempt = 0;
316 let mut delay = config.initial_delay;
317
318 loop {
319 attempt += 1;
320 match operation().await {
321 Ok(value) => return Ok(value),
322 Err(err) => {
323 if !err.is_retryable() || attempt >= config.max_attempts {
325 return Err(err);
326 }
327
328 tokio::time::sleep(delay).await;
330
331 delay = std::cmp::min(
333 Duration::from_millis(
334 (delay.as_millis() as f64 * config.backoff_multiplier) as u64,
335 ),
336 config.max_delay,
337 );
338 }
339 }
340 }
341 }
342
343 pub async fn with_default_retry<F, Fut, T>(operation: F) -> Result<T>
345 where
346 F: FnMut() -> Fut,
347 Fut: Future<Output = Result<T>>,
348 {
349 with_retry(RetryConfig::default(), operation).await
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::retry::RetryConfig;
356 use super::*;
357 use std::sync::atomic::{AtomicUsize, Ordering};
358 use std::sync::Arc;
359
360 #[test]
361 fn test_timeout_error_creation() {
362 let err = Error::timeout(5000);
363 assert!(err.is_timeout());
364 assert!(err.is_retryable());
365 assert_eq!(format!("{}", err), "Operation timed out after 5000ms");
366 }
367
368 #[test]
369 fn test_concurrent_modification_error() {
370 let err = Error::concurrent_modification("node was modified by another operation");
371 assert!(err.is_concurrent_modification());
372 assert!(err.is_retryable());
373 assert_eq!(
374 format!("{}", err),
375 "Concurrent modification detected: node was modified by another operation"
376 );
377 }
378
379 #[test]
380 fn test_pool_exhausted_error() {
381 let err = Error::pool_exhausted();
382 assert!(err.is_pool_exhausted());
383 assert!(err.is_retryable());
384 assert_eq!(format!("{}", err), "Connection pool exhausted");
385 }
386
387 #[test]
388 fn test_error_type_checks() {
389 let timeout = Error::timeout(1000);
390 let concurrent = Error::concurrent_modification("test");
391 let pool = Error::pool_exhausted();
392 let storage = Error::Storage("test".to_string());
393
394 assert!(timeout.is_timeout());
395 assert!(!timeout.is_concurrent_modification());
396 assert!(!timeout.is_pool_exhausted());
397
398 assert!(!concurrent.is_timeout());
399 assert!(concurrent.is_concurrent_modification());
400 assert!(!concurrent.is_pool_exhausted());
401
402 assert!(!pool.is_timeout());
403 assert!(!pool.is_concurrent_modification());
404 assert!(pool.is_pool_exhausted());
405
406 assert!(!storage.is_timeout());
407 assert!(!storage.is_concurrent_modification());
408 assert!(!storage.is_pool_exhausted());
409 }
410
411 #[test]
412 fn test_retryable_errors() {
413 assert!(Error::timeout(1000).is_retryable());
414 assert!(Error::concurrent_modification("test").is_retryable());
415 assert!(Error::pool_exhausted().is_retryable());
416
417 assert!(!Error::Storage("test".to_string()).is_retryable());
418 assert!(!Error::NodeNotFound("test".to_string()).is_retryable());
419 assert!(!Error::Serialization("test".to_string()).is_retryable());
420 }
421
422 #[test]
423 fn test_result_context() {
424 let result: Result<i32> = Err(Error::Storage("disk full".to_string()));
425 let with_context = result.context("Failed to save node");
426
427 assert!(with_context.is_err());
428 let err = with_context.unwrap_err();
429 assert!(format!("{}", err).contains("Failed to save node"));
430 assert!(format!("{}", err).contains("disk full"));
431 }
432
433 #[test]
434 fn test_result_with_context_lazy() {
435 let result: Result<i32> = Err(Error::Storage("connection lost".to_string()));
436 let with_context = result.with_context(|| format!("Operation failed at {}", 42));
437
438 assert!(with_context.is_err());
439 let err = with_context.unwrap_err();
440 assert!(format!("{}", err).contains("Operation failed at 42"));
441 assert!(format!("{}", err).contains("connection lost"));
442 }
443
444 #[tokio::test]
445 async fn test_timeout_with_fast_operation() {
446 use super::timeout::with_timeout_ms;
447
448 let result = with_timeout_ms(1000, async {
449 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
450 Ok::<_, Error>(42)
451 })
452 .await;
453
454 assert!(result.is_ok());
455 assert_eq!(result.unwrap(), 42);
456 }
457
458 #[tokio::test]
459 async fn test_timeout_with_slow_operation() {
460 use super::timeout::with_timeout_ms;
461
462 let result = with_timeout_ms(100, async {
463 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
464 Ok::<_, Error>(42)
465 })
466 .await;
467
468 assert!(result.is_err());
469 let err = result.unwrap_err();
470 assert!(err.is_timeout());
471 }
472
473 #[tokio::test]
474 async fn test_timeout_with_error() {
475 use super::timeout::with_timeout_ms;
476
477 let result = with_timeout_ms(1000, async {
478 Err::<i32, Error>(Error::Storage("test error".to_string()))
479 })
480 .await;
481
482 assert!(result.is_err());
483 let err = result.unwrap_err();
484 assert!(!err.is_timeout());
485 assert!(matches!(err, Error::Storage(_)));
486 }
487
488 #[tokio::test]
489 async fn test_timeout_secs() {
490 use super::timeout::with_timeout_secs;
491
492 let result = with_timeout_secs(1, async {
493 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
494 Ok::<_, Error>(100)
495 })
496 .await;
497
498 assert!(result.is_ok());
499 assert_eq!(result.unwrap(), 100);
500 }
501
502 #[tokio::test]
503 async fn test_retry_success_on_first_attempt() {
504 use super::retry::{with_retry, RetryConfig};
505
506 let config = RetryConfig::new().with_max_attempts(3);
507 let result = with_retry(config, || async { Ok::<_, Error>(42) }).await;
508
509 assert!(result.is_ok());
510 assert_eq!(result.unwrap(), 42);
511 }
512
513 #[tokio::test]
514 async fn test_retry_success_after_failures() {
515 use super::retry::{with_retry, RetryConfig};
516
517 let attempt_count = Arc::new(AtomicUsize::new(0));
518 let attempt_count_clone = Arc::clone(&attempt_count);
519
520 let config = RetryConfig::new()
521 .with_max_attempts(5)
522 .with_initial_delay(std::time::Duration::from_millis(10));
523
524 let result = with_retry(config, || {
525 let count = Arc::clone(&attempt_count_clone);
526 async move {
527 let attempt = count.fetch_add(1, Ordering::SeqCst);
528 if attempt < 2 {
529 Err(Error::timeout(100))
530 } else {
531 Ok(42)
532 }
533 }
534 })
535 .await;
536
537 assert!(result.is_ok());
538 assert_eq!(result.unwrap(), 42);
539 assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
540 }
541
542 #[tokio::test]
543 async fn test_retry_fails_after_max_attempts() {
544 use super::retry::{with_retry, RetryConfig};
545
546 let attempt_count = Arc::new(AtomicUsize::new(0));
547 let attempt_count_clone = Arc::clone(&attempt_count);
548
549 let config = RetryConfig::new()
550 .with_max_attempts(3)
551 .with_initial_delay(std::time::Duration::from_millis(10));
552
553 let result = with_retry(config, || {
554 let count = Arc::clone(&attempt_count_clone);
555 async move {
556 count.fetch_add(1, Ordering::SeqCst);
557 Err::<i32, Error>(Error::pool_exhausted())
558 }
559 })
560 .await;
561
562 assert!(result.is_err());
563 assert!(result.unwrap_err().is_pool_exhausted());
564 assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
565 }
566
567 #[tokio::test]
568 async fn test_retry_no_retry_for_non_retryable_error() {
569 use super::retry::{with_retry, RetryConfig};
570
571 let attempt_count = Arc::new(AtomicUsize::new(0));
572 let attempt_count_clone = Arc::clone(&attempt_count);
573
574 let config = RetryConfig::new().with_max_attempts(5);
575
576 let result = with_retry(config, || {
577 let count = Arc::clone(&attempt_count_clone);
578 async move {
579 count.fetch_add(1, Ordering::SeqCst);
580 Err::<i32, Error>(Error::Storage("permanent error".to_string()))
581 }
582 })
583 .await;
584
585 assert!(result.is_err());
586 assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
588 }
589
590 #[tokio::test]
591 async fn test_retry_with_concurrent_modification() {
592 use super::retry::{with_retry, RetryConfig};
593
594 let attempt_count = Arc::new(AtomicUsize::new(0));
595 let attempt_count_clone = Arc::clone(&attempt_count);
596
597 let config = RetryConfig::new()
598 .with_max_attempts(4)
599 .with_initial_delay(std::time::Duration::from_millis(10));
600
601 let result = with_retry(config, || {
602 let count = Arc::clone(&attempt_count_clone);
603 async move {
604 let attempt = count.fetch_add(1, Ordering::SeqCst);
605 if attempt < 2 {
606 Err(Error::concurrent_modification("version mismatch"))
607 } else {
608 Ok(100)
609 }
610 }
611 })
612 .await;
613
614 assert!(result.is_ok());
615 assert_eq!(result.unwrap(), 100);
616 assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
617 }
618
619 #[tokio::test]
620 async fn test_retry_exponential_backoff() {
621 use super::retry::{with_retry, RetryConfig};
622 use std::time::Instant;
623
624 let attempt_count = Arc::new(AtomicUsize::new(0));
625 let attempt_count_clone = Arc::clone(&attempt_count);
626
627 let config = RetryConfig::new()
628 .with_max_attempts(3)
629 .with_initial_delay(std::time::Duration::from_millis(50))
630 .with_backoff_multiplier(2.0);
631
632 let start = Instant::now();
633 let result = with_retry(config, || {
634 let count = Arc::clone(&attempt_count_clone);
635 async move {
636 count.fetch_add(1, Ordering::SeqCst);
637 Err::<i32, Error>(Error::timeout(100))
638 }
639 })
640 .await;
641
642 let elapsed = start.elapsed();
643
644 assert!(result.is_err());
645 assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
646 assert!(elapsed.as_millis() >= 150);
648 }
649
650 #[tokio::test]
651 async fn test_default_retry() {
652 use super::retry::with_default_retry;
653
654 let attempt_count = Arc::new(AtomicUsize::new(0));
655 let attempt_count_clone = Arc::clone(&attempt_count);
656
657 let result = with_default_retry(|| {
658 let count = Arc::clone(&attempt_count_clone);
659 async move {
660 let attempt = count.fetch_add(1, Ordering::SeqCst);
661 if attempt < 2 {
662 Err(Error::pool_exhausted())
663 } else {
664 Ok(42)
665 }
666 }
667 })
668 .await;
669
670 assert!(result.is_ok());
671 assert_eq!(result.unwrap(), 42);
672 }
673
674 #[test]
675 fn test_retry_config_builder() {
676 let config = RetryConfig::new()
677 .with_max_attempts(10)
678 .with_initial_delay(std::time::Duration::from_millis(200))
679 .with_max_delay(std::time::Duration::from_secs(30))
680 .with_backoff_multiplier(3.0);
681
682 assert_eq!(config.max_attempts, 10);
683 assert_eq!(config.initial_delay, std::time::Duration::from_millis(200));
684 assert_eq!(config.max_delay, std::time::Duration::from_secs(30));
685 assert_eq!(config.backoff_multiplier, 3.0);
686 }
687}