#![allow(clippy::too_many_arguments)]
#![allow(dead_code)]
use crate::error::{MetricsError, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub trait DataSource: std::fmt::Debug + Send + Sync {
fn id(&self) -> &str;
fn get_data(&self) -> Result<Value>;
fn subscribe(&mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Result<String>;
fn unsubscribe(&mut self, subscription_id: &str) -> Result<()>;
fn connect(&mut self) -> Result<()>;
fn disconnect(&mut self) -> Result<()>;
fn is_connected(&self) -> bool;
fn config(&self) -> &DataSourceConfig;
fn update_config(&mut self, config: DataSourceConfig) -> Result<()>;
fn get_history(&self, start: Instant, end: Instant) -> Result<Vec<(Instant, Value)>>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSourceConfig {
pub id: String,
pub source_type: DataSourceType,
pub connection: ConnectionConfig,
pub format: DataFormatConfig,
pub cache: CacheConfig,
pub error_handling: ErrorHandlingConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataSourceType {
WebSocket,
HttpPolling,
ServerSentEvents,
File,
Database,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionConfig {
pub url: String,
pub headers: HashMap<String, String>,
pub auth: Option<AuthConfig>,
pub retry: RetryConfig,
pub pooling: ConnectionPoolConfig,
pub timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthConfig {
pub auth_type: AuthType,
pub credentials: HashMap<String, String>,
pub refresh_url: Option<String>,
pub expires_in: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuthType {
Basic,
Bearer,
ApiKey,
OAuth2,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_attempts: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub backoff_multiplier: f64,
pub jitter: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionPoolConfig {
pub enabled: bool,
pub max_size: u32,
pub min_size: u32,
pub connection_timeout: Duration,
pub idle_timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataFormatConfig {
pub format: DataFormat,
pub schema: Option<Value>,
pub field_mappings: HashMap<String, String>,
pub validation: ValidationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataFormat {
Json,
Csv,
Xml,
Protobuf,
MessagePack,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationConfig {
pub enabled: bool,
pub rules: Vec<ValidationRule>,
pub on_failure: ValidationAction,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationRule {
pub field: String,
pub rule_type: ValidationRuleType,
pub parameters: HashMap<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ValidationRuleType {
Required,
Type(String),
Range { min: Option<f64>, max: Option<f64> },
Pattern(String),
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ValidationAction {
Reject,
Warn,
DefaultValues,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheConfig {
pub enabled: bool,
pub size: usize,
pub ttl: Duration,
pub strategy: CacheStrategy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CacheStrategy {
LRU,
FIFO,
TTL,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorHandlingConfig {
pub retry_on_error: bool,
pub circuit_breaker: Option<CircuitBreakerConfig>,
pub fallback_source: Option<String>,
pub notify_on_error: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub success_threshold: u32,
pub timeout: Duration,
pub half_open_timeout: Duration,
}
#[derive(Debug, Clone)]
pub struct DataUpdate {
pub source_id: String,
pub timestamp: Instant,
pub data: Value,
pub change_type: ChangeType,
pub affected_fields: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChangeType {
Insert,
Update,
Delete,
Replace,
Refresh,
}
pub struct DataSourceManager {
sources: Arc<Mutex<HashMap<String, Box<dyn DataSource>>>>,
subscriptions: Arc<Mutex<HashMap<String, Vec<Box<dyn Fn(DataUpdate) + Send + Sync>>>>>,
change_detector: Arc<Mutex<ChangeDetector>>,
}
#[derive(Debug)]
pub struct ChangeDetector {
config: ChangeDetectionConfig,
previous_states: HashMap<String, Value>,
history: VecDeque<DataUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeDetectionConfig {
pub enabled: bool,
pub strategy: ChangeDetectionStrategy,
pub depth: u32,
pub ignore_fields: Vec<String>,
pub notification: ChangeNotificationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChangeDetectionStrategy {
Deep,
Shallow,
Hash,
Timestamp,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeNotificationConfig {
pub batch_notifications: bool,
pub batch_size: usize,
pub batch_timeout: Duration,
pub filters: Vec<NotificationFilter>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationFilter {
pub filter_type: FilterType,
pub parameters: HashMap<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FilterType {
Field(String),
Value(Value),
ChangeType(ChangeType),
Custom(String),
}
impl DataSourceManager {
pub fn new() -> Self {
Self {
sources: Arc::new(Mutex::new(HashMap::new())),
subscriptions: Arc::new(Mutex::new(HashMap::new())),
change_detector: Arc::new(Mutex::new(ChangeDetector::new())),
}
}
pub fn register_source(&self, source: Box<dyn DataSource>) -> Result<()> {
let id = source.id().to_string();
self.sources
.lock()
.expect("Operation failed")
.insert(id, source);
Ok(())
}
pub fn unregister_source(&self, source_id: &str) -> Result<()> {
self.sources
.lock()
.expect("Operation failed")
.remove(source_id);
self.subscriptions
.lock()
.expect("Operation failed")
.remove(source_id);
Ok(())
}
pub fn get_source(&self, source_id: &str) -> Option<String> {
self.sources
.lock()
.expect("Operation failed")
.get(source_id)
.map(|_| source_id.to_string())
}
pub fn subscribe<F>(&self, source_id: &str, callback: F) -> Result<String>
where
F: Fn(DataUpdate) + Send + Sync + 'static,
{
let subscription_id = format!("{}_{}", source_id, scirs2_core::random::random::<u64>());
self.subscriptions
.lock()
.expect("Operation failed")
.entry(source_id.to_string())
.or_default()
.push(Box::new(callback));
Ok(subscription_id)
}
}
impl ChangeDetector {
pub fn new() -> Self {
Self {
config: ChangeDetectionConfig::default(),
previous_states: HashMap::new(),
history: VecDeque::new(),
}
}
pub fn detect_changes(&mut self, source_id: &str, data: &Value) -> Vec<DataUpdate> {
let mut updates = Vec::new();
if let Some(previous) = self.previous_states.get(source_id) {
if previous != data {
updates.push(DataUpdate {
source_id: source_id.to_string(),
timestamp: Instant::now(),
data: data.clone(),
change_type: ChangeType::Update,
affected_fields: vec!["*".to_string()],
});
}
} else {
updates.push(DataUpdate {
source_id: source_id.to_string(),
timestamp: Instant::now(),
data: data.clone(),
change_type: ChangeType::Insert,
affected_fields: vec!["*".to_string()],
});
}
self.previous_states
.insert(source_id.to_string(), data.clone());
updates
}
}
impl Default for ChangeDetectionConfig {
fn default() -> Self {
Self {
enabled: true,
strategy: ChangeDetectionStrategy::Deep,
depth: 10,
ignore_fields: Vec::new(),
notification: ChangeNotificationConfig::default(),
}
}
}
impl Default for ChangeNotificationConfig {
fn default() -> Self {
Self {
batch_notifications: false,
batch_size: 10,
batch_timeout: Duration::from_millis(100),
filters: Vec::new(),
}
}
}
impl Default for DataSourceConfig {
fn default() -> Self {
Self {
id: "default".to_string(),
source_type: DataSourceType::WebSocket,
connection: ConnectionConfig::default(),
format: DataFormatConfig::default(),
cache: CacheConfig::default(),
error_handling: ErrorHandlingConfig::default(),
}
}
}
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
url: "ws://localhost:8080".to_string(),
headers: HashMap::new(),
auth: None,
retry: RetryConfig::default(),
pooling: ConnectionPoolConfig::default(),
timeout: Duration::from_secs(30),
}
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
backoff_multiplier: 2.0,
jitter: true,
}
}
}
impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
enabled: false,
max_size: 10,
min_size: 1,
connection_timeout: Duration::from_secs(10),
idle_timeout: Duration::from_secs(300),
}
}
}
impl Default for DataFormatConfig {
fn default() -> Self {
Self {
format: DataFormat::Json,
schema: None,
field_mappings: HashMap::new(),
validation: ValidationConfig::default(),
}
}
}
impl Default for ValidationConfig {
fn default() -> Self {
Self {
enabled: false,
rules: Vec::new(),
on_failure: ValidationAction::Warn,
}
}
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
enabled: true,
size: 1000,
ttl: Duration::from_secs(300),
strategy: CacheStrategy::LRU,
}
}
}
impl Default for ErrorHandlingConfig {
fn default() -> Self {
Self {
retry_on_error: true,
circuit_breaker: None,
fallback_source: None,
notify_on_error: true,
}
}
}