holger-server-lib 0.6.6

Holger server library: config, wiring, gRPC service, Rust API
// holger-server-lib/src/lib.rs
//
// Two modes of operation:
//   1. Rust function bindings — use Holger struct directly as a library
//   2. gRPC server — configure exposed_endpoints in RON, call holger.start()

use serde::{Deserialize, Serialize};

use std::{
    fs::File,
    io::BufReader,
    path::Path,
};

use anyhow::Result;
use ron::de::from_reader;

use std::collections::HashMap;
use std::sync::Arc;
use traits::RepositoryBackendTrait;
use crate::exposed::ExposedEndpoint;
use crate::exposed::fast_routes::FastRoutes;
pub use crate::repository::Repository;
pub use crate::storage::StorageEndpoint;

pub mod auth;
pub mod exposed;
pub mod grpc;
pub mod object;
pub mod proxy;
pub mod repository;
pub mod storage;
pub mod upstream_rust;

pub use crate::object::{LocalHolger, RemoteHolger};
pub use traits::HolgerObject;

// ========================= WIRING ENGINE =========================

pub fn wire_holger(holger: &mut Holger) -> Result<()> {
    let mut repo_map = HashMap::new();
    let mut exposed_map = HashMap::new();
    let mut storage_map = HashMap::new();

    for repo in &*holger.repositories {
        repo_map.insert(repo.ron_name.clone(), repo as *const Repository);
    }
    for exp in &*holger.exposed_endpoints {
        exposed_map.insert(exp.ron_name.clone(), exp as *const ExposedEndpoint);
    }
    for st in &*holger.storage_endpoints {
        storage_map.insert(st.ron_name.clone(), st as *const StorageEndpoint);
    }

    // Wire repository IO references
    for repo in &mut holger.repositories {
        for name in &repo.ron_upstreams {
            if let Some(ptr) = repo_map.get(name) {
                repo.wired_upstreams.push(*ptr);
            } else {
                return Err(anyhow::anyhow!("Missing upstream repo: {}", name));
            }
        }

        if let Some(io) = &mut repo.ron_in {
            io.wired_storage = *storage_map
                .get(&io.ron_storage_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
            io.wired_exposed = *exposed_map
                .get(&io.ron_exposed_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
        }

        if let Some(io) = &mut repo.ron_out {
            io.wired_storage = *storage_map
                .get(&io.ron_storage_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
            io.wired_exposed = *exposed_map
                .get(&io.ron_exposed_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
        }
    }

    // Wire reverse links (endpoint → repos)
    for exp in &mut holger.exposed_endpoints {
        for repo in &holger.repositories {
            if let Some(io) = &repo.ron_out {
                if io.ron_exposed_endpoint == exp.ron_name {
                    exp.wired_out_repositories.push(repo as *const Repository);
                }
            }
        }
    }

    for st in &mut holger.storage_endpoints {
        for repo in &holger.repositories {
            if let Some(io) = &repo.ron_in {
                if io.ron_storage_endpoint == st.ron_name {
                    st.wired_in_repositories.push(repo as *const Repository);
                }
            }
            if let Some(io) = &repo.ron_out {
                if io.ron_storage_endpoint == st.ron_name {
                    st.wired_out_repositories.push(repo as *const Repository);
                }
            }
        }
    }

    // Build route tables for each exposed endpoint, wrapping repos that have
    // configured upstreams in a ProxyBackend so 404s fall through automatically.
    for exp in &mut holger.exposed_endpoints {
        let mut routes: Vec<(String, Arc<dyn RepositoryBackendTrait>)> = Vec::new();

        for &repo_ptr in &exp.wired_out_repositories {
            if repo_ptr.is_null() { continue; }
            let repo: &Repository = unsafe { &*repo_ptr };
            let Some(backend_arc) = &repo.backend_repository else { continue };

            let backend: Arc<dyn RepositoryBackendTrait> = if repo.wired_upstreams.is_empty() {
                backend_arc.clone()
            } else {
                let upstream_backends: Vec<Arc<dyn RepositoryBackendTrait>> = repo
                    .wired_upstreams
                    .iter()
                    .filter_map(|&ptr| {
                        if ptr.is_null() { return None; }
                        let upstream: &Repository = unsafe { &*ptr };
                        upstream.backend_repository.clone()
                    })
                    .collect();
                Arc::new(crate::proxy::ProxyBackend::new(backend_arc.clone(), upstream_backends))
            };

            routes.push((repo.ron_name.clone(), backend));
        }

        exp.aggregated_routes = if routes.is_empty() {
            None
        } else {
            Some(FastRoutes::new(routes))
        };
    }

    Ok(())
}

// ========================= ROOT HOLGER =========================

#[derive(Serialize, Deserialize)]
pub struct Holger {
    pub repositories: Vec<Repository>,
    pub exposed_endpoints: Vec<ExposedEndpoint>,
    pub storage_endpoints: Vec<StorageEndpoint>,

    /// Optional auth configuration. Absent or empty methods = open access.
    #[serde(default)]
    pub auth: Option<auth::AuthConfig>,
}

// ========================= LOAD RON CONFIG =========================

pub fn read_ron_config<P: AsRef<Path>>(path: P) -> Result<Holger> {
    let file = File::open(path)?;
    let reader = BufReader::new(file);
    let holger: Holger = from_reader(reader)?;
    Ok(holger)
}

/// Autogenerate the RON config for the **dev release pair** that nornir embeds:
///
/// * `/cache`    — a writable-znippy primary with a `crates-io` (`rust-remote`)
///   upstream → a transparent caching crates.io mirror (write-through fills it).
/// * `/sparring` — a writable-znippy registry → the release rehearsal target;
///   `seal` freezes it into a static drift snapshot.
///
/// nornir writes this to a file and launches `holger start --config <file>` as a
/// subprocess (no library linking, no version coupling). cargo speaks the HTTP
/// SPARSE protocol, so it talks to `http_addr` (the FastRoutes gateway), e.g.
/// `sparse+http://<http_addr>/cache/index/`; `grpc_url` is the control plane.
/// Archives live under `data_dir`.
pub fn dev_pair_ron(data_dir: &Path, grpc_url: &str, http_addr: &str) -> String {
    let d = data_dir.display();
    format!(
        r#"(
    exposed_endpoints: [
        ( ron_name: "dev", ron_url: "{grpc_url}", ron_http_url: Some("{http_addr}") ),
    ],
    storage_endpoints: [
        ( ron_name: "cache-store", ron_storage_type: "znippy", ron_path: "{d}/cache/" ),
        ( ron_name: "sparring-store", ron_storage_type: "znippy", ron_path: "{d}/sparring/" ),
    ],
    repositories: [
        ( ron_name: "crates-io", ron_repo_type: "rust-remote", ron_upstreams: [], ron_in: None, ron_out: None ),
        ( ron_name: "cache", ron_repo_type: "rust", ron_writable_archive: Some("{d}/cache.znippy"), ron_base_url: Some("http://{http_addr}"), ron_upstreams: ["crates-io"], ron_in: None, ron_out: Some(( ron_storage_endpoint: "cache-store", ron_exposed_endpoint: "dev" )) ),
        ( ron_name: "sparring", ron_repo_type: "rust", ron_writable_archive: Some("{d}/sparring.znippy"), ron_base_url: Some("http://{http_addr}"), ron_upstreams: ["crates-io"], ron_in: None, ron_out: Some(( ron_storage_endpoint: "sparring-store", ron_exposed_endpoint: "dev" )) ),
    ],
)
"#
    )
}

impl Holger {
    // ─── Rust API (direct function bindings) ─────────────────────────

    /// Fetch an artifact by ID from a named repository
    pub fn fetch(&self, repository: &str, id: &traits::ArtifactId) -> Result<Option<Vec<u8>>> {
        let repo = self.get_repo(repository)?;
        repo.fetch(id).map_err(Into::into)
    }

    /// Store an artifact (write-enabled repos only)
    pub fn put(&self, repository: &str, id: &traits::ArtifactId, data: &[u8]) -> Result<()> {
        let repo = self.get_repo(repository)?;
        if !repo.is_writable() {
            anyhow::bail!("Repository '{}' is read-only", repository);
        }
        repo.put(id, data).map_err(Into::into)
    }

    /// List all configured repository names
    pub fn list_repositories(&self) -> Vec<&str> {
        self.repositories.iter().map(|r| r.ron_name.as_str()).collect()
    }

    /// Get a repository backend by name
    fn get_repo(&self, name: &str) -> Result<&Arc<dyn RepositoryBackendTrait>> {
        for repo in &self.repositories {
            if repo.ron_name == name {
                return repo.backend_repository.as_ref()
                    .ok_or_else(|| anyhow::anyhow!("Repository '{}' has no backend", name));
            }
        }
        anyhow::bail!("Repository '{}' not found", name)
    }

    // ─── gRPC server (when configured) ──────────────────────────────

    /// Start gRPC servers (and optional HTTP gateways) for all configured
    /// exposed_endpoints
    pub fn start(&self) -> Result<()> {
        crate::exposed::tls::ensure_crypto_provider();
        let auth_config = Arc::new(self.auth.clone().unwrap_or_default());
        for ep in &self.exposed_endpoints {
            if let Some(routes) = &ep.aggregated_routes {
                self.start_grpc_endpoint(&ep.ron_url, routes.clone(), auth_config.clone(), ep.ron_tls.as_ref())?;

                if let Some(http_url) = &ep.ron_http_url {
                    let addr: std::net::SocketAddr = http_url.parse()
                        .map_err(|e| anyhow::anyhow!("Invalid HTTP address '{}': {}", http_url, e))?;
                    let tls = match &ep.ron_tls {
                        Some(t) => Some(t.rustls_gateway_config()?),
                        None => None,
                    };
                    let settings = Arc::new(crate::exposed::http::GatewaySettings {
                        routes: routes.clone(),
                        auth_config: auth_config.clone(),
                        tls,
                        // 1 GiB default body cap.
                        max_body_bytes: ep.ron_max_body_bytes.unwrap_or(1024 * 1024 * 1024),
                    });
                    crate::exposed::http::start_http_gateway(addr, settings)?;
                }
            }
        }
        Ok(())
    }

    fn start_grpc_endpoint(
        &self,
        addr: &str,
        routes: FastRoutes,
        auth_config: Arc<auth::AuthConfig>,
        tls: Option<&crate::exposed::tls::TlsSettings>,
    ) -> Result<()> {
        use crate::grpc::holger_proto::{
            repository_service_server::RepositoryServiceServer,
            archive_service_server::ArchiveServiceServer,
            admin_service_server::AdminServiceServer,
        };
        use crate::grpc::HolgerGrpc;

        let grpc_state = Arc::new(HolgerGrpc::with_auth(routes, auth_config));
        let listen_addr: std::net::SocketAddr = addr.parse()
            .map_err(|e| anyhow::anyhow!("Invalid gRPC address '{}': {}", addr, e))?;

        let tls_was_set = tls.is_some();
        let mut builder = tonic::transport::Server::builder();
        match tls {
            Some(t) => {
                builder = builder.tls_config(t.tonic_config()?)
                    .map_err(|e| anyhow::anyhow!("gRPC TLS config: {}", e))?;
            }
            None => log::warn!(
                "SECURITY: gRPC server on {} runs WITHOUT TLS (cleartext h2c). \
                 Set ron_tls for any non-loopback use.",
                listen_addr
            ),
        }

        tokio::spawn(async move {
            let scheme = if tls_was_set { "https/h2" } else { "h2c (cleartext)" };
            println!("gRPC server listening on {} [{}]", listen_addr, scheme);
            if let Err(e) = builder
                .add_service(RepositoryServiceServer::new(grpc_state.clone()))
                .add_service(ArchiveServiceServer::new(grpc_state.clone()))
                .add_service(AdminServiceServer::new(grpc_state.clone()))
                .serve(listen_addr)
                .await
            {
                eprintln!("gRPC server error: {}", e);
            }
        });

        Ok(())
    }

    pub fn instantiate_backends(&mut self) -> Result<()> {
        for se in &mut self.storage_endpoints {
            se.backend_from_config()?;
        }
        for repo in &mut self.repositories {
            repo.backend_from_config()?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// The autogenerated dev pair must (a) parse, (b) instantiate every backend,
    /// (c) wire (upstreams + endpoints all resolve), and (d) behave: `/sparring`
    /// is writable + serves what it's given, `/crates-io` is read-only.
    #[test]
    fn dev_pair_ron_parses_instantiates_and_wires() {
        let dir = tempfile::tempdir().unwrap();
        let ron = dev_pair_ron(dir.path(), "https://127.0.0.1:18443", "127.0.0.1:18444");

        let mut holger: Holger =
            from_reader(std::io::Cursor::new(ron.as_bytes())).expect("dev-pair RON must parse");
        assert_eq!(holger.list_repositories(), vec!["crates-io", "cache", "sparring"]);

        holger.instantiate_backends().expect("every backend must build");
        wire_holger(&mut holger).expect("upstreams + endpoints must all wire");

        // /sparring is writable and round-trips a crate.
        let id = traits::ArtifactId { namespace: None, name: "demo".into(), version: "0.1.0".into() };
        holger.put("sparring", &id, b"crate-bytes").expect("sparring is writable");
        assert_eq!(
            holger.fetch("sparring", &id).unwrap().as_deref(),
            Some(b"crate-bytes".as_ref()),
        );

        // /crates-io upstream is read-only (a put must be rejected).
        assert!(holger.put("crates-io", &id, b"x").is_err(), "crates-io upstream is read-only");
    }
}