voidmerge 0.0.25

VoidMerge: The open-source, developer friendly web services platform.
Documentation
//! Metering utilities.

use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};

struct Sys {
    last: std::time::Instant,
    sys_kind: sysinfo::RefreshKind,
    sys: sysinfo::System,
    disk_kind: sysinfo::DiskRefreshKind,
    disks: sysinfo::Disks,
}

impl Default for Sys {
    fn default() -> Self {
        let last = std::time::Instant::now();

        let sys_kind = sysinfo::RefreshKind::nothing()
            .with_cpu(sysinfo::CpuRefreshKind::nothing().with_cpu_usage())
            .with_memory(sysinfo::MemoryRefreshKind::nothing().with_ram());
        let mut sys = sysinfo::System::new_with_specifics(sys_kind);
        sys.refresh_specifics(sys_kind);

        let disk_kind = sysinfo::DiskRefreshKind::nothing().with_storage();
        let disks =
            sysinfo::Disks::new_with_refreshed_list_specifics(disk_kind);

        Sys {
            last,
            sys_kind,
            sys,
            disk_kind,
            disks,
        }
    }
}

impl Sys {
    fn check_update(&mut self) {
        let now = std::time::Instant::now();
        if now - self.last < std::time::Duration::from_secs(10) {
            return;
        }
        self.last = now;
        self.sys.refresh_specifics(self.sys_kind);
        self.disks.refresh_specifics(true, self.disk_kind);
    }

    pub fn mem_avail(&mut self) -> u64 {
        self.check_update();
        self.sys.available_memory()
    }

    pub fn mem_used(&mut self) -> u64 {
        self.check_update();
        self.sys.used_memory()
    }

    pub fn mem_total(&mut self) -> u64 {
        self.check_update();
        self.sys.total_memory()
    }

    pub fn cpu_usage(&mut self) -> f64 {
        self.check_update();
        let mut usage = 0.0_f64;
        for cpu in self.sys.cpus() {
            usage += cpu.cpu_usage() as f64;
        }
        usage / self.sys.cpus().len() as f64
    }

    pub fn disk_total(
        &mut self,
        disk_total_byte: &dyn opentelemetry::metrics::AsyncInstrument<u64>,
    ) {
        self.check_update();
        for disk in self.disks.list() {
            disk_total_byte.observe(
                disk.total_space(),
                &[opentelemetry::KeyValue::new(
                    "mount",
                    disk.mount_point().to_string_lossy().to_string(),
                )],
            );
        }
    }

    pub fn disk_avail(
        &mut self,
        disk_avail_byte: &dyn opentelemetry::metrics::AsyncInstrument<u64>,
    ) {
        self.check_update();
        for disk in self.disks.list() {
            disk_avail_byte.observe(
                disk.available_space(),
                &[opentelemetry::KeyValue::new(
                    "mount",
                    disk.mount_point().to_string_lossy().to_string(),
                )],
            );
        }
    }
}

static SYS: OnceLock<Mutex<Sys>> = OnceLock::new();
fn sys() -> &'static Mutex<Sys> {
    SYS.get_or_init(Default::default)
}

struct OtelMeters {
    egress_byte: opentelemetry::metrics::Counter<f64>,
    fn_mib_milli: opentelemetry::metrics::Counter<f64>,
    obj_store_byte_min: opentelemetry::metrics::Counter<f64>,

    _mem_avail_byte: opentelemetry::metrics::ObservableGauge<u64>,
    _mem_used_byte: opentelemetry::metrics::ObservableGauge<u64>,
    _mem_total_byte: opentelemetry::metrics::ObservableGauge<u64>,

    _cpu_usage_percent: opentelemetry::metrics::ObservableGauge<f64>,

    _disk_total_byte: opentelemetry::metrics::ObservableGauge<u64>,
    _disk_avail_byte: opentelemetry::metrics::ObservableGauge<u64>,
}

impl Default for OtelMeters {
    fn default() -> Self {
        let meter = opentelemetry::global::meter("vm");

        let egress_byte = meter
            .f64_counter("vm.egress")
            .with_unit("byte")
            .with_description("Egress data transfer")
            .build();

        let fn_mib_milli = meter
            .f64_counter("vm.fn")
            .with_unit("mib-milli")
            .with_description("Function call memory * duration")
            .build();

        let obj_store_byte_min = meter
            .f64_counter("vm.obj.storage")
            .with_unit("byte-min")
            .with_description("Object storage")
            .build();

        let _mem_avail_byte = meter
            .u64_observable_gauge("vm.sys.mem.avail")
            .with_unit("byte")
            .with_description("Memory (RAM) available")
            .with_callback(|i| {
                i.observe(sys().lock().unwrap().mem_avail(), &[]);
            })
            .build();

        let _mem_used_byte = meter
            .u64_observable_gauge("vm.sys.mem.used")
            .with_unit("byte")
            .with_description("Memory (RAM) used")
            .with_callback(|i| {
                i.observe(sys().lock().unwrap().mem_used(), &[]);
            })
            .build();

        let _mem_total_byte = meter
            .u64_observable_gauge("vm.sys.mem.total")
            .with_unit("byte")
            .with_description("Memory (RAM) total")
            .with_callback(|i| {
                i.observe(sys().lock().unwrap().mem_total(), &[]);
            })
            .build();

        let _cpu_usage_percent = meter
            .f64_observable_gauge("vm.sys.cpu.usage")
            .with_unit("percent")
            .with_description("CPU usage percentage")
            .with_callback(|i| {
                i.observe(sys().lock().unwrap().cpu_usage(), &[]);
            })
            .build();

        let _disk_total_byte = meter
            .u64_observable_gauge("vm.sys.disk.total")
            .with_unit("byte")
            .with_description("Disk total size")
            .with_callback(|i| {
                sys().lock().unwrap().disk_total(i);
            })
            .build();

        let _disk_avail_byte = meter
            .u64_observable_gauge("vm.sys.disk.avail")
            .with_unit("byte")
            .with_description("Disk available size")
            .with_callback(|i| {
                sys().lock().unwrap().disk_avail(i);
            })
            .build();

        Self {
            egress_byte,
            fn_mib_milli,
            obj_store_byte_min,
            _mem_avail_byte,
            _mem_used_byte,
            _mem_total_byte,
            _cpu_usage_percent,
            _disk_total_byte,
            _disk_avail_byte,
        }
    }
}

static OTEL_METERS: OnceLock<OtelMeters> = OnceLock::new();
fn otel() -> &'static OtelMeters {
    OTEL_METERS.get_or_init(Default::default)
}

#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct Agg {
    egress_byte: u128,
    fn_mib_milli: u128,
    obj_store_byte_min: u128,
}

type AggMap = HashMap<Arc<str>, Agg>;

static METER: OnceLock<Mutex<AggMap>> = OnceLock::new();

fn meter() -> &'static Mutex<AggMap> {
    METER.get_or_init(Default::default)
}

macro_rules! meter_ctx {
    ($ctx: ident) => {
        meter().lock().unwrap().entry($ctx.clone()).or_default()
    };
}

/// Call this once in binary to init metering task.
pub fn meter_init() {
    // initialize the otel meters
    otel();
    tokio::task::spawn(init_meter_task());
}

/// Hook for receiving meter updates.
pub type MeterHook =
    Arc<dyn Fn(&Arc<str>, &'static str, u128) + 'static + Send + Sync>;

static HOOKS: OnceLock<Mutex<Vec<MeterHook>>> = OnceLock::new();
fn hooks() -> &'static Mutex<Vec<MeterHook>> {
    HOOKS.get_or_init(Default::default)
}
fn hook_trigger(ctx: &Arc<str>, meter: &'static str, value: u128) {
    let hooks = hooks().lock().unwrap().clone();
    for hook in hooks {
        hook(ctx, meter, value);
    }
}

/// Register a hook for receiving meter updates.
pub fn meter_register_hook(hook: MeterHook) {
    hooks().lock().unwrap().push(hook);
}

/// Increment the egress usage for a context.
pub fn meter_egress_byte(ctx: &Arc<str>, egress_byte: u128) {
    otel().egress_byte.add(
        egress_byte as f64,
        &[opentelemetry::KeyValue::new("ctx", ctx.to_string())],
    );
    meter_ctx!(ctx).egress_byte += egress_byte;
    hook_trigger(ctx, "egress_byte", egress_byte);
}

/// Increment the fn memory*duration usage for a context.
pub fn meter_fn_mib_milli(ctx: &Arc<str>, fn_mib_milli: u128) {
    otel().fn_mib_milli.add(
        fn_mib_milli as f64,
        &[opentelemetry::KeyValue::new("ctx", ctx.to_string())],
    );
    meter_ctx!(ctx).fn_mib_milli += fn_mib_milli;
    hook_trigger(ctx, "fn_mib_milli", fn_mib_milli);
}

/// Set the current storage size for a context.
pub fn meter_obj_store_byte_min(ctx: &Arc<str>, obj_store_byte_min: u128) {
    otel().obj_store_byte_min.add(
        obj_store_byte_min as f64,
        &[opentelemetry::KeyValue::new("ctx", ctx.to_string())],
    );
    meter_ctx!(ctx).obj_store_byte_min += obj_store_byte_min;
    hook_trigger(ctx, "obj_store_byte_min", obj_store_byte_min);
}

async fn init_meter_task() {
    loop {
        tokio::time::sleep(std::time::Duration::from_secs(60 * 5)).await;

        let map: AggMap = std::mem::take(&mut *meter().lock().unwrap());

        for (ctx, meter) in map {
            tracing::info!(
                target: "METER",
                %ctx,
                egress_byte = meter.egress_byte as f64,
                fn_mib_milli = meter.fn_mib_milli as f64,
                obj_store_byte_min = meter.obj_store_byte_min as f64,
            );
        }
    }
}