rust_observer 0.2.2

Express telemetry rust SDK
Documentation
mod exporters;
mod grpc_metrics;
mod http_metrics;
mod metrics_builder;
mod process_metrics;
mod stdout_exporter;
mod sys_metrics;
mod websocket_metrics;

use chrono::Utc;
pub use exporters::{MetricsExporter, MetricsExporterManager};
pub use grpc_metrics::GrpcMetrics;
pub use http_metrics::HttpMetrics;
pub use metrics_builder::MetricsBuilder;
use process_metrics::ProcessMetrics;
pub use stdout_exporter::{print_metrics, StdoutMetricsExporter};
pub use sys_metrics::SystemMetrics;
use sys_metrics::SystemMetricsCollector;
pub use websocket_metrics::WebSocketMetrics;

use crate::context::get_global_context;

#[derive(Debug, Clone)]
pub struct Metrics {
    pub timestamp: i64,
    pub interval_secs: u64,
    pub project_name: String,
    pub app_name: String,
    pub service_name: String,
    pub replica_id: String,
    pub system_metrics: Option<SystemMetrics>,
    pub process_metrics: Option<ProcessMetrics>,
    pub http_metrics: Option<http_metrics::HttpMetricsSnapshot>,
    pub grpc_metrics: Option<grpc_metrics::GrpcMetricsSnapshot>,
    pub websocket_metrics: Option<websocket_metrics::WebSocketMetricsSnapshot>,
}

impl Metrics {
    pub fn new(interval_secs: u64) -> Self {
        let global_context = get_global_context();
        let global_context = global_context.read();
        Self {
            timestamp: 0,
            interval_secs,
            project_name: global_context.project_name.clone(),
            app_name: global_context.app_name.clone(),
            service_name: global_context.service_name.clone(),
            replica_id: global_context.replica_id.clone(),
            system_metrics: None,
            process_metrics: None,
            http_metrics: None,
            grpc_metrics: None,
            websocket_metrics: None,
        }
    }

    pub async fn collect_all_metrics(
        &mut self,
        system_metrics_collector: &SystemMetricsCollector,
        http_enabled: bool,
        grpc_enabled: bool,
        websocket_enabled: bool,
        system_enabled: bool,
        process_enabled: bool,
    ) {
        self.timestamp = Utc::now().timestamp();

        if system_enabled {
            self.system_metrics = Some(system_metrics_collector.collect());
        }

        if process_enabled {
            self.process_metrics = ProcessMetrics::collect();
        }

        if http_enabled {
            self.http_metrics = HttpMetrics::flush().await.map(
                |(method_count, status_codes, rpi, avg_durations)| {
                    http_metrics::HttpMetricsSnapshot {
                        requests_per_interval: rpi,
                        method_count,
                        status_code_distribution: status_codes,
                        average_request_duration: avg_durations,
                    }
                },
            );
        }

        if grpc_enabled {
            self.grpc_metrics = GrpcMetrics::flush().await.map(
                |(service_count, status_codes, rpi, avg_durations)| {
                    grpc_metrics::GrpcMetricsSnapshot {
                        calls_per_interval: rpi,
                        service_count,
                        status_code_distribution: status_codes,
                        average_call_duration: avg_durations,
                    }
                },
            );
        }

        if websocket_enabled {
            self.websocket_metrics =
                WebSocketMetrics::flush()
                    .await
                    .map(|(new_connections, disconnections)| {
                        websocket_metrics::WebSocketMetricsSnapshot {
                            new_connections,
                            disconnections,
                        }
                    });
        }
    }
}