oxigdal_cluster/fault_tolerance/
timeout.rs1use crate::error::{ClusterError, Result};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16use tracing::{debug, warn};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct TimeoutConfig {
21 pub default_timeout: Duration,
23 pub min_timeout: Duration,
25 pub max_timeout: Duration,
27 pub adaptive: bool,
29 pub adaptive_percentile: f64,
31 pub adaptive_window_size: usize,
33 pub adaptive_multiplier: f64,
35}
36
37impl Default for TimeoutConfig {
38 fn default() -> Self {
39 Self {
40 default_timeout: Duration::from_secs(30),
41 min_timeout: Duration::from_millis(100),
42 max_timeout: Duration::from_secs(300),
43 adaptive: true,
44 adaptive_percentile: 0.95,
45 adaptive_window_size: 100,
46 adaptive_multiplier: 1.5,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct TimeoutStats {
54 pub total_operations: u64,
56 pub total_timeouts: u64,
58 pub total_success: u64,
60 pub avg_duration_us: u64,
62 pub current_timeout_us: u64,
64 pub p50_latency_us: u64,
66 pub p95_latency_us: u64,
68 pub p99_latency_us: u64,
70}
71
72struct TimeoutManagerInner {
74 config: TimeoutConfig,
76 current_timeout: RwLock<Duration>,
78 latency_history: RwLock<Vec<u64>>,
80 stats: RwLock<TimeoutStats>,
82 total_duration_us: AtomicU64,
84}
85
86#[derive(Clone)]
88pub struct TimeoutManager {
89 inner: Arc<TimeoutManagerInner>,
90}
91
92impl TimeoutManager {
93 pub fn new(config: TimeoutConfig) -> Self {
95 let current_timeout = config.default_timeout;
96 Self {
97 inner: Arc::new(TimeoutManagerInner {
98 config,
99 current_timeout: RwLock::new(current_timeout),
100 latency_history: RwLock::new(Vec::new()),
101 stats: RwLock::new(TimeoutStats {
102 current_timeout_us: current_timeout.as_micros() as u64,
103 ..Default::default()
104 }),
105 total_duration_us: AtomicU64::new(0),
106 }),
107 }
108 }
109
110 pub fn with_defaults() -> Self {
112 Self::new(TimeoutConfig::default())
113 }
114
115 pub fn timeout(&self) -> Duration {
117 *self.inner.current_timeout.read()
118 }
119
120 pub async fn call<F, T>(&self, f: F) -> Result<T>
122 where
123 F: std::future::Future<Output = T>,
124 {
125 let timeout = self.timeout();
126 let start = Instant::now();
127
128 match tokio::time::timeout(timeout, f).await {
129 Ok(result) => {
130 let duration = start.elapsed();
131 self.record_success(duration);
132 Ok(result)
133 }
134 Err(_) => {
135 self.record_timeout();
136 Err(ClusterError::Timeout(format!(
137 "Operation timed out after {:?}",
138 timeout
139 )))
140 }
141 }
142 }
143
144 pub async fn call_with_error<F, T, E>(&self, f: F) -> Result<T>
146 where
147 F: std::future::Future<Output = std::result::Result<T, E>>,
148 E: std::fmt::Display,
149 {
150 let timeout = self.timeout();
151 let start = Instant::now();
152
153 match tokio::time::timeout(timeout, f).await {
154 Ok(Ok(result)) => {
155 let duration = start.elapsed();
156 self.record_success(duration);
157 Ok(result)
158 }
159 Ok(Err(e)) => {
160 let duration = start.elapsed();
161 self.record_success(duration); Err(ClusterError::ExecutionError(e.to_string()))
163 }
164 Err(_) => {
165 self.record_timeout();
166 Err(ClusterError::Timeout(format!(
167 "Operation timed out after {:?}",
168 timeout
169 )))
170 }
171 }
172 }
173
174 pub fn record_success(&self, duration: Duration) {
176 let duration_us = duration.as_micros() as u64;
177
178 {
180 let mut history = self.inner.latency_history.write();
181 history.push(duration_us);
182 if history.len() > self.inner.config.adaptive_window_size {
183 history.remove(0);
184 }
185 }
186
187 let total = self
189 .inner
190 .total_duration_us
191 .fetch_add(duration_us, Ordering::SeqCst)
192 + duration_us;
193
194 {
196 let mut stats = self.inner.stats.write();
197 stats.total_operations += 1;
198 stats.total_success += 1;
199 stats.avg_duration_us = total / stats.total_operations;
200 }
201
202 if self.inner.config.adaptive {
204 self.update_adaptive_timeout();
205 }
206
207 debug!("Timeout manager: recorded success, duration={:?}", duration);
208 }
209
210 pub fn record_timeout(&self) {
212 let mut stats = self.inner.stats.write();
213 stats.total_operations += 1;
214 stats.total_timeouts += 1;
215
216 warn!(
217 "Timeout manager: operation timed out (total timeouts: {})",
218 stats.total_timeouts
219 );
220 }
221
222 fn update_adaptive_timeout(&self) {
224 let history = self.inner.latency_history.read();
225 if history.len() < 10 {
226 return;
227 }
228
229 let mut sorted: Vec<u64> = history.clone();
231 sorted.sort_unstable();
232
233 let p50_idx = (sorted.len() as f64 * 0.50) as usize;
235 let p95_idx = (sorted.len() as f64 * self.inner.config.adaptive_percentile) as usize;
236 let p99_idx = (sorted.len() as f64 * 0.99) as usize;
237
238 let p50 = sorted
239 .get(p50_idx.min(sorted.len() - 1))
240 .copied()
241 .unwrap_or(0);
242 let p95 = sorted
243 .get(p95_idx.min(sorted.len() - 1))
244 .copied()
245 .unwrap_or(0);
246 let p99 = sorted
247 .get(p99_idx.min(sorted.len() - 1))
248 .copied()
249 .unwrap_or(0);
250
251 let adaptive_us = (p95 as f64 * self.inner.config.adaptive_multiplier) as u64;
253 let adaptive_timeout = Duration::from_micros(adaptive_us);
254
255 let clamped = adaptive_timeout
257 .max(self.inner.config.min_timeout)
258 .min(self.inner.config.max_timeout);
259
260 *self.inner.current_timeout.write() = clamped;
262
263 let mut stats = self.inner.stats.write();
265 stats.current_timeout_us = clamped.as_micros() as u64;
266 stats.p50_latency_us = p50;
267 stats.p95_latency_us = p95;
268 stats.p99_latency_us = p99;
269
270 debug!(
271 "Adaptive timeout updated: {:?} (p95={:?})",
272 clamped,
273 Duration::from_micros(p95)
274 );
275 }
276
277 pub fn set_timeout(&self, timeout: Duration) {
279 let clamped = timeout
280 .max(self.inner.config.min_timeout)
281 .min(self.inner.config.max_timeout);
282
283 *self.inner.current_timeout.write() = clamped;
284 self.inner.stats.write().current_timeout_us = clamped.as_micros() as u64;
285 }
286
287 pub fn get_stats(&self) -> TimeoutStats {
289 self.inner.stats.read().clone()
290 }
291
292 pub fn timeout_rate(&self) -> f64 {
294 let stats = self.inner.stats.read();
295 if stats.total_operations == 0 {
296 0.0
297 } else {
298 stats.total_timeouts as f64 / stats.total_operations as f64
299 }
300 }
301
302 pub fn reset_stats(&self) {
304 let current_timeout = *self.inner.current_timeout.read();
305 *self.inner.stats.write() = TimeoutStats {
306 current_timeout_us: current_timeout.as_micros() as u64,
307 ..Default::default()
308 };
309 self.inner.latency_history.write().clear();
310 self.inner.total_duration_us.store(0, Ordering::SeqCst);
311 }
312}
313
314#[derive(Debug, Clone)]
316pub struct Deadline {
317 expires_at: Instant,
319 original_budget: Duration,
321}
322
323impl Deadline {
324 pub fn new(budget: Duration) -> Self {
326 Self {
327 expires_at: Instant::now() + budget,
328 original_budget: budget,
329 }
330 }
331
332 pub fn at(expires_at: Instant) -> Self {
334 let now = Instant::now();
335 let original_budget = if expires_at > now {
336 expires_at - now
337 } else {
338 Duration::ZERO
339 };
340 Self {
341 expires_at,
342 original_budget,
343 }
344 }
345
346 pub fn remaining(&self) -> Duration {
348 let now = Instant::now();
349 if self.expires_at > now {
350 self.expires_at - now
351 } else {
352 Duration::ZERO
353 }
354 }
355
356 pub fn is_expired(&self) -> bool {
358 Instant::now() >= self.expires_at
359 }
360
361 pub fn original_budget(&self) -> Duration {
363 self.original_budget
364 }
365
366 pub fn elapsed(&self) -> Duration {
368 let deadline_start = self.expires_at - self.original_budget;
369 Instant::now().saturating_duration_since(deadline_start)
370 }
371
372 pub fn check(&self) -> Result<Duration> {
374 let remaining = self.remaining();
375 if remaining.is_zero() {
376 Err(ClusterError::Timeout("Deadline expired".to_string()))
377 } else {
378 Ok(remaining)
379 }
380 }
381
382 pub async fn run<F, T>(&self, f: F) -> Result<T>
384 where
385 F: std::future::Future<Output = T>,
386 {
387 let remaining = self.check()?;
388
389 match tokio::time::timeout(remaining, f).await {
390 Ok(result) => Ok(result),
391 Err(_) => Err(ClusterError::Timeout("Deadline exceeded".to_string())),
392 }
393 }
394}
395
396#[derive(Clone)]
398pub struct TimeoutBudget {
399 inner: Arc<TimeoutBudgetInner>,
400}
401
402struct TimeoutBudgetInner {
403 total_budget: Duration,
405 started_at: Instant,
407 operations: AtomicU64,
409 consumed_us: AtomicU64,
411}
412
413impl TimeoutBudget {
414 pub fn new(total_budget: Duration) -> Self {
416 Self {
417 inner: Arc::new(TimeoutBudgetInner {
418 total_budget,
419 started_at: Instant::now(),
420 operations: AtomicU64::new(0),
421 consumed_us: AtomicU64::new(0),
422 }),
423 }
424 }
425
426 pub fn total(&self) -> Duration {
428 self.inner.total_budget
429 }
430
431 pub fn remaining(&self) -> Duration {
433 let elapsed = self.inner.started_at.elapsed();
434 if elapsed >= self.inner.total_budget {
435 Duration::ZERO
436 } else {
437 self.inner.total_budget - elapsed
438 }
439 }
440
441 pub fn elapsed(&self) -> Duration {
443 self.inner.started_at.elapsed()
444 }
445
446 pub fn is_exhausted(&self) -> bool {
448 self.remaining().is_zero()
449 }
450
451 pub fn allocate(&self, portion: f64) -> Result<Duration> {
453 let remaining = self.remaining();
454 if remaining.is_zero() {
455 return Err(ClusterError::Timeout("Budget exhausted".to_string()));
456 }
457
458 let allocated = Duration::from_secs_f64(remaining.as_secs_f64() * portion.min(1.0));
459 Ok(allocated)
460 }
461
462 pub fn allocate_even(&self, remaining_operations: u32) -> Result<Duration> {
464 if remaining_operations == 0 {
465 return Err(ClusterError::InvalidOperation(
466 "No remaining operations".to_string(),
467 ));
468 }
469
470 let remaining = self.remaining();
471 if remaining.is_zero() {
472 return Err(ClusterError::Timeout("Budget exhausted".to_string()));
473 }
474
475 Ok(remaining / remaining_operations)
476 }
477
478 pub fn record_operation(&self, duration: Duration) {
480 self.inner.operations.fetch_add(1, Ordering::SeqCst);
481 self.inner
482 .consumed_us
483 .fetch_add(duration.as_micros() as u64, Ordering::SeqCst);
484 }
485
486 pub fn operations_completed(&self) -> u64 {
488 self.inner.operations.load(Ordering::SeqCst)
489 }
490
491 pub fn consumed(&self) -> Duration {
493 Duration::from_micros(self.inner.consumed_us.load(Ordering::SeqCst))
494 }
495
496 pub fn to_deadline(&self) -> Deadline {
498 Deadline::new(self.remaining())
499 }
500}
501
502#[derive(Clone)]
504pub struct TimeoutRegistry {
505 managers: Arc<RwLock<HashMap<String, TimeoutManager>>>,
506 default_config: TimeoutConfig,
507}
508
509impl TimeoutRegistry {
510 pub fn new(default_config: TimeoutConfig) -> Self {
512 Self {
513 managers: Arc::new(RwLock::new(HashMap::new())),
514 default_config,
515 }
516 }
517
518 pub fn with_defaults() -> Self {
520 Self::new(TimeoutConfig::default())
521 }
522
523 pub fn get_or_create(&self, key: &str) -> TimeoutManager {
525 let managers = self.managers.read();
526 if let Some(manager) = managers.get(key) {
527 return manager.clone();
528 }
529 drop(managers);
530
531 let mut managers = self.managers.write();
532 managers
533 .entry(key.to_string())
534 .or_insert_with(|| TimeoutManager::new(self.default_config.clone()))
535 .clone()
536 }
537
538 pub fn get(&self, key: &str) -> Option<TimeoutManager> {
540 self.managers.read().get(key).cloned()
541 }
542
543 pub fn register(&self, key: &str, config: TimeoutConfig) -> TimeoutManager {
545 let manager = TimeoutManager::new(config);
546 self.managers
547 .write()
548 .insert(key.to_string(), manager.clone());
549 manager
550 }
551
552 pub fn get_all_stats(&self) -> HashMap<String, TimeoutStats> {
554 self.managers
555 .read()
556 .iter()
557 .map(|(k, v)| (k.clone(), v.get_stats()))
558 .collect()
559 }
560}
561
562impl Default for TimeoutRegistry {
563 fn default() -> Self {
564 Self::with_defaults()
565 }
566}
567
568#[cfg(test)]
569#[allow(clippy::expect_used, clippy::unwrap_used)]
570mod tests {
571 use super::*;
572
573 #[tokio::test]
574 async fn test_timeout_manager_creation() {
575 let tm = TimeoutManager::with_defaults();
576 assert_eq!(tm.timeout(), Duration::from_secs(30));
577 }
578
579 #[tokio::test]
580 async fn test_timeout_success() {
581 let tm = TimeoutManager::with_defaults();
582
583 let result = tm.call(async { 42 }).await;
584 assert!(result.is_ok());
585 assert_eq!(result.ok(), Some(42));
586
587 let stats = tm.get_stats();
588 assert_eq!(stats.total_success, 1);
589 }
590
591 #[tokio::test]
592 async fn test_timeout_exceeded() {
593 let config = TimeoutConfig {
594 default_timeout: Duration::from_millis(10),
595 ..Default::default()
596 };
597 let tm = TimeoutManager::new(config);
598
599 let result = tm
600 .call(async {
601 tokio::time::sleep(Duration::from_millis(50)).await;
602 42
603 })
604 .await;
605
606 assert!(result.is_err());
607
608 let stats = tm.get_stats();
609 assert_eq!(stats.total_timeouts, 1);
610 }
611
612 #[test]
613 fn test_deadline() {
614 let deadline = Deadline::new(Duration::from_secs(10));
615 assert!(!deadline.is_expired());
616 assert!(deadline.remaining() <= Duration::from_secs(10));
617 }
618
619 #[test]
620 fn test_deadline_expired() {
621 let deadline = Deadline::new(Duration::ZERO);
622 assert!(deadline.is_expired());
623 assert_eq!(deadline.remaining(), Duration::ZERO);
624 }
625
626 #[test]
627 fn test_timeout_budget() {
628 let budget = TimeoutBudget::new(Duration::from_secs(10));
629 assert_eq!(budget.total(), Duration::from_secs(10));
630 assert!(!budget.is_exhausted());
631
632 let allocated = budget.allocate(0.5);
633 assert!(allocated.is_ok());
634 }
635
636 #[test]
637 fn test_timeout_budget_even_allocation() {
638 let budget = TimeoutBudget::new(Duration::from_secs(10));
639
640 let allocated = budget.allocate_even(5);
641 assert!(allocated.is_ok());
642 assert!(allocated.ok().map(|d| d.as_secs()).unwrap_or(0) >= 1);
644 }
645
646 #[tokio::test]
647 async fn test_adaptive_timeout() {
648 let config = TimeoutConfig {
649 adaptive: true,
650 adaptive_window_size: 10,
651 default_timeout: Duration::from_secs(30),
652 min_timeout: Duration::from_millis(1),
653 ..Default::default()
654 };
655 let tm = TimeoutManager::new(config);
656
657 for _ in 0..15 {
659 tm.record_success(Duration::from_millis(100));
660 }
661
662 let stats = tm.get_stats();
664 assert!(stats.p50_latency_us > 0);
665 }
666
667 #[test]
668 fn test_timeout_registry() {
669 let registry = TimeoutRegistry::with_defaults();
670
671 let tm1 = registry.get_or_create("service_a");
672 let tm2 = registry.get_or_create("service_a");
673
674 assert_eq!(tm1.timeout(), tm2.timeout());
676 }
677}