holger-server-lib 0.6.5

Holger server library: config, wiring, gRPC service, Rust API
pub mod holger_proto {
    tonic::include_proto!("holger.v1");
}

use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};
use tokio_stream::wrappers::ReceiverStream;

use holger_proto::repository_service_server::RepositoryService;
use holger_proto::archive_service_server::ArchiveService;
use holger_proto::admin_service_server::AdminService;
use holger_proto::*;

use crate::auth::{self, AuthConfig};
use crate::exposed::fast_routes::FastRoutes;
use traits::RepositoryBackendTrait;

/// Shared state for all gRPC services
pub struct HolgerGrpc {
    pub routes: FastRoutes,
    pub start_time: Instant,
    pub auth_config: Arc<AuthConfig>,
}

impl HolgerGrpc {
    pub fn new(routes: FastRoutes) -> Self {
        Self::with_auth(routes, Arc::new(AuthConfig::default()))
    }

    pub fn with_auth(routes: FastRoutes, auth_config: Arc<AuthConfig>) -> Self {
        Self {
            routes,
            start_time: Instant::now(),
            auth_config,
        }
    }

    fn get_repo(&self, name: &str) -> Result<Arc<dyn RepositoryBackendTrait>, Status> {
        self.routes
            .lookup(name)
            .cloned()
            .ok_or_else(|| Status::not_found(format!("Repository '{}' not found", name)))
    }

    /// Validate write-request credentials against the configured auth methods.
    /// Accepts a Bearer token or an mTLS client-cert CN. An empty config means
    /// open access.
    async fn authorize_write<T>(&self, request: &Request<T>) -> Result<(), Status> {
        let bearer = request
            .metadata()
            .get("authorization")
            .and_then(|v| v.to_str().ok())
            .and_then(|v| v.strip_prefix("Bearer "));

        // mTLS identity from the TLS handshake, if the server runs with a
        // client CA configured.
        let client_cn = request
            .peer_certs()
            .and_then(|certs| {
                let ders: Vec<rustls::pki_types::CertificateDer<'_>> = certs
                    .iter()
                    .map(|c| rustls::pki_types::CertificateDer::from(c.as_ref().to_vec()))
                    .collect();
                crate::exposed::tls::leaf_common_name(&ders)
            });

        auth::validate_request(&self.auth_config, bearer, client_cn.as_deref())
            .await
            .map(|_| ())
            .map_err(|_| Status::unauthenticated("Valid credentials required for write access"))
    }
}

// ─── RepositoryService ───────────────────────────────────────────────

#[tonic::async_trait]
impl RepositoryService for Arc<HolgerGrpc> {
    async fn fetch_artifact(
        &self,
        request: Request<FetchArtifactRequest>,
    ) -> Result<Response<FetchArtifactResponse>, Status> {
        let req = request.into_inner();
        let repo = self.get_repo(&req.repository)?;
        let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;

        let trait_id = traits::ArtifactId {
            namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
            name: id.name,
            version: id.version,
        };

        let data = repo
            .fetch(&trait_id)
            .map_err(|e| Status::internal(e.to_string()))?
            .ok_or_else(|| Status::not_found("Artifact not found"))?;

        Ok(Response::new(FetchArtifactResponse {
            size_bytes: data.len() as i64,
            content_type: "application/octet-stream".into(),
            data,
        }))
    }

    async fn list_artifacts(
        &self,
        request: Request<ListArtifactsRequest>,
    ) -> Result<Response<ListArtifactsResponse>, Status> {
        let req = request.into_inner();
        let repo = self.get_repo(&req.repository)?;

        // Empty proto string means "no filter" (proto3 has no optional string
        // here); anything else is a real substring filter.
        let name_filter = if req.name_filter.is_empty() {
            None
        } else {
            Some(req.name_filter.as_str())
        };
        // proto `limit` is int32 (the page size); the backend takes a usize.
        let limit = req.limit.max(0) as usize;
        // `page_token` is the opaque offset cursor; empty = first page.
        let page_token = if req.page_token.is_empty() {
            None
        } else {
            Some(req.page_token.as_str())
        };

        let (entries, next_page_token) =
            crate::object::paginate_artifacts(repo.as_ref(), name_filter, limit, page_token)
                .map_err(|e| Status::internal(e.to_string()))?;

        let artifacts = entries
            .into_iter()
            .map(|e| ArtifactEntry {
                id: Some(ArtifactId {
                    // None namespace (Rust-style, no namespace) → empty proto
                    // string, the wire convention this server uses elsewhere.
                    namespace: e.id.namespace.unwrap_or_default(),
                    name: e.id.name,
                    version: e.id.version,
                }),
                size_bytes: e.size_bytes,
                content_type: e.content_type,
            })
            .collect();

        Ok(Response::new(ListArtifactsResponse {
            artifacts,
            next_page_token,
        }))
    }

    async fn put_artifact(
        &self,
        request: Request<PutArtifactRequest>,
    ) -> Result<Response<PutArtifactResponse>, Status> {
        self.authorize_write(&request).await?;
        let req = request.into_inner();
        let repo = self.get_repo(&req.repository)?;

        if !repo.is_writable() {
            return Err(Status::permission_denied("Repository is read-only"));
        }

        let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
        let trait_id = traits::ArtifactId {
            namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
            name: id.name,
            version: id.version,
        };

        repo.put(&trait_id, &req.data)
            .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(PutArtifactResponse {
            success: true,
            message: "Artifact stored".into(),
        }))
    }

    type StreamArtifactStream = ReceiverStream<Result<ArtifactChunk, Status>>;

    async fn stream_artifact(
        &self,
        request: Request<FetchArtifactRequest>,
    ) -> Result<Response<Self::StreamArtifactStream>, Status> {
        let req = request.into_inner();
        let repo = self.get_repo(&req.repository)?;
        let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;

        let trait_id = traits::ArtifactId {
            namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
            name: id.name,
            version: id.version,
        };

        let data = repo
            .fetch(&trait_id)
            .map_err(|e| Status::internal(e.to_string()))?
            .ok_or_else(|| Status::not_found("Artifact not found"))?;

        let (tx, rx) = tokio::sync::mpsc::channel(16);
        const CHUNK_SIZE: usize = 64 * 1024; // 64KB chunks

        tokio::spawn(async move {
            for chunk in data.chunks(CHUNK_SIZE) {
                if tx.send(Ok(ArtifactChunk { data: chunk.to_vec() })).await.is_err() {
                    break;
                }
            }
        });

        Ok(Response::new(ReceiverStream::new(rx)))
    }
}

// ─── ArchiveService ──────────────────────────────────────────────────

#[tonic::async_trait]
impl ArchiveService for Arc<HolgerGrpc> {
    async fn list_archive_files(
        &self,
        request: Request<ListArchiveFilesRequest>,
    ) -> Result<Response<ListArchiveFilesResponse>, Status> {
        let req = request.into_inner();
        let repo = self.get_repo(&req.repository)?;

        // Empty proto string means "no prefix filter" (proto3 has no optional
        // string here); anything else is a real path-prefix filter.
        let prefix = if req.prefix.is_empty() {
            None
        } else {
            Some(req.prefix.as_str())
        };

        let paths = repo
            .archive_files(prefix)
            .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(ListArchiveFilesResponse {
            // proto `total_files` is int64.
            total_files: paths.len() as i64,
            paths,
        }))
    }

    async fn archive_info(
        &self,
        request: Request<ArchiveInfoRequest>,
    ) -> Result<Response<ArchiveInfoResponse>, Status> {
        let req = request.into_inner();
        let repo = self.get_repo(&req.repository)?;

        let info = repo
            .archive_info()
            .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(ArchiveInfoResponse {
            // proto counters are int64; the trait carries u64.
            file_count: info.file_count as i64,
            total_uncompressed_bytes: info.total_uncompressed_bytes as i64,
            archive_path: info.archive_path,
        }))
    }
}

// ─── AdminService ────────────────────────────────────────────────────

#[tonic::async_trait]
impl AdminService for Arc<HolgerGrpc> {
    async fn health(
        &self,
        _request: Request<HealthRequest>,
    ) -> Result<Response<HealthResponse>, Status> {
        Ok(Response::new(HealthResponse {
            status: "ok".into(),
            version: env!("CARGO_PKG_VERSION").into(),
            uptime_seconds: self.start_time.elapsed().as_secs() as i64,
        }))
    }

    async fn list_repositories(
        &self,
        _request: Request<ListRepositoriesRequest>,
    ) -> Result<Response<ListRepositoriesResponse>, Status> {
        let repos = self.routes.all_repos();
        let infos = repos
            .iter()
            .map(|(name, repo)| RepositoryInfo {
                name: name.clone(),
                repo_type: format!("{:?}", repo.format()),
                writable: repo.is_writable(),
                has_archive: repo.has_archive(),
            })
            .collect();

        Ok(Response::new(ListRepositoriesResponse {
            repositories: infos,
        }))
    }
}