iocaine 3.0.0

The deadliest poison known to AI
Documentation
// SPDX-FileCopyrightText: 2025 Gergely Nagy
// SPDX-FileContributor: Gergely Nagy
//
// SPDX-License-Identifier: MIT

use anyhow::Result;
use axum::{Router, routing::get};
use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder};
use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tokio::time::{self, Duration, Instant};
use tokio_listener::Listener;

use crate::{morgue::shutdown_signal, queer::HRT};

const VERSION: &str = env!("CARGO_PKG_VERSION");

#[derive(Clone, Default)]
pub struct TenXProgrammer {
    pub registry: MetricRegistry,
    pub persist_path: Option<PathBuf>,
    pub persist_interval: Duration,
    persister_abort: Option<tokio::task::AbortHandle>,
}

#[derive(Clone, Default)]
pub struct MetricRegistry {
    registry: Arc<Registry>,
    counters: Arc<RwLock<HashMap<String, LabeledIntCounterVec>>>,
}

impl MetricRegistry {
    #[must_use]
    pub fn gather(&self) -> Vec<prometheus::proto::MetricFamily> {
        self.registry.gather()
    }

    pub fn register(&self, c: LabeledIntCounterVec) -> Result<LabeledIntCounterVec> {
        match self.registry.register(Box::new(c.counter.clone())) {
            Ok(()) => {
                self.counters
                    .write()
                    .map_err(|e| {
                        tracing::error!("failed to lock metrics registry for writing");
                        anyhow::anyhow!("{e}")
                    })?
                    .insert(c.name.clone(), c.clone());
                Ok(c)
            }
            Err(prometheus::Error::AlreadyReg) => {
                let counter = self
                    .counters
                    .read()
                    .map_err(|e| {
                        tracing::error!("failed to lock metrics registry for reading");
                        anyhow::anyhow!("{e}")
                    })?
                    .get(&c.name)
                    .unwrap()
                    .clone();

                Ok(counter)
            }
            Err(e) => Err(e.into()),
        }
    }
}

#[derive(Deserialize, Debug, Default, Clone)]
pub struct PersistedMetrics {
    #[serde(flatten)]
    pub metrics: HashMap<String, Vec<PersistedMetric>>,
}

#[derive(Deserialize, Debug, Default, Clone)]
pub struct PersistedMetric {
    pub labels: HashMap<String, String>,
    pub value: f64,
}

impl PersistedMetrics {
    pub fn update(&self, counter: &LabeledIntCounterVec) {
        let Some(metrics) = self.metrics.get(&counter.name) else {
            return;
        };

        tracing::debug!({ metric = counter.name }, "updating persisted metric");
        for metric in metrics {
            counter.set(&metric.labels, metric.value);
        }
    }
}

#[derive(Clone)]
pub struct LabeledIntCounterVec {
    pub counter: IntCounterVec,
    pub name: String,
    pub labels: Vec<String>,
}

impl LabeledIntCounterVec {
    pub fn new<S: AsRef<str>, T: AsRef<str>>(name: S, desc: S, labels: &[T]) -> Result<Self> {
        let opts = Opts::new(name.as_ref(), desc.as_ref());
        let metric_labels: Vec<_> = labels.iter().map(AsRef::as_ref).collect();

        let counter = IntCounterVec::new(opts, metric_labels.as_slice()).inspect_err(|e| {
            tracing::error!(
                { name = name.as_ref(), labels = format!("{:?}", metric_labels) },
                "unable to create IntCounterVec metric: {e}"
            );
        })?;
        Ok(Self {
            counter,
            name: name.as_ref().to_owned(),
            labels: metric_labels.into_iter().map(ToOwned::to_owned).collect(),
        })
    }

    pub fn inc<S: AsRef<str> + std::fmt::Debug>(&self, label_values: &[S]) -> Option<()> {
        if label_values.len() != self.labels.len() {
            tracing::error!(
                { name = self.name, expected = self.labels.len(), actual = label_values.len() },
                "number of label values do not match",
            );
            return None;
        }
        self.counter.with_label_values(label_values).inc();
        Some(())
    }

    pub fn inc_by<S: AsRef<str> + std::fmt::Debug>(
        &self,
        amount: u64,
        label_values: &[S],
    ) -> Option<()> {
        if label_values.len() != self.labels.len() {
            tracing::error!(
                { name = self.name, expected = self.labels.len(), actual = label_values.len() },
                "number of label values do not match",
            );
            return None;
        }
        self.counter.with_label_values(label_values).inc_by(amount);
        Some(())
    }

    #[allow(clippy::cast_possible_truncation)]
    #[allow(clippy::cast_sign_loss)]
    pub fn set(&self, labels: &HashMap<String, String>, value: f64) -> Option<()> {
        if labels.len() != self.labels.len() {
            tracing::error!(
                { metric = self.name, expected = self.labels.len(), actual = labels.len() },
                "number of label values do not match",
            );
            return None;
        }

        let mut values = Vec::new();
        for name in &self.labels {
            let Some(value) = labels.get(name) else {
                tracing::error!(
                    { metric = self.name, name },
                    "label not found in persisted metric"
                );
                return None;
            };
            values.push(value);
        }

        let counter = self.counter.with_label_values(&values);
        counter.reset();
        counter.inc_by(value as u64);

        Some(())
    }
}

impl TenXProgrammer {
    pub fn new(persist_path: &Option<PathBuf>, persist_interval: Duration) -> Result<Self> {
        let registry = Registry::new();

        let version_opts = Opts::new(
            "iocaine_version",
            "Version of the running iocaine (in the 'version' label)",
        );
        let version = IntGaugeVec::new(version_opts, &["version"])?;
        version.with_label_values(&[VERSION]).set(1);
        registry.register(Box::new(version))?;

        let tenx = Self {
            registry: MetricRegistry {
                registry: Arc::new(registry),
                counters: Arc::default(),
            },
            persist_path: persist_path.clone(),
            persist_interval,
            persister_abort: None,
        };

        Ok(tenx)
    }

    pub fn persist(&self) -> Result<()> {
        let Some(persist_path) = &self.persist_path else {
            return Ok(());
        };

        tracing::debug!(
            { persist_path = persist_path.display().to_string() },
            "persisting metrics"
        );

        let mut f = File::create(persist_path)?;

        let encoder = HRT::new();
        let metrics = self.registry.gather();
        Ok(encoder.encode(&metrics, &mut f)?)
    }

    pub fn load_metrics(&self) -> Result<PersistedMetrics> {
        let Some(ref persist_path) = self.persist_path else {
            return Ok(PersistedMetrics::default());
        };

        tracing::debug!(
            { persist_path = persist_path.display().to_string() },
            "loading persisted metrics"
        );

        let Ok(data) = std::fs::read_to_string(persist_path) else {
            return Ok(PersistedMetrics::default());
        };

        Ok(serde_json::from_str(&data)?)
    }

    pub fn abort(&self) {
        if let Some(handle) = &self.persister_abort {
            handle.abort();
        }
    }

    pub async fn serve(mut self, listener: Listener) -> Result<()> {
        if let Some(ref persist_path) = self.persist_path {
            let tenx = self.clone();
            let persist_path = persist_path.display().to_string();
            let handle = tokio::spawn(async move {
                let sleep = time::sleep(self.persist_interval);
                tokio::pin!(sleep);
                loop {
                    sleep.as_mut().await;
                    if let Err(e) = tenx.persist() {
                        tracing::error!({ persist_path }, "unable to persist metrics: {e}");
                    }
                    sleep.as_mut().reset(Instant::now() + self.persist_interval);
                }
            });
            self.persister_abort = Some(handle.abort_handle());
        }

        let registry = self.registry.clone();
        let app = Router::new()
            .route(
                "/metrics",
                get(async move || {
                    let encoder = TextEncoder::new();
                    let mut buffer = Vec::<u8>::new();

                    let metrics = registry.gather();
                    encoder.encode(&metrics, &mut buffer).unwrap();
                    let metrics = prometheus::gather();
                    encoder.encode(&metrics, &mut buffer).unwrap();

                    String::from_utf8(buffer).unwrap()
                }),
            )
            .layer(tower_http::trace::TraceLayer::new_for_http());

        Ok(axum::serve(listener, app)
            .with_graceful_shutdown(shutdown_signal(Some(self)))
            .await?)
    }
}