Skip to main content

oxirs_core/
production.rs

1//! Production Hardening Features for OxiRS Core
2//!
3//! Beta.1 Feature: Production-Ready Hardening
4//!
5//! This module provides production-grade features for reliability, observability,
6//! and operational excellence:
7//! - Enhanced error handling with context
8//! - Health checking and diagnostics
9//! - Performance monitoring and metrics
10//! - Circuit breakers and failsafes
11//! - Graceful degradation
12//! - Resource limits and quotas
13
14use crate::OxirsError;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21/// Enhanced error type with contextual information for production debugging
22#[derive(Debug, Clone)]
23pub struct ProductionError {
24    /// The underlying error
25    pub error: OxirsError,
26    /// Error context (operation, state, etc.)
27    pub context: ErrorContext,
28    /// Timestamp when error occurred
29    pub timestamp: std::time::SystemTime,
30    /// Severity level
31    pub severity: ErrorSeverity,
32    /// Whether the operation can be retried
33    pub retryable: bool,
34}
35
36/// Error context information
37#[derive(Debug, Clone)]
38pub struct ErrorContext {
39    /// Operation that failed
40    pub operation: String,
41    /// Additional context fields
42    pub fields: HashMap<String, String>,
43    /// Stack trace if available
44    pub trace: Option<String>,
45}
46
47/// Error severity levels
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum ErrorSeverity {
50    /// Debug information
51    Debug,
52    /// Informational
53    Info,
54    /// Warning - degraded but functional
55    Warning,
56    /// Error - operation failed
57    Error,
58    /// Critical - system integrity at risk
59    Critical,
60}
61
62impl ProductionError {
63    /// Create a new production error
64    pub fn new(error: OxirsError, operation: impl Into<String>) -> Self {
65        Self {
66            error,
67            context: ErrorContext {
68                operation: operation.into(),
69                fields: HashMap::new(),
70                trace: None,
71            },
72            timestamp: std::time::SystemTime::now(),
73            severity: ErrorSeverity::Error,
74            retryable: false,
75        }
76    }
77
78    /// Add context field
79    pub fn with_context(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
80        self.context.fields.insert(key.into(), value.into());
81        self
82    }
83
84    /// Set severity level
85    pub fn with_severity(mut self, severity: ErrorSeverity) -> Self {
86        self.severity = severity;
87        self
88    }
89
90    /// Mark as retryable
91    pub fn retryable(mut self) -> Self {
92        self.retryable = true;
93        self
94    }
95
96    /// Get formatted error message with all context
97    pub fn detailed_message(&self) -> String {
98        let mut msg = format!(
99            "[{:?}] {} in operation '{}'",
100            self.severity, self.error, self.context.operation
101        );
102
103        if !self.context.fields.is_empty() {
104            msg.push_str("\nContext:");
105            for (key, value) in &self.context.fields {
106                msg.push_str(&format!("\n  {key}: {value}"));
107            }
108        }
109
110        if self.retryable {
111            msg.push_str("\n(Operation is retryable)");
112        }
113
114        msg
115    }
116}
117
118/// Health status of a component
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum HealthStatus {
121    /// Component is healthy
122    Healthy,
123    /// Component is degraded but functional
124    Degraded,
125    /// Component is unhealthy
126    Unhealthy,
127    /// Component status unknown
128    Unknown,
129}
130
131/// Health check result
132#[derive(Debug, Clone)]
133pub struct HealthCheck {
134    /// Component name
135    pub component: String,
136    /// Health status
137    pub status: HealthStatus,
138    /// Status message
139    pub message: String,
140    /// Check timestamp
141    pub timestamp: Instant,
142    /// Response time
143    pub response_time: Duration,
144    /// Additional metrics
145    pub metrics: HashMap<String, f64>,
146}
147
148impl HealthCheck {
149    /// Create a healthy check
150    pub fn healthy(component: impl Into<String>, message: impl Into<String>) -> Self {
151        Self {
152            component: component.into(),
153            status: HealthStatus::Healthy,
154            message: message.into(),
155            timestamp: Instant::now(),
156            response_time: Duration::from_micros(0),
157            metrics: HashMap::new(),
158        }
159    }
160
161    /// Create a degraded check
162    pub fn degraded(component: impl Into<String>, message: impl Into<String>) -> Self {
163        Self {
164            component: component.into(),
165            status: HealthStatus::Degraded,
166            message: message.into(),
167            timestamp: Instant::now(),
168            response_time: Duration::from_micros(0),
169            metrics: HashMap::new(),
170        }
171    }
172
173    /// Create an unhealthy check
174    pub fn unhealthy(component: impl Into<String>, message: impl Into<String>) -> Self {
175        Self {
176            component: component.into(),
177            status: HealthStatus::Unhealthy,
178            message: message.into(),
179            timestamp: Instant::now(),
180            response_time: Duration::from_micros(0),
181            metrics: HashMap::new(),
182        }
183    }
184
185    /// Add a metric
186    pub fn with_metric(mut self, name: impl Into<String>, value: f64) -> Self {
187        self.metrics.insert(name.into(), value);
188        self
189    }
190
191    /// Set response time
192    pub fn with_response_time(mut self, duration: Duration) -> Self {
193        self.response_time = duration;
194        self
195    }
196}
197
198/// Circuit breaker for fault tolerance
199pub struct CircuitBreaker {
200    /// Circuit state
201    state: Arc<RwLock<CircuitState>>,
202    /// Failure count
203    failures: AtomicUsize,
204    /// Success count
205    successes: AtomicUsize,
206    /// Configuration
207    config: CircuitBreakerConfig,
208}
209
210/// Circuit breaker state
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212enum CircuitState {
213    /// Circuit is closed (normal operation)
214    Closed,
215    /// Circuit is open (failing fast)
216    Open,
217    /// Circuit is half-open (testing recovery)
218    HalfOpen,
219}
220
221/// Circuit breaker configuration
222#[derive(Debug, Clone)]
223pub struct CircuitBreakerConfig {
224    /// Failure threshold to open circuit
225    pub failure_threshold: usize,
226    /// Success threshold to close circuit
227    pub success_threshold: usize,
228    /// Timeout before trying half-open
229    pub timeout: Duration,
230    /// Window for counting failures
231    pub window: Duration,
232}
233
234impl Default for CircuitBreakerConfig {
235    fn default() -> Self {
236        Self {
237            failure_threshold: 5,
238            success_threshold: 2,
239            timeout: Duration::from_secs(60),
240            window: Duration::from_secs(10),
241        }
242    }
243}
244
245impl CircuitBreaker {
246    /// Create a new circuit breaker
247    pub fn new(config: CircuitBreakerConfig) -> Self {
248        Self {
249            state: Arc::new(RwLock::new(CircuitState::Closed)),
250            failures: AtomicUsize::new(0),
251            successes: AtomicUsize::new(0),
252            config,
253        }
254    }
255
256    /// Check if operation should be allowed
257    pub fn allow_request(&self) -> bool {
258        let state = *self.state.read();
259        match state {
260            CircuitState::Closed => true,
261            CircuitState::Open => false,
262            CircuitState::HalfOpen => true,
263        }
264    }
265
266    /// Record a successful operation
267    pub fn record_success(&self) {
268        let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
269        self.failures.store(0, Ordering::Relaxed);
270
271        let state = *self.state.read();
272        if state == CircuitState::HalfOpen && successes >= self.config.success_threshold {
273            *self.state.write() = CircuitState::Closed;
274            self.successes.store(0, Ordering::Relaxed);
275        }
276    }
277
278    /// Record a failed operation
279    pub fn record_failure(&self) {
280        let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
281
282        if failures >= self.config.failure_threshold {
283            *self.state.write() = CircuitState::Open;
284            // Schedule transition to half-open after timeout
285            // In production, this would use a timer/scheduler
286        }
287    }
288
289    /// Get current state
290    pub fn state(&self) -> String {
291        format!("{:?}", *self.state.read())
292    }
293
294    /// Get statistics
295    pub fn stats(&self) -> CircuitBreakerStats {
296        CircuitBreakerStats {
297            state: format!("{:?}", *self.state.read()),
298            failures: self.failures.load(Ordering::Relaxed),
299            successes: self.successes.load(Ordering::Relaxed),
300        }
301    }
302}
303
304/// Circuit breaker statistics
305#[derive(Debug, Clone)]
306pub struct CircuitBreakerStats {
307    pub state: String,
308    pub failures: usize,
309    pub successes: usize,
310}
311
312/// Performance monitoring and metrics collection
313pub struct PerformanceMonitor {
314    /// Operation latencies
315    latencies: RwLock<HashMap<String, Vec<Duration>>>,
316    /// Operation counts
317    counts: RwLock<HashMap<String, AtomicU64>>,
318    /// Error counts
319    errors: RwLock<HashMap<String, AtomicU64>>,
320    /// Start time
321    start_time: Instant,
322}
323
324impl PerformanceMonitor {
325    /// Create a new performance monitor
326    pub fn new() -> Self {
327        Self {
328            latencies: RwLock::new(HashMap::new()),
329            counts: RwLock::new(HashMap::new()),
330            errors: RwLock::new(HashMap::new()),
331            start_time: Instant::now(),
332        }
333    }
334
335    /// Record an operation execution
336    pub fn record_operation(&self, operation: &str, duration: Duration, success: bool) {
337        // Record latency
338        {
339            let mut latencies = self.latencies.write();
340            latencies
341                .entry(operation.to_string())
342                .or_default()
343                .push(duration);
344        }
345
346        // Increment count
347        {
348            let mut counts = self.counts.write();
349            counts
350                .entry(operation.to_string())
351                .or_insert_with(|| AtomicU64::new(0))
352                .fetch_add(1, Ordering::Relaxed);
353        }
354
355        // Record error if failed
356        if !success {
357            let mut errors = self.errors.write();
358            errors
359                .entry(operation.to_string())
360                .or_insert_with(|| AtomicU64::new(0))
361                .fetch_add(1, Ordering::Relaxed);
362        }
363    }
364
365    /// Get statistics for an operation
366    pub fn stats(&self, operation: &str) -> Option<OperationStats> {
367        let latencies = self.latencies.read();
368        let counts = self.counts.read();
369        let errors = self.errors.read();
370
371        let latency_vec = latencies.get(operation)?;
372        let count = counts.get(operation)?.load(Ordering::Relaxed);
373        let error_count = errors
374            .get(operation)
375            .map_or(0, |e| e.load(Ordering::Relaxed));
376
377        if latency_vec.is_empty() {
378            return None;
379        }
380
381        // Calculate statistics
382        let mut sorted_latencies = latency_vec.clone();
383        sorted_latencies.sort();
384
385        let total: Duration = sorted_latencies.iter().sum();
386        let avg = total / sorted_latencies.len() as u32;
387
388        let p50 = sorted_latencies[sorted_latencies.len() / 2];
389        let p95 = sorted_latencies[sorted_latencies.len() * 95 / 100];
390        let p99 = sorted_latencies[sorted_latencies.len() * 99 / 100];
391        let min = *sorted_latencies
392            .first()
393            .expect("collection validated to be non-empty");
394        let max = *sorted_latencies
395            .last()
396            .expect("collection validated to be non-empty");
397
398        Some(OperationStats {
399            operation: operation.to_string(),
400            count,
401            error_count,
402            avg_latency: avg,
403            p50_latency: p50,
404            p95_latency: p95,
405            p99_latency: p99,
406            min_latency: min,
407            max_latency: max,
408        })
409    }
410
411    /// Get all statistics
412    pub fn all_stats(&self) -> Vec<OperationStats> {
413        let operations: Vec<String> = self.counts.read().keys().cloned().collect();
414        operations.iter().filter_map(|op| self.stats(op)).collect()
415    }
416
417    /// Get uptime
418    pub fn uptime(&self) -> Duration {
419        self.start_time.elapsed()
420    }
421}
422
423impl Default for PerformanceMonitor {
424    fn default() -> Self {
425        Self::new()
426    }
427}
428
429/// Operation statistics
430#[derive(Debug, Clone)]
431pub struct OperationStats {
432    pub operation: String,
433    pub count: u64,
434    pub error_count: u64,
435    pub avg_latency: Duration,
436    pub p50_latency: Duration,
437    pub p95_latency: Duration,
438    pub p99_latency: Duration,
439    pub min_latency: Duration,
440    pub max_latency: Duration,
441}
442
443impl OperationStats {
444    /// Get error rate as percentage
445    pub fn error_rate(&self) -> f64 {
446        if self.count == 0 {
447            0.0
448        } else {
449            (self.error_count as f64 / self.count as f64) * 100.0
450        }
451    }
452
453    /// Get throughput (operations per second)
454    pub fn throughput(&self, duration: Duration) -> f64 {
455        if duration.as_secs_f64() == 0.0 {
456            0.0
457        } else {
458            self.count as f64 / duration.as_secs_f64()
459        }
460    }
461}
462
463/// Resource quota manager
464pub struct ResourceQuota {
465    /// Maximum memory usage in bytes
466    max_memory: AtomicUsize,
467    /// Current memory usage estimate
468    current_memory: AtomicUsize,
469    /// Maximum operation rate (ops/sec)
470    max_rate: AtomicU64,
471    /// Current operation count
472    operation_count: AtomicU64,
473    /// Rate limit window start
474    window_start: RwLock<Instant>,
475    /// Whether quota is enforced
476    enforced: AtomicBool,
477}
478
479impl ResourceQuota {
480    /// Create a new resource quota manager
481    pub fn new(max_memory: usize, max_rate: u64) -> Self {
482        Self {
483            max_memory: AtomicUsize::new(max_memory),
484            current_memory: AtomicUsize::new(0),
485            max_rate: AtomicU64::new(max_rate),
486            operation_count: AtomicU64::new(0),
487            window_start: RwLock::new(Instant::now()),
488            enforced: AtomicBool::new(true),
489        }
490    }
491
492    /// Check if memory quota allows allocation
493    pub fn check_memory(&self, bytes: usize) -> bool {
494        if !self.enforced.load(Ordering::Relaxed) {
495            return true;
496        }
497
498        let current = self.current_memory.load(Ordering::Relaxed);
499        let max = self.max_memory.load(Ordering::Relaxed);
500        current + bytes <= max
501    }
502
503    /// Allocate memory (update quota)
504    pub fn allocate_memory(&self, bytes: usize) -> Result<(), String> {
505        if !self.check_memory(bytes) {
506            return Err(format!("Memory quota exceeded: requested {bytes} bytes"));
507        }
508
509        self.current_memory.fetch_add(bytes, Ordering::Relaxed);
510        Ok(())
511    }
512
513    /// Free memory (update quota)
514    pub fn free_memory(&self, bytes: usize) {
515        self.current_memory.fetch_sub(bytes, Ordering::Relaxed);
516    }
517
518    /// Check if rate limit allows operation
519    pub fn check_rate(&self) -> bool {
520        if !self.enforced.load(Ordering::Relaxed) {
521            return true;
522        }
523
524        let now = Instant::now();
525        let window_start = *self.window_start.read();
526
527        // Reset window if needed
528        if now.duration_since(window_start) >= Duration::from_secs(1) {
529            *self.window_start.write() = now;
530            self.operation_count.store(0, Ordering::Relaxed);
531            return true;
532        }
533
534        let count = self.operation_count.load(Ordering::Relaxed);
535        let max = self.max_rate.load(Ordering::Relaxed);
536        count < max
537    }
538
539    /// Record an operation (update rate limit)
540    pub fn record_operation(&self) -> Result<(), String> {
541        if !self.check_rate() {
542            return Err("Rate limit exceeded".to_string());
543        }
544
545        self.operation_count.fetch_add(1, Ordering::Relaxed);
546        Ok(())
547    }
548
549    /// Get current quota usage
550    pub fn usage(&self) -> QuotaUsage {
551        QuotaUsage {
552            memory_used: self.current_memory.load(Ordering::Relaxed),
553            memory_max: self.max_memory.load(Ordering::Relaxed),
554            operations_count: self.operation_count.load(Ordering::Relaxed),
555            operations_max: self.max_rate.load(Ordering::Relaxed),
556        }
557    }
558
559    /// Enable or disable enforcement
560    pub fn set_enforced(&self, enforced: bool) {
561        self.enforced.store(enforced, Ordering::Relaxed);
562    }
563}
564
565/// Quota usage information
566#[derive(Debug, Clone)]
567pub struct QuotaUsage {
568    pub memory_used: usize,
569    pub memory_max: usize,
570    pub operations_count: u64,
571    pub operations_max: u64,
572}
573
574impl QuotaUsage {
575    /// Get memory usage percentage
576    pub fn memory_percent(&self) -> f64 {
577        if self.memory_max == 0 {
578            0.0
579        } else {
580            (self.memory_used as f64 / self.memory_max as f64) * 100.0
581        }
582    }
583
584    /// Get rate usage percentage
585    pub fn rate_percent(&self) -> f64 {
586        if self.operations_max == 0 {
587            0.0
588        } else {
589            (self.operations_count as f64 / self.operations_max as f64) * 100.0
590        }
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597
598    #[test]
599    fn test_production_error() {
600        let error = OxirsError::Parse("Test error".to_string());
601        let prod_error = ProductionError::new(error, "parse_operation")
602            .with_context("file", "test.ttl")
603            .with_context("line", "42")
604            .with_severity(ErrorSeverity::Error)
605            .retryable();
606
607        assert_eq!(prod_error.context.operation, "parse_operation");
608        assert_eq!(
609            prod_error.context.fields.get("file"),
610            Some(&"test.ttl".to_string())
611        );
612        assert!(prod_error.retryable);
613        assert_eq!(prod_error.severity, ErrorSeverity::Error);
614
615        let message = prod_error.detailed_message();
616        assert!(message.contains("parse_operation"));
617        assert!(message.contains("file: test.ttl"));
618    }
619
620    #[test]
621    fn test_health_check() {
622        let health = HealthCheck::healthy("database", "All systems operational")
623            .with_metric("response_time_ms", 5.2)
624            .with_metric("connections", 10.0)
625            .with_response_time(Duration::from_millis(5));
626
627        assert_eq!(health.status, HealthStatus::Healthy);
628        assert_eq!(health.component, "database");
629        assert_eq!(health.metrics.get("response_time_ms"), Some(&5.2));
630    }
631
632    #[test]
633    fn test_circuit_breaker() {
634        let config = CircuitBreakerConfig {
635            failure_threshold: 3,
636            success_threshold: 2,
637            ..Default::default()
638        };
639        let breaker = CircuitBreaker::new(config);
640
641        // Initially closed
642        assert!(breaker.allow_request());
643
644        // Record failures
645        breaker.record_failure();
646        breaker.record_failure();
647        assert!(breaker.allow_request()); // Still closed
648
649        breaker.record_failure();
650        // Should be open now
651        // Note: State transition happens asynchronously in production
652
653        let stats = breaker.stats();
654        assert_eq!(stats.failures, 3);
655    }
656
657    #[test]
658    fn test_performance_monitor() {
659        let monitor = PerformanceMonitor::new();
660
661        // Record some operations
662        monitor.record_operation("query", Duration::from_millis(10), true);
663        monitor.record_operation("query", Duration::from_millis(15), true);
664        monitor.record_operation("query", Duration::from_millis(20), true);
665        monitor.record_operation("query", Duration::from_millis(25), false);
666
667        let stats = monitor.stats("query").expect("operation should succeed");
668        assert_eq!(stats.count, 4);
669        assert_eq!(stats.error_count, 1);
670        assert_eq!(stats.error_rate(), 25.0);
671    }
672
673    #[test]
674    fn test_resource_quota() {
675        let quota = ResourceQuota::new(1024, 100);
676
677        // Test memory quota
678        assert!(quota.check_memory(512));
679        assert!(quota.allocate_memory(512).is_ok());
680        assert!(quota.check_memory(512));
681        assert!(!quota.check_memory(513));
682
683        quota.free_memory(256);
684        assert!(quota.check_memory(768));
685
686        // Test rate limit
687        for _ in 0..100 {
688            assert!(quota.record_operation().is_ok());
689        }
690        // Should hit rate limit
691        assert!(quota.record_operation().is_err());
692
693        let usage = quota.usage();
694        assert_eq!(usage.memory_used, 256);
695        assert_eq!(usage.operations_count, 100);
696    }
697}