1use crate::OxirsError;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone)]
23pub struct ProductionError {
24 pub error: OxirsError,
26 pub context: ErrorContext,
28 pub timestamp: std::time::SystemTime,
30 pub severity: ErrorSeverity,
32 pub retryable: bool,
34}
35
36#[derive(Debug, Clone)]
38pub struct ErrorContext {
39 pub operation: String,
41 pub fields: HashMap<String, String>,
43 pub trace: Option<String>,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum ErrorSeverity {
50 Debug,
52 Info,
54 Warning,
56 Error,
58 Critical,
60}
61
62impl ProductionError {
63 pub fn new(error: OxirsError, operation: impl Into<String>) -> Self {
65 Self {
66 error,
67 context: ErrorContext {
68 operation: operation.into(),
69 fields: HashMap::new(),
70 trace: None,
71 },
72 timestamp: std::time::SystemTime::now(),
73 severity: ErrorSeverity::Error,
74 retryable: false,
75 }
76 }
77
78 pub fn with_context(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
80 self.context.fields.insert(key.into(), value.into());
81 self
82 }
83
84 pub fn with_severity(mut self, severity: ErrorSeverity) -> Self {
86 self.severity = severity;
87 self
88 }
89
90 pub fn retryable(mut self) -> Self {
92 self.retryable = true;
93 self
94 }
95
96 pub fn detailed_message(&self) -> String {
98 let mut msg = format!(
99 "[{:?}] {} in operation '{}'",
100 self.severity, self.error, self.context.operation
101 );
102
103 if !self.context.fields.is_empty() {
104 msg.push_str("\nContext:");
105 for (key, value) in &self.context.fields {
106 msg.push_str(&format!("\n {key}: {value}"));
107 }
108 }
109
110 if self.retryable {
111 msg.push_str("\n(Operation is retryable)");
112 }
113
114 msg
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum HealthStatus {
121 Healthy,
123 Degraded,
125 Unhealthy,
127 Unknown,
129}
130
131#[derive(Debug, Clone)]
133pub struct HealthCheck {
134 pub component: String,
136 pub status: HealthStatus,
138 pub message: String,
140 pub timestamp: Instant,
142 pub response_time: Duration,
144 pub metrics: HashMap<String, f64>,
146}
147
148impl HealthCheck {
149 pub fn healthy(component: impl Into<String>, message: impl Into<String>) -> Self {
151 Self {
152 component: component.into(),
153 status: HealthStatus::Healthy,
154 message: message.into(),
155 timestamp: Instant::now(),
156 response_time: Duration::from_micros(0),
157 metrics: HashMap::new(),
158 }
159 }
160
161 pub fn degraded(component: impl Into<String>, message: impl Into<String>) -> Self {
163 Self {
164 component: component.into(),
165 status: HealthStatus::Degraded,
166 message: message.into(),
167 timestamp: Instant::now(),
168 response_time: Duration::from_micros(0),
169 metrics: HashMap::new(),
170 }
171 }
172
173 pub fn unhealthy(component: impl Into<String>, message: impl Into<String>) -> Self {
175 Self {
176 component: component.into(),
177 status: HealthStatus::Unhealthy,
178 message: message.into(),
179 timestamp: Instant::now(),
180 response_time: Duration::from_micros(0),
181 metrics: HashMap::new(),
182 }
183 }
184
185 pub fn with_metric(mut self, name: impl Into<String>, value: f64) -> Self {
187 self.metrics.insert(name.into(), value);
188 self
189 }
190
191 pub fn with_response_time(mut self, duration: Duration) -> Self {
193 self.response_time = duration;
194 self
195 }
196}
197
198pub struct CircuitBreaker {
200 state: Arc<RwLock<CircuitState>>,
202 failures: AtomicUsize,
204 successes: AtomicUsize,
206 config: CircuitBreakerConfig,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212enum CircuitState {
213 Closed,
215 Open,
217 HalfOpen,
219}
220
221#[derive(Debug, Clone)]
223pub struct CircuitBreakerConfig {
224 pub failure_threshold: usize,
226 pub success_threshold: usize,
228 pub timeout: Duration,
230 pub window: Duration,
232}
233
234impl Default for CircuitBreakerConfig {
235 fn default() -> Self {
236 Self {
237 failure_threshold: 5,
238 success_threshold: 2,
239 timeout: Duration::from_secs(60),
240 window: Duration::from_secs(10),
241 }
242 }
243}
244
245impl CircuitBreaker {
246 pub fn new(config: CircuitBreakerConfig) -> Self {
248 Self {
249 state: Arc::new(RwLock::new(CircuitState::Closed)),
250 failures: AtomicUsize::new(0),
251 successes: AtomicUsize::new(0),
252 config,
253 }
254 }
255
256 pub fn allow_request(&self) -> bool {
258 let state = *self.state.read();
259 match state {
260 CircuitState::Closed => true,
261 CircuitState::Open => false,
262 CircuitState::HalfOpen => true,
263 }
264 }
265
266 pub fn record_success(&self) {
268 let successes = self.successes.fetch_add(1, Ordering::Relaxed) + 1;
269 self.failures.store(0, Ordering::Relaxed);
270
271 let state = *self.state.read();
272 if state == CircuitState::HalfOpen && successes >= self.config.success_threshold {
273 *self.state.write() = CircuitState::Closed;
274 self.successes.store(0, Ordering::Relaxed);
275 }
276 }
277
278 pub fn record_failure(&self) {
280 let failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
281
282 if failures >= self.config.failure_threshold {
283 *self.state.write() = CircuitState::Open;
284 }
287 }
288
289 pub fn state(&self) -> String {
291 format!("{:?}", *self.state.read())
292 }
293
294 pub fn stats(&self) -> CircuitBreakerStats {
296 CircuitBreakerStats {
297 state: format!("{:?}", *self.state.read()),
298 failures: self.failures.load(Ordering::Relaxed),
299 successes: self.successes.load(Ordering::Relaxed),
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
306pub struct CircuitBreakerStats {
307 pub state: String,
308 pub failures: usize,
309 pub successes: usize,
310}
311
312pub struct PerformanceMonitor {
314 latencies: RwLock<HashMap<String, Vec<Duration>>>,
316 counts: RwLock<HashMap<String, AtomicU64>>,
318 errors: RwLock<HashMap<String, AtomicU64>>,
320 start_time: Instant,
322}
323
324impl PerformanceMonitor {
325 pub fn new() -> Self {
327 Self {
328 latencies: RwLock::new(HashMap::new()),
329 counts: RwLock::new(HashMap::new()),
330 errors: RwLock::new(HashMap::new()),
331 start_time: Instant::now(),
332 }
333 }
334
335 pub fn record_operation(&self, operation: &str, duration: Duration, success: bool) {
337 {
339 let mut latencies = self.latencies.write();
340 latencies
341 .entry(operation.to_string())
342 .or_default()
343 .push(duration);
344 }
345
346 {
348 let mut counts = self.counts.write();
349 counts
350 .entry(operation.to_string())
351 .or_insert_with(|| AtomicU64::new(0))
352 .fetch_add(1, Ordering::Relaxed);
353 }
354
355 if !success {
357 let mut errors = self.errors.write();
358 errors
359 .entry(operation.to_string())
360 .or_insert_with(|| AtomicU64::new(0))
361 .fetch_add(1, Ordering::Relaxed);
362 }
363 }
364
365 pub fn stats(&self, operation: &str) -> Option<OperationStats> {
367 let latencies = self.latencies.read();
368 let counts = self.counts.read();
369 let errors = self.errors.read();
370
371 let latency_vec = latencies.get(operation)?;
372 let count = counts.get(operation)?.load(Ordering::Relaxed);
373 let error_count = errors
374 .get(operation)
375 .map_or(0, |e| e.load(Ordering::Relaxed));
376
377 if latency_vec.is_empty() {
378 return None;
379 }
380
381 let mut sorted_latencies = latency_vec.clone();
383 sorted_latencies.sort();
384
385 let total: Duration = sorted_latencies.iter().sum();
386 let avg = total / sorted_latencies.len() as u32;
387
388 let p50 = sorted_latencies[sorted_latencies.len() / 2];
389 let p95 = sorted_latencies[sorted_latencies.len() * 95 / 100];
390 let p99 = sorted_latencies[sorted_latencies.len() * 99 / 100];
391 let min = *sorted_latencies
392 .first()
393 .expect("collection validated to be non-empty");
394 let max = *sorted_latencies
395 .last()
396 .expect("collection validated to be non-empty");
397
398 Some(OperationStats {
399 operation: operation.to_string(),
400 count,
401 error_count,
402 avg_latency: avg,
403 p50_latency: p50,
404 p95_latency: p95,
405 p99_latency: p99,
406 min_latency: min,
407 max_latency: max,
408 })
409 }
410
411 pub fn all_stats(&self) -> Vec<OperationStats> {
413 let operations: Vec<String> = self.counts.read().keys().cloned().collect();
414 operations.iter().filter_map(|op| self.stats(op)).collect()
415 }
416
417 pub fn uptime(&self) -> Duration {
419 self.start_time.elapsed()
420 }
421}
422
423impl Default for PerformanceMonitor {
424 fn default() -> Self {
425 Self::new()
426 }
427}
428
429#[derive(Debug, Clone)]
431pub struct OperationStats {
432 pub operation: String,
433 pub count: u64,
434 pub error_count: u64,
435 pub avg_latency: Duration,
436 pub p50_latency: Duration,
437 pub p95_latency: Duration,
438 pub p99_latency: Duration,
439 pub min_latency: Duration,
440 pub max_latency: Duration,
441}
442
443impl OperationStats {
444 pub fn error_rate(&self) -> f64 {
446 if self.count == 0 {
447 0.0
448 } else {
449 (self.error_count as f64 / self.count as f64) * 100.0
450 }
451 }
452
453 pub fn throughput(&self, duration: Duration) -> f64 {
455 if duration.as_secs_f64() == 0.0 {
456 0.0
457 } else {
458 self.count as f64 / duration.as_secs_f64()
459 }
460 }
461}
462
463pub struct ResourceQuota {
465 max_memory: AtomicUsize,
467 current_memory: AtomicUsize,
469 max_rate: AtomicU64,
471 operation_count: AtomicU64,
473 window_start: RwLock<Instant>,
475 enforced: AtomicBool,
477}
478
479impl ResourceQuota {
480 pub fn new(max_memory: usize, max_rate: u64) -> Self {
482 Self {
483 max_memory: AtomicUsize::new(max_memory),
484 current_memory: AtomicUsize::new(0),
485 max_rate: AtomicU64::new(max_rate),
486 operation_count: AtomicU64::new(0),
487 window_start: RwLock::new(Instant::now()),
488 enforced: AtomicBool::new(true),
489 }
490 }
491
492 pub fn check_memory(&self, bytes: usize) -> bool {
494 if !self.enforced.load(Ordering::Relaxed) {
495 return true;
496 }
497
498 let current = self.current_memory.load(Ordering::Relaxed);
499 let max = self.max_memory.load(Ordering::Relaxed);
500 current + bytes <= max
501 }
502
503 pub fn allocate_memory(&self, bytes: usize) -> Result<(), String> {
505 if !self.check_memory(bytes) {
506 return Err(format!("Memory quota exceeded: requested {bytes} bytes"));
507 }
508
509 self.current_memory.fetch_add(bytes, Ordering::Relaxed);
510 Ok(())
511 }
512
513 pub fn free_memory(&self, bytes: usize) {
515 self.current_memory.fetch_sub(bytes, Ordering::Relaxed);
516 }
517
518 pub fn check_rate(&self) -> bool {
520 if !self.enforced.load(Ordering::Relaxed) {
521 return true;
522 }
523
524 let now = Instant::now();
525 let window_start = *self.window_start.read();
526
527 if now.duration_since(window_start) >= Duration::from_secs(1) {
529 *self.window_start.write() = now;
530 self.operation_count.store(0, Ordering::Relaxed);
531 return true;
532 }
533
534 let count = self.operation_count.load(Ordering::Relaxed);
535 let max = self.max_rate.load(Ordering::Relaxed);
536 count < max
537 }
538
539 pub fn record_operation(&self) -> Result<(), String> {
541 if !self.check_rate() {
542 return Err("Rate limit exceeded".to_string());
543 }
544
545 self.operation_count.fetch_add(1, Ordering::Relaxed);
546 Ok(())
547 }
548
549 pub fn usage(&self) -> QuotaUsage {
551 QuotaUsage {
552 memory_used: self.current_memory.load(Ordering::Relaxed),
553 memory_max: self.max_memory.load(Ordering::Relaxed),
554 operations_count: self.operation_count.load(Ordering::Relaxed),
555 operations_max: self.max_rate.load(Ordering::Relaxed),
556 }
557 }
558
559 pub fn set_enforced(&self, enforced: bool) {
561 self.enforced.store(enforced, Ordering::Relaxed);
562 }
563}
564
565#[derive(Debug, Clone)]
567pub struct QuotaUsage {
568 pub memory_used: usize,
569 pub memory_max: usize,
570 pub operations_count: u64,
571 pub operations_max: u64,
572}
573
574impl QuotaUsage {
575 pub fn memory_percent(&self) -> f64 {
577 if self.memory_max == 0 {
578 0.0
579 } else {
580 (self.memory_used as f64 / self.memory_max as f64) * 100.0
581 }
582 }
583
584 pub fn rate_percent(&self) -> f64 {
586 if self.operations_max == 0 {
587 0.0
588 } else {
589 (self.operations_count as f64 / self.operations_max as f64) * 100.0
590 }
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 #[test]
599 fn test_production_error() {
600 let error = OxirsError::Parse("Test error".to_string());
601 let prod_error = ProductionError::new(error, "parse_operation")
602 .with_context("file", "test.ttl")
603 .with_context("line", "42")
604 .with_severity(ErrorSeverity::Error)
605 .retryable();
606
607 assert_eq!(prod_error.context.operation, "parse_operation");
608 assert_eq!(
609 prod_error.context.fields.get("file"),
610 Some(&"test.ttl".to_string())
611 );
612 assert!(prod_error.retryable);
613 assert_eq!(prod_error.severity, ErrorSeverity::Error);
614
615 let message = prod_error.detailed_message();
616 assert!(message.contains("parse_operation"));
617 assert!(message.contains("file: test.ttl"));
618 }
619
620 #[test]
621 fn test_health_check() {
622 let health = HealthCheck::healthy("database", "All systems operational")
623 .with_metric("response_time_ms", 5.2)
624 .with_metric("connections", 10.0)
625 .with_response_time(Duration::from_millis(5));
626
627 assert_eq!(health.status, HealthStatus::Healthy);
628 assert_eq!(health.component, "database");
629 assert_eq!(health.metrics.get("response_time_ms"), Some(&5.2));
630 }
631
632 #[test]
633 fn test_circuit_breaker() {
634 let config = CircuitBreakerConfig {
635 failure_threshold: 3,
636 success_threshold: 2,
637 ..Default::default()
638 };
639 let breaker = CircuitBreaker::new(config);
640
641 assert!(breaker.allow_request());
643
644 breaker.record_failure();
646 breaker.record_failure();
647 assert!(breaker.allow_request()); breaker.record_failure();
650 let stats = breaker.stats();
654 assert_eq!(stats.failures, 3);
655 }
656
657 #[test]
658 fn test_performance_monitor() {
659 let monitor = PerformanceMonitor::new();
660
661 monitor.record_operation("query", Duration::from_millis(10), true);
663 monitor.record_operation("query", Duration::from_millis(15), true);
664 monitor.record_operation("query", Duration::from_millis(20), true);
665 monitor.record_operation("query", Duration::from_millis(25), false);
666
667 let stats = monitor.stats("query").expect("operation should succeed");
668 assert_eq!(stats.count, 4);
669 assert_eq!(stats.error_count, 1);
670 assert_eq!(stats.error_rate(), 25.0);
671 }
672
673 #[test]
674 fn test_resource_quota() {
675 let quota = ResourceQuota::new(1024, 100);
676
677 assert!(quota.check_memory(512));
679 assert!(quota.allocate_memory(512).is_ok());
680 assert!(quota.check_memory(512));
681 assert!(!quota.check_memory(513));
682
683 quota.free_memory(256);
684 assert!(quota.check_memory(768));
685
686 for _ in 0..100 {
688 assert!(quota.record_operation().is_ok());
689 }
690 assert!(quota.record_operation().is_err());
692
693 let usage = quota.usage();
694 assert_eq!(usage.memory_used, 256);
695 assert_eq!(usage.operations_count, 100);
696 }
697}