Skip to main content

csv_adapter_core/
hardening.rs

1//! Production hardening utilities for core module
2//!
3//! This module provides:
4//! - Bounded queues for rate limiting
5//! - Circuit breakers for failure detection
6//! - Timeout configuration
7//! - Memory limits enforcement
8
9use std::collections::VecDeque;
10use std::time::Duration;
11
12/// Maximum number of items in bounded queues
13pub const MAX_SEAL_REGISTRY_SIZE: usize = 1000;
14
15/// Maximum number of entries in caches
16pub const MAX_CACHE_SIZE: usize = 1000;
17
18/// Maximum number of entries in registries
19pub const MAX_REGISTRY_SIZE: usize = 10000;
20
21/// Default timeout for RPC calls
22pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30);
23
24/// Default timeout for health checks
25pub const DEFAULT_HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(5);
26
27/// Default maximum failures before circuit opens
28pub const DEFAULT_CIRCUIT_MAX_FAILURES: usize = 5;
29
30/// Default reset timeout for circuit breaker
31pub const DEFAULT_CIRCUIT_RESET_TIMEOUT: Duration = Duration::from_secs(60);
32
33/// Bounded queue for enforcing size limits on collections
34///
35/// Prevents unbounded growth of caches, registries, and other
36/// in-memory data structures that could lead to memory exhaustion.
37#[derive(Clone, Debug)]
38pub struct BoundedQueue<T> {
39    queue: VecDeque<T>,
40    max_size: usize,
41}
42
43impl<T> BoundedQueue<T> {
44    /// Create a new bounded queue with the given maximum size
45    pub fn new(max_size: usize) -> Self {
46        Self {
47            queue: VecDeque::new(),
48            max_size,
49        }
50    }
51
52    /// Push an item to the back of the queue
53    ///
54    /// Returns `true` if the item was added, `false` if the queue is full.
55    pub fn push(&mut self, item: T) -> bool {
56        if self.queue.len() >= self.max_size {
57            return false;
58        }
59        self.queue.push_back(item);
60        true
61    }
62
63    /// Pop an item from the front of the queue (FIFO order)
64    pub fn pop(&mut self) -> Option<T> {
65        self.queue.pop_front()
66    }
67
68    /// Returns the number of items in the queue
69    pub fn len(&self) -> usize {
70        self.queue.len()
71    }
72
73    /// Returns `true` if the queue is empty
74    pub fn is_empty(&self) -> bool {
75        self.queue.is_empty()
76    }
77
78    /// Returns `true` if the queue is at maximum capacity
79    pub fn is_full(&self) -> bool {
80        self.queue.len() >= self.max_size
81    }
82}
83
84impl<T> Default for BoundedQueue<T> {
85    fn default() -> Self {
86        Self::new(MAX_SEAL_REGISTRY_SIZE)
87    }
88}
89
90/// Circuit breaker state for managing service availability
91#[derive(Clone, Debug, PartialEq)]
92pub enum CircuitState {
93    /// Normal operation — requests are allowed
94    Closed,
95    /// Failure threshold exceeded — requests are blocked
96    Open,
97    /// Testing recovery — a single request is allowed through
98    HalfOpen,
99}
100
101/// Circuit breaker for failure detection and automatic service isolation
102///
103/// Transitions from `Closed` to `Open` when failures exceed the threshold,
104/// then to `HalfOpen` after a timeout period to test recovery.
105pub struct CircuitBreaker {
106    failure_count: usize,
107    max_failures: usize,
108    last_failure_time: Option<std::time::SystemTime>,
109    reset_timeout: Duration,
110    state: CircuitState,
111}
112
113impl CircuitBreaker {
114    /// Create a new circuit breaker with the given failure threshold and reset timeout
115    pub fn new(max_failures: usize, reset_timeout: Duration) -> Self {
116        Self {
117            failure_count: 0,
118            max_failures,
119            last_failure_time: None,
120            reset_timeout,
121            state: CircuitState::Closed,
122        }
123    }
124
125    /// Record a failure and potentially trip the circuit open
126    pub fn record_failure(&mut self) {
127        self.failure_count += 1;
128        self.last_failure_time = Some(std::time::SystemTime::now());
129
130        if self.failure_count >= self.max_failures {
131            self.state = CircuitState::Open;
132        }
133    }
134
135    /// Record a success, resetting the circuit to closed state
136    pub fn record_success(&mut self) {
137        self.failure_count = 0;
138        self.state = CircuitState::Closed;
139    }
140
141    /// Check whether a request should be allowed through
142    ///
143    /// Returns `true` if the circuit is closed, or if the timeout
144    /// has elapsed and the circuit is transitioning to half-open.
145    pub fn allow_request(&mut self) -> bool {
146        match self.state {
147            CircuitState::Closed => true,
148            CircuitState::Open => {
149                if let Some(last_failure) = self.last_failure_time {
150                    if last_failure.elapsed().unwrap_or_default() > self.reset_timeout {
151                        self.state = CircuitState::HalfOpen;
152                        true
153                    } else {
154                        false
155                    }
156                } else {
157                    self.state = CircuitState::HalfOpen;
158                    true
159                }
160            }
161            CircuitState::HalfOpen => true,
162        }
163    }
164
165    /// Returns the current circuit state
166    pub fn state(&self) -> &CircuitState {
167        &self.state
168    }
169
170    /// Returns the current consecutive failure count
171    pub fn failure_count(&self) -> usize {
172        self.failure_count
173    }
174}
175
176impl Default for CircuitBreaker {
177    fn default() -> Self {
178        Self::new(DEFAULT_CIRCUIT_MAX_FAILURES, DEFAULT_CIRCUIT_RESET_TIMEOUT)
179    }
180}
181
182/// Timeout configuration for RPC calls and health checks
183#[derive(Clone, Debug)]
184pub struct TimeoutConfig {
185    /// Timeout for individual RPC calls
186    pub rpc_call: Duration,
187    /// Timeout for health check requests
188    pub health_check: Duration,
189}
190
191impl Default for TimeoutConfig {
192    fn default() -> Self {
193        Self {
194            rpc_call: DEFAULT_RPC_TIMEOUT,
195            health_check: DEFAULT_HEALTH_CHECK_TIMEOUT,
196        }
197    }
198}
199
200/// Memory limits configuration for caches and registries
201#[derive(Clone, Debug)]
202pub struct MemoryLimits {
203    /// Maximum number of entries in caches
204    pub cache_size: usize,
205    /// Maximum number of entries in registries
206    pub registry_size: usize,
207}
208
209impl Default for MemoryLimits {
210    fn default() -> Self {
211        Self {
212            cache_size: MAX_CACHE_SIZE,
213            registry_size: MAX_REGISTRY_SIZE,
214        }
215    }
216}