nornir 0.4.17

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! 📞 MCP tab — "are agents using the MCP, and which tools?"
//!
//! Aggregates the warehouse `mcp_requests` telemetry table (one row per tool
//! call, recorded server-side by `nornir-mcp` — metrics only, no payloads) into
//! per-tool usage stats. Reuses the same generic table scan as the Warehouse
//! browser, so it works against a local warehouse *or* a remote nornir-server
//! with no extra server endpoint.

use std::path::PathBuf;

use eframe::egui::{self};

use crate::warehouse::iceberg::{IcebergWarehouse, TablePreview};

use super::facett_theme::{Theme, RED, AMBER};

/// Telemetry rows are tiny (4 scalar columns); scan a generous window so the
/// aggregate is meaningful. If a warehouse ever exceeds this, the panel says so.
const SCAN_LIMIT: usize = 50_000;
const TABLE: &str = "mcp_requests";

enum Src {
    Local(PathBuf),
    Remote { endpoint: String, token: String },
}

struct ToolStat {
    tool: String,
    calls: u64,
    errors: u64,
    avg_ms: f64,
}

pub struct McpTab {
    source: Src,
    /// `nornir-workspace` gRPC header (empty for local).
    workspace: String,
    stats: Option<Result<Vec<ToolStat>, String>>,
    total: u64,
    capped: bool,
    loaded: bool,
    theme: Theme,
}

impl McpTab {
    /// Read a local warehouse dir directly (`Source::Local`).
    pub fn local(root: PathBuf) -> Self {
        Self::with(Src::Local(root), String::new())
    }

    /// Read a remote server's warehouse over the generic scan gRPC.
    pub fn remote(endpoint: String, token: String, workspace: String) -> Self {
        Self::with(Src::Remote { endpoint, token }, workspace)
    }

    fn with(source: Src, workspace: String) -> Self {
        Self { source, workspace, stats: None, total: 0, capped: false, loaded: false, theme: Theme::default() }
    }

    /// Set the facett palette the pane paints with.
    pub fn set_palette(&mut self, t: Theme) {
        self.theme = t;
    }

    /// Re-scope to a different workspace (the picker switched).
    pub(crate) fn set_workspace(&mut self, workspace: String) {
        self.workspace = workspace;
        self.stats = None;
        self.loaded = false;
    }

    fn scan(&self) -> Result<TablePreview, String> {
        match &self.source {
            Src::Local(root) => IcebergWarehouse::open(root)
                .and_then(|wh| wh.scan_preview(TABLE, SCAN_LIMIT))
                .map_err(|e| format!("{e:#}")),
            Src::Remote { endpoint, token } => {
                super::remote::scan_table(endpoint, token, TABLE, SCAN_LIMIT as u32, &self.workspace)
                    .map_err(|e| format!("{e:#}"))
            }
        }
    }

    /// Total MCP calls recorded in the warehouse (0 until [`reload`](Self::reload)).
    pub(crate) fn total_calls(&self) -> u64 {
        self.total
    }

    /// Has the telemetry been scanned yet? (Distinguishes "0 calls" from "not
    /// loaded" so the pre-flight MCP-usage warning doesn't false-fire at boot.)
    pub(crate) fn is_loaded(&self) -> bool {
        self.loaded
    }

    /// Scan once if not yet loaded — lets the pre-flight check read the count
    /// without waiting for the user to open the 📞 MCP tab.
    pub(crate) fn ensure_loaded(&mut self) {
        if !self.loaded {
            self.reload();
        }
    }

    fn reload(&mut self) {
        self.loaded = true;
        match self.scan() {
            Ok(p) => {
                self.capped = p.rows.len() >= SCAN_LIMIT;
                let (stats, total) = aggregate(&p);
                self.total = total;
                self.stats = Some(Ok(stats));
            }
            Err(e) => self.stats = Some(Err(e)),
        }
    }

    /// 📞 MCP tab's slice of `state_json` (LAW #6): the per-tool usage rows the
    /// table paints (tool / calls / errors / avg-ms), the total call count, and
    /// the cap/loaded flags. Read-only telemetry tab (no form fields).
    pub fn state_json(&self) -> serde_json::Value {
        serde_json::json!({
            "loaded": self.loaded,
            "total_calls": self.total,
            "capped": self.capped,
            "rows": match &self.stats {
                None => serde_json::Value::Null,
                Some(Ok(stats)) => serde_json::json!(stats.iter().map(|s| serde_json::json!({
                    "tool": s.tool,
                    "calls": s.calls,
                    "errors": s.errors,
                    "avg_ms": s.avg_ms,
                })).collect::<Vec<_>>()),
                Some(Err(e)) => serde_json::json!({ "error": e }),
            },
            "palette": self.theme.name,
        })
    }

    pub fn draw(&mut self, ui: &mut egui::Ui) {
        let theme = self.theme;
        ui.horizontal(|ui| {
            ui.heading("📞 MCP usage");
            if ui.button("⟳ Reload").clicked() {
                self.reload();
            }
        });
        ui.label(
            "per-tool call counts from the warehouse `mcp_requests` table — recorded by \
             nornir-mcp on every tool dispatch (metrics only: tool, ok/err, latency).",
        );
        ui.separator();

        if !self.loaded {
            self.reload();
        }

        match &self.stats {
            None => {
                ui.spinner();
            }
            Some(Err(e)) => {
                let lower = e.to_lowercase();
                if lower.contains("does not exist")
                    || lower.contains("not found")
                    || lower.contains("nosuch")
                {
                    ui.colored_label(
                        theme.text_dim,
                        "no MCP calls recorded yet — the table appears once nornir-mcp \
                         has served at least one tool call.",
                    );
                } else {
                    ui.colored_label(RED, e);
                }
            }
            Some(Ok(stats)) => {
                if stats.is_empty() {
                    ui.colored_label(theme.text_dim, "no MCP calls recorded yet.");
                    return;
                }
                let errs: u64 = stats.iter().map(|s| s.errors).sum();
                ui.label(format!(
                    "{} total call(s) across {} tool(s) · {} error(s){}",
                    self.total,
                    stats.len(),
                    errs,
                    if self.capped { format!("  ·  showing the last {SCAN_LIMIT} rows") } else { String::new() },
                ));
                ui.add_space(6.0);

                let max = stats.iter().map(|s| s.calls).max().unwrap_or(1).max(1);
                egui::Grid::new("mcp_stats_grid")
                    .striped(true)
                    .num_columns(4)
                    .spacing([18.0, 4.0])
                    .show(ui, |ui| {
                        ui.strong("tool");
                        ui.strong("calls");
                        ui.strong("errors");
                        ui.strong("avg ms");
                        ui.end_row();
                        for s in stats {
                            ui.horizontal(|ui| {
                                let frac = s.calls as f32 / max as f32;
                                let (rect, _) = ui.allocate_exact_size(
                                    egui::Vec2::new(90.0 * frac + 1.0, 11.0),
                                    egui::Sense::hover(),
                                );
                                ui.painter().rect_filled(rect, 2.0, theme.accent);
                                ui.label(&s.tool);
                            });
                            ui.label(s.calls.to_string());
                            if s.errors > 0 {
                                ui.colored_label(AMBER, s.errors.to_string());
                            } else {
                                ui.label("0");
                            }
                            ui.label(format!("{:.1}", s.avg_ms));
                            ui.end_row();
                        }
                    });
            }
        }
    }
}

/// Group `mcp_requests` rows (stringified cells) by tool. Resolves columns by
/// name so it survives schema column-order changes.
fn aggregate(p: &TablePreview) -> (Vec<ToolStat>, u64) {
    let idx = |name: &str| p.columns.iter().position(|c| c == name);
    let (ti, si, li) = (idx("tool"), idx("status"), idx("latency_ms"));
    let mut map: std::collections::HashMap<String, (u64, u64, f64)> = std::collections::HashMap::new();
    let mut total = 0u64;
    for row in &p.rows {
        let tool = ti.and_then(|i| row.get(i)).cloned().unwrap_or_else(|| "?".to_string());
        let ok = si.and_then(|i| row.get(i)).map(|s| s == "ok").unwrap_or(true);
        let lat: f64 = li.and_then(|i| row.get(i)).and_then(|s| s.parse().ok()).unwrap_or(0.0);
        let e = map.entry(tool).or_insert((0, 0, 0.0));
        e.0 += 1;
        if !ok {
            e.1 += 1;
        }
        e.2 += lat;
        total += 1;
    }
    let mut stats: Vec<ToolStat> = map
        .into_iter()
        .map(|(tool, (calls, errors, lat_sum))| ToolStat {
            tool,
            calls,
            errors,
            avg_ms: if calls > 0 { lat_sum / calls as f64 } else { 0.0 },
        })
        .collect();
    stats.sort_by(|a, b| b.calls.cmp(&a.calls).then_with(|| a.tool.cmp(&b.tool)));
    (stats, total)
}