nornir 0.4.54

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Embed holger IN-PROCESS — the nornir⇆holger marriage.
//!
//! Instead of spawning `holger-server dev-pair` as a child process, nornir links
//! `holger-server-lib` and runs the **dev pair** (`/cache` crates.io-mirror +
//! writable `/sparring` registry) on a tokio runtime in-process. Three consumers
//! use this: `nornir cache up`, `release stage --execute`, and (optionally) the
//! `nornir-server` daemon at boot. Gated behind the `embed-holger` feature.
//!
//! Holger's own CLI composes startup as:
//!   write `dev_pair_ron` → `read_ron_config` → `instantiate_backends`
//!   → `wire_holger` → `start()`  (then block while the spawned servers run).
//! `start()` is sync and uses `tokio::spawn`, so it must run inside a tokio
//! runtime context; afterwards the gRPC + HTTP listeners live on that runtime.

use anyhow::{Context, Result};
use std::path::Path;
use std::time::Duration;

/// Build the dev-pair config under `data_dir` and start holger's gRPC + HTTP
/// servers on the **current** tokio runtime (the caller must already be inside
/// one — e.g. `nornir-server`'s `#[tokio::main]`). Returns as soon as the servers
/// are spawned; use [`wait_ready`] to confirm the HTTP gateway is accepting.
pub fn start_in_current_runtime(data_dir: &Path, grpc: &str, http: &str) -> Result<()> {
    std::fs::create_dir_all(data_dir)
        .with_context(|| format!("create holger data dir {}", data_dir.display()))?;
    let cfg_path = data_dir.join("holger-dev-pair.ron");
    std::fs::write(&cfg_path, holger_server_lib::dev_pair_ron(data_dir, grpc, http))
        .with_context(|| format!("write holger config {}", cfg_path.display()))?;
    let mut holger = holger_server_lib::read_ron_config(&cfg_path)
        .with_context(|| format!("read holger config {}", cfg_path.display()))?;
    holger.instantiate_backends().context("holger: instantiate backends")?;
    holger_server_lib::wire_holger(&mut holger).context("holger: wire routes")?;
    holger.start().context("holger: start servers")?;
    Ok(())
}

/// A holger dev-pair running on its **own** tokio runtime — for the sync CLI
/// paths (`nornir cache up`, `release stage`) that aren't already inside a
/// runtime. Hold this for as long as the servers should run; dropping it drops
/// the runtime and stops the servers (the rehearsal registry is ephemeral).
pub struct EmbeddedHolger {
    /// Owns the worker threads the gRPC/HTTP listeners run on. Dropping it (last
    /// field) shuts them down.
    _rt: tokio::runtime::Runtime,
    pub grpc: String,
    pub http: String,
}

impl EmbeddedHolger {
    /// Spin up a dedicated multi-thread runtime, start the dev pair on it, and
    /// return the live handle. Does NOT wait for readiness — call
    /// [`EmbeddedHolger::wait_ready`].
    pub fn start(data_dir: &Path, grpc: &str, http: &str) -> Result<Self> {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .worker_threads(2)
            .thread_name("holger-embed")
            .build()
            .context("build holger tokio runtime")?;
        {
            let _guard = rt.enter();
            start_in_current_runtime(data_dir, grpc, http)?;
        }
        Ok(Self { _rt: rt, grpc: grpc.to_string(), http: http.to_string() })
    }

    /// Poll the HTTP gateway until the sparse index serves (or time out).
    pub fn wait_ready(&self, timeout: Duration) -> Result<()> {
        wait_ready(&self.http, timeout)
    }
}

/// One repository's read-only summary, enumerated through holger's **functional
/// (in-process) API** — NO gRPC/HTTP server is started. Built by
/// [`read_registry`].
///
/// `Serialize`/`Deserialize` so the server can ship a `Vec<RepoSummary>` over
/// `Viz.HolgerRegistry` (JSON-carried, like the other Viz read RPCs) to a thin
/// viz client that has no local holger to read.
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct RepoSummary {
    /// Repository name as configured in the dev-pair RON (`crates-io`, `cache`,
    /// `sparring`).
    pub name: String,
    /// `ron_repo_type` (`rust`, `rust-remote`) — tells a cache mirror apart from
    /// a writable rehearsal registry.
    pub repo_type: String,
    /// `true` for a writable backend (the `/sparring` rehearsal registry + the
    /// `/cache` write-through primary); `false` for the read-only `crates-io`
    /// upstream.
    pub writable: bool,
    /// Distinct crate **names** held in this repo's backing znippy archive
    /// (deduped across versions). 0 for the upstream (no local archive to list).
    pub crate_count: usize,
    /// Total artifact entries (one per crate *version*) the backend lists.
    pub artifact_count: usize,
    /// A few crate ids (`name vX.Y.Z`) for the pane — most-recent-first as the
    /// archive lists them, capped.
    pub recent: Vec<String>,
}

/// Read the on-disk dev-pair registry under `data_dir` through holger's
/// **functional API**, WITHOUT starting any server: write/read the dev-pair RON,
/// `instantiate_backends`, then iterate `holger.repositories` and call each
/// backend's `list(None, limit)` (the same enumeration the gRPC `list_artifacts`
/// serves) to summarise the `/cache` mirror + `/sparring` rehearsal registry.
///
/// `grpc`/`http` only fill the RON's address fields (the dev-pair builder needs
/// them); no listener is bound. Returns one [`RepoSummary`] per configured repo,
/// in config order (`crates-io`, `cache`, `sparring`).
pub fn read_registry(
    data_dir: &Path,
    grpc: &str,
    http: &str,
    per_repo_limit: usize,
) -> Result<Vec<RepoSummary>> {
    use std::collections::BTreeSet;
    std::fs::create_dir_all(data_dir)
        .with_context(|| format!("create holger data dir {}", data_dir.display()))?;
    let cfg_path = data_dir.join("holger-dev-pair.ron");
    std::fs::write(&cfg_path, holger_server_lib::dev_pair_ron(data_dir, grpc, http))
        .with_context(|| format!("write holger config {}", cfg_path.display()))?;
    let mut holger = holger_server_lib::read_ron_config(&cfg_path)
        .with_context(|| format!("read holger config {}", cfg_path.display()))?;
    holger.instantiate_backends().context("holger: instantiate backends")?;

    let mut out = Vec::with_capacity(holger.repositories.len());
    for repo in &holger.repositories {
        let mut summary = RepoSummary {
            name: repo.ron_name.clone(),
            repo_type: repo.ron_repo_type.clone(),
            ..Default::default()
        };
        if let Some(backend) = &repo.backend_repository {
            summary.writable = backend.is_writable();
            // `list(None, limit)` enumerates the backing znippy archive's crate
            // entries (parsed back into ArtifactId). The upstream `crates-io`
            // has no local archive → empty, exactly as intended.
            match backend.list(None, per_repo_limit) {
                Ok(entries) => {
                    summary.artifact_count = entries.len();
                    let mut names: BTreeSet<String> = BTreeSet::new();
                    for e in &entries {
                        names.insert(e.id.name.clone());
                    }
                    summary.crate_count = names.len();
                    summary.recent = entries
                        .iter()
                        .rev()
                        .take(8)
                        .map(|e| format!("{} v{}", e.id.name, e.id.version))
                        .collect();
                }
                Err(e) => {
                    // A backend that can't list (e.g. an empty/absent archive)
                    // is reported as zero, not a hard failure — the pane still
                    // wants the repo row + writable flag.
                    eprintln!("holger read_registry: list({}) failed: {e:#}", repo.ron_name);
                }
            }
        }
        out.push(summary);
    }
    Ok(out)
}

/// Poll holger's HTTP sparse gateway at `http_addr` (host:port) until it serves
/// the `/cache` index `config.json` (proof the routes are wired), or time out.
pub fn wait_ready(http_addr: &str, timeout: Duration) -> Result<()> {
    let url = format!("http://{http_addr}/cache/index/config.json");
    let deadline = std::time::Instant::now() + timeout;
    loop {
        if ureq::get(&url)
            .timeout(Duration::from_millis(500))
            .call()
            .is_ok()
        {
            return Ok(());
        }
        if std::time::Instant::now() >= deadline {
            anyhow::bail!("holger HTTP gateway never became ready at {url} (waited {timeout:?})");
        }
        std::thread::sleep(Duration::from_millis(150));
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// `read_registry` reads the dev-pair registry through holger's functional
    /// (in-process) API WITHOUT starting a server: it lists the three configured
    /// repos, and after a `put` into the writable `/sparring` registry (also a
    /// functional-API call) the re-read enumerates that crate via `list`.
    #[test]
    fn read_registry_enumerates_dev_pair_via_functional_api() {
        let dir = tempfile::tempdir().unwrap();
        let data = dir.path();
        let grpc = "127.0.0.1:18443";
        let http = "127.0.0.1:18444";

        // Empty registry: three repos, all zero crates (no server started).
        let repos = read_registry(data, grpc, http, 1000).expect("read empty dev pair");
        let names: Vec<&str> = repos.iter().map(|r| r.name.as_str()).collect();
        assert_eq!(
            names,
            vec!["crates-io", "cache", "sparring"],
            "functional API lists the dev-pair repos"
        );
        let sparring = repos.iter().find(|r| r.name == "sparring").unwrap();
        assert!(sparring.writable, "/sparring is a writable registry");
        assert_eq!(sparring.crate_count, 0, "nothing published yet");

        // Publish a crate into /sparring via the SAME functional API (construct →
        // instantiate_backends → put), then re-read and see it enumerated.
        let cfg_path = data.join("holger-dev-pair.ron");
        std::fs::write(&cfg_path, holger_server_lib::dev_pair_ron(data, grpc, http)).unwrap();
        let mut holger = holger_server_lib::read_ron_config(&cfg_path).unwrap();
        holger.instantiate_backends().unwrap();
        let id = holger_server_lib::ArtifactId {
            namespace: None,
            name: "demo".into(),
            version: "0.1.0".into(),
        };
        holger.put("sparring", &id, b"crate-bytes").expect("publish to /sparring");

        let repos = read_registry(data, grpc, http, 1000).expect("re-read after publish");
        let sparring = repos.iter().find(|r| r.name == "sparring").unwrap();
        assert_eq!(sparring.crate_count, 1, "the published crate is enumerated");
        assert!(
            sparring.recent.iter().any(|s| s.contains("demo")),
            "recent list carries the published crate, got {:?}",
            sparring.recent
        );
    }
}