pub mod holger_proto {
tonic::include_proto!("holger.v1");
}
use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};
use tokio_stream::wrappers::ReceiverStream;
use holger_proto::repository_service_server::RepositoryService;
use holger_proto::archive_service_server::ArchiveService;
use holger_proto::admin_service_server::AdminService;
use holger_proto::*;
use crate::exposed::fast_routes::FastRoutes;
use holger_traits::RepositoryBackendTrait;
pub struct HolgerGrpc {
pub routes: FastRoutes,
pub start_time: Instant,
}
impl HolgerGrpc {
pub fn new(routes: FastRoutes) -> Self {
Self {
routes,
start_time: Instant::now(),
}
}
fn get_repo(&self, name: &str) -> Result<Arc<dyn RepositoryBackendTrait>, Status> {
self.routes
.lookup(name)
.cloned()
.ok_or_else(|| Status::not_found(format!("Repository '{}' not found", name)))
}
}
#[tonic::async_trait]
impl RepositoryService for Arc<HolgerGrpc> {
async fn fetch_artifact(
&self,
request: Request<FetchArtifactRequest>,
) -> Result<Response<FetchArtifactResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let trait_id = holger_traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
let data = repo
.fetch(&trait_id)
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found("Artifact not found"))?;
Ok(Response::new(FetchArtifactResponse {
size_bytes: data.len() as i64,
content_type: "application/octet-stream".into(),
data,
}))
}
async fn list_artifacts(
&self,
request: Request<ListArtifactsRequest>,
) -> Result<Response<ListArtifactsResponse>, Status> {
let req = request.into_inner();
let _repo = self.get_repo(&req.repository)?;
Ok(Response::new(ListArtifactsResponse {
artifacts: vec![],
next_page_token: String::new(),
}))
}
async fn put_artifact(
&self,
request: Request<PutArtifactRequest>,
) -> Result<Response<PutArtifactResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
if !repo.is_writable() {
return Err(Status::permission_denied("Repository is read-only"));
}
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let trait_id = holger_traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
repo.put(&trait_id, &req.data)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(PutArtifactResponse {
success: true,
message: "Artifact stored".into(),
}))
}
type StreamArtifactStream = ReceiverStream<Result<ArtifactChunk, Status>>;
async fn stream_artifact(
&self,
request: Request<FetchArtifactRequest>,
) -> Result<Response<Self::StreamArtifactStream>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let trait_id = holger_traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
let data = repo
.fetch(&trait_id)
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found("Artifact not found"))?;
let (tx, rx) = tokio::sync::mpsc::channel(16);
const CHUNK_SIZE: usize = 64 * 1024;
tokio::spawn(async move {
for chunk in data.chunks(CHUNK_SIZE) {
if tx.send(Ok(ArtifactChunk { data: chunk.to_vec() })).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tonic::async_trait]
impl ArchiveService for Arc<HolgerGrpc> {
async fn list_archive_files(
&self,
request: Request<ListArchiveFilesRequest>,
) -> Result<Response<ListArchiveFilesResponse>, Status> {
let req = request.into_inner();
let repo = self.get_repo(&req.repository)?;
let _ = repo;
let _ = req.prefix;
Ok(Response::new(ListArchiveFilesResponse {
paths: vec![],
total_files: 0,
}))
}
async fn archive_info(
&self,
request: Request<ArchiveInfoRequest>,
) -> Result<Response<ArchiveInfoResponse>, Status> {
let req = request.into_inner();
let _repo = self.get_repo(&req.repository)?;
Ok(Response::new(ArchiveInfoResponse {
file_count: 0,
total_uncompressed_bytes: 0,
archive_path: String::new(),
}))
}
}
#[tonic::async_trait]
impl AdminService for Arc<HolgerGrpc> {
async fn health(
&self,
_request: Request<HealthRequest>,
) -> Result<Response<HealthResponse>, Status> {
Ok(Response::new(HealthResponse {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
uptime_seconds: self.start_time.elapsed().as_secs() as i64,
}))
}
async fn list_repositories(
&self,
_request: Request<ListRepositoriesRequest>,
) -> Result<Response<ListRepositoriesResponse>, Status> {
let repos = self.routes.all_repos();
let infos = repos
.iter()
.map(|(name, repo)| RepositoryInfo {
name: name.clone(),
repo_type: format!("{:?}", repo.format()),
writable: repo.is_writable(),
has_archive: true, })
.collect();
Ok(Response::new(ListRepositoriesResponse {
repositories: infos,
}))
}
}