1#![allow(clippy::too_many_arguments)]
7#![allow(dead_code)]
8
9use crate::error::{MetricsError, Result};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::collections::{HashMap, VecDeque};
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15
16pub trait DataSource: std::fmt::Debug + Send + Sync {
18 fn id(&self) -> &str;
20
21 fn get_data(&self) -> Result<Value>;
23
24 fn subscribe(&mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Result<String>;
26
27 fn unsubscribe(&mut self, subscription_id: &str) -> Result<()>;
29
30 fn connect(&mut self) -> Result<()>;
32
33 fn disconnect(&mut self) -> Result<()>;
35
36 fn is_connected(&self) -> bool;
38
39 fn config(&self) -> &DataSourceConfig;
41
42 fn update_config(&mut self, config: DataSourceConfig) -> Result<()>;
44
45 fn get_history(&self, start: Instant, end: Instant) -> Result<Vec<(Instant, Value)>>;
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct DataSourceConfig {
52 pub id: String,
54 pub source_type: DataSourceType,
56 pub connection: ConnectionConfig,
58 pub format: DataFormatConfig,
60 pub cache: CacheConfig,
62 pub error_handling: ErrorHandlingConfig,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum DataSourceType {
69 WebSocket,
71 HttpPolling,
73 ServerSentEvents,
75 File,
77 Database,
79 Custom(String),
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ConnectionConfig {
86 pub url: String,
88 pub headers: HashMap<String, String>,
90 pub auth: Option<AuthConfig>,
92 pub retry: RetryConfig,
94 pub pooling: ConnectionPoolConfig,
96 pub timeout: Duration,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AuthConfig {
103 pub auth_type: AuthType,
105 pub credentials: HashMap<String, String>,
107 pub refresh_url: Option<String>,
109 pub expires_in: Option<Duration>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub enum AuthType {
116 Basic,
118 Bearer,
120 ApiKey,
122 OAuth2,
124 Custom(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct RetryConfig {
131 pub max_attempts: u32,
133 pub initial_delay: Duration,
135 pub max_delay: Duration,
137 pub backoff_multiplier: f64,
139 pub jitter: bool,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct ConnectionPoolConfig {
146 pub enabled: bool,
148 pub max_size: u32,
150 pub min_size: u32,
152 pub connection_timeout: Duration,
154 pub idle_timeout: Duration,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct DataFormatConfig {
161 pub format: DataFormat,
163 pub schema: Option<Value>,
165 pub field_mappings: HashMap<String, String>,
167 pub validation: ValidationConfig,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum DataFormat {
174 Json,
176 Csv,
178 Xml,
180 Protobuf,
182 MessagePack,
184 Custom(String),
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ValidationConfig {
191 pub enabled: bool,
193 pub rules: Vec<ValidationRule>,
195 pub on_failure: ValidationAction,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct ValidationRule {
202 pub field: String,
204 pub rule_type: ValidationRuleType,
206 pub parameters: HashMap<String, Value>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub enum ValidationRuleType {
213 Required,
215 Type(String),
217 Range { min: Option<f64>, max: Option<f64> },
219 Pattern(String),
221 Custom(String),
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub enum ValidationAction {
228 Reject,
230 Warn,
232 DefaultValues,
234 Custom(String),
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CacheConfig {
241 pub enabled: bool,
243 pub size: usize,
245 pub ttl: Duration,
247 pub strategy: CacheStrategy,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253pub enum CacheStrategy {
254 LRU,
256 FIFO,
258 TTL,
260 Custom(String),
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct ErrorHandlingConfig {
267 pub retry_on_error: bool,
269 pub circuit_breaker: Option<CircuitBreakerConfig>,
271 pub fallback_source: Option<String>,
273 pub notify_on_error: bool,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct CircuitBreakerConfig {
280 pub failure_threshold: u32,
282 pub success_threshold: u32,
284 pub timeout: Duration,
286 pub half_open_timeout: Duration,
288}
289
290#[derive(Debug, Clone)]
292pub struct DataUpdate {
293 pub source_id: String,
295 pub timestamp: Instant,
297 pub data: Value,
299 pub change_type: ChangeType,
301 pub affected_fields: Vec<String>,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
307pub enum ChangeType {
308 Insert,
310 Update,
312 Delete,
314 Replace,
316 Refresh,
318}
319
320pub struct DataSourceManager {
322 sources: Arc<Mutex<HashMap<String, Box<dyn DataSource>>>>,
324 subscriptions: Arc<Mutex<HashMap<String, Vec<Box<dyn Fn(DataUpdate) + Send + Sync>>>>>,
326 change_detector: Arc<Mutex<ChangeDetector>>,
328}
329
330#[derive(Debug)]
332pub struct ChangeDetector {
333 config: ChangeDetectionConfig,
335 previous_states: HashMap<String, Value>,
337 history: VecDeque<DataUpdate>,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct ChangeDetectionConfig {
344 pub enabled: bool,
346 pub strategy: ChangeDetectionStrategy,
348 pub depth: u32,
350 pub ignore_fields: Vec<String>,
352 pub notification: ChangeNotificationConfig,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize)]
358pub enum ChangeDetectionStrategy {
359 Deep,
361 Shallow,
363 Hash,
365 Timestamp,
367 Custom(String),
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct ChangeNotificationConfig {
374 pub batch_notifications: bool,
376 pub batch_size: usize,
378 pub batch_timeout: Duration,
380 pub filters: Vec<NotificationFilter>,
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct NotificationFilter {
387 pub filter_type: FilterType,
389 pub parameters: HashMap<String, Value>,
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub enum FilterType {
396 Field(String),
398 Value(Value),
400 ChangeType(ChangeType),
402 Custom(String),
404}
405
406impl DataSourceManager {
407 pub fn new() -> Self {
409 Self {
410 sources: Arc::new(Mutex::new(HashMap::new())),
411 subscriptions: Arc::new(Mutex::new(HashMap::new())),
412 change_detector: Arc::new(Mutex::new(ChangeDetector::new())),
413 }
414 }
415
416 pub fn register_source(&self, source: Box<dyn DataSource>) -> Result<()> {
418 let id = source.id().to_string();
419 self.sources
420 .lock()
421 .expect("Operation failed")
422 .insert(id, source);
423 Ok(())
424 }
425
426 pub fn unregister_source(&self, source_id: &str) -> Result<()> {
428 self.sources
429 .lock()
430 .expect("Operation failed")
431 .remove(source_id);
432 self.subscriptions
433 .lock()
434 .expect("Operation failed")
435 .remove(source_id);
436 Ok(())
437 }
438
439 pub fn get_source(&self, source_id: &str) -> Option<String> {
441 self.sources
443 .lock()
444 .expect("Operation failed")
445 .get(source_id)
446 .map(|_| source_id.to_string())
447 }
448
449 pub fn subscribe<F>(&self, source_id: &str, callback: F) -> Result<String>
451 where
452 F: Fn(DataUpdate) + Send + Sync + 'static,
453 {
454 let subscription_id = format!("{}_{}", source_id, scirs2_core::random::random::<u64>());
455 self.subscriptions
456 .lock()
457 .expect("Operation failed")
458 .entry(source_id.to_string())
459 .or_default()
460 .push(Box::new(callback));
461 Ok(subscription_id)
462 }
463}
464
465impl ChangeDetector {
466 pub fn new() -> Self {
468 Self {
469 config: ChangeDetectionConfig::default(),
470 previous_states: HashMap::new(),
471 history: VecDeque::new(),
472 }
473 }
474
475 pub fn detect_changes(&mut self, source_id: &str, data: &Value) -> Vec<DataUpdate> {
477 let mut updates = Vec::new();
479
480 if let Some(previous) = self.previous_states.get(source_id) {
481 if previous != data {
482 updates.push(DataUpdate {
483 source_id: source_id.to_string(),
484 timestamp: Instant::now(),
485 data: data.clone(),
486 change_type: ChangeType::Update,
487 affected_fields: vec!["*".to_string()],
488 });
489 }
490 } else {
491 updates.push(DataUpdate {
492 source_id: source_id.to_string(),
493 timestamp: Instant::now(),
494 data: data.clone(),
495 change_type: ChangeType::Insert,
496 affected_fields: vec!["*".to_string()],
497 });
498 }
499
500 self.previous_states
501 .insert(source_id.to_string(), data.clone());
502 updates
503 }
504}
505
506impl Default for ChangeDetectionConfig {
507 fn default() -> Self {
508 Self {
509 enabled: true,
510 strategy: ChangeDetectionStrategy::Deep,
511 depth: 10,
512 ignore_fields: Vec::new(),
513 notification: ChangeNotificationConfig::default(),
514 }
515 }
516}
517
518impl Default for ChangeNotificationConfig {
519 fn default() -> Self {
520 Self {
521 batch_notifications: false,
522 batch_size: 10,
523 batch_timeout: Duration::from_millis(100),
524 filters: Vec::new(),
525 }
526 }
527}
528
529impl Default for DataSourceConfig {
530 fn default() -> Self {
531 Self {
532 id: "default".to_string(),
533 source_type: DataSourceType::WebSocket,
534 connection: ConnectionConfig::default(),
535 format: DataFormatConfig::default(),
536 cache: CacheConfig::default(),
537 error_handling: ErrorHandlingConfig::default(),
538 }
539 }
540}
541
542impl Default for ConnectionConfig {
543 fn default() -> Self {
544 Self {
545 url: "ws://localhost:8080".to_string(),
546 headers: HashMap::new(),
547 auth: None,
548 retry: RetryConfig::default(),
549 pooling: ConnectionPoolConfig::default(),
550 timeout: Duration::from_secs(30),
551 }
552 }
553}
554
555impl Default for RetryConfig {
556 fn default() -> Self {
557 Self {
558 max_attempts: 3,
559 initial_delay: Duration::from_millis(100),
560 max_delay: Duration::from_secs(30),
561 backoff_multiplier: 2.0,
562 jitter: true,
563 }
564 }
565}
566
567impl Default for ConnectionPoolConfig {
568 fn default() -> Self {
569 Self {
570 enabled: false,
571 max_size: 10,
572 min_size: 1,
573 connection_timeout: Duration::from_secs(10),
574 idle_timeout: Duration::from_secs(300),
575 }
576 }
577}
578
579impl Default for DataFormatConfig {
580 fn default() -> Self {
581 Self {
582 format: DataFormat::Json,
583 schema: None,
584 field_mappings: HashMap::new(),
585 validation: ValidationConfig::default(),
586 }
587 }
588}
589
590impl Default for ValidationConfig {
591 fn default() -> Self {
592 Self {
593 enabled: false,
594 rules: Vec::new(),
595 on_failure: ValidationAction::Warn,
596 }
597 }
598}
599
600impl Default for CacheConfig {
601 fn default() -> Self {
602 Self {
603 enabled: true,
604 size: 1000,
605 ttl: Duration::from_secs(300),
606 strategy: CacheStrategy::LRU,
607 }
608 }
609}
610
611impl Default for ErrorHandlingConfig {
612 fn default() -> Self {
613 Self {
614 retry_on_error: true,
615 circuit_breaker: None,
616 fallback_source: None,
617 notify_on_error: true,
618 }
619 }
620}