axiomsync 1.0.0

Core data-processing engine for AxiomSync local retrieval runtime.
Documentation
use chrono::{DateTime, Utc};
use rusqlite::{params, types::Type};

use crate::error::Result;

use super::OmReflectionApplyOutcome;

pub(super) fn update_reflection_apply_metrics_tx(
    tx: &rusqlite::Transaction<'_>,
    outcome: OmReflectionApplyOutcome,
    latency_ms: u64,
) -> Result<()> {
    let (applied_delta, stale_delta, idempotent_delta) = match outcome {
        OmReflectionApplyOutcome::Applied => (1u64, 0u64, 0u64),
        OmReflectionApplyOutcome::StaleGeneration => (0u64, 1u64, 0u64),
        OmReflectionApplyOutcome::IdempotentEvent => (0u64, 0u64, 1u64),
    };
    tx.execute(
        r"
        INSERT INTO om_runtime_metrics(
            id,
            reflect_apply_attempts_total,
            reflect_apply_applied_total,
            reflect_apply_stale_generation_total,
            reflect_apply_idempotent_total,
            reflect_apply_latency_ms_total,
            reflect_apply_latency_ms_max,
            updated_at
        )
        VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
        ON CONFLICT(id) DO UPDATE SET
            reflect_apply_attempts_total = om_runtime_metrics.reflect_apply_attempts_total + excluded.reflect_apply_attempts_total,
            reflect_apply_applied_total = om_runtime_metrics.reflect_apply_applied_total + excluded.reflect_apply_applied_total,
            reflect_apply_stale_generation_total = om_runtime_metrics.reflect_apply_stale_generation_total + excluded.reflect_apply_stale_generation_total,
            reflect_apply_idempotent_total = om_runtime_metrics.reflect_apply_idempotent_total + excluded.reflect_apply_idempotent_total,
            reflect_apply_latency_ms_total = om_runtime_metrics.reflect_apply_latency_ms_total + excluded.reflect_apply_latency_ms_total,
            reflect_apply_latency_ms_max = MAX(om_runtime_metrics.reflect_apply_latency_ms_max, excluded.reflect_apply_latency_ms_max),
            updated_at = excluded.updated_at
        ",
        params![
            1i64,
            1i64,
            u64_to_i64_saturating(applied_delta),
            u64_to_i64_saturating(stale_delta),
            u64_to_i64_saturating(idempotent_delta),
            u64_to_i64_saturating(latency_ms),
            u64_to_i64_saturating(latency_ms),
            Utc::now().to_rfc3339(),
        ],
    )?;
    Ok(())
}

pub(super) fn elapsed_millis_u64(value: u128) -> u64 {
    u64::try_from(value).unwrap_or(u64::MAX)
}

fn u64_to_i64_saturating(value: u64) -> i64 {
    i64::try_from(value).unwrap_or(i64::MAX)
}

pub(super) fn i64_to_u32_saturating(value: i64) -> u32 {
    if value <= 0 {
        0
    } else {
        u32::try_from(value).unwrap_or(u32::MAX)
    }
}

pub(super) fn i64_to_u64_saturating(value: i64) -> u64 {
    if value <= 0 {
        0
    } else {
        u64::try_from(value).unwrap_or(u64::MAX)
    }
}

pub(super) fn usize_to_i64_saturating(value: usize) -> i64 {
    i64::try_from(value).unwrap_or(i64::MAX)
}

pub(super) fn bool_to_i64(value: bool) -> i64 {
    i64::from(u8::from(value))
}

pub(super) fn parse_required_rfc3339(idx: usize, raw: &str) -> rusqlite::Result<DateTime<Utc>> {
    DateTime::parse_from_rfc3339(raw)
        .map(|x| x.with_timezone(&Utc))
        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(idx, Type::Text, Box::new(err)))
}

pub(super) fn parse_optional_rfc3339(
    idx: usize,
    raw: Option<&str>,
) -> rusqlite::Result<Option<DateTime<Utc>>> {
    raw.map(|value| parse_required_rfc3339(idx, value))
        .transpose()
}

pub(super) fn ratio_u64(numerator: u64, denominator: u64) -> f64 {
    if denominator == 0 {
        return 0.0;
    }
    #[allow(
        clippy::cast_precision_loss,
        reason = "ratio is a telemetry metric where floating precision tradeoff is acceptable"
    )]
    {
        numerator as f64 / denominator as f64
    }
}

pub(super) fn parse_string_vec_json(idx: usize, raw: &str) -> rusqlite::Result<Vec<String>> {
    serde_json::from_str::<Vec<String>>(raw)
        .map_err(|err| rusqlite::Error::FromSqlConversionFailure(idx, Type::Text, Box::new(err)))
}