eventdbx 3.9.8

An event-sourced, nosql, write-side database system.
Documentation
use std::time::Instant;

use anyhow::{Result, anyhow};
use axum::{
    extract::{MatchedPath, Request as AxumRequest},
    http::{HeaderValue, StatusCode, header},
    middleware::Next,
    response::{IntoResponse, Response},
};
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
use once_cell::sync::OnceCell;

static PROMETHEUS_HANDLE: OnceCell<PrometheusHandle> = OnceCell::new();
static START_TIME: OnceCell<Instant> = OnceCell::new();

pub fn init() -> Result<()> {
    PROMETHEUS_HANDLE
        .get_or_try_init(|| {
            let builder = PrometheusBuilder::new()
                .set_buckets_for_metric(
                    Matcher::Full("eventdbx_http_request_duration_seconds".into()),
                    &[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
                )
                .map_err(|err| anyhow!("failed to configure prometheus exporter: {err}"))?;
            let handle = builder
                .install_recorder()
                .map_err(|err| anyhow!("failed to initialise prometheus recorder: {err}"))?;

            describe_counter!(
                "eventdbx_http_requests_total",
                "Total number of HTTP requests processed by the EventDBX server."
            );
            describe_histogram!(
                "eventdbx_http_request_duration_seconds",
                "HTTP request latency observed by the EventDBX server."
            );
            describe_counter!(
                "eventdbx_plugin_jobs_enqueued_total",
                "Total number of plugin jobs enqueued for delivery."
            );
            describe_counter!(
                "eventdbx_plugin_jobs_completed_total",
                "Total number of plugin jobs completed successfully."
            );
            describe_counter!(
                "eventdbx_plugin_jobs_failed_total",
                "Total number of plugin jobs that failed."
            );
            describe_gauge!(
                "eventdbx_plugin_queue_jobs",
                "Current number of plugin jobs by state."
            );
            describe_counter!(
                "eventdbx_store_operations_total",
                "Total number of operations executed by the event store."
            );
            describe_histogram!(
                "eventdbx_store_operation_duration_seconds",
                "Duration of operations executed by the event store."
            );
            describe_counter!(
                "eventdbx_capnp_control_requests_total",
                "Total number of Cap'n Proto control requests handled over the TCP socket."
            );
            describe_histogram!(
                "eventdbx_capnp_control_request_duration_seconds",
                "Latency of Cap'n Proto control requests handled over the TCP socket."
            );
            describe_counter!(
                "eventdbx_capnp_replication_requests_total",
                "Total number of replication socket requests handled."
            );
            describe_histogram!(
                "eventdbx_capnp_replication_request_duration_seconds",
                "Latency of replication socket requests handled."
            );
            describe_counter!(
                "eventdbx_cli_proxy_commands_total",
                "Total number of CLI proxy shell-out commands executed."
            );
            describe_histogram!(
                "eventdbx_cli_proxy_command_duration_seconds",
                "Latency of CLI proxy shell-out command execution."
            );

            let _ = START_TIME.set(Instant::now());
            Ok(handle)
        })
        .map(|_| ())
}

pub async fn metrics_handler() -> Response {
    if PROMETHEUS_HANDLE.get().is_none() {
        return (
            StatusCode::SERVICE_UNAVAILABLE,
            "metrics recorder not initialised",
        )
            .into_response();
    }

    let body = render_metrics();
    let headers = [(
        header::CONTENT_TYPE,
        HeaderValue::from_static("text/plain; version=0.0.4"),
    )];
    (StatusCode::OK, headers, body).into_response()
}

pub async fn track_http_metrics(req: AxumRequest, next: Next) -> Response {
    let method = req.method().clone();
    let matched_path = req
        .extensions()
        .get::<MatchedPath>()
        .map(|p| p.as_str().to_string());
    let path = matched_path.unwrap_or_else(|| req.uri().path().to_string());
    let method_label = method.as_str().to_owned();

    let start = Instant::now();
    let response = next.run(req).await;
    let latency = start.elapsed().as_secs_f64();
    let status = response.status().as_u16().to_string();

    counter!(
        "eventdbx_http_requests_total",
        "method" => method_label.clone(),
        "path" => path.clone(),
        "status" => status.clone()
    )
    .increment(1);
    histogram!(
        "eventdbx_http_request_duration_seconds",
        "method" => method_label,
        "path" => path,
        "status" => status
    )
    .record(latency);

    response
}

pub fn record_capnp_control_request(operation: &str, status: &str, duration: f64) {
    let operation_label = operation.to_owned();
    let status_label = status.to_owned();
    counter!(
        "eventdbx_capnp_control_requests_total",
        "operation" => operation_label.clone(),
        "status" => status_label.clone()
    )
    .increment(1);
    histogram!(
        "eventdbx_capnp_control_request_duration_seconds",
        "operation" => operation_label,
        "status" => status_label
    )
    .record(duration);
}

pub fn record_capnp_replication_request(operation: &str, status: &str, duration: f64) {
    let operation_label = operation.to_owned();
    let status_label = status.to_owned();
    counter!(
        "eventdbx_capnp_replication_requests_total",
        "operation" => operation_label.clone(),
        "status" => status_label.clone()
    )
    .increment(1);
    histogram!(
        "eventdbx_capnp_replication_request_duration_seconds",
        "operation" => operation_label,
        "status" => status_label
    )
    .record(duration);
}

pub fn record_cli_proxy_command(
    command: &str,
    status: &str,
    exit_code: Option<i32>,
    duration: f64,
) {
    let exit_code_label = exit_code
        .map(|code| code.to_string())
        .unwrap_or_else(|| "n/a".to_string());
    let command_label = command.to_owned();
    let status_label = status.to_owned();
    counter!(
        "eventdbx_cli_proxy_commands_total",
        "command" => command_label.clone(),
        "status" => status_label.clone(),
        "exit_code" => exit_code_label.clone()
    )
    .increment(1);
    histogram!(
        "eventdbx_cli_proxy_command_duration_seconds",
        "command" => command_label,
        "status" => status_label,
        "exit_code" => exit_code_label
    )
    .record(duration);
}

pub fn render_metrics() -> String {
    if let Some(handle) = PROMETHEUS_HANDLE.get() {
        let mut body = handle.render();
        if let Some(start) = START_TIME.get() {
            let uptime = start.elapsed().as_secs_f64();
            body.push_str(&format!("eventdbx_uptime_seconds{{}} {}\n", uptime));
        }
        body
    } else {
        String::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn render_includes_uptime_once_initialised() {
        init().expect("observability init should succeed");
        let metrics = render_metrics();
        assert!(
            metrics.contains("eventdbx_uptime_seconds"),
            "expected uptime metric in output, got: {metrics:?}"
        );
    }

    #[test]
    fn counter_metrics_are_recorded() {
        init().expect("observability init should succeed");
        metrics::counter!("test_observability_counter_total").increment(1);
        let metrics = render_metrics();
        assert!(
            metrics.contains("test_observability_counter_total"),
            "expected counter metric in output, got: {metrics:?}"
        );
    }

    #[tokio::test]
    async fn metrics_handler_renders_text_plain() {
        init().expect("observability init should succeed");
        let response = metrics_handler().await;
        assert_eq!(response.status(), StatusCode::OK);

        let content_type = response
            .headers()
            .get(header::CONTENT_TYPE)
            .expect("content type header missing");
        assert_eq!(content_type, "text/plain; version=0.0.4");

        let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
            .await
            .expect("metrics body to bytes");
        let body = String::from_utf8_lossy(&body_bytes);
        assert!(
            body.contains("eventdbx_uptime_seconds"),
            "expected uptime metric in response body, got: {body:?}"
        );
    }

    #[tokio::test]
    async fn router_layer_records_http_metrics() {
        use axum::{Router, body::Body, http::Request, middleware::from_fn, routing::get};
        use tower::util::ServiceExt;

        init().expect("observability init should succeed");

        let app = Router::new()
            .route("/metrics", get(metrics_handler))
            .layer(from_fn(track_http_metrics));

        let request = Request::builder()
            .uri("/metrics")
            .body(Body::empty())
            .unwrap();
        let response = app.clone().oneshot(request).await.unwrap();
        assert_eq!(response.status(), StatusCode::OK);

        let second = Request::builder()
            .uri("/metrics")
            .body(Body::empty())
            .unwrap();
        let response = app.oneshot(second).await.unwrap();
        assert_eq!(response.status(), StatusCode::OK);

        let body_bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
            .await
            .expect("metrics body to bytes");
        let body = String::from_utf8_lossy(&body_bytes);
        metrics::counter!("test_snapshot_total").increment(1);
        let snapshot = render_metrics();
        assert!(
            snapshot.contains("test_snapshot_total"),
            "expected manual counter to appear in snapshot, got: {snapshot:?}"
        );
        assert!(
            body.contains("eventdbx_http_requests_total"),
            "expected http metrics in response body, got: {body:?}"
        );
    }
}