pub mod holger_proto {
tonic::include_proto!("holger.v1");
}
use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};
use tokio_stream::wrappers::ReceiverStream;
use holger_proto::repository_service_server::RepositoryService;
use holger_proto::archive_service_server::ArchiveService;
use holger_proto::admin_service_server::AdminService;
use holger_proto::*;
use crate::auth::{self, AuthConfig};
use crate::exposed::fast_routes::FastRoutes;
use traits::RepositoryBackendTrait;
pub struct HolgerGrpc {
pub routes: FastRoutes,
pub start_time: Instant,
pub auth_config: Arc<AuthConfig>,
}
impl HolgerGrpc {
pub fn new(routes: FastRoutes) -> Self {
Self::with_auth(routes, Arc::new(AuthConfig::default()))
}
pub fn with_auth(routes: FastRoutes, auth_config: Arc<AuthConfig>) -> Self {
Self {
routes,
start_time: Instant::now(),
auth_config,
}
}
fn get_repo(&self, name: &str) -> Result<Arc<dyn RepositoryBackendTrait>, Status> {
self.routes
.lookup(name)
.cloned()
.ok_or_else(|| Status::not_found(format!("Repository '{}' not found", name)))
}
async fn authorize_write<T>(&self, request: &Request<T>) -> Result<(), Status> {
let bearer = request
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "));
let client_cn = request
.peer_certs()
.and_then(|certs| {
let ders: Vec<rustls::pki_types::CertificateDer<'_>> = certs
.iter()
.map(|c| rustls::pki_types::CertificateDer::from(c.as_ref().to_vec()))
.collect();
crate::exposed::tls::leaf_common_name(&ders)
});
auth::validate_request(&self.auth_config, bearer, client_cn.as_deref())
.await
.map(|_| ())
.map_err(|_| Status::unauthenticated("Valid credentials required for write access"))
}
}
#[tonic::async_trait]
impl RepositoryService for Arc<HolgerGrpc> {
async fn fetch_artifact(
&self,
request: Request<FetchArtifactRequest>,
) -> Result<Response<FetchArtifactResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let trait_id = traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
let data = repo
.fetch(&trait_id)
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found("Artifact not found"))?;
Ok(Response::new(FetchArtifactResponse {
size_bytes: data.len() as i64,
content_type: "application/octet-stream".into(),
data,
}))
}
async fn list_artifacts(
&self,
request: Request<ListArtifactsRequest>,
) -> Result<Response<ListArtifactsResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let name_filter = if req.name_filter.is_empty() {
None
} else {
Some(req.name_filter.as_str())
};
let limit = req.limit.max(0) as usize;
let page_token = if req.page_token.is_empty() {
None
} else {
Some(req.page_token.as_str())
};
let (entries, next_page_token) =
crate::object::paginate_artifacts(repo.as_ref(), name_filter, limit, page_token)
.map_err(|e| Status::internal(e.to_string()))?;
let artifacts = entries
.into_iter()
.map(|e| ArtifactEntry {
id: Some(ArtifactId {
namespace: e.id.namespace.unwrap_or_default(),
name: e.id.name,
version: e.id.version,
}),
size_bytes: e.size_bytes,
content_type: e.content_type,
})
.collect();
Ok(Response::new(ListArtifactsResponse {
artifacts,
next_page_token,
}))
}
async fn put_artifact(
&self,
request: Request<PutArtifactRequest>,
) -> Result<Response<PutArtifactResponse>, Status> {
self.authorize_write(&request).await?;
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
if !repo.is_writable() {
return Err(Status::permission_denied("Repository is read-only"));
}
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let trait_id = traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
repo.put(&trait_id, &req.data)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(PutArtifactResponse {
success: true,
message: "Artifact stored".into(),
}))
}
type StreamArtifactStream = ReceiverStream<Result<ArtifactChunk, Status>>;
async fn stream_artifact(
&self,
request: Request<FetchArtifactRequest>,
) -> Result<Response<Self::StreamArtifactStream>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let trait_id = traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
let data = repo
.fetch(&trait_id)
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found("Artifact not found"))?;
let (tx, rx) = tokio::sync::mpsc::channel(16);
const CHUNK_SIZE: usize = 64 * 1024;
tokio::spawn(async move {
for chunk in data.chunks(CHUNK_SIZE) {
if tx.send(Ok(ArtifactChunk { data: chunk.to_vec() })).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tonic::async_trait]
impl ArchiveService for Arc<HolgerGrpc> {
async fn list_archive_files(
&self,
request: Request<ListArchiveFilesRequest>,
) -> Result<Response<ListArchiveFilesResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let prefix = if req.prefix.is_empty() {
None
} else {
Some(req.prefix.as_str())
};
let paths = repo
.archive_files(prefix)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ListArchiveFilesResponse {
total_files: paths.len() as i64,
paths,
}))
}
async fn archive_info(
&self,
request: Request<ArchiveInfoRequest>,
) -> Result<Response<ArchiveInfoResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let info = repo
.archive_info()
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ArchiveInfoResponse {
file_count: info.file_count as i64,
total_uncompressed_bytes: info.total_uncompressed_bytes as i64,
archive_path: info.archive_path,
}))
}
}
#[tonic::async_trait]
impl AdminService for Arc<HolgerGrpc> {
async fn health(
&self,
_request: Request<HealthRequest>,
) -> Result<Response<HealthResponse>, Status> {
Ok(Response::new(HealthResponse {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
uptime_seconds: self.start_time.elapsed().as_secs() as i64,
}))
}
async fn list_repositories(
&self,
_request: Request<ListRepositoriesRequest>,
) -> Result<Response<ListRepositoriesResponse>, Status> {
let repos = self.routes.all_repos();
let infos = repos
.iter()
.map(|(name, repo)| RepositoryInfo {
name: name.clone(),
repo_type: format!("{:?}", repo.format()),
writable: repo.is_writable(),
has_archive: repo.has_archive(),
})
.collect();
Ok(Response::new(ListRepositoriesResponse {
repositories: infos,
}))
}
}