pub mod holger_proto {
include!("generated/holger.v1.rs");
}
use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};
use tokio_stream::wrappers::ReceiverStream;
use holger_proto::repository_service_server::RepositoryService;
use holger_proto::archive_service_server::ArchiveService;
use holger_proto::admin_service_server::AdminService;
use holger_proto::*;
use crate::audit::{AuditAction, AuditEvent, AuditLog, NoopAuditLog};
use crate::auth::{self, AuthConfig};
use crate::exposed::fast_routes::FastRoutes;
use traits::RepositoryBackendTrait;
pub struct HolgerGrpc {
pub routes: FastRoutes,
pub start_time: Instant,
pub auth_config: Arc<AuthConfig>,
pub audit: Arc<dyn AuditLog>,
}
impl HolgerGrpc {
pub fn new(routes: FastRoutes) -> Self {
Self::with_auth(routes, Arc::new(AuthConfig::default()))
}
pub fn with_auth(routes: FastRoutes, auth_config: Arc<AuthConfig>) -> Self {
Self {
routes,
start_time: Instant::now(),
auth_config,
audit: Arc::new(NoopAuditLog),
}
}
pub fn with_audit(mut self, audit: Arc<dyn AuditLog>) -> Self {
self.audit = audit;
self
}
fn record_audit(&self, event: AuditEvent) {
if let Err(e) = self.audit.record(event) {
log::warn!("audit record failed: {e}");
}
}
fn get_repo(&self, name: &str) -> Result<Arc<dyn RepositoryBackendTrait>, Status> {
self.routes
.lookup(name)
.cloned()
.ok_or_else(|| Status::not_found(format!("Repository '{}' not found", name)))
}
async fn authorize_write<T>(
&self,
request: &Request<T>,
) -> Result<Option<auth::AuthIdentity>, Status> {
let bearer = request
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "));
let client_cn = request
.peer_certs()
.and_then(|certs| {
let ders: Vec<rustls::pki_types::CertificateDer<'_>> = certs
.iter()
.map(|c| rustls::pki_types::CertificateDer::from(c.as_ref().to_vec()))
.collect();
crate::exposed::tls::leaf_common_name(&ders)
});
let identity = auth::validate_request(&self.auth_config, bearer, client_cn.as_deref())
.await
.map_err(|_| Status::unauthenticated("Valid credentials required for write access"))?;
if self.auth_config.rbac_enabled() {
match &identity {
Some(id) => {
let role = self.auth_config.role_for(&id.subject);
if !role.can_write() {
return Err(Status::permission_denied(format!(
"identity '{}' has role {:?}; writer or admin required for write access",
id.subject, role
)));
}
}
None => {
return Err(Status::unauthenticated(
"authenticated identity required for write access",
));
}
}
}
Ok(identity)
}
}
fn peer_addr_string<T>(request: &Request<T>) -> String {
request
.remote_addr()
.map(|a| a.to_string())
.unwrap_or_default()
}
fn artifact_label(id: &ArtifactId) -> String {
if id.namespace.is_empty() {
format!("{}@{}", id.name, id.version)
} else {
format!("{}/{}@{}", id.namespace, id.name, id.version)
}
}
#[tonic::async_trait]
impl RepositoryService for Arc<HolgerGrpc> {
async fn fetch_artifact(
&self,
request: Request<FetchArtifactRequest>,
) -> Result<Response<FetchArtifactResponse>, Status> {
let source_ip = peer_addr_string(&request);
let req = request.into_inner();
let repo_name = req.repository.clone();
let repo = self.get_repo(&repo_name)?;
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let artifact = artifact_label(&id);
let trait_id = traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
match repo.fetch(&trait_id) {
Ok(Some(data)) => {
let n = data.len() as u64;
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::Download, &repo_name, &artifact, &source_ip, 200, n,
));
Ok(Response::new(FetchArtifactResponse {
size_bytes: data.len() as i64,
content_type: "application/octet-stream".into(),
data,
}))
}
Ok(None) => {
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::Download, &repo_name, &artifact, &source_ip, 404, 0,
));
Err(Status::not_found("Artifact not found"))
}
Err(e) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::Download, &repo_name, &artifact, &source_ip, 500, 0,
)
.with_detail(e.to_string()),
);
Err(Status::internal(e.to_string()))
}
}
}
async fn list_artifacts(
&self,
request: Request<ListArtifactsRequest>,
) -> Result<Response<ListArtifactsResponse>, Status> {
let source_ip = peer_addr_string(&request);
let req = request.into_inner();
let repo_name = req.repository.clone();
let filter_label = req.name_filter.clone();
let repo = match self.get_repo(&repo_name) {
Ok(r) => r,
Err(status) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, &filter_label, &source_ip, 404, 0,
)
.with_detail("repository not found"),
);
return Err(status);
}
};
let name_filter = if req.name_filter.is_empty() {
None
} else {
Some(req.name_filter.as_str())
};
let limit = req.limit.max(0) as usize;
let page_token = if req.page_token.is_empty() {
None
} else {
Some(req.page_token.as_str())
};
let (entries, next_page_token) =
match crate::object::paginate_artifacts(repo.as_ref(), name_filter, limit, page_token) {
Ok(v) => v,
Err(e) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, &filter_label, &source_ip, 500, 0,
)
.with_detail(e.to_string()),
);
return Err(Status::internal(e.to_string()));
}
};
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, &filter_label, &source_ip, 200, entries.len() as u64,
));
let artifacts = entries
.into_iter()
.map(|e| ArtifactEntry {
id: Some(ArtifactId {
namespace: e.id.namespace.unwrap_or_default(),
name: e.id.name,
version: e.id.version,
}),
size_bytes: e.size_bytes,
content_type: e.content_type,
})
.collect();
Ok(Response::new(ListArtifactsResponse {
artifacts,
next_page_token,
}))
}
async fn put_artifact(
&self,
request: Request<PutArtifactRequest>,
) -> Result<Response<PutArtifactResponse>, Status> {
let source_ip = peer_addr_string(&request);
let ident = self
.authorize_write(&request)
.await?
.map(|id| id.subject)
.unwrap_or_else(|| "anonymous".to_string());
let req = request.into_inner();
let repo_name = req.repository.clone();
let repo = self.get_repo(&repo_name)?;
if !repo.is_writable() {
self.record_audit(
AuditEvent::new(&ident, AuditAction::Upload, &repo_name, "", &source_ip, 403, 0)
.with_detail("repository is read-only"),
);
return Err(Status::permission_denied("Repository is read-only"));
}
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let artifact = artifact_label(&id);
let trait_id = traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
let nbytes = req.data.len() as u64;
match repo.put(&trait_id, &req.data) {
Ok(()) => {
self.record_audit(AuditEvent::new(
&ident, AuditAction::Upload, &repo_name, &artifact, &source_ip, 200, nbytes,
));
Ok(Response::new(PutArtifactResponse {
success: true,
message: "Artifact stored".into(),
}))
}
Err(e) => {
self.record_audit(
AuditEvent::new(
&ident, AuditAction::Upload, &repo_name, &artifact, &source_ip, 500, 0,
)
.with_detail(e.to_string()),
);
Err(Status::internal(e.to_string()))
}
}
}
type StreamArtifactStream = ReceiverStream<Result<ArtifactChunk, Status>>;
async fn stream_artifact(
&self,
request: Request<FetchArtifactRequest>,
) -> Result<Response<Self::StreamArtifactStream>, Status> {
let source_ip = peer_addr_string(&request);
let req = request.into_inner();
let repo_name = req.repository.clone();
let repo = self.get_repo(&repo_name)?;
let id = req.id.ok_or_else(|| Status::invalid_argument("Missing artifact ID"))?;
let artifact = artifact_label(&id);
let trait_id = traits::ArtifactId {
namespace: if id.namespace.is_empty() { None } else { Some(id.namespace) },
name: id.name,
version: id.version,
};
let data = match repo.fetch(&trait_id) {
Ok(Some(data)) => {
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::Download, &repo_name, &artifact, &source_ip, 200, data.len() as u64,
));
data
}
Ok(None) => {
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::Download, &repo_name, &artifact, &source_ip, 404, 0,
));
return Err(Status::not_found("Artifact not found"));
}
Err(e) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::Download, &repo_name, &artifact, &source_ip, 500, 0,
)
.with_detail(e.to_string()),
);
return Err(Status::internal(e.to_string()));
}
};
let (tx, rx) = tokio::sync::mpsc::channel(16);
const CHUNK_SIZE: usize = 64 * 1024;
tokio::spawn(async move {
for chunk in data.chunks(CHUNK_SIZE) {
if tx.send(Ok(ArtifactChunk { data: chunk.to_vec() })).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tonic::async_trait]
impl ArchiveService for Arc<HolgerGrpc> {
async fn list_archive_files(
&self,
request: Request<ListArchiveFilesRequest>,
) -> Result<Response<ListArchiveFilesResponse>, Status> {
let source_ip = peer_addr_string(&request);
let req = request.into_inner();
let repo_name = req.repository.clone();
let prefix_label = req.prefix.clone();
let repo = match self.get_repo(&repo_name) {
Ok(r) => r,
Err(status) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, &prefix_label, &source_ip, 404, 0,
)
.with_detail("repository not found"),
);
return Err(status);
}
};
let prefix = if req.prefix.is_empty() {
None
} else {
Some(req.prefix.as_str())
};
let paths = match repo.archive_files(prefix) {
Ok(p) => p,
Err(e) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, &prefix_label, &source_ip, 500, 0,
)
.with_detail(e.to_string()),
);
return Err(Status::internal(e.to_string()));
}
};
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, &prefix_label, &source_ip, 200, paths.len() as u64,
));
Ok(Response::new(ListArchiveFilesResponse {
total_files: paths.len() as i64,
paths,
}))
}
async fn archive_info(
&self,
request: Request<ArchiveInfoRequest>,
) -> Result<Response<ArchiveInfoResponse>, Status> {
let source_ip = peer_addr_string(&request);
let req = request.into_inner();
let repo_name = req.repository.clone();
let repo = match self.get_repo(&repo_name) {
Ok(r) => r,
Err(status) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, "", &source_ip, 404, 0,
)
.with_detail("repository not found"),
);
return Err(status);
}
};
let info = match repo.archive_info() {
Ok(i) => i,
Err(e) => {
self.record_audit(
AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, "", &source_ip, 500, 0,
)
.with_detail(e.to_string()),
);
return Err(Status::internal(e.to_string()));
}
};
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::List, &repo_name, "", &source_ip, 200, info.file_count,
));
Ok(Response::new(ArchiveInfoResponse {
file_count: info.file_count as i64,
total_uncompressed_bytes: info.total_uncompressed_bytes as i64,
archive_path: info.archive_path,
}))
}
}
#[tonic::async_trait]
impl AdminService for Arc<HolgerGrpc> {
async fn health(
&self,
_request: Request<HealthRequest>,
) -> Result<Response<HealthResponse>, Status> {
Ok(Response::new(HealthResponse {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
uptime_seconds: self.start_time.elapsed().as_secs() as i64,
}))
}
async fn list_repositories(
&self,
request: Request<ListRepositoriesRequest>,
) -> Result<Response<ListRepositoriesResponse>, Status> {
let source_ip = peer_addr_string(&request);
let repos = self.routes.all_repos();
self.record_audit(AuditEvent::new(
"anonymous", AuditAction::List, "", "", &source_ip, 200, repos.len() as u64,
));
let infos = repos
.iter()
.map(|(name, repo)| RepositoryInfo {
name: name.clone(),
repo_type: format!("{:?}", repo.format()),
writable: repo.is_writable(),
has_archive: repo.has_archive(),
})
.collect();
Ok(Response::new(ListRepositoriesResponse {
repositories: infos,
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audit::MemoryAuditLog;
use std::sync::Arc;
use traits::{ArchiveInfo, ArtifactFormat, RepositoryBackendTrait};
struct MockRepo {
name: String,
blob: Option<Vec<u8>>,
}
impl RepositoryBackendTrait for MockRepo {
fn name(&self) -> &str {
&self.name
}
fn format(&self) -> ArtifactFormat {
ArtifactFormat::Rust
}
fn is_writable(&self) -> bool {
false
}
fn fetch(&self, _id: &traits::ArtifactId) -> anyhow::Result<Option<Vec<u8>>> {
Ok(self.blob.clone())
}
fn put(&self, _id: &traits::ArtifactId, _data: &[u8]) -> anyhow::Result<()> {
anyhow::bail!("read-only")
}
fn archive_files(&self, _prefix: Option<&str>) -> anyhow::Result<Vec<String>> {
Ok(vec!["a.rs".into(), "b.rs".into()])
}
fn archive_info(&self) -> anyhow::Result<ArchiveInfo> {
Ok(ArchiveInfo {
file_count: 2,
total_uncompressed_bytes: 99,
archive_path: "mock".into(),
})
}
fn handle_http2_request(
&self,
_method: &str,
_suburl: &str,
_body: &[u8],
) -> anyhow::Result<(u16, Vec<(String, String)>, Vec<u8>)> {
Ok((200, vec![], Vec::new()))
}
}
fn grpc_with(blob: Option<Vec<u8>>) -> (Arc<HolgerGrpc>, Arc<MemoryAuditLog>) {
let backend: Arc<dyn RepositoryBackendTrait> =
Arc::new(MockRepo { name: "crates".into(), blob });
let routes = FastRoutes::new(vec![("crates".to_string(), backend)]);
let audit = Arc::new(MemoryAuditLog::new());
let grpc = Arc::new(HolgerGrpc::new(routes).with_audit(audit.clone()));
(grpc, audit)
}
fn artifact_id() -> ArtifactId {
ArtifactId { namespace: String::new(), name: "serde".into(), version: "1.0".into() }
}
#[tokio::test]
async fn stream_artifact_audits_download_hit_and_miss() {
let (grpc, audit) = grpc_with(Some(b"hello".to_vec()));
let req = Request::new(FetchArtifactRequest {
repository: "crates".into(),
id: Some(artifact_id()),
});
grpc.stream_artifact(req).await.expect("stream ok");
let ev = audit.events();
assert_eq!(ev.len(), 1);
assert_eq!(ev[0].action, AuditAction::Download);
assert_eq!(ev[0].ident, "anonymous");
assert_eq!(ev[0].repo, "crates");
assert_eq!(ev[0].artifact, "serde@1.0");
assert_eq!(ev[0].status, 200);
assert_eq!(ev[0].bytes, 5);
let (grpc, audit) = grpc_with(None);
let req = Request::new(FetchArtifactRequest {
repository: "crates".into(),
id: Some(artifact_id()),
});
assert!(grpc.stream_artifact(req).await.is_err());
let ev = audit.events();
assert_eq!(ev.len(), 1);
assert_eq!(ev[0].action, AuditAction::Download);
assert_eq!(ev[0].status, 404);
}
#[tokio::test]
async fn list_artifacts_audits_list() {
let (grpc, audit) = grpc_with(None);
let req = Request::new(ListArtifactsRequest {
repository: "crates".into(),
name_filter: "ser".into(),
limit: 10,
page_token: String::new(),
});
RepositoryService::list_artifacts(&grpc, req).await.expect("list ok");
let ev = audit.events();
assert_eq!(ev.len(), 1);
assert_eq!(ev[0].action, AuditAction::List);
assert_eq!(ev[0].repo, "crates");
assert_eq!(ev[0].artifact, "ser"); assert_eq!(ev[0].status, 200);
}
#[tokio::test]
async fn list_artifacts_unknown_repo_audits_404() {
let (grpc, audit) = grpc_with(None);
let req = Request::new(ListArtifactsRequest {
repository: "ghost".into(),
name_filter: String::new(),
limit: 10,
page_token: String::new(),
});
assert!(RepositoryService::list_artifacts(&grpc, req).await.is_err());
let ev = audit.events();
assert_eq!(ev.len(), 1);
assert_eq!(ev[0].action, AuditAction::List);
assert_eq!(ev[0].repo, "ghost");
assert_eq!(ev[0].status, 404);
}
#[tokio::test]
async fn archive_endpoints_audit_list() {
let (grpc, audit) = grpc_with(None);
let req = Request::new(ListArchiveFilesRequest {
repository: "crates".into(),
prefix: "src/".into(),
});
grpc.list_archive_files(req).await.expect("archive files ok");
let req = Request::new(ArchiveInfoRequest { repository: "crates".into() });
grpc.archive_info(req).await.expect("archive info ok");
let ev = audit.events();
assert_eq!(ev.len(), 2);
assert!(ev.iter().all(|e| e.action == AuditAction::List && e.status == 200));
assert_eq!(ev[0].artifact, "src/"); assert_eq!(ev[1].bytes, 2); }
#[tokio::test]
async fn list_repositories_audits_list() {
let (grpc, audit) = grpc_with(None);
let req = Request::new(ListRepositoriesRequest {});
AdminService::list_repositories(&grpc, req).await.expect("list repos ok");
let ev = audit.events();
assert_eq!(ev.len(), 1);
assert_eq!(ev[0].action, AuditAction::List);
assert_eq!(ev[0].bytes, 1); }
}