Skip to main content

oxigdal_cluster/fault_tolerance/
timeout.rs

1//! Timeout management for fault tolerance.
2//!
3//! Provides configurable timeout management with support for:
4//! - Static timeouts
5//! - Dynamic adaptive timeouts
6//! - Deadline propagation
7//! - Timeout budgets
8
9use crate::error::{ClusterError, Result};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16use tracing::{debug, warn};
17
18/// Timeout configuration.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct TimeoutConfig {
21    /// Default timeout duration
22    pub default_timeout: Duration,
23    /// Minimum allowed timeout
24    pub min_timeout: Duration,
25    /// Maximum allowed timeout
26    pub max_timeout: Duration,
27    /// Enable adaptive timeout adjustment
28    pub adaptive: bool,
29    /// Percentile for adaptive timeout (e.g., 0.95 for p95)
30    pub adaptive_percentile: f64,
31    /// Window size for adaptive calculations
32    pub adaptive_window_size: usize,
33    /// Multiplier for adaptive timeout (safety margin)
34    pub adaptive_multiplier: f64,
35}
36
37impl Default for TimeoutConfig {
38    fn default() -> Self {
39        Self {
40            default_timeout: Duration::from_secs(30),
41            min_timeout: Duration::from_millis(100),
42            max_timeout: Duration::from_secs(300),
43            adaptive: true,
44            adaptive_percentile: 0.95,
45            adaptive_window_size: 100,
46            adaptive_multiplier: 1.5,
47        }
48    }
49}
50
51/// Timeout statistics.
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct TimeoutStats {
54    /// Total operations
55    pub total_operations: u64,
56    /// Total timeouts
57    pub total_timeouts: u64,
58    /// Total successes within timeout
59    pub total_success: u64,
60    /// Average duration in microseconds
61    pub avg_duration_us: u64,
62    /// Current adaptive timeout in microseconds
63    pub current_timeout_us: u64,
64    /// P50 latency in microseconds
65    pub p50_latency_us: u64,
66    /// P95 latency in microseconds
67    pub p95_latency_us: u64,
68    /// P99 latency in microseconds
69    pub p99_latency_us: u64,
70}
71
72/// Internal state for timeout manager.
73struct TimeoutManagerInner {
74    /// Configuration
75    config: TimeoutConfig,
76    /// Current timeout (may be adaptive)
77    current_timeout: RwLock<Duration>,
78    /// Latency history for adaptive calculations
79    latency_history: RwLock<Vec<u64>>,
80    /// Statistics
81    stats: RwLock<TimeoutStats>,
82    /// Total duration for averaging
83    total_duration_us: AtomicU64,
84}
85
86/// Timeout manager for configurable timeout management.
87#[derive(Clone)]
88pub struct TimeoutManager {
89    inner: Arc<TimeoutManagerInner>,
90}
91
92impl TimeoutManager {
93    /// Create a new timeout manager.
94    pub fn new(config: TimeoutConfig) -> Self {
95        let current_timeout = config.default_timeout;
96        Self {
97            inner: Arc::new(TimeoutManagerInner {
98                config,
99                current_timeout: RwLock::new(current_timeout),
100                latency_history: RwLock::new(Vec::new()),
101                stats: RwLock::new(TimeoutStats {
102                    current_timeout_us: current_timeout.as_micros() as u64,
103                    ..Default::default()
104                }),
105                total_duration_us: AtomicU64::new(0),
106            }),
107        }
108    }
109
110    /// Create with default configuration.
111    pub fn with_defaults() -> Self {
112        Self::new(TimeoutConfig::default())
113    }
114
115    /// Get the current timeout duration.
116    pub fn timeout(&self) -> Duration {
117        *self.inner.current_timeout.read()
118    }
119
120    /// Execute a future with timeout.
121    pub async fn call<F, T>(&self, f: F) -> Result<T>
122    where
123        F: std::future::Future<Output = T>,
124    {
125        let timeout = self.timeout();
126        let start = Instant::now();
127
128        match tokio::time::timeout(timeout, f).await {
129            Ok(result) => {
130                let duration = start.elapsed();
131                self.record_success(duration);
132                Ok(result)
133            }
134            Err(_) => {
135                self.record_timeout();
136                Err(ClusterError::Timeout(format!(
137                    "Operation timed out after {:?}",
138                    timeout
139                )))
140            }
141        }
142    }
143
144    /// Execute a fallible future with timeout.
145    pub async fn call_with_error<F, T, E>(&self, f: F) -> Result<T>
146    where
147        F: std::future::Future<Output = std::result::Result<T, E>>,
148        E: std::fmt::Display,
149    {
150        let timeout = self.timeout();
151        let start = Instant::now();
152
153        match tokio::time::timeout(timeout, f).await {
154            Ok(Ok(result)) => {
155                let duration = start.elapsed();
156                self.record_success(duration);
157                Ok(result)
158            }
159            Ok(Err(e)) => {
160                let duration = start.elapsed();
161                self.record_success(duration); // Not a timeout
162                Err(ClusterError::ExecutionError(e.to_string()))
163            }
164            Err(_) => {
165                self.record_timeout();
166                Err(ClusterError::Timeout(format!(
167                    "Operation timed out after {:?}",
168                    timeout
169                )))
170            }
171        }
172    }
173
174    /// Record a successful operation.
175    pub fn record_success(&self, duration: Duration) {
176        let duration_us = duration.as_micros() as u64;
177
178        // Update latency history
179        {
180            let mut history = self.inner.latency_history.write();
181            history.push(duration_us);
182            if history.len() > self.inner.config.adaptive_window_size {
183                history.remove(0);
184            }
185        }
186
187        // Update total duration
188        let total = self
189            .inner
190            .total_duration_us
191            .fetch_add(duration_us, Ordering::SeqCst)
192            + duration_us;
193
194        // Update stats
195        {
196            let mut stats = self.inner.stats.write();
197            stats.total_operations += 1;
198            stats.total_success += 1;
199            stats.avg_duration_us = total / stats.total_operations;
200        }
201
202        // Update adaptive timeout if enabled
203        if self.inner.config.adaptive {
204            self.update_adaptive_timeout();
205        }
206
207        debug!("Timeout manager: recorded success, duration={:?}", duration);
208    }
209
210    /// Record a timeout.
211    pub fn record_timeout(&self) {
212        let mut stats = self.inner.stats.write();
213        stats.total_operations += 1;
214        stats.total_timeouts += 1;
215
216        warn!(
217            "Timeout manager: operation timed out (total timeouts: {})",
218            stats.total_timeouts
219        );
220    }
221
222    /// Update adaptive timeout based on latency history.
223    fn update_adaptive_timeout(&self) {
224        let history = self.inner.latency_history.read();
225        if history.len() < 10 {
226            return;
227        }
228
229        // Sort for percentile calculation
230        let mut sorted: Vec<u64> = history.clone();
231        sorted.sort_unstable();
232
233        // Calculate percentiles
234        let p50_idx = (sorted.len() as f64 * 0.50) as usize;
235        let p95_idx = (sorted.len() as f64 * self.inner.config.adaptive_percentile) as usize;
236        let p99_idx = (sorted.len() as f64 * 0.99) as usize;
237
238        let p50 = sorted
239            .get(p50_idx.min(sorted.len() - 1))
240            .copied()
241            .unwrap_or(0);
242        let p95 = sorted
243            .get(p95_idx.min(sorted.len() - 1))
244            .copied()
245            .unwrap_or(0);
246        let p99 = sorted
247            .get(p99_idx.min(sorted.len() - 1))
248            .copied()
249            .unwrap_or(0);
250
251        // Calculate adaptive timeout
252        let adaptive_us = (p95 as f64 * self.inner.config.adaptive_multiplier) as u64;
253        let adaptive_timeout = Duration::from_micros(adaptive_us);
254
255        // Clamp to configured bounds
256        let clamped = adaptive_timeout
257            .max(self.inner.config.min_timeout)
258            .min(self.inner.config.max_timeout);
259
260        // Update current timeout
261        *self.inner.current_timeout.write() = clamped;
262
263        // Update stats
264        let mut stats = self.inner.stats.write();
265        stats.current_timeout_us = clamped.as_micros() as u64;
266        stats.p50_latency_us = p50;
267        stats.p95_latency_us = p95;
268        stats.p99_latency_us = p99;
269
270        debug!(
271            "Adaptive timeout updated: {:?} (p95={:?})",
272            clamped,
273            Duration::from_micros(p95)
274        );
275    }
276
277    /// Set a fixed timeout (disables adaptive).
278    pub fn set_timeout(&self, timeout: Duration) {
279        let clamped = timeout
280            .max(self.inner.config.min_timeout)
281            .min(self.inner.config.max_timeout);
282
283        *self.inner.current_timeout.write() = clamped;
284        self.inner.stats.write().current_timeout_us = clamped.as_micros() as u64;
285    }
286
287    /// Get timeout statistics.
288    pub fn get_stats(&self) -> TimeoutStats {
289        self.inner.stats.read().clone()
290    }
291
292    /// Get timeout rate.
293    pub fn timeout_rate(&self) -> f64 {
294        let stats = self.inner.stats.read();
295        if stats.total_operations == 0 {
296            0.0
297        } else {
298            stats.total_timeouts as f64 / stats.total_operations as f64
299        }
300    }
301
302    /// Reset statistics.
303    pub fn reset_stats(&self) {
304        let current_timeout = *self.inner.current_timeout.read();
305        *self.inner.stats.write() = TimeoutStats {
306            current_timeout_us: current_timeout.as_micros() as u64,
307            ..Default::default()
308        };
309        self.inner.latency_history.write().clear();
310        self.inner.total_duration_us.store(0, Ordering::SeqCst);
311    }
312}
313
314/// Deadline for propagating timeout budgets across operations.
315#[derive(Debug, Clone)]
316pub struct Deadline {
317    /// When the deadline expires
318    expires_at: Instant,
319    /// Original budget
320    original_budget: Duration,
321}
322
323impl Deadline {
324    /// Create a new deadline with the given budget.
325    pub fn new(budget: Duration) -> Self {
326        Self {
327            expires_at: Instant::now() + budget,
328            original_budget: budget,
329        }
330    }
331
332    /// Create a deadline from an absolute time.
333    pub fn at(expires_at: Instant) -> Self {
334        let now = Instant::now();
335        let original_budget = if expires_at > now {
336            expires_at - now
337        } else {
338            Duration::ZERO
339        };
340        Self {
341            expires_at,
342            original_budget,
343        }
344    }
345
346    /// Get remaining time until deadline.
347    pub fn remaining(&self) -> Duration {
348        let now = Instant::now();
349        if self.expires_at > now {
350            self.expires_at - now
351        } else {
352            Duration::ZERO
353        }
354    }
355
356    /// Check if deadline has expired.
357    pub fn is_expired(&self) -> bool {
358        Instant::now() >= self.expires_at
359    }
360
361    /// Get original budget.
362    pub fn original_budget(&self) -> Duration {
363        self.original_budget
364    }
365
366    /// Get time elapsed since deadline was created.
367    pub fn elapsed(&self) -> Duration {
368        let deadline_start = self.expires_at - self.original_budget;
369        Instant::now().saturating_duration_since(deadline_start)
370    }
371
372    /// Check deadline and return remaining time or error if expired.
373    pub fn check(&self) -> Result<Duration> {
374        let remaining = self.remaining();
375        if remaining.is_zero() {
376            Err(ClusterError::Timeout("Deadline expired".to_string()))
377        } else {
378            Ok(remaining)
379        }
380    }
381
382    /// Execute a future with this deadline.
383    pub async fn run<F, T>(&self, f: F) -> Result<T>
384    where
385        F: std::future::Future<Output = T>,
386    {
387        let remaining = self.check()?;
388
389        match tokio::time::timeout(remaining, f).await {
390            Ok(result) => Ok(result),
391            Err(_) => Err(ClusterError::Timeout("Deadline exceeded".to_string())),
392        }
393    }
394}
395
396/// Timeout budget for distributing time across multiple operations.
397#[derive(Clone)]
398pub struct TimeoutBudget {
399    inner: Arc<TimeoutBudgetInner>,
400}
401
402struct TimeoutBudgetInner {
403    /// Total budget
404    total_budget: Duration,
405    /// Start time
406    started_at: Instant,
407    /// Operations completed
408    operations: AtomicU64,
409    /// Time consumed
410    consumed_us: AtomicU64,
411}
412
413impl TimeoutBudget {
414    /// Create a new timeout budget.
415    pub fn new(total_budget: Duration) -> Self {
416        Self {
417            inner: Arc::new(TimeoutBudgetInner {
418                total_budget,
419                started_at: Instant::now(),
420                operations: AtomicU64::new(0),
421                consumed_us: AtomicU64::new(0),
422            }),
423        }
424    }
425
426    /// Get total budget.
427    pub fn total(&self) -> Duration {
428        self.inner.total_budget
429    }
430
431    /// Get remaining budget.
432    pub fn remaining(&self) -> Duration {
433        let elapsed = self.inner.started_at.elapsed();
434        if elapsed >= self.inner.total_budget {
435            Duration::ZERO
436        } else {
437            self.inner.total_budget - elapsed
438        }
439    }
440
441    /// Get elapsed time.
442    pub fn elapsed(&self) -> Duration {
443        self.inner.started_at.elapsed()
444    }
445
446    /// Check if budget is exhausted.
447    pub fn is_exhausted(&self) -> bool {
448        self.remaining().is_zero()
449    }
450
451    /// Allocate a portion of the remaining budget for an operation.
452    pub fn allocate(&self, portion: f64) -> Result<Duration> {
453        let remaining = self.remaining();
454        if remaining.is_zero() {
455            return Err(ClusterError::Timeout("Budget exhausted".to_string()));
456        }
457
458        let allocated = Duration::from_secs_f64(remaining.as_secs_f64() * portion.min(1.0));
459        Ok(allocated)
460    }
461
462    /// Allocate budget evenly for remaining operations.
463    pub fn allocate_even(&self, remaining_operations: u32) -> Result<Duration> {
464        if remaining_operations == 0 {
465            return Err(ClusterError::InvalidOperation(
466                "No remaining operations".to_string(),
467            ));
468        }
469
470        let remaining = self.remaining();
471        if remaining.is_zero() {
472            return Err(ClusterError::Timeout("Budget exhausted".to_string()));
473        }
474
475        Ok(remaining / remaining_operations)
476    }
477
478    /// Record an operation's duration.
479    pub fn record_operation(&self, duration: Duration) {
480        self.inner.operations.fetch_add(1, Ordering::SeqCst);
481        self.inner
482            .consumed_us
483            .fetch_add(duration.as_micros() as u64, Ordering::SeqCst);
484    }
485
486    /// Get number of operations completed.
487    pub fn operations_completed(&self) -> u64 {
488        self.inner.operations.load(Ordering::SeqCst)
489    }
490
491    /// Get consumed time.
492    pub fn consumed(&self) -> Duration {
493        Duration::from_micros(self.inner.consumed_us.load(Ordering::SeqCst))
494    }
495
496    /// Create a deadline from the remaining budget.
497    pub fn to_deadline(&self) -> Deadline {
498        Deadline::new(self.remaining())
499    }
500}
501
502/// Timeout manager registry for managing multiple timeout managers.
503#[derive(Clone)]
504pub struct TimeoutRegistry {
505    managers: Arc<RwLock<HashMap<String, TimeoutManager>>>,
506    default_config: TimeoutConfig,
507}
508
509impl TimeoutRegistry {
510    /// Create a new timeout registry.
511    pub fn new(default_config: TimeoutConfig) -> Self {
512        Self {
513            managers: Arc::new(RwLock::new(HashMap::new())),
514            default_config,
515        }
516    }
517
518    /// Create with default configuration.
519    pub fn with_defaults() -> Self {
520        Self::new(TimeoutConfig::default())
521    }
522
523    /// Get or create a timeout manager for the given key.
524    pub fn get_or_create(&self, key: &str) -> TimeoutManager {
525        let managers = self.managers.read();
526        if let Some(manager) = managers.get(key) {
527            return manager.clone();
528        }
529        drop(managers);
530
531        let mut managers = self.managers.write();
532        managers
533            .entry(key.to_string())
534            .or_insert_with(|| TimeoutManager::new(self.default_config.clone()))
535            .clone()
536    }
537
538    /// Get a timeout manager by key.
539    pub fn get(&self, key: &str) -> Option<TimeoutManager> {
540        self.managers.read().get(key).cloned()
541    }
542
543    /// Register a timeout manager with custom configuration.
544    pub fn register(&self, key: &str, config: TimeoutConfig) -> TimeoutManager {
545        let manager = TimeoutManager::new(config);
546        self.managers
547            .write()
548            .insert(key.to_string(), manager.clone());
549        manager
550    }
551
552    /// Get all timeout stats.
553    pub fn get_all_stats(&self) -> HashMap<String, TimeoutStats> {
554        self.managers
555            .read()
556            .iter()
557            .map(|(k, v)| (k.clone(), v.get_stats()))
558            .collect()
559    }
560}
561
562impl Default for TimeoutRegistry {
563    fn default() -> Self {
564        Self::with_defaults()
565    }
566}
567
568#[cfg(test)]
569#[allow(clippy::expect_used, clippy::unwrap_used)]
570mod tests {
571    use super::*;
572
573    #[tokio::test]
574    async fn test_timeout_manager_creation() {
575        let tm = TimeoutManager::with_defaults();
576        assert_eq!(tm.timeout(), Duration::from_secs(30));
577    }
578
579    #[tokio::test]
580    async fn test_timeout_success() {
581        let tm = TimeoutManager::with_defaults();
582
583        let result = tm.call(async { 42 }).await;
584        assert!(result.is_ok());
585        assert_eq!(result.ok(), Some(42));
586
587        let stats = tm.get_stats();
588        assert_eq!(stats.total_success, 1);
589    }
590
591    #[tokio::test]
592    async fn test_timeout_exceeded() {
593        let config = TimeoutConfig {
594            default_timeout: Duration::from_millis(10),
595            ..Default::default()
596        };
597        let tm = TimeoutManager::new(config);
598
599        let result = tm
600            .call(async {
601                tokio::time::sleep(Duration::from_millis(50)).await;
602                42
603            })
604            .await;
605
606        assert!(result.is_err());
607
608        let stats = tm.get_stats();
609        assert_eq!(stats.total_timeouts, 1);
610    }
611
612    #[test]
613    fn test_deadline() {
614        let deadline = Deadline::new(Duration::from_secs(10));
615        assert!(!deadline.is_expired());
616        assert!(deadline.remaining() <= Duration::from_secs(10));
617    }
618
619    #[test]
620    fn test_deadline_expired() {
621        let deadline = Deadline::new(Duration::ZERO);
622        assert!(deadline.is_expired());
623        assert_eq!(deadline.remaining(), Duration::ZERO);
624    }
625
626    #[test]
627    fn test_timeout_budget() {
628        let budget = TimeoutBudget::new(Duration::from_secs(10));
629        assert_eq!(budget.total(), Duration::from_secs(10));
630        assert!(!budget.is_exhausted());
631
632        let allocated = budget.allocate(0.5);
633        assert!(allocated.is_ok());
634    }
635
636    #[test]
637    fn test_timeout_budget_even_allocation() {
638        let budget = TimeoutBudget::new(Duration::from_secs(10));
639
640        let allocated = budget.allocate_even(5);
641        assert!(allocated.is_ok());
642        // Should be approximately 2 seconds each
643        assert!(allocated.ok().map(|d| d.as_secs()).unwrap_or(0) >= 1);
644    }
645
646    #[tokio::test]
647    async fn test_adaptive_timeout() {
648        let config = TimeoutConfig {
649            adaptive: true,
650            adaptive_window_size: 10,
651            default_timeout: Duration::from_secs(30),
652            min_timeout: Duration::from_millis(1),
653            ..Default::default()
654        };
655        let tm = TimeoutManager::new(config);
656
657        // Record some successes with known durations
658        for _ in 0..15 {
659            tm.record_success(Duration::from_millis(100));
660        }
661
662        // Timeout should have adapted
663        let stats = tm.get_stats();
664        assert!(stats.p50_latency_us > 0);
665    }
666
667    #[test]
668    fn test_timeout_registry() {
669        let registry = TimeoutRegistry::with_defaults();
670
671        let tm1 = registry.get_or_create("service_a");
672        let tm2 = registry.get_or_create("service_a");
673
674        // Should be the same manager
675        assert_eq!(tm1.timeout(), tm2.timeout());
676    }
677}