holger-server-lib 0.6.5

Holger server library: config, wiring, gRPC service, Rust API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! `HolgerObject` adapters — two transports behind one interface.
//!
//! An external partner holds an `Arc<dyn HolgerObject>` (see `holger-traits`)
//! and never cares what's behind the vtable, exactly like a Java interface
//! reference. Two concrete implementations live here:
//!
//!   * [`LocalHolger`] — in-process: calls the local engine directly (no
//!     network, no serialization). Built from the same [`FastRoutes`] table the
//!     gRPC server uses, so it is `Send + Sync`.
//!   * [`RemoteHolger`] — remote: forwards every call over gRPC using the
//!     tonic-generated client. The language-neutral contract is `holger.proto`.
//!
//! Pick the transport once at startup; the CLI / UI-backend code is identical
//! either way.

use std::sync::Arc;
use std::time::Instant;

use anyhow::{anyhow, Result};
use async_trait::async_trait;

use traits::{
    ArchiveInfo, ArtifactEntry, ArtifactId, Health, HolgerObject, RepositoryBackendTrait,
    RepositoryInfo,
};

use crate::exposed::fast_routes::FastRoutes;

/// Paginate a backend listing with an opaque offset cursor.
///
/// `RepositoryBackendTrait::list` makes no stable-order guarantee across calls
/// (a `HashMap` / `read_dir` may reorder), so offset paging can't slice the raw
/// listing directly. We fetch the full matching set once, sort it into a stable
/// total order (namespace, name, version), then return the requested window.
/// `page_token` is the integer offset (empty/None = start); the returned token
/// is the next offset, or empty when the listing is exhausted. `page_size == 0`
/// means "everything from the offset" (no further paging).
///
/// NB: fetching the whole set per page is O(n) — fine for the archive sizes
/// holger serves today; a cursor pushed into the backend would scale further.
pub(crate) fn paginate_artifacts(
    repo: &dyn RepositoryBackendTrait,
    name_filter: Option<&str>,
    page_size: usize,
    page_token: Option<&str>,
) -> Result<(Vec<ArtifactEntry>, String)> {
    let offset: usize = page_token.and_then(|t| t.parse().ok()).unwrap_or(0);
    let mut all = repo.list(name_filter, 0)?;
    all.sort_by(|a, b| {
        a.id
            .namespace
            .cmp(&b.id.namespace)
            .then_with(|| a.id.name.cmp(&b.id.name))
            .then_with(|| a.id.version.cmp(&b.id.version))
    });
    let total = all.len();
    if offset >= total {
        return Ok((Vec::new(), String::new()));
    }
    let end = if page_size == 0 {
        total
    } else {
        offset.saturating_add(page_size).min(total)
    };
    let page = all[offset..end].to_vec();
    let next = if end < total {
        end.to_string()
    } else {
        String::new()
    };
    Ok((page, next))
}

// ─── LocalHolger: direct in-process transport ────────────────────────

/// In-process `HolgerObject`. Resolves repositories through a [`FastRoutes`]
/// table and calls the backends directly.
pub struct LocalHolger {
    routes: FastRoutes,
    start: Instant,
}

impl LocalHolger {
    pub fn new(routes: FastRoutes) -> Self {
        Self { routes, start: Instant::now() }
    }

    fn repo(&self, name: &str) -> Result<Arc<dyn RepositoryBackendTrait>> {
        self.routes
            .lookup(name)
            .cloned()
            .ok_or_else(|| anyhow!("Repository '{}' not found", name))
    }
}

#[async_trait]
impl HolgerObject for LocalHolger {
    async fn fetch(&self, repository: &str, id: &ArtifactId) -> Result<Option<Vec<u8>>> {
        self.repo(repository)?.fetch(id)
    }

    async fn put(&self, repository: &str, id: &ArtifactId, data: &[u8]) -> Result<()> {
        let repo = self.repo(repository)?;
        if !repo.is_writable() {
            anyhow::bail!("Repository '{}' is read-only", repository);
        }
        repo.put(id, data)
    }

    async fn list_repositories(&self) -> Result<Vec<RepositoryInfo>> {
        Ok(self
            .routes
            .all_repos()
            .into_iter()
            .map(|(name, repo)| RepositoryInfo {
                name,
                repo_type: format!("{:?}", repo.format()),
                writable: repo.is_writable(),
                has_archive: true,
            })
            .collect())
    }

    /// List artifacts straight off the backend (no network). Mirrors the gRPC
    /// transport's `ListArtifacts`, minus serialization: resolve the repo and
    /// page through its listing with [`paginate_artifacts`] (offset cursor in
    /// `page_token`, `limit` as the page size).
    async fn list_artifacts(
        &self,
        repository: &str,
        name_filter: Option<String>,
        limit: u32,
        page_token: Option<String>,
    ) -> Result<(Vec<ArtifactEntry>, String)> {
        let repo = self.repo(repository)?;
        paginate_artifacts(
            repo.as_ref(),
            name_filter.as_deref(),
            limit as usize,
            page_token.as_deref(),
        )
    }

    /// Browse the backing archive's raw file paths straight off the backend (no
    /// network). Resolves the repo and delegates to its `archive_files`.
    async fn list_archive_files(
        &self,
        repository: &str,
        prefix: Option<String>,
    ) -> Result<Vec<String>> {
        self.repo(repository)?.archive_files(prefix.as_deref())
    }

    /// Archive stats straight off the backend (no network).
    async fn archive_info(&self, repository: &str) -> Result<ArchiveInfo> {
        self.repo(repository)?.archive_info()
    }

    async fn health(&self) -> Result<Health> {
        Ok(Health {
            status: "ok".into(),
            version: env!("CARGO_PKG_VERSION").into(),
            uptime_seconds: self.start.elapsed().as_secs() as i64,
        })
    }
}

// ─── RemoteHolger: gRPC transport ────────────────────────────────────

use crate::grpc::holger_proto::{
    admin_service_client::AdminServiceClient,
    archive_service_client::ArchiveServiceClient,
    repository_service_client::RepositoryServiceClient,
    ArchiveInfoRequest, ArtifactId as ProtoArtifactId, FetchArtifactRequest, HealthRequest,
    ListArchiveFilesRequest, ListArtifactsRequest, ListRepositoriesRequest, PutArtifactRequest,
};
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
use tonic::Code;
use tonic::metadata::{Ascii, MetadataValue};
use tonic::service::interceptor::InterceptedService;
use tonic::service::Interceptor;
use tonic::{Request, Status};

/// A tonic interceptor that stamps `authorization: Bearer <token>` onto every
/// request when a token is configured. A `None` token is a no-op, so the same
/// client construction path serves both authenticated and open servers.
#[derive(Clone)]
pub struct BearerAuth(Option<MetadataValue<Ascii>>);

impl Interceptor for BearerAuth {
    fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
        if let Some(v) = &self.0 {
            req.metadata_mut().insert("authorization", v.clone());
        }
        Ok(req)
    }
}

/// Remote `HolgerObject`. Forwards every call to a Holger gRPC server using the
/// tonic-generated client. Cheap to clone (the channel is reference-counted).
pub struct RemoteHolger {
    channel: Channel,
    auth: BearerAuth,
}

impl RemoteHolger {
    /// Connect to a Holger gRPC endpoint, e.g. `http://127.0.0.1:50051`, with no
    /// credentials (open servers / read-only browsing).
    pub async fn connect(endpoint: impl Into<String>) -> Result<Self> {
        let channel = Channel::from_shared(endpoint.into())?.connect().await?;
        Ok(Self { channel, auth: BearerAuth(None) })
    }

    /// Connect with an OIDC bearer token injected on every request — the auth
    /// path a UI uses for write access (validated server-side by
    /// `auth::validate_request`).
    pub async fn connect_with_token(
        endpoint: impl Into<String>,
        token: impl AsRef<str>,
    ) -> Result<Self> {
        let channel = Channel::from_shared(endpoint.into())?.connect().await?;
        Ok(Self { channel, auth: bearer(Some(token.as_ref()))? })
    }

    /// Connect over TLS (and, when a `client_identity` is supplied, mTLS).
    ///
    /// * `ca_pem` — PEM bytes of the CA to verify the server certificate
    ///   against. `None` falls back to the platform/webpki roots tonic uses by
    ///   default.
    /// * `client_identity` — `(cert_pem, key_pem)` for the **client**
    ///   certificate that authenticates *us* to the server. `Some(..)` is what
    ///   turns plain TLS into mTLS (the holger CN → role auth path).
    /// * `token` — an optional bearer token stamped on every request, exactly as
    ///   in [`RemoteHolger::connect_with_token`]; orthogonal to TLS so a server
    ///   can require either or both.
    ///
    /// NOTE: this only establishes the client side. There is no end-to-end mTLS
    /// integration test here — exercising it needs generated certs and a TLS
    /// server (a manual / mannequin step), so this path is verified to *compile*
    /// only. See the task brief (feature B).
    pub async fn connect_with_tls(
        endpoint: impl Into<String>,
        ca_pem: Option<Vec<u8>>,
        client_identity: Option<(Vec<u8>, Vec<u8>)>,
        token: Option<String>,
    ) -> Result<Self> {
        let mut tls = ClientTlsConfig::new();
        if let Some(ca) = ca_pem {
            tls = tls.ca_certificate(Certificate::from_pem(ca));
        }
        if let Some((cert, key)) = client_identity {
            tls = tls.identity(Identity::from_pem(cert, key));
        }
        let channel = Channel::from_shared(endpoint.into())?
            .tls_config(tls)?
            .connect()
            .await?;
        Ok(Self { channel, auth: bearer(token.as_deref())? })
    }

    /// Build from an already-established channel (no credentials).
    pub fn from_channel(channel: Channel) -> Self {
        Self { channel, auth: BearerAuth(None) }
    }

    fn repo_client(&self) -> RepositoryServiceClient<InterceptedService<Channel, BearerAuth>> {
        RepositoryServiceClient::with_interceptor(self.channel.clone(), self.auth.clone())
    }

    fn admin_client(&self) -> AdminServiceClient<InterceptedService<Channel, BearerAuth>> {
        AdminServiceClient::with_interceptor(self.channel.clone(), self.auth.clone())
    }

    fn archive_client(&self) -> ArchiveServiceClient<InterceptedService<Channel, BearerAuth>> {
        ArchiveServiceClient::with_interceptor(self.channel.clone(), self.auth.clone())
    }
}

/// Build a [`BearerAuth`] interceptor from an optional token. `None` yields the
/// no-op interceptor (open servers); `Some` parses `Bearer <token>` once so the
/// per-request `call` is just a clone+insert. Shared by every token-bearing
/// constructor so the parse/validation logic lives in one place.
fn bearer(token: Option<&str>) -> Result<BearerAuth> {
    match token {
        None => Ok(BearerAuth(None)),
        Some(t) => {
            let value: MetadataValue<Ascii> = format!("Bearer {t}")
                .parse()
                .map_err(|e| anyhow!("invalid bearer token: {e}"))?;
            Ok(BearerAuth(Some(value)))
        }
    }
}

fn to_proto_id(id: &ArtifactId) -> ProtoArtifactId {
    ProtoArtifactId {
        namespace: id.namespace.clone().unwrap_or_default(),
        name: id.name.clone(),
        version: id.version.clone(),
    }
}

#[async_trait]
impl HolgerObject for RemoteHolger {
    async fn fetch(&self, repository: &str, id: &ArtifactId) -> Result<Option<Vec<u8>>> {
        let mut client = self.repo_client();
        let request = FetchArtifactRequest {
            repository: repository.to_string(),
            id: Some(to_proto_id(id)),
        };
        match client.fetch_artifact(request).await {
            Ok(resp) => Ok(Some(resp.into_inner().data)),
            Err(status) if status.code() == Code::NotFound => Ok(None),
            Err(status) => Err(anyhow!("gRPC fetch failed: {}", status)),
        }
    }

    async fn put(&self, repository: &str, id: &ArtifactId, data: &[u8]) -> Result<()> {
        let mut client = self.repo_client();
        let request = PutArtifactRequest {
            repository: repository.to_string(),
            id: Some(to_proto_id(id)),
            data: data.to_vec(),
        };
        let resp = client
            .put_artifact(request)
            .await
            .map_err(|s| anyhow!("gRPC put failed: {}", s))?
            .into_inner();
        if resp.success {
            Ok(())
        } else {
            Err(anyhow!("put rejected: {}", resp.message))
        }
    }

    async fn list_repositories(&self) -> Result<Vec<RepositoryInfo>> {
        let mut client = self.admin_client();
        let resp = client
            .list_repositories(ListRepositoriesRequest {})
            .await
            .map_err(|s| anyhow!("gRPC list_repositories failed: {}", s))?
            .into_inner();
        Ok(resp
            .repositories
            .into_iter()
            .map(|r| RepositoryInfo {
                name: r.name,
                repo_type: r.repo_type,
                writable: r.writable,
                has_archive: r.has_archive,
            })
            .collect())
    }

    async fn list_artifacts(
        &self,
        repository: &str,
        name_filter: Option<String>,
        limit: u32,
        page_token: Option<String>,
    ) -> Result<(Vec<ArtifactEntry>, String)> {
        let mut client = self.repo_client();
        let request = ListArtifactsRequest {
            repository: repository.to_string(),
            name_filter: name_filter.unwrap_or_default(),
            // proto `limit` is int32; the trait takes u32 (no negative page size).
            limit: limit as i32,
            page_token: page_token.unwrap_or_default(),
        };
        let resp = client
            .list_artifacts(request)
            .await
            .map_err(|s| anyhow!("gRPC list_artifacts failed: {}", s))?
            .into_inner();
        let entries = resp
            .artifacts
            .into_iter()
            .map(|a| {
                let id = a.id.unwrap_or_default();
                ArtifactEntry {
                    id: ArtifactId {
                        // proto carries an empty namespace string for the
                        // namespace-less ecosystems (Rust); map that back to None.
                        namespace: if id.namespace.is_empty() {
                            None
                        } else {
                            Some(id.namespace)
                        },
                        name: id.name,
                        version: id.version,
                    },
                    size_bytes: a.size_bytes,
                    content_type: a.content_type,
                }
            })
            .collect();
        Ok((entries, resp.next_page_token))
    }

    /// Browse the backing archive's raw file paths over gRPC
    /// (`ArchiveService::ListArchiveFiles`). An empty `prefix` Option maps to the
    /// empty proto string (proto3 has no optional string here) = "no filter".
    async fn list_archive_files(
        &self,
        repository: &str,
        prefix: Option<String>,
    ) -> Result<Vec<String>> {
        let mut client = self.archive_client();
        let request = ListArchiveFilesRequest {
            repository: repository.to_string(),
            prefix: prefix.unwrap_or_default(),
        };
        let resp = client
            .list_archive_files(request)
            .await
            .map_err(|s| anyhow!("gRPC list_archive_files failed: {}", s))?
            .into_inner();
        Ok(resp.paths)
    }

    /// Archive stats over gRPC (`ArchiveService::ArchiveInfo`). The proto carries
    /// int64 counters; cast back to the trait's u64.
    async fn archive_info(&self, repository: &str) -> Result<ArchiveInfo> {
        let mut client = self.archive_client();
        let request = ArchiveInfoRequest {
            repository: repository.to_string(),
        };
        let resp = client
            .archive_info(request)
            .await
            .map_err(|s| anyhow!("gRPC archive_info failed: {}", s))?
            .into_inner();
        Ok(ArchiveInfo {
            file_count: resp.file_count as u64,
            total_uncompressed_bytes: resp.total_uncompressed_bytes as u64,
            archive_path: resp.archive_path,
        })
    }

    async fn health(&self) -> Result<Health> {
        let mut client = self.admin_client();
        let resp = client
            .health(HealthRequest {})
            .await
            .map_err(|s| anyhow!("gRPC health failed: {}", s))?
            .into_inner();
        Ok(Health {
            status: resp.status,
            version: resp.version,
            uptime_seconds: resp.uptime_seconds,
        })
    }
}