holger-server-lib 0.6.7

Holger server library: config, wiring, gRPC service, Rust API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
// holger-server-lib/src/lib.rs
//
// Two modes of operation:
//   1. Rust function bindings — use Holger struct directly as a library
//   2. gRPC server — configure exposed_endpoints in RON, call holger.start()

use serde::{Deserialize, Serialize};

use std::{
    fs::File,
    io::BufReader,
    path::Path,
};

use anyhow::Result;
use ron::de::from_reader;

use std::collections::HashMap;
use std::sync::Arc;
pub use traits::RepositoryBackendTrait;
use crate::exposed::ExposedEndpoint;
use crate::exposed::fast_routes::FastRoutes;
pub use crate::repository::Repository;
pub use crate::storage::StorageEndpoint;

pub mod audit;
pub mod auth;
pub mod exposed;
pub mod grpc;
pub mod object;
pub mod proxy;
pub mod repository;
pub mod storage;
pub mod upstream_rust;

pub use crate::object::{LocalHolger, RemoteHolger};
pub use traits::HolgerObject;
// Re-export the core artifact types + the backend trait so in-process consumers
// (e.g. nornir's embedded-holger viz pane) can enumerate a repository's
// backend directly — `holger.repositories[i].backend_repository` is an
// `Arc<dyn RepositoryBackendTrait>`, so the trait must be reachable to call
// `list`/`fetch` on it without depending on `holger-traits` separately.
pub use traits::{ArtifactEntry, ArtifactId};

// ========================= WIRING ENGINE =========================

pub fn wire_holger(holger: &mut Holger) -> Result<()> {
    let mut repo_map = HashMap::new();
    let mut exposed_map = HashMap::new();
    let mut storage_map = HashMap::new();

    for repo in &*holger.repositories {
        repo_map.insert(repo.ron_name.clone(), repo as *const Repository);
    }
    for exp in &*holger.exposed_endpoints {
        exposed_map.insert(exp.ron_name.clone(), exp as *const ExposedEndpoint);
    }
    for st in &*holger.storage_endpoints {
        storage_map.insert(st.ron_name.clone(), st as *const StorageEndpoint);
    }

    // Wire repository IO references
    for repo in &mut holger.repositories {
        for name in &repo.ron_upstreams {
            if let Some(ptr) = repo_map.get(name) {
                repo.wired_upstreams.push(*ptr);
            } else {
                return Err(anyhow::anyhow!("Missing upstream repo: {}", name));
            }
        }

        if let Some(io) = &mut repo.ron_in {
            io.wired_storage = *storage_map
                .get(&io.ron_storage_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
            io.wired_exposed = *exposed_map
                .get(&io.ron_exposed_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
        }

        if let Some(io) = &mut repo.ron_out {
            io.wired_storage = *storage_map
                .get(&io.ron_storage_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
            io.wired_exposed = *exposed_map
                .get(&io.ron_exposed_endpoint)
                .ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
        }
    }

    // Wire reverse links (endpoint → repos)
    for exp in &mut holger.exposed_endpoints {
        for repo in &holger.repositories {
            if let Some(io) = &repo.ron_out {
                if io.ron_exposed_endpoint == exp.ron_name {
                    exp.wired_out_repositories.push(repo as *const Repository);
                }
            }
        }
    }

    for st in &mut holger.storage_endpoints {
        for repo in &holger.repositories {
            if let Some(io) = &repo.ron_in {
                if io.ron_storage_endpoint == st.ron_name {
                    st.wired_in_repositories.push(repo as *const Repository);
                }
            }
            if let Some(io) = &repo.ron_out {
                if io.ron_storage_endpoint == st.ron_name {
                    st.wired_out_repositories.push(repo as *const Repository);
                }
            }
        }
    }

    // Build route tables for each exposed endpoint, wrapping repos that have
    // configured upstreams in a ProxyBackend so 404s fall through automatically.
    for exp in &mut holger.exposed_endpoints {
        let mut routes: Vec<(String, Arc<dyn RepositoryBackendTrait>)> = Vec::new();

        for &repo_ptr in &exp.wired_out_repositories {
            if repo_ptr.is_null() { continue; }
            let repo: &Repository = unsafe { &*repo_ptr };
            let Some(backend_arc) = &repo.backend_repository else { continue };

            let backend: Arc<dyn RepositoryBackendTrait> = if repo.wired_upstreams.is_empty() {
                backend_arc.clone()
            } else {
                let upstream_backends: Vec<Arc<dyn RepositoryBackendTrait>> = repo
                    .wired_upstreams
                    .iter()
                    .filter_map(|&ptr| {
                        if ptr.is_null() { return None; }
                        let upstream: &Repository = unsafe { &*ptr };
                        upstream.backend_repository.clone()
                    })
                    .collect();
                Arc::new(crate::proxy::ProxyBackend::new(backend_arc.clone(), upstream_backends))
            };

            routes.push((repo.ron_name.clone(), backend));
        }

        exp.aggregated_routes = if routes.is_empty() {
            None
        } else {
            Some(FastRoutes::new(routes))
        };
    }

    Ok(())
}

// ========================= ROOT HOLGER =========================

#[derive(Serialize, Deserialize)]
pub struct Holger {
    pub repositories: Vec<Repository>,
    pub exposed_endpoints: Vec<ExposedEndpoint>,
    pub storage_endpoints: Vec<StorageEndpoint>,

    /// Optional auth configuration. Absent or empty methods = open access.
    #[serde(default)]
    pub auth: Option<auth::AuthConfig>,

    /// Audit-log configuration. Defaults to ON (append-only Arrow IPC) so a
    /// config that says nothing still gets an audit trail.
    #[serde(default)]
    pub audit: AuditSettings,
}

/// Audit-log configuration. On by default; the default backend is the
/// append-only Arrow IPC log ([`audit::ArrowIpcAuditLog`]).
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AuditSettings {
    /// Directory for append-only Arrow IPC audit segments. Relative paths
    /// resolve against the server's working directory.
    #[serde(default = "default_audit_dir")]
    pub dir: String,
    /// Master switch. `false` installs a no-op audit log (audit disabled).
    #[serde(default = "default_true")]
    pub enabled: bool,
}

fn default_audit_dir() -> String {
    "holger-audit".to_string()
}

fn default_true() -> bool {
    true
}

impl Default for AuditSettings {
    fn default() -> Self {
        Self {
            dir: default_audit_dir(),
            enabled: true,
        }
    }
}

// ========================= LOAD RON CONFIG =========================

pub fn read_ron_config<P: AsRef<Path>>(path: P) -> Result<Holger> {
    let file = File::open(path)?;
    let reader = BufReader::new(file);
    let holger: Holger = from_reader(reader)?;
    Ok(holger)
}

/// Autogenerate the RON config for the **dev release pair** that nornir embeds:
///
/// * `/cache`    — a writable-znippy primary with a `crates-io` (`rust-remote`)
///   upstream → a transparent caching crates.io mirror (write-through fills it).
/// * `/sparring` — a writable-znippy registry → the release rehearsal target;
///   `seal` freezes it into a static drift snapshot.
///
/// nornir writes this to a file and launches `holger start --config <file>` as a
/// subprocess (no library linking, no version coupling). cargo speaks the HTTP
/// SPARSE protocol, so it talks to `http_addr` (the FastRoutes gateway), e.g.
/// `sparse+http://<http_addr>/cache/index/`; `grpc_url` is the control plane.
/// Archives live under `data_dir`.
pub fn dev_pair_ron(data_dir: &Path, grpc_url: &str, http_addr: &str) -> String {
    let d = data_dir.display();
    format!(
        r#"(
    exposed_endpoints: [
        ( ron_name: "dev", ron_url: "{grpc_url}", ron_http_url: Some("{http_addr}") ),
    ],
    storage_endpoints: [
        ( ron_name: "cache-store", ron_storage_type: "znippy", ron_path: "{d}/cache/" ),
        ( ron_name: "sparring-store", ron_storage_type: "znippy", ron_path: "{d}/sparring/" ),
    ],
    repositories: [
        ( ron_name: "crates-io", ron_repo_type: "rust-remote", ron_upstreams: [], ron_in: None, ron_out: None ),
        ( ron_name: "cache", ron_repo_type: "rust", ron_writable_archive: Some("{d}/cache.znippy"), ron_base_url: Some("http://{http_addr}"), ron_upstreams: ["crates-io"], ron_in: None, ron_out: Some(( ron_storage_endpoint: "cache-store", ron_exposed_endpoint: "dev" )) ),
        ( ron_name: "sparring", ron_repo_type: "rust", ron_writable_archive: Some("{d}/sparring.znippy"), ron_base_url: Some("http://{http_addr}"), ron_upstreams: ["crates-io"], ron_in: None, ron_out: Some(( ron_storage_endpoint: "sparring-store", ron_exposed_endpoint: "dev" )) ),
    ],
)
"#
    )
}

impl Holger {
    // ─── Rust API (direct function bindings) ─────────────────────────

    /// Fetch an artifact by ID from a named repository
    pub fn fetch(&self, repository: &str, id: &traits::ArtifactId) -> Result<Option<Vec<u8>>> {
        let repo = self.get_repo(repository)?;
        repo.fetch(id).map_err(Into::into)
    }

    /// Store an artifact (write-enabled repos only)
    pub fn put(&self, repository: &str, id: &traits::ArtifactId, data: &[u8]) -> Result<()> {
        let repo = self.get_repo(repository)?;
        if !repo.is_writable() {
            anyhow::bail!("Repository '{}' is read-only", repository);
        }
        repo.put(id, data).map_err(Into::into)
    }

    /// List all configured repository names
    pub fn list_repositories(&self) -> Vec<&str> {
        self.repositories.iter().map(|r| r.ron_name.as_str()).collect()
    }

    /// Get a repository backend by name
    fn get_repo(&self, name: &str) -> Result<&Arc<dyn RepositoryBackendTrait>> {
        for repo in &self.repositories {
            if repo.ron_name == name {
                return repo.backend_repository.as_ref()
                    .ok_or_else(|| anyhow::anyhow!("Repository '{}' has no backend", name));
            }
        }
        anyhow::bail!("Repository '{}' not found", name)
    }

    // ─── gRPC server (when configured) ──────────────────────────────

    /// Start gRPC servers (and optional HTTP gateways) for all configured
    /// exposed_endpoints
    pub fn start(&self) -> Result<()> {
        crate::exposed::tls::ensure_crypto_provider();
        let auth_config = Arc::new(self.auth.clone().unwrap_or_default());
        // One shared, append-only audit sink across every endpoint.
        let audit = self.build_audit_log();
        for ep in &self.exposed_endpoints {
            if let Some(routes) = &ep.aggregated_routes {
                self.start_grpc_endpoint(
                    &ep.ron_url,
                    routes.clone(),
                    auth_config.clone(),
                    audit.clone(),
                    ep.ron_tls.as_ref(),
                )?;

                if let Some(http_url) = &ep.ron_http_url {
                    let addr: std::net::SocketAddr = http_url.parse()
                        .map_err(|e| anyhow::anyhow!("Invalid HTTP address '{}': {}", http_url, e))?;
                    let tls = match &ep.ron_tls {
                        Some(t) => Some(t.rustls_gateway_config()?),
                        None => None,
                    };
                    let settings = Arc::new(crate::exposed::http::GatewaySettings {
                        routes: routes.clone(),
                        auth_config: auth_config.clone(),
                        tls,
                        // 1 GiB default body cap.
                        max_body_bytes: ep.ron_max_body_bytes.unwrap_or(1024 * 1024 * 1024),
                        audit: audit.clone(),
                    });
                    crate::exposed::http::start_http_gateway(addr, settings)?;
                }
            }
        }
        Ok(())
    }

    /// Build the shared audit sink from config. If the Arrow-IPC backend cannot
    /// be opened, log and fall back to a no-op log rather than refusing to start
    /// — availability of the server outweighs a missing audit directory, and the
    /// failure is loud.
    fn build_audit_log(&self) -> Arc<dyn audit::AuditLog> {
        if !self.audit.enabled {
            log::info!("audit log disabled by config");
            return Arc::new(audit::NoopAuditLog);
        }
        match audit::default_audit_log(&self.audit.dir) {
            Ok(log) => {
                log::info!(
                    "audit log: append-only Arrow IPC segments under {:?}",
                    self.audit.dir
                );
                log
            }
            Err(e) => {
                log::error!(
                    "audit DISABLED — could not open audit dir {:?}: {e}",
                    self.audit.dir
                );
                Arc::new(audit::NoopAuditLog)
            }
        }
    }

    fn start_grpc_endpoint(
        &self,
        addr: &str,
        routes: FastRoutes,
        auth_config: Arc<auth::AuthConfig>,
        audit: Arc<dyn audit::AuditLog>,
        tls: Option<&crate::exposed::tls::TlsSettings>,
    ) -> Result<()> {
        use crate::grpc::holger_proto::{
            repository_service_server::RepositoryServiceServer,
            archive_service_server::ArchiveServiceServer,
            admin_service_server::AdminServiceServer,
        };
        use crate::grpc::HolgerGrpc;

        let grpc_state = Arc::new(HolgerGrpc::with_auth(routes, auth_config).with_audit(audit));
        let listen_addr: std::net::SocketAddr = addr.parse()
            .map_err(|e| anyhow::anyhow!("Invalid gRPC address '{}': {}", addr, e))?;

        let tls_was_set = tls.is_some();
        let mut builder = tonic::transport::Server::builder();
        match tls {
            Some(t) => {
                builder = builder.tls_config(t.tonic_config()?)
                    .map_err(|e| anyhow::anyhow!("gRPC TLS config: {}", e))?;
            }
            None => log::warn!(
                "SECURITY: gRPC server on {} runs WITHOUT TLS (cleartext h2c). \
                 Set ron_tls for any non-loopback use.",
                listen_addr
            ),
        }

        tokio::spawn(async move {
            let scheme = if tls_was_set { "https/h2" } else { "h2c (cleartext)" };
            println!("gRPC server listening on {} [{}]", listen_addr, scheme);
            if let Err(e) = builder
                .add_service(RepositoryServiceServer::new(grpc_state.clone()))
                .add_service(ArchiveServiceServer::new(grpc_state.clone()))
                .add_service(AdminServiceServer::new(grpc_state.clone()))
                .serve(listen_addr)
                .await
            {
                eprintln!("gRPC server error: {}", e);
            }
        });

        Ok(())
    }

    pub fn instantiate_backends(&mut self) -> Result<()> {
        for se in &mut self.storage_endpoints {
            se.backend_from_config()?;
        }
        for repo in &mut self.repositories {
            repo.backend_from_config()?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// The autogenerated dev pair must (a) parse, (b) instantiate every backend,
    /// (c) wire (upstreams + endpoints all resolve), and (d) behave: `/sparring`
    /// is writable + serves what it's given, `/crates-io` is read-only.
    #[test]
    fn dev_pair_ron_parses_instantiates_and_wires() {
        let dir = tempfile::tempdir().unwrap();
        let ron = dev_pair_ron(dir.path(), "https://127.0.0.1:18443", "127.0.0.1:18444");

        let mut holger: Holger =
            from_reader(std::io::Cursor::new(ron.as_bytes())).expect("dev-pair RON must parse");
        assert_eq!(holger.list_repositories(), vec!["crates-io", "cache", "sparring"]);

        holger.instantiate_backends().expect("every backend must build");
        wire_holger(&mut holger).expect("upstreams + endpoints must all wire");

        // /sparring is writable and round-trips a crate.
        let id = traits::ArtifactId { namespace: None, name: "demo".into(), version: "0.1.0".into() };
        holger.put("sparring", &id, b"crate-bytes").expect("sparring is writable");
        assert_eq!(
            holger.fetch("sparring", &id).unwrap().as_deref(),
            Some(b"crate-bytes".as_ref()),
        );

        // /crates-io upstream is read-only (a put must be rejected).
        assert!(holger.put("crates-io", &id, b"x").is_err(), "crates-io upstream is read-only");
    }
}