scirs2_metrics/visualization/advanced_interactive/
data_sources.rs

1//! Data source management for real-time visualization
2//!
3//! This module provides data source abstractions for streaming real-time data
4//! to interactive dashboard widgets.
5
6#![allow(clippy::too_many_arguments)]
7#![allow(dead_code)]
8
9use crate::error::{MetricsError, Result};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::collections::{HashMap, VecDeque};
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15
16/// Data source trait for real-time data
17pub trait DataSource: std::fmt::Debug + Send + Sync {
18    /// Get data source ID
19    fn id(&self) -> &str;
20
21    /// Get latest data
22    fn get_data(&self) -> Result<Value>;
23
24    /// Subscribe to data updates
25    fn subscribe(&mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Result<String>;
26
27    /// Unsubscribe from updates
28    fn unsubscribe(&mut self, subscription_id: &str) -> Result<()>;
29
30    /// Connect to data source
31    fn connect(&mut self) -> Result<()>;
32
33    /// Disconnect from data source
34    fn disconnect(&mut self) -> Result<()>;
35
36    /// Check if connected
37    fn is_connected(&self) -> bool;
38
39    /// Get configuration
40    fn config(&self) -> &DataSourceConfig;
41
42    /// Update configuration
43    fn update_config(&mut self, config: DataSourceConfig) -> Result<()>;
44
45    /// Get data history
46    fn get_history(&self, start: Instant, end: Instant) -> Result<Vec<(Instant, Value)>>;
47}
48
49/// Data source configuration
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct DataSourceConfig {
52    /// Data source ID
53    pub id: String,
54    /// Data source type
55    pub source_type: DataSourceType,
56    /// Connection configuration
57    pub connection: ConnectionConfig,
58    /// Data format configuration
59    pub format: DataFormatConfig,
60    /// Caching configuration
61    pub cache: CacheConfig,
62    /// Error handling configuration
63    pub error_handling: ErrorHandlingConfig,
64}
65
66/// Data source type enumeration
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum DataSourceType {
69    /// WebSocket data source
70    WebSocket,
71    /// HTTP polling data source
72    HttpPolling,
73    /// Server-sent events
74    ServerSentEvents,
75    /// File-based data source
76    File,
77    /// Database connection
78    Database,
79    /// Custom data source
80    Custom(String),
81}
82
83/// Connection configuration
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ConnectionConfig {
86    /// Connection URL
87    pub url: String,
88    /// Connection headers
89    pub headers: HashMap<String, String>,
90    /// Authentication configuration
91    pub auth: Option<AuthConfig>,
92    /// Retry configuration
93    pub retry: RetryConfig,
94    /// Connection pooling
95    pub pooling: ConnectionPoolConfig,
96    /// Timeout settings
97    pub timeout: Duration,
98}
99
100/// Authentication configuration
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AuthConfig {
103    /// Authentication type
104    pub auth_type: AuthType,
105    /// Credentials
106    pub credentials: HashMap<String, String>,
107    /// Token refresh URL
108    pub refresh_url: Option<String>,
109    /// Token expiration
110    pub expires_in: Option<Duration>,
111}
112
113/// Authentication type enumeration
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub enum AuthType {
116    /// Basic authentication
117    Basic,
118    /// Bearer token
119    Bearer,
120    /// API key
121    ApiKey,
122    /// OAuth 2.0
123    OAuth2,
124    /// Custom authentication
125    Custom(String),
126}
127
128/// Retry configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct RetryConfig {
131    /// Maximum retry attempts
132    pub max_attempts: u32,
133    /// Initial retry delay
134    pub initial_delay: Duration,
135    /// Maximum retry delay
136    pub max_delay: Duration,
137    /// Backoff multiplier
138    pub backoff_multiplier: f64,
139    /// Jitter enabled
140    pub jitter: bool,
141}
142
143/// Connection pooling configuration
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct ConnectionPoolConfig {
146    /// Enable connection pooling
147    pub enabled: bool,
148    /// Maximum pool size
149    pub max_size: u32,
150    /// Minimum pool size
151    pub min_size: u32,
152    /// Connection timeout
153    pub connection_timeout: Duration,
154    /// Idle timeout
155    pub idle_timeout: Duration,
156}
157
158/// Data format configuration
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct DataFormatConfig {
161    /// Data format
162    pub format: DataFormat,
163    /// Schema configuration
164    pub schema: Option<Value>,
165    /// Field mappings
166    pub field_mappings: HashMap<String, String>,
167    /// Data validation
168    pub validation: ValidationConfig,
169}
170
171/// Data format enumeration
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum DataFormat {
174    /// JSON format
175    Json,
176    /// CSV format
177    Csv,
178    /// XML format
179    Xml,
180    /// Protocol Buffers
181    Protobuf,
182    /// MessagePack
183    MessagePack,
184    /// Custom format
185    Custom(String),
186}
187
188/// Validation configuration
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ValidationConfig {
191    /// Enable validation
192    pub enabled: bool,
193    /// Validation rules
194    pub rules: Vec<ValidationRule>,
195    /// Action on validation failure
196    pub on_failure: ValidationAction,
197}
198
199/// Validation rule
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct ValidationRule {
202    /// Field path
203    pub field: String,
204    /// Rule type
205    pub rule_type: ValidationRuleType,
206    /// Rule parameters
207    pub parameters: HashMap<String, Value>,
208}
209
210/// Validation rule type enumeration
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub enum ValidationRuleType {
213    /// Required field
214    Required,
215    /// Type check
216    Type(String),
217    /// Range check
218    Range { min: Option<f64>, max: Option<f64> },
219    /// Pattern match
220    Pattern(String),
221    /// Custom validation
222    Custom(String),
223}
224
225/// Validation action enumeration
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub enum ValidationAction {
228    /// Reject invalid data
229    Reject,
230    /// Log warning and proceed
231    Warn,
232    /// Apply default values
233    DefaultValues,
234    /// Custom action
235    Custom(String),
236}
237
238/// Cache configuration
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CacheConfig {
241    /// Enable caching
242    pub enabled: bool,
243    /// Cache size (number of entries)
244    pub size: usize,
245    /// Cache TTL
246    pub ttl: Duration,
247    /// Cache strategy
248    pub strategy: CacheStrategy,
249}
250
251/// Cache strategy enumeration
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub enum CacheStrategy {
254    /// Least Recently Used
255    LRU,
256    /// First In, First Out
257    FIFO,
258    /// Time-based expiration
259    TTL,
260    /// Custom strategy
261    Custom(String),
262}
263
264/// Error handling configuration
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct ErrorHandlingConfig {
267    /// Retry on error
268    pub retry_on_error: bool,
269    /// Circuit breaker configuration
270    pub circuit_breaker: Option<CircuitBreakerConfig>,
271    /// Fallback data source
272    pub fallback_source: Option<String>,
273    /// Error notification
274    pub notify_on_error: bool,
275}
276
277/// Circuit breaker configuration
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct CircuitBreakerConfig {
280    /// Failure threshold
281    pub failure_threshold: u32,
282    /// Success threshold
283    pub success_threshold: u32,
284    /// Timeout duration
285    pub timeout: Duration,
286    /// Half-open retry timeout
287    pub half_open_timeout: Duration,
288}
289
290/// Data update notification
291#[derive(Debug, Clone)]
292pub struct DataUpdate {
293    /// Source ID
294    pub source_id: String,
295    /// Timestamp
296    pub timestamp: Instant,
297    /// Updated data
298    pub data: Value,
299    /// Change type
300    pub change_type: ChangeType,
301    /// Affected fields
302    pub affected_fields: Vec<String>,
303}
304
305/// Change type enumeration
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub enum ChangeType {
308    /// New data added
309    Insert,
310    /// Existing data updated
311    Update,
312    /// Data deleted
313    Delete,
314    /// Data replaced
315    Replace,
316    /// Full refresh
317    Refresh,
318}
319
320/// Data source manager
321pub struct DataSourceManager {
322    /// Registered data sources
323    sources: Arc<Mutex<HashMap<String, Box<dyn DataSource>>>>,
324    /// Subscriptions
325    subscriptions: Arc<Mutex<HashMap<String, Vec<Box<dyn Fn(DataUpdate) + Send + Sync>>>>>,
326    /// Change detector
327    change_detector: Arc<Mutex<ChangeDetector>>,
328}
329
330/// Change detector for data updates
331#[derive(Debug)]
332pub struct ChangeDetector {
333    /// Change detection configuration
334    config: ChangeDetectionConfig,
335    /// Previous data states
336    previous_states: HashMap<String, Value>,
337    /// Change history
338    history: VecDeque<DataUpdate>,
339}
340
341/// Change detection configuration
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct ChangeDetectionConfig {
344    /// Enable change detection
345    pub enabled: bool,
346    /// Detection strategy
347    pub strategy: ChangeDetectionStrategy,
348    /// Comparison depth
349    pub depth: u32,
350    /// Ignore fields
351    pub ignore_fields: Vec<String>,
352    /// Notification configuration
353    pub notification: ChangeNotificationConfig,
354}
355
356/// Change detection strategy enumeration
357#[derive(Debug, Clone, Serialize, Deserialize)]
358pub enum ChangeDetectionStrategy {
359    /// Deep comparison
360    Deep,
361    /// Shallow comparison
362    Shallow,
363    /// Hash-based comparison
364    Hash,
365    /// Timestamp-based
366    Timestamp,
367    /// Custom strategy
368    Custom(String),
369}
370
371/// Change notification configuration
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct ChangeNotificationConfig {
374    /// Batch notifications
375    pub batch_notifications: bool,
376    /// Batch size
377    pub batch_size: usize,
378    /// Batch timeout
379    pub batch_timeout: Duration,
380    /// Notification filters
381    pub filters: Vec<NotificationFilter>,
382}
383
384/// Notification filter
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct NotificationFilter {
387    /// Filter type
388    pub filter_type: FilterType,
389    /// Filter parameters
390    pub parameters: HashMap<String, Value>,
391}
392
393/// Filter type enumeration
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub enum FilterType {
396    /// Field-based filter
397    Field(String),
398    /// Value-based filter
399    Value(Value),
400    /// Change type filter
401    ChangeType(ChangeType),
402    /// Custom filter
403    Custom(String),
404}
405
406impl DataSourceManager {
407    /// Create new data source manager
408    pub fn new() -> Self {
409        Self {
410            sources: Arc::new(Mutex::new(HashMap::new())),
411            subscriptions: Arc::new(Mutex::new(HashMap::new())),
412            change_detector: Arc::new(Mutex::new(ChangeDetector::new())),
413        }
414    }
415
416    /// Register data source
417    pub fn register_source(&self, source: Box<dyn DataSource>) -> Result<()> {
418        let id = source.id().to_string();
419        self.sources
420            .lock()
421            .expect("Operation failed")
422            .insert(id, source);
423        Ok(())
424    }
425
426    /// Unregister data source
427    pub fn unregister_source(&self, source_id: &str) -> Result<()> {
428        self.sources
429            .lock()
430            .expect("Operation failed")
431            .remove(source_id);
432        self.subscriptions
433            .lock()
434            .expect("Operation failed")
435            .remove(source_id);
436        Ok(())
437    }
438
439    /// Get data source
440    pub fn get_source(&self, source_id: &str) -> Option<String> {
441        // Simplified - would return actual source reference
442        self.sources
443            .lock()
444            .expect("Operation failed")
445            .get(source_id)
446            .map(|_| source_id.to_string())
447    }
448
449    /// Subscribe to data updates
450    pub fn subscribe<F>(&self, source_id: &str, callback: F) -> Result<String>
451    where
452        F: Fn(DataUpdate) + Send + Sync + 'static,
453    {
454        let subscription_id = format!("{}_{}", source_id, scirs2_core::random::random::<u64>());
455        self.subscriptions
456            .lock()
457            .expect("Operation failed")
458            .entry(source_id.to_string())
459            .or_default()
460            .push(Box::new(callback));
461        Ok(subscription_id)
462    }
463}
464
465impl ChangeDetector {
466    /// Create new change detector
467    pub fn new() -> Self {
468        Self {
469            config: ChangeDetectionConfig::default(),
470            previous_states: HashMap::new(),
471            history: VecDeque::new(),
472        }
473    }
474
475    /// Detect changes in data
476    pub fn detect_changes(&mut self, source_id: &str, data: &Value) -> Vec<DataUpdate> {
477        // Simplified change detection logic
478        let mut updates = Vec::new();
479
480        if let Some(previous) = self.previous_states.get(source_id) {
481            if previous != data {
482                updates.push(DataUpdate {
483                    source_id: source_id.to_string(),
484                    timestamp: Instant::now(),
485                    data: data.clone(),
486                    change_type: ChangeType::Update,
487                    affected_fields: vec!["*".to_string()],
488                });
489            }
490        } else {
491            updates.push(DataUpdate {
492                source_id: source_id.to_string(),
493                timestamp: Instant::now(),
494                data: data.clone(),
495                change_type: ChangeType::Insert,
496                affected_fields: vec!["*".to_string()],
497            });
498        }
499
500        self.previous_states
501            .insert(source_id.to_string(), data.clone());
502        updates
503    }
504}
505
506impl Default for ChangeDetectionConfig {
507    fn default() -> Self {
508        Self {
509            enabled: true,
510            strategy: ChangeDetectionStrategy::Deep,
511            depth: 10,
512            ignore_fields: Vec::new(),
513            notification: ChangeNotificationConfig::default(),
514        }
515    }
516}
517
518impl Default for ChangeNotificationConfig {
519    fn default() -> Self {
520        Self {
521            batch_notifications: false,
522            batch_size: 10,
523            batch_timeout: Duration::from_millis(100),
524            filters: Vec::new(),
525        }
526    }
527}
528
529impl Default for DataSourceConfig {
530    fn default() -> Self {
531        Self {
532            id: "default".to_string(),
533            source_type: DataSourceType::WebSocket,
534            connection: ConnectionConfig::default(),
535            format: DataFormatConfig::default(),
536            cache: CacheConfig::default(),
537            error_handling: ErrorHandlingConfig::default(),
538        }
539    }
540}
541
542impl Default for ConnectionConfig {
543    fn default() -> Self {
544        Self {
545            url: "ws://localhost:8080".to_string(),
546            headers: HashMap::new(),
547            auth: None,
548            retry: RetryConfig::default(),
549            pooling: ConnectionPoolConfig::default(),
550            timeout: Duration::from_secs(30),
551        }
552    }
553}
554
555impl Default for RetryConfig {
556    fn default() -> Self {
557        Self {
558            max_attempts: 3,
559            initial_delay: Duration::from_millis(100),
560            max_delay: Duration::from_secs(30),
561            backoff_multiplier: 2.0,
562            jitter: true,
563        }
564    }
565}
566
567impl Default for ConnectionPoolConfig {
568    fn default() -> Self {
569        Self {
570            enabled: false,
571            max_size: 10,
572            min_size: 1,
573            connection_timeout: Duration::from_secs(10),
574            idle_timeout: Duration::from_secs(300),
575        }
576    }
577}
578
579impl Default for DataFormatConfig {
580    fn default() -> Self {
581        Self {
582            format: DataFormat::Json,
583            schema: None,
584            field_mappings: HashMap::new(),
585            validation: ValidationConfig::default(),
586        }
587    }
588}
589
590impl Default for ValidationConfig {
591    fn default() -> Self {
592        Self {
593            enabled: false,
594            rules: Vec::new(),
595            on_failure: ValidationAction::Warn,
596        }
597    }
598}
599
600impl Default for CacheConfig {
601    fn default() -> Self {
602        Self {
603            enabled: true,
604            size: 1000,
605            ttl: Duration::from_secs(300),
606            strategy: CacheStrategy::LRU,
607        }
608    }
609}
610
611impl Default for ErrorHandlingConfig {
612    fn default() -> Self {
613        Self {
614            retry_on_error: true,
615            circuit_breaker: None,
616            fallback_source: None,
617            notify_on_error: true,
618        }
619    }
620}