tokitai-operator 0.1.0

Verified DL kernel compiler: formally-checked GEMM, p-adic, sheaf, contract-carrying ops. Paper-artifact grade.
Documentation
//! JSONL-backed metrics logger for the training step driver.
//!
//! `MetricsEntry` is the per-step record: loss, grad norm,
//! throughput, and any user-supplied scalar fields. The log is
//! append-only JSONL so it is `tail -f`-able and trivially
//! parseable by `jq` / `pandas`.
//!
//! Used by `src/training/step.rs` (the training step driver) and
//! surfaced via the `metrics_smoke` integration test. The format
//! is documented inline (see the file header for the per-field
//! description).
//!
// Phase 2.8 metrics logger.
//
// JSONL-backed, append-only training metrics log. Used by the
// training step driver in `src/training/` to record loss, grad norm,
// throughput, and any other scalar field per optimizer step.
//
// Design choices (see also the README section "metrics_log format"):
//   * JSONL (newline-delimited JSON) so the file is `tail -f`-able
//     and trivially parseable by external tools (jq, pandas, ...).
//   * In-memory `Vec<MetricsEntry>` cache so callers can iterate or
//     compute summaries without re-reading the file from disk.
//   * `extra: BTreeMap<String, f32>` for forward-compatible fields
//     (router_entropy, per_expert_loss, grad_clip_ratio, ...) that
//     are useful for MoE training but should not bloat the core
//     struct. BTreeMap is used for deterministic key ordering in the
//     on-disk JSONL.
//   * All public methods that touch disk return `std::io::Result`
//     so this module has no dependency on the crate's `Error` enum
//     (which is reserved for domain/IR/planner semantics).

use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};

use serde::{Deserialize, Serialize};

/// A single training step's worth of metrics.
///
/// The `extra` map is a free-form key-value bag for forward-compatible
/// fields that don't deserve a top-level slot yet (e.g. `router_entropy`,
/// `per_expert_loss`, `grad_clip_ratio`). It is serialized inline
/// under the `"extra"` JSON key.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct MetricsEntry {
    pub step: u32,
    pub wall_time_ms: f64,
    pub loss: f32,
    pub grad_norm: f32,
    pub throughput_samples_per_sec: f32,
    #[serde(default)]
    pub extra: BTreeMap<String, f32>,
}

/// Aggregate statistics computed from a `MetricsLog`.
///
/// `total_wall_time_ms` is the wall_time_ms of the most recent
/// entry (i.e. cumulative training time, treating wall_time_ms as
/// "ms since training start"). For an empty log, all numeric
/// fields are zero.
#[derive(Debug, Clone, PartialEq)]
pub struct MetricsSummary {
    pub total_steps: u32,
    pub total_wall_time_ms: f64,
    pub loss_min: f32,
    pub loss_mean: f32,
    pub loss_max: f32,
}

/// Append-only JSONL-backed training metrics log.
///
/// On `open`, any existing JSONL entries in the file are loaded
/// into the in-memory buffer; subsequent `record` calls append to
/// the buffer and flush a single JSON line to disk.
pub struct MetricsLog {
    entries: Vec<MetricsEntry>,
    path: PathBuf,
}

impl MetricsLog {
    /// Open (or create) the JSONL file at `path` and read any
    /// existing entries into the in-memory buffer. Missing parent
    /// directories are created. Empty/whitespace lines are skipped;
    /// malformed lines produce an `InvalidData` IO error.
    pub fn open(path: &Path) -> std::io::Result<Self> {
        if let Some(parent) = path.parent() {
            if !parent.as_os_str().is_empty() {
                std::fs::create_dir_all(parent)?;
            }
        }
        let mut entries: Vec<MetricsEntry> = Vec::new();
        if path.exists() {
            let file = File::open(path)?;
            let reader = BufReader::new(file);
            for (lineno, line) in reader.lines().enumerate() {
                let line = line?;
                if line.trim().is_empty() {
                    continue;
                }
                let entry: MetricsEntry = serde_json::from_str(&line).map_err(|e| {
                    std::io::Error::new(
                        std::io::ErrorKind::InvalidData,
                        format!("metrics_log parse error at line {}: {e}", lineno + 1),
                    )
                })?;
                entries.push(entry);
            }
        }
        Ok(Self {
            entries,
            path: path.to_path_buf(),
        })
    }

    /// Append `entry` to the in-memory buffer and flush it to disk
    /// as a single JSON line terminated by `\n`. The disk write
    /// happens before the in-memory push, so a crash mid-flush
    /// cannot desync memory from disk.
    pub fn record(&mut self, entry: MetricsEntry) -> std::io::Result<()> {
        let line = serde_json::to_string(&entry)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        let mut f = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&self.path)?;
        writeln!(f, "{}", line)?;
        self.entries.push(entry);
        Ok(())
    }

    /// Compute aggregate loss statistics + step / wall-time totals
    /// from the in-memory entries. Returns an all-zeros summary
    /// for an empty log.
    pub fn summary(&self) -> MetricsSummary {
        if self.entries.is_empty() {
            return MetricsSummary {
                total_steps: 0,
                total_wall_time_ms: 0.0,
                loss_min: 0.0,
                loss_mean: 0.0,
                loss_max: 0.0,
            };
        }
        let mut loss_min = f32::INFINITY;
        let mut loss_max = f32::NEG_INFINITY;
        let mut loss_sum: f64 = 0.0;
        for e in &self.entries {
            if e.loss < loss_min {
                loss_min = e.loss;
            }
            if e.loss > loss_max {
                loss_max = e.loss;
            }
            loss_sum += e.loss as f64;
        }
        let n = self.entries.len() as f64;
        MetricsSummary {
            total_steps: self.entries.len() as u32,
            // Total wall time = last entry's wall_time_ms (cumulative
            // training time, where each entry's wall_time_ms is "ms
            // since training start").
            total_wall_time_ms: self.entries.last().map(|e| e.wall_time_ms).unwrap_or(0.0),
            loss_min,
            loss_mean: (loss_sum / n) as f32,
            loss_max,
        }
    }

    /// Iterate over all in-memory entries in insertion order.
    pub fn iter(&self) -> impl Iterator<Item = &MetricsEntry> {
        self.entries.iter()
    }

    /// Number of entries currently in the in-memory buffer.
    pub fn len(&self) -> usize {
        self.entries.len()
    }

    /// True if the in-memory buffer holds no entries.
    pub fn is_empty(&self) -> bool {
        self.entries.is_empty()
    }
}

/// Euclidean L2 norm of a flat gradient vector.
///
/// Returns 0.0 for an empty slice. Internally accumulates in `f64`
/// to avoid fp32 overflow on large gradient vectors, then casts
/// back to `f32` for the public contract.
pub fn grad_norm(grads: &[f32]) -> f32 {
    let mut sum_sq: f64 = 0.0;
    for &g in grads {
        let g = g as f64;
        sum_sq += g * g;
    }
    sum_sq.sqrt() as f32
}