use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use sqlx::PgPool;
use thiserror::Error;
use tracing::{error, instrument};
#[derive(Debug, Error)]
pub enum HistoryError {
#[error("Database query failed: {0}")]
Database(#[from] sqlx::Error),
#[error("Serialization or Deserialization failed: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Invalid input provided: {0}")]
InvalidInput(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoryEvent {
pub id: i32,
pub entity_id: String,
pub event_type: String,
pub timestamp: DateTime<Utc>,
pub meta: JsonValue,
}
#[derive(Debug, Clone)]
pub struct AnomalyConfig {
pub default_threshold: usize,
pub per_type_thresholds: HashMap<String, usize>,
}
impl Default for AnomalyConfig {
fn default() -> Self {
Self {
default_threshold: 5, per_type_thresholds: HashMap::new(),
}
}
}
#[derive(Debug)]
pub struct HistoryService {
db_pool: PgPool,
anomaly_config: AnomalyConfig,
}
impl HistoryService {
pub fn new(db_pool: PgPool, anomaly_config: AnomalyConfig) -> Self {
Self { db_pool, anomaly_config }
}
#[instrument(skip(self, event), fields(entity_id = %event.entity_id, event_type = %event.event_type))]
pub async fn log_event(&self, event: &HistoryEvent) -> Result<(), HistoryError> {
Ok(())
}
#[instrument(skip(self), fields(entity_id = %entity_id, limit = %limit, offset = %offset))]
pub async fn get_entity_history(
&self,
entity_id: &str,
since: Option<DateTime<Utc>>,
limit: i64,
offset: i64,
) -> Result<Vec<HistoryEvent>, HistoryError> {
Ok(vec![])
}
#[instrument(skip(self), fields(entity_id = %entity_id))]
pub async fn detect_timeline_anomalies(
&self,
entity_id: &str,
window_mins: i64,
) -> Result<Vec<HistoryEvent>, HistoryError> {
let since = Utc::now() - chrono::Duration::minutes(window_mins);
let events = self.get_entity_history(entity_id, Some(since), 1000, 0).await?;
let mut counter: HashMap<String, usize> = HashMap::new();
let mut anomalies = Vec::new();
for event in &events {
let count = counter.entry(event.event_type.clone()).or_insert(0);
*count += 1;
let threshold = self
.anomaly_config
.per_type_thresholds
.get(&event.event_type)
.copied()
.unwrap_or(self.anomaly_config.default_threshold);
if *count > threshold {
anomalies.push(event.clone());
}
}
Ok(anomalies)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn create_mock_service() -> HistoryService {
let mock_pool = PgPool::connect_lazy("postgres://user:pass@localhost/test").unwrap();
let mut config = AnomalyConfig::default();
config.default_threshold = 2; config.per_type_thresholds.insert("CRITICAL_ERROR".to_string(), 0);
HistoryService::new(mock_pool, config)
}
#[tokio::test]
async fn test_anomaly_detection_with_default_threshold() {
let service = create_mock_service();
let mock_events = vec![
HistoryEvent { id: 1, entity_id: "device123".into(), event_type: "LOGIN_SUCCESS".into(), timestamp: Utc::now(), meta: json!({}) },
HistoryEvent { id: 2, entity_id: "device123".into(), event_type: "LOGIN_SUCCESS".into(), timestamp: Utc::now(), meta: json!({}) },
HistoryEvent { id: 3, entity_id: "device123".into(), event_type: "LOGIN_SUCCESS".into(), timestamp: Utc::now(), meta: json!({}) },
];
}
#[tokio::test]
async fn test_anomaly_detection_with_custom_threshold() {
let service = create_mock_service();
let mock_events = vec![
HistoryEvent { id: 1, entity_id: "device123".into(), event_type: "CRITICAL_ERROR".into(), timestamp: Utc::now(), meta: json!({}) },
HistoryEvent { id: 2, entity_id: "device123".into(), event_type: "CRITICAL_ERROR".into(), timestamp: Utc::now(), meta: json!({}) },
];
}
}