datafold 0.1.55

A personal database for data sovereignty with AI-powered ingestion
Documentation
//! Statistics tracking for event monitoring
//!
//! This module contains all statistics-related types and their implementations.

use std::time::{SystemTime, UNIX_EPOCH};

/// Statistics about system activity tracked by the event monitor
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct EventStatistics {
    pub field_value_sets: u64,
    pub atom_creations: u64,
    pub atom_updates: u64,
    pub molecule_creations: u64,
    pub molecule_updates: u64,
    pub schema_loads: u64,
    pub schema_changes: u64,
    pub transform_triggers: u64,
    pub transform_executions: u64,
    pub transform_successes: u64,
    pub transform_failures: u64,
    pub transform_registrations: u64,
    pub query_executions: u64,
    pub mutation_executions: u64,
    pub total_events: u64,
    pub monitoring_start_time: u64,
    /// Track execution times for performance monitoring
    pub transform_execution_times: Vec<(String, u64)>, // (transform_id, execution_time_ms)
    /// Track success/failure rates per transform
    pub transform_stats: std::collections::HashMap<String, TransformStats>,
    /// Track query performance by schema and type
    pub query_stats: std::collections::HashMap<String, QueryStats>,
    /// Track mutation performance by schema and operation
    pub mutation_stats: std::collections::HashMap<String, MutationStats>,
}

/// Statistics for individual transforms
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct TransformStats {
    pub executions: u64,
    pub successes: u64,
    pub failures: u64,
    pub total_execution_time_ms: u64,
    pub avg_execution_time_ms: f64,
    pub last_execution_time: u64,
}

/// Statistics for query operations
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct QueryStats {
    pub executions: u64,
    pub total_execution_time_ms: u64,
    pub avg_execution_time_ms: f64,
    pub total_results: usize,
    pub avg_result_count: f64,
    pub last_execution_time: u64,
}

/// Statistics for mutation operations
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct MutationStats {
    pub executions: u64,
    pub total_execution_time_ms: u64,
    pub avg_execution_time_ms: f64,
    pub total_fields_affected: usize,
    pub avg_fields_affected: f64,
    pub last_execution_time: u64,
}

impl EventStatistics {
    pub fn increment_field_value_sets(&mut self) {
        self.field_value_sets += 1;
        self.total_events += 1;
    }

    pub fn increment_atom_creations(&mut self) {
        self.atom_creations += 1;
        self.total_events += 1;
    }

    pub fn increment_atom_updates(&mut self) {
        self.atom_updates += 1;
        self.total_events += 1;
    }

    pub fn increment_molecule_creations(&mut self) {
        self.molecule_creations += 1;
        self.total_events += 1;
    }

    pub fn increment_molecule_updates(&mut self) {
        self.molecule_updates += 1;
        self.total_events += 1;
    }

    pub fn increment_schema_loads(&mut self) {
        self.schema_loads += 1;
        self.total_events += 1;
    }

    pub fn increment_schema_changes(&mut self) {
        self.schema_changes += 1;
        self.total_events += 1;
    }

    pub fn increment_transform_triggers(&mut self) {
        self.transform_triggers += 1;
        self.total_events += 1;
    }

    pub fn increment_transform_registrations(&mut self) {
        self.transform_registrations += 1;
        self.total_events += 1;
    }

    pub fn increment_transform_executions(
        &mut self,
        transform_id: &str,
        success: bool,
        execution_time_ms: u64,
    ) {
        self.transform_executions += 1;
        self.total_events += 1;

        if success {
            self.transform_successes += 1;
        } else {
            self.transform_failures += 1;
        }

        // Track execution time
        self.transform_execution_times
            .push((transform_id.to_string(), execution_time_ms));

        // Update per-transform statistics
        let stats = self
            .transform_stats
            .entry(transform_id.to_string())
            .or_default();
        stats.executions += 1;
        if success {
            stats.successes += 1;
        } else {
            stats.failures += 1;
        }
        stats.total_execution_time_ms += execution_time_ms;
        stats.avg_execution_time_ms =
            stats.total_execution_time_ms as f64 / stats.executions as f64;
        stats.last_execution_time = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
    }

    pub fn increment_query_executions(
        &mut self,
        schema: &str,
        query_type: &str,
        execution_time_ms: u64,
        result_count: usize,
    ) {
        self.query_executions += 1;
        self.total_events += 1;

        // Update per-schema query statistics
        let key = format!("{}:{}", schema, query_type);
        let stats = self.query_stats.entry(key).or_default();
        stats.executions += 1;
        stats.total_execution_time_ms += execution_time_ms;
        stats.avg_execution_time_ms =
            stats.total_execution_time_ms as f64 / stats.executions as f64;
        stats.total_results += result_count;
        stats.avg_result_count = stats.total_results as f64 / stats.executions as f64;
        stats.last_execution_time = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
    }

    pub fn increment_mutation_executions(
        &mut self,
        event: &super::message_bus::query_events::MutationExecuted,
    ) {
        self.mutation_executions += 1;
        self.total_events += 1;
        let schema = event.schema.clone();
        let operation = event.operation.clone();
        let execution_time_ms = event.execution_time_ms;
        let fields_affected = event.fields_affected.len();
        // Update per-schema mutation statistics
        let key = format!("{}:{}", schema, operation);
        let stats = self.mutation_stats.entry(key).or_default();
        stats.executions += 1;
        stats.total_execution_time_ms += execution_time_ms;
        stats.avg_execution_time_ms =
            stats.total_execution_time_ms as f64 / stats.executions as f64;
        stats.total_fields_affected += fields_affected;
        stats.avg_fields_affected = stats.total_fields_affected as f64 / stats.executions as f64;
        stats.last_execution_time = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
    }

    /// Get overall transform performance metrics
    pub fn get_transform_performance_summary(&self) -> (f64, f64, u64, u64) {
        let overall_success_rate = if self.transform_executions > 0 {
            self.transform_successes as f64 / self.transform_executions as f64
        } else {
            0.0
        };

        let overall_avg_time = if !self.transform_execution_times.is_empty() {
            let total_time: u64 = self
                .transform_execution_times
                .iter()
                .map(|(_, time)| *time)
                .sum();
            total_time as f64 / self.transform_execution_times.len() as f64
        } else {
            0.0
        };

        (
            overall_success_rate,
            overall_avg_time,
            self.transform_successes,
            self.transform_failures,
        )
    }
}