1use serde::{Deserialize, Serialize};
8use sklears_core::error::{Result as SklResult, SklearsError};
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, SystemTime};
12use uuid::Uuid;
13
14use crate::fault_core::Priority;
15
16use super::event_system::{CircuitBreakerEvent, CircuitBreakerEventType};
17
18pub struct CircuitBreakerAnalytics {
20 processors: HashMap<String, Box<dyn AnalyticsProcessor + Send + Sync>>,
22 data_store: Arc<AnalyticsDataStore>,
24 scheduler: Arc<AnalyticsScheduler>,
26 config: AnalyticsConfig,
28}
29
30impl std::fmt::Debug for CircuitBreakerAnalytics {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("CircuitBreakerAnalytics")
33 .field(
34 "processors",
35 &format!("<{} processors>", self.processors.len()),
36 )
37 .field("data_store", &self.data_store)
38 .field("scheduler", &self.scheduler)
39 .field("config", &self.config)
40 .finish()
41 }
42}
43
44pub trait AnalyticsProcessor: Send + Sync {
46 fn process(&self, data: &[CircuitBreakerEvent]) -> AnalyticsResult;
48
49 fn name(&self) -> &str;
51
52 fn config(&self) -> HashMap<String, String>;
54}
55
56#[derive(Debug, Clone)]
58pub struct AnalyticsResult {
59 pub id: String,
61 pub analysis_type: String,
63 pub timestamp: SystemTime,
65 pub insights: Vec<AnalyticsInsight>,
67 pub metrics: HashMap<String, f64>,
69 pub recommendations: Vec<AnalyticsRecommendation>,
71}
72
73#[derive(Debug, Clone)]
75pub struct AnalyticsInsight {
76 pub insight_type: String,
78 pub description: String,
80 pub confidence: f64,
82 pub evidence: Vec<String>,
84}
85
86#[derive(Debug, Clone)]
88pub struct AnalyticsRecommendation {
89 pub recommendation_type: String,
91 pub description: String,
93 pub priority: Priority,
95 pub effort: ImplementationEffort,
97 pub expected_impact: f64,
99}
100
101#[derive(Debug, Clone, PartialEq)]
103pub enum ImplementationEffort {
104 Low,
106 Medium,
108 High,
110 VeryHigh,
112}
113
114pub struct AnalyticsDataStore {
116 backend: Box<dyn DataStorageBackend + Send + Sync>,
118 indexes: HashMap<String, DataIndex>,
120 config: DataStorageConfig,
122}
123
124impl std::fmt::Debug for AnalyticsDataStore {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("AnalyticsDataStore")
127 .field("backend", &"<storage backend>")
128 .field("indexes", &self.indexes)
129 .field("config", &self.config)
130 .finish()
131 }
132}
133
134pub trait DataStorageBackend: Send + Sync {
136 fn store(&self, data: &[CircuitBreakerEvent]) -> SklResult<()>;
138
139 fn query(&self, query: &DataQuery) -> SklResult<Vec<CircuitBreakerEvent>>;
141
142 fn delete(&self, criteria: &DeleteCriteria) -> SklResult<u64>;
144
145 fn statistics(&self) -> StorageStatistics;
147}
148
149#[derive(Debug, Clone)]
151pub struct DataQuery {
152 pub time_range: Option<(SystemTime, SystemTime)>,
154 pub event_types: Vec<CircuitBreakerEventType>,
156 pub circuit_ids: Vec<String>,
158 pub filters: Vec<QueryFilter>,
160 pub limit: Option<usize>,
162 pub order_by: Vec<OrderBy>,
164}
165
166#[derive(Debug, Clone)]
168pub struct QueryFilter {
169 pub field: String,
171 pub operator: QueryOperator,
173 pub value: String,
175}
176
177#[derive(Debug, Clone, PartialEq)]
179pub enum QueryOperator {
180 Equals,
182 NotEquals,
184 GreaterThan,
186 LessThan,
188 Contains,
190 In,
192 NotIn,
194}
195
196#[derive(Debug, Clone)]
198pub struct OrderBy {
199 pub field: String,
200 pub direction: OrderDirection,
201}
202
203#[derive(Debug, Clone, PartialEq)]
205pub enum OrderDirection {
206 Ascending,
208 Descending,
210}
211
212#[derive(Debug, Clone)]
214pub struct DeleteCriteria {
215 pub time_range: Option<(SystemTime, SystemTime)>,
217 pub event_types: Vec<CircuitBreakerEventType>,
219 pub filters: Vec<QueryFilter>,
221}
222
223#[derive(Debug, Clone)]
225pub struct StorageStatistics {
226 pub total_events: u64,
228 pub storage_size: u64,
230 pub oldest_event: Option<SystemTime>,
232 pub newest_event: Option<SystemTime>,
234}
235
236#[derive(Debug)]
238pub struct DataIndex {
239 pub name: String,
241 pub fields: Vec<String>,
243 pub index_type: IndexType,
245 pub statistics: IndexStatistics,
247}
248
249#[derive(Debug, Clone, PartialEq)]
251pub enum IndexType {
252 BTree,
254 Hash,
256 Composite,
258 FullText,
260}
261
262#[derive(Debug, Clone)]
264pub struct IndexStatistics {
265 pub size: u64,
267 pub entries: u64,
269 pub hit_rate: f64,
271 pub last_update: SystemTime,
273}
274
275#[derive(Debug, Clone)]
277pub struct DataStorageConfig {
278 pub backend_type: String,
280 pub retention_period: Duration,
282 pub compression: bool,
284 pub encryption: bool,
286 pub backup: BackupConfig,
288}
289
290#[derive(Debug, Clone)]
292pub struct BackupConfig {
293 pub enabled: bool,
295 pub frequency: Duration,
297 pub retention: Duration,
299 pub destination: String,
301}
302
303#[derive(Debug)]
305pub struct AnalyticsScheduler {
306 jobs: Arc<RwLock<Vec<AnalyticsJob>>>,
308 executor: Arc<JobExecutor>,
310 config: SchedulerConfig,
312}
313
314#[derive(Debug, Clone)]
316pub struct AnalyticsJob {
317 pub id: String,
319 pub name: String,
321 pub schedule: JobSchedule,
323 pub processor: String,
325 pub config: HashMap<String, String>,
327 pub status: JobStatus,
329}
330
331#[derive(Debug, Clone)]
333pub struct JobSchedule {
334 pub schedule_type: ScheduleType,
336 pub parameters: HashMap<String, String>,
338 pub next_execution: SystemTime,
340}
341
342#[derive(Debug, Clone, PartialEq)]
344pub enum ScheduleType {
345 Interval(Duration),
347 Cron(String),
349 OneTime(SystemTime),
351 Triggered(String),
353}
354
355#[derive(Debug, Clone, PartialEq)]
357pub enum JobStatus {
358 Pending,
360 Running,
362 Completed,
364 Failed,
366 Cancelled,
368}
369
370#[derive(Debug)]
372pub struct JobExecutor {
373 thread_pool: Arc<ThreadPool>,
375 metrics: Arc<Mutex<ExecutionMetrics>>,
377}
378
379#[derive(Debug)]
381pub struct ThreadPool;
382
383#[derive(Debug, Default)]
385pub struct ExecutionMetrics {
386 pub total_jobs: u64,
388 pub successful_jobs: u64,
390 pub failed_jobs: u64,
392 pub avg_execution_time: Duration,
394}
395
396#[derive(Debug, Clone)]
398pub struct SchedulerConfig {
399 pub enabled: bool,
401 pub thread_pool_size: usize,
403 pub job_timeout: Duration,
405 pub max_concurrent_jobs: usize,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct AnalyticsConfig {
412 pub enabled: bool,
414 pub collection_interval: Duration,
416 pub analysis_frequency: Duration,
418 pub retention_period: Duration,
420 pub real_time: bool,
422}
423
424#[derive(Debug)]
426pub struct PerformanceAnalyticsProcessor {
427 config: PerformanceAnalyticsConfig,
429}
430
431#[derive(Debug, Clone)]
433pub struct PerformanceAnalyticsConfig {
434 pub thresholds: HashMap<String, f64>,
436 pub analysis_window: Duration,
438 pub min_sample_size: usize,
440}
441
442#[derive(Debug)]
444pub struct PatternAnalyticsProcessor {
445 config: PatternAnalyticsConfig,
447}
448
449#[derive(Debug, Clone)]
451pub struct PatternAnalyticsConfig {
452 pub algorithms: Vec<String>,
454 pub min_confidence: f64,
456 pub window_size: usize,
458}
459
460#[derive(Debug)]
462pub struct AnomalyAnalyticsProcessor {
463 config: AnomalyAnalyticsConfig,
465}
466
467#[derive(Debug, Clone)]
469pub struct AnomalyAnalyticsConfig {
470 pub detection_threshold: f64,
472 pub statistical_methods: Vec<String>,
474 pub sensitivity: f64,
476}
477
478#[derive(Debug)]
480pub struct InMemoryDataStorageBackend {
481 data: Arc<RwLock<Vec<CircuitBreakerEvent>>>,
483}
484
485impl Default for CircuitBreakerAnalytics {
486 fn default() -> Self {
487 Self::new()
488 }
489}
490
491impl CircuitBreakerAnalytics {
492 #[must_use]
494 pub fn new() -> Self {
495 Self {
496 processors: HashMap::new(),
497 data_store: Arc::new(AnalyticsDataStore {
498 backend: Box::new(InMemoryDataStorageBackend::new()),
499 indexes: HashMap::new(),
500 config: DataStorageConfig {
501 backend_type: "memory".to_string(),
502 retention_period: Duration::from_secs(604_800),
503 compression: false,
504 encryption: false,
505 backup: BackupConfig {
506 enabled: false,
507 frequency: Duration::from_secs(86400),
508 retention: Duration::from_secs(2_592_000),
509 destination: "/tmp/backups".to_string(),
510 },
511 },
512 }),
513 scheduler: Arc::new(AnalyticsScheduler {
514 jobs: Arc::new(RwLock::new(Vec::new())),
515 executor: Arc::new(JobExecutor {
516 thread_pool: Arc::new(ThreadPool),
517 metrics: Arc::new(Mutex::new(ExecutionMetrics::default())),
518 }),
519 config: SchedulerConfig {
520 enabled: true,
521 thread_pool_size: 4,
522 job_timeout: Duration::from_secs(300),
523 max_concurrent_jobs: 10,
524 },
525 }),
526 config: AnalyticsConfig {
527 enabled: true,
528 collection_interval: Duration::from_secs(60),
529 analysis_frequency: Duration::from_secs(300),
530 retention_period: Duration::from_secs(2_592_000),
531 real_time: false,
532 },
533 }
534 }
535
536 #[must_use]
538 pub fn with_config(config: AnalyticsConfig) -> Self {
539 let mut analytics = Self::new();
540 analytics.config = config;
541 analytics
542 }
543
544 pub fn register_processor(
546 &mut self,
547 name: String,
548 processor: Box<dyn AnalyticsProcessor + Send + Sync>,
549 ) {
550 self.processors.insert(name, processor);
551 }
552
553 pub fn store_data(&self, events: &[CircuitBreakerEvent]) -> SklResult<()> {
555 self.data_store.backend.store(events)
556 }
557
558 pub fn query_data(&self, query: &DataQuery) -> SklResult<Vec<CircuitBreakerEvent>> {
560 self.data_store.backend.query(query)
561 }
562
563 pub fn analyze(
565 &self,
566 processor_name: &str,
567 data: &[CircuitBreakerEvent],
568 ) -> SklResult<AnalyticsResult> {
569 let processor = self.processors.get(processor_name).ok_or_else(|| {
570 SklearsError::Configuration(format!("Unknown processor: {processor_name}"))
571 })?;
572
573 Ok(processor.process(data))
574 }
575
576 pub fn schedule_job(&self, job: AnalyticsJob) -> SklResult<()> {
578 let mut jobs = self.scheduler.jobs.write().unwrap();
579 jobs.push(job);
580 Ok(())
581 }
582
583 pub fn get_insights(
585 &self,
586 circuit_id: Option<&str>,
587 time_range: Option<(SystemTime, SystemTime)>,
588 ) -> SklResult<Vec<AnalyticsInsight>> {
589 let query = DataQuery {
590 time_range,
591 event_types: vec![],
592 circuit_ids: circuit_id
593 .map(|id| vec![id.to_string()])
594 .unwrap_or_default(),
595 filters: vec![],
596 limit: Some(1000),
597 order_by: vec![OrderBy {
598 field: "timestamp".to_string(),
599 direction: OrderDirection::Descending,
600 }],
601 };
602
603 let events = self.query_data(&query)?;
604 let mut insights = Vec::new();
605
606 for (name, processor) in &self.processors {
608 let result = processor.process(&events);
609 insights.extend(result.insights);
610 }
611
612 Ok(insights)
613 }
614
615 pub fn get_recommendations(
617 &self,
618 circuit_id: Option<&str>,
619 ) -> SklResult<Vec<AnalyticsRecommendation>> {
620 let insights = self.get_insights(circuit_id, None)?;
621 let mut recommendations = Vec::new();
622
623 for insight in insights {
625 if insight.confidence > 0.8 {
626 let recommendation = self.generate_recommendation_from_insight(&insight);
627 recommendations.push(recommendation);
628 }
629 }
630
631 Ok(recommendations)
632 }
633
634 #[must_use]
636 pub fn get_storage_statistics(&self) -> StorageStatistics {
637 self.data_store.backend.statistics()
638 }
639
640 pub fn cleanup_data(&self, retention_period: Duration) -> SklResult<u64> {
642 let cutoff_time = SystemTime::now() - retention_period;
643 let criteria = DeleteCriteria {
644 time_range: Some((SystemTime::UNIX_EPOCH, cutoff_time)),
645 event_types: vec![],
646 filters: vec![],
647 };
648
649 self.data_store.backend.delete(&criteria)
650 }
651
652 fn generate_recommendation_from_insight(
654 &self,
655 insight: &AnalyticsInsight,
656 ) -> AnalyticsRecommendation {
657 match insight.insight_type.as_str() {
659 "high_failure_rate" => AnalyticsRecommendation {
660 recommendation_type: "threshold_adjustment".to_string(),
661 description: "Consider lowering failure threshold due to high failure rate"
662 .to_string(),
663 priority: Priority::High,
664 effort: ImplementationEffort::Low,
665 expected_impact: 0.8,
666 },
667 "slow_recovery" => AnalyticsRecommendation {
668 recommendation_type: "recovery_optimization".to_string(),
669 description: "Optimize recovery strategy to reduce recovery time".to_string(),
670 priority: Priority::Medium,
671 effort: ImplementationEffort::Medium,
672 expected_impact: 0.6,
673 },
674 _ => AnalyticsRecommendation {
675 recommendation_type: "general".to_string(),
676 description: "Review circuit breaker configuration".to_string(),
677 priority: Priority::Low,
678 effort: ImplementationEffort::Low,
679 expected_impact: 0.3,
680 },
681 }
682 }
683}
684
685impl Default for PerformanceAnalyticsProcessor {
686 fn default() -> Self {
687 Self::new()
688 }
689}
690
691impl PerformanceAnalyticsProcessor {
692 #[must_use]
694 pub fn new() -> Self {
695 Self {
696 config: PerformanceAnalyticsConfig::default(),
697 }
698 }
699
700 #[must_use]
702 pub fn with_config(config: PerformanceAnalyticsConfig) -> Self {
703 Self { config }
704 }
705}
706
707impl AnalyticsProcessor for PerformanceAnalyticsProcessor {
708 fn process(&self, data: &[CircuitBreakerEvent]) -> AnalyticsResult {
709 let mut insights = Vec::new();
710 let mut metrics = HashMap::new();
711
712 let total_requests = data.len();
714 let failed_requests = data
715 .iter()
716 .filter(|event| event.event_type == CircuitBreakerEventType::RequestFailed)
717 .count();
718
719 let failure_rate = if total_requests > 0 {
720 failed_requests as f64 / total_requests as f64
721 } else {
722 0.0
723 };
724
725 metrics.insert("failure_rate".to_string(), failure_rate);
726 metrics.insert("total_requests".to_string(), total_requests as f64);
727
728 if failure_rate > 0.1 {
730 insights.push(AnalyticsInsight {
731 insight_type: "high_failure_rate".to_string(),
732 description: format!("High failure rate detected: {:.2}%", failure_rate * 100.0),
733 confidence: 0.9,
734 evidence: vec![format!(
735 "{} failures out of {} requests",
736 failed_requests, total_requests
737 )],
738 });
739 }
740
741 AnalyticsResult {
743 id: Uuid::new_v4().to_string(),
744 analysis_type: "performance".to_string(),
745 timestamp: SystemTime::now(),
746 insights,
747 metrics,
748 recommendations: vec![],
749 }
750 }
751
752 fn name(&self) -> &'static str {
753 "performance"
754 }
755
756 fn config(&self) -> HashMap<String, String> {
757 let mut config = HashMap::new();
758 config.insert("type".to_string(), "performance".to_string());
759 config.insert(
760 "min_sample_size".to_string(),
761 self.config.min_sample_size.to_string(),
762 );
763 config
764 }
765}
766
767impl Default for PatternAnalyticsProcessor {
768 fn default() -> Self {
769 Self::new()
770 }
771}
772
773impl PatternAnalyticsProcessor {
774 #[must_use]
776 pub fn new() -> Self {
777 Self {
778 config: PatternAnalyticsConfig::default(),
779 }
780 }
781}
782
783impl AnalyticsProcessor for PatternAnalyticsProcessor {
784 fn process(&self, data: &[CircuitBreakerEvent]) -> AnalyticsResult {
785 let insights = Vec::new(); let metrics = HashMap::new();
787
788 AnalyticsResult {
790 id: Uuid::new_v4().to_string(),
791 analysis_type: "pattern".to_string(),
792 timestamp: SystemTime::now(),
793 insights,
794 metrics,
795 recommendations: vec![],
796 }
797 }
798
799 fn name(&self) -> &'static str {
800 "pattern"
801 }
802
803 fn config(&self) -> HashMap<String, String> {
804 let mut config = HashMap::new();
805 config.insert("type".to_string(), "pattern".to_string());
806 config.insert(
807 "min_confidence".to_string(),
808 self.config.min_confidence.to_string(),
809 );
810 config
811 }
812}
813
814impl Default for InMemoryDataStorageBackend {
815 fn default() -> Self {
816 Self::new()
817 }
818}
819
820impl InMemoryDataStorageBackend {
821 #[must_use]
823 pub fn new() -> Self {
824 Self {
825 data: Arc::new(RwLock::new(Vec::new())),
826 }
827 }
828}
829
830impl DataStorageBackend for InMemoryDataStorageBackend {
831 fn store(&self, events: &[CircuitBreakerEvent]) -> SklResult<()> {
832 let mut data = self.data.write().unwrap();
833 data.extend_from_slice(events);
834 Ok(())
835 }
836
837 fn query(&self, _query: &DataQuery) -> SklResult<Vec<CircuitBreakerEvent>> {
838 let data = self.data.read().unwrap();
839 Ok(data.clone())
840 }
841
842 fn delete(&self, _criteria: &DeleteCriteria) -> SklResult<u64> {
843 Ok(0)
844 }
845
846 fn statistics(&self) -> StorageStatistics {
847 let data = self.data.read().unwrap();
848 StorageStatistics {
850 total_events: data.len() as u64,
851 storage_size: 0,
852 oldest_event: data.first().map(|e| e.timestamp),
853 newest_event: data.last().map(|e| e.timestamp),
854 }
855 }
856}
857
858impl Default for AnalyticsConfig {
859 fn default() -> Self {
860 Self {
861 enabled: true,
862 collection_interval: Duration::from_secs(60),
863 analysis_frequency: Duration::from_secs(300),
864 retention_period: Duration::from_secs(2_592_000), real_time: false,
866 }
867 }
868}
869
870impl Default for PerformanceAnalyticsConfig {
871 fn default() -> Self {
872 Self {
873 thresholds: {
874 let mut thresholds = HashMap::new();
875 thresholds.insert("failure_rate".to_string(), 0.1);
876 thresholds.insert("response_time".to_string(), 1000.0);
877 thresholds
878 },
879 analysis_window: Duration::from_secs(300),
880 min_sample_size: 100,
881 }
882 }
883}
884
885impl Default for PatternAnalyticsConfig {
886 fn default() -> Self {
887 Self {
888 algorithms: vec!["frequency".to_string(), "temporal".to_string()],
889 min_confidence: 0.8,
890 window_size: 1000,
891 }
892 }
893}
894
895impl Default for SchedulerConfig {
896 fn default() -> Self {
897 Self {
898 enabled: true,
899 thread_pool_size: 4,
900 job_timeout: Duration::from_secs(300),
901 max_concurrent_jobs: 10,
902 }
903 }
904}
905
906impl Default for BackupConfig {
907 fn default() -> Self {
908 Self {
909 enabled: false,
910 frequency: Duration::from_secs(86400), retention: Duration::from_secs(2_592_000), destination: "/tmp/backups".to_string(),
913 }
914 }
915}