axiomsync 1.0.0

Core data-processing engine for AxiomSync local retrieval runtime.
Documentation
use std::collections::HashMap;
use std::path::Path;

use chrono::Utc;

use crate::catalog::{
    parse_trace_metrics_snapshot_document, trace_metrics_snapshot_uri, trace_metrics_snapshots_uri,
};
use crate::error::Result;
use crate::models::{
    TraceIndexEntry, TraceMetricsReport, TraceMetricsSample, TraceMetricsSnapshotDocument,
    TraceMetricsSnapshotSummary, TraceMetricsTrendReport, TraceRequestTypeMetrics,
};
use crate::quality::{percentile_u128, to_trace_metrics_snapshot_summary};
use crate::uri::AxiomUri;

use super::AxiomSync;

impl AxiomSync {
    pub fn list_traces(&self, limit: usize) -> Result<Vec<TraceIndexEntry>> {
        self.state.list_trace_index(limit.max(1))
    }

    pub fn trace_metrics(&self, limit: usize, include_replays: bool) -> Result<TraceMetricsReport> {
        let entries = self.list_traces(limit.max(1))?;
        let mut analyzed = Vec::<TraceMetricsSample>::new();
        let mut skipped_missing = 0usize;
        let mut skipped_invalid = 0usize;

        for entry in &entries {
            if !include_replays && entry.request_type.ends_with("_replay") {
                continue;
            }

            match self.get_trace(&entry.trace_id) {
                Ok(Some(trace)) => {
                    analyzed.push(TraceMetricsSample {
                        trace_id: entry.trace_id.clone(),
                        request_type: entry.request_type.clone(),
                        latency_ms: trace.metrics.latency_ms,
                        explored_nodes: trace.metrics.explored_nodes,
                        convergence_rounds: trace.metrics.convergence_rounds,
                        created_at: entry.created_at.clone(),
                    });
                }
                Ok(None) => skipped_missing += 1,
                Err(_) => skipped_invalid += 1,
            }
        }

        let mut grouped =
            HashMap::<String, (Vec<u128>, usize, usize, u32)>::with_capacity(analyzed.len());
        for item in &analyzed {
            let row = grouped
                .entry(item.request_type.clone())
                .or_insert_with(|| (Vec::new(), 0, 0, 0));
            row.0.push(item.latency_ms);
            row.1 += 1;
            row.2 += item.explored_nodes;
            row.3 += item.convergence_rounds;
        }

        let mut by_request_type = grouped
            .into_iter()
            .map(
                |(request_type, (mut latencies, traces, explored_sum, convergence_sum))| {
                    latencies.sort_unstable();
                    let avg_latency_ms =
                        average_u128(latencies.iter().copied().sum::<u128>(), traces);
                    let avg_explored_nodes = average_usize(explored_sum, traces);
                    let avg_convergence_rounds = average_u32(convergence_sum, traces);
                    TraceRequestTypeMetrics {
                        request_type,
                        traces,
                        p50_latency_ms: percentile_u128(&latencies, 5_000),
                        p95_latency_ms: percentile_u128(&latencies, 9_500),
                        avg_latency_ms,
                        avg_explored_nodes,
                        avg_convergence_rounds,
                    }
                },
            )
            .collect::<Vec<_>>();
        by_request_type.sort_by(|a, b| {
            b.traces
                .cmp(&a.traces)
                .then_with(|| a.request_type.cmp(&b.request_type))
        });

        let mut slowest_samples = analyzed.clone();
        slowest_samples.sort_by(|a, b| {
            b.latency_ms
                .cmp(&a.latency_ms)
                .then_with(|| b.created_at.cmp(&a.created_at))
        });
        slowest_samples.truncate(20);

        Ok(TraceMetricsReport {
            window_limit: limit.max(1),
            include_replays,
            indexed_traces_scanned: entries.len(),
            traces_analyzed: analyzed.len(),
            traces_skipped_missing: skipped_missing,
            traces_skipped_invalid: skipped_invalid,
            by_request_type,
            slowest_samples,
        })
    }

    pub fn create_trace_metrics_snapshot(
        &self,
        limit: usize,
        include_replays: bool,
    ) -> Result<TraceMetricsSnapshotSummary> {
        let report = self.trace_metrics(limit.max(1), include_replays)?;
        let snapshot_id = uuid::Uuid::new_v4().to_string();
        let created_at = Utc::now().to_rfc3339();
        let uri = trace_metrics_snapshot_uri(&snapshot_id)?;
        let report_uri = uri.to_string();
        let doc = TraceMetricsSnapshotDocument {
            version: 1,
            snapshot_id,
            created_at,
            report,
        };
        self.fs
            .write(&uri, &serde_json::to_string_pretty(&doc)?, true)?;
        Ok(to_trace_metrics_snapshot_summary(&doc, &report_uri))
    }

    pub fn list_trace_metrics_snapshots(
        &self,
        limit: usize,
    ) -> Result<Vec<TraceMetricsSnapshotSummary>> {
        let docs = self.list_trace_metrics_snapshot_documents(limit.max(1))?;
        Ok(docs
            .into_iter()
            .map(|(report_uri, doc)| to_trace_metrics_snapshot_summary(&doc, &report_uri))
            .collect())
    }

    pub fn trace_metrics_trend(
        &self,
        limit: usize,
        request_type: Option<&str>,
    ) -> Result<TraceMetricsTrendReport> {
        let request_type = request_type
            .map(str::trim)
            .filter(|x| !x.is_empty())
            .unwrap_or("find")
            .to_ascii_lowercase();
        let docs = self.list_trace_metrics_snapshot_documents(limit.max(2))?;
        if docs.is_empty() {
            return Ok(TraceMetricsTrendReport {
                request_type,
                latest: None,
                previous: None,
                latest_p95_latency_ms: None,
                previous_p95_latency_ms: None,
                delta_p95_latency_ms: None,
                latest_avg_explored_nodes: None,
                previous_avg_explored_nodes: None,
                delta_avg_explored_nodes: None,
                status: "no_data".to_string(),
            });
        }

        let latest = docs
            .first()
            .map(|(uri, doc)| to_trace_metrics_snapshot_summary(doc, uri));
        let previous = docs
            .get(1)
            .map(|(uri, doc)| to_trace_metrics_snapshot_summary(doc, uri));
        let request_type_match = request_type.as_str();
        let latest_metrics = docs.first().and_then(|(_, doc)| {
            doc.report
                .by_request_type
                .iter()
                .find(|x| x.request_type == request_type_match)
        });
        let previous_metrics = docs.get(1).and_then(|(_, doc)| {
            doc.report
                .by_request_type
                .iter()
                .find(|x| x.request_type == request_type_match)
        });

        let latest_p95_latency_ms = latest_metrics.map(|x| x.p95_latency_ms);
        let previous_p95_latency_ms = previous_metrics.map(|x| x.p95_latency_ms);
        let delta_p95_latency_ms = latest_p95_latency_ms
            .zip(previous_p95_latency_ms)
            .and_then(|(latest, previous)| delta_u128_to_i128(latest, previous));

        let latest_avg_explored_nodes = latest_metrics.map(|x| x.avg_explored_nodes);
        let previous_avg_explored_nodes = previous_metrics.map(|x| x.avg_explored_nodes);
        let delta_avg_explored_nodes =
            match (latest_avg_explored_nodes, previous_avg_explored_nodes) {
                (Some(l), Some(p)) => Some(l - p),
                _ => None,
            };

        let status = if previous.is_none() {
            "insufficient_history".to_string()
        } else if latest_metrics.is_none() || previous_metrics.is_none() {
            "missing_request_type".to_string()
        } else if let Some(delta) = delta_p95_latency_ms {
            match delta.cmp(&0) {
                std::cmp::Ordering::Less => "improved".to_string(),
                std::cmp::Ordering::Greater => "regressed".to_string(),
                std::cmp::Ordering::Equal => "stable".to_string(),
            }
        } else {
            "mixed".to_string()
        };

        Ok(TraceMetricsTrendReport {
            request_type,
            latest,
            previous,
            latest_p95_latency_ms,
            previous_p95_latency_ms,
            delta_p95_latency_ms,
            latest_avg_explored_nodes,
            previous_avg_explored_nodes,
            delta_avg_explored_nodes,
            status,
        })
    }

    fn list_trace_metrics_snapshot_documents(
        &self,
        limit: usize,
    ) -> Result<Vec<(String, TraceMetricsSnapshotDocument)>> {
        let limit = limit.max(1);
        let dir = trace_metrics_snapshots_uri()?;
        if !self.fs.exists(&dir) {
            return Ok(Vec::new());
        }
        let entries = self.fs.list(&dir, false)?;
        let mut docs = Vec::<(String, TraceMetricsSnapshotDocument)>::new();
        for entry in entries {
            if entry.is_dir || !has_json_extension(&entry.name) {
                continue;
            }
            let uri = AxiomUri::parse(&entry.uri)?;
            let raw = self.fs.read(&uri)?;
            let Ok(doc) = parse_trace_metrics_snapshot_document(&raw) else {
                continue;
            };
            docs.push((uri.to_string(), doc));
        }
        docs.sort_by(|a, b| {
            b.1.created_at
                .cmp(&a.1.created_at)
                .then_with(|| b.1.snapshot_id.cmp(&a.1.snapshot_id))
        });
        docs.truncate(limit);
        Ok(docs)
    }
}

fn average_u128(total: u128, count: usize) -> f32 {
    if count == 0 {
        return 0.0;
    }
    let average = u128_to_f64(total) / usize_to_f64(count);
    f64_to_f32(average)
}

fn average_usize(total: usize, count: usize) -> f32 {
    if count == 0 {
        return 0.0;
    }
    let average = usize_to_f64(total) / usize_to_f64(count);
    f64_to_f32(average)
}

fn average_u32(total: u32, count: usize) -> f32 {
    if count == 0 {
        return 0.0;
    }
    let average = f64::from(total) / usize_to_f64(count);
    f64_to_f32(average)
}

fn delta_u128_to_i128(latest: u128, previous: u128) -> Option<i128> {
    let latest = i128::try_from(latest).ok()?;
    let previous = i128::try_from(previous).ok()?;
    Some(latest - previous)
}

fn has_json_extension(name: &str) -> bool {
    Path::new(name)
        .extension()
        .and_then(|ext| ext.to_str())
        .is_some_and(|ext| ext.eq_ignore_ascii_case("json"))
}

const fn u128_to_f64(value: u128) -> f64 {
    #[allow(
        clippy::cast_precision_loss,
        reason = "trace metrics aggregate large counters and intentionally project to floating domain"
    )]
    {
        value as f64
    }
}

const fn usize_to_f64(value: usize) -> f64 {
    #[allow(
        clippy::cast_precision_loss,
        reason = "trace metrics ratios operate in floating domain by design"
    )]
    {
        value as f64
    }
}

const fn f64_to_f32(value: f64) -> f32 {
    #[allow(
        clippy::cast_possible_truncation,
        clippy::cast_sign_loss,
        clippy::cast_precision_loss,
        reason = "final report fields are f32; upstream values are bounded summary metrics"
    )]
    {
        value as f32
    }
}