use std::sync::Arc;
use std::time::Instant;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use traits::{
ArchiveInfo, ArtifactEntry, ArtifactId, Health, HolgerObject, RepositoryBackendTrait,
RepositoryInfo,
};
use crate::exposed::fast_routes::FastRoutes;
pub(crate) fn paginate_artifacts(
repo: &dyn RepositoryBackendTrait,
name_filter: Option<&str>,
page_size: usize,
page_token: Option<&str>,
) -> Result<(Vec<ArtifactEntry>, String)> {
let offset: usize = page_token.and_then(|t| t.parse().ok()).unwrap_or(0);
let mut all = repo.list(name_filter, 0)?;
all.sort_by(|a, b| {
a.id
.namespace
.cmp(&b.id.namespace)
.then_with(|| a.id.name.cmp(&b.id.name))
.then_with(|| a.id.version.cmp(&b.id.version))
});
let total = all.len();
if offset >= total {
return Ok((Vec::new(), String::new()));
}
let end = if page_size == 0 {
total
} else {
offset.saturating_add(page_size).min(total)
};
let page = all[offset..end].to_vec();
let next = if end < total {
end.to_string()
} else {
String::new()
};
Ok((page, next))
}
pub struct LocalHolger {
routes: FastRoutes,
start: Instant,
}
impl LocalHolger {
pub fn new(routes: FastRoutes) -> Self {
Self { routes, start: Instant::now() }
}
fn repo(&self, name: &str) -> Result<Arc<dyn RepositoryBackendTrait>> {
self.routes
.lookup(name)
.cloned()
.ok_or_else(|| anyhow!("Repository '{}' not found", name))
}
}
#[async_trait]
impl HolgerObject for LocalHolger {
async fn fetch(&self, repository: &str, id: &ArtifactId) -> Result<Option<Vec<u8>>> {
self.repo(repository)?.fetch(id)
}
async fn put(&self, repository: &str, id: &ArtifactId, data: &[u8]) -> Result<()> {
let repo = self.repo(repository)?;
if !repo.is_writable() {
anyhow::bail!("Repository '{}' is read-only", repository);
}
repo.put(id, data)
}
async fn list_repositories(&self) -> Result<Vec<RepositoryInfo>> {
Ok(self
.routes
.all_repos()
.into_iter()
.map(|(name, repo)| RepositoryInfo {
name,
repo_type: format!("{:?}", repo.format()),
writable: repo.is_writable(),
has_archive: true,
})
.collect())
}
async fn list_artifacts(
&self,
repository: &str,
name_filter: Option<String>,
limit: u32,
page_token: Option<String>,
) -> Result<(Vec<ArtifactEntry>, String)> {
let repo = self.repo(repository)?;
paginate_artifacts(
repo.as_ref(),
name_filter.as_deref(),
limit as usize,
page_token.as_deref(),
)
}
async fn list_archive_files(
&self,
repository: &str,
prefix: Option<String>,
) -> Result<Vec<String>> {
self.repo(repository)?.archive_files(prefix.as_deref())
}
async fn archive_info(&self, repository: &str) -> Result<ArchiveInfo> {
self.repo(repository)?.archive_info()
}
async fn health(&self) -> Result<Health> {
Ok(Health {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
uptime_seconds: self.start.elapsed().as_secs() as i64,
})
}
}
use crate::grpc::holger_proto::{
admin_service_client::AdminServiceClient,
archive_service_client::ArchiveServiceClient,
repository_service_client::RepositoryServiceClient,
ArchiveInfoRequest, ArtifactId as ProtoArtifactId, FetchArtifactRequest, HealthRequest,
ListArchiveFilesRequest, ListArtifactsRequest, ListRepositoriesRequest, PutArtifactRequest,
};
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
use tonic::Code;
use tonic::metadata::{Ascii, MetadataValue};
use tonic::service::interceptor::InterceptedService;
use tonic::service::Interceptor;
use tonic::{Request, Status};
#[derive(Clone)]
pub struct BearerAuth(Option<MetadataValue<Ascii>>);
impl Interceptor for BearerAuth {
fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
if let Some(v) = &self.0 {
req.metadata_mut().insert("authorization", v.clone());
}
Ok(req)
}
}
pub struct RemoteHolger {
channel: Channel,
auth: BearerAuth,
}
impl RemoteHolger {
pub async fn connect(endpoint: impl Into<String>) -> Result<Self> {
let channel = Channel::from_shared(endpoint.into())?.connect().await?;
Ok(Self { channel, auth: BearerAuth(None) })
}
pub async fn connect_with_token(
endpoint: impl Into<String>,
token: impl AsRef<str>,
) -> Result<Self> {
let channel = Channel::from_shared(endpoint.into())?.connect().await?;
Ok(Self { channel, auth: bearer(Some(token.as_ref()))? })
}
pub async fn connect_with_tls(
endpoint: impl Into<String>,
ca_pem: Option<Vec<u8>>,
client_identity: Option<(Vec<u8>, Vec<u8>)>,
token: Option<String>,
) -> Result<Self> {
let mut tls = ClientTlsConfig::new();
if let Some(ca) = ca_pem {
tls = tls.ca_certificate(Certificate::from_pem(ca));
}
if let Some((cert, key)) = client_identity {
tls = tls.identity(Identity::from_pem(cert, key));
}
let channel = Channel::from_shared(endpoint.into())?
.tls_config(tls)?
.connect()
.await?;
Ok(Self { channel, auth: bearer(token.as_deref())? })
}
pub fn from_channel(channel: Channel) -> Self {
Self { channel, auth: BearerAuth(None) }
}
fn repo_client(&self) -> RepositoryServiceClient<InterceptedService<Channel, BearerAuth>> {
RepositoryServiceClient::with_interceptor(self.channel.clone(), self.auth.clone())
}
fn admin_client(&self) -> AdminServiceClient<InterceptedService<Channel, BearerAuth>> {
AdminServiceClient::with_interceptor(self.channel.clone(), self.auth.clone())
}
fn archive_client(&self) -> ArchiveServiceClient<InterceptedService<Channel, BearerAuth>> {
ArchiveServiceClient::with_interceptor(self.channel.clone(), self.auth.clone())
}
}
fn bearer(token: Option<&str>) -> Result<BearerAuth> {
match token {
None => Ok(BearerAuth(None)),
Some(t) => {
let value: MetadataValue<Ascii> = format!("Bearer {t}")
.parse()
.map_err(|e| anyhow!("invalid bearer token: {e}"))?;
Ok(BearerAuth(Some(value)))
}
}
}
fn to_proto_id(id: &ArtifactId) -> ProtoArtifactId {
ProtoArtifactId {
namespace: id.namespace.clone().unwrap_or_default(),
name: id.name.clone(),
version: id.version.clone(),
}
}
#[async_trait]
impl HolgerObject for RemoteHolger {
async fn fetch(&self, repository: &str, id: &ArtifactId) -> Result<Option<Vec<u8>>> {
let mut client = self.repo_client();
let request = FetchArtifactRequest {
repository: repository.to_string(),
id: Some(to_proto_id(id)),
};
match client.fetch_artifact(request).await {
Ok(resp) => Ok(Some(resp.into_inner().data)),
Err(status) if status.code() == Code::NotFound => Ok(None),
Err(status) => Err(anyhow!("gRPC fetch failed: {}", status)),
}
}
async fn put(&self, repository: &str, id: &ArtifactId, data: &[u8]) -> Result<()> {
let mut client = self.repo_client();
let request = PutArtifactRequest {
repository: repository.to_string(),
id: Some(to_proto_id(id)),
data: data.to_vec(),
};
let resp = client
.put_artifact(request)
.await
.map_err(|s| anyhow!("gRPC put failed: {}", s))?
.into_inner();
if resp.success {
Ok(())
} else {
Err(anyhow!("put rejected: {}", resp.message))
}
}
async fn list_repositories(&self) -> Result<Vec<RepositoryInfo>> {
let mut client = self.admin_client();
let resp = client
.list_repositories(ListRepositoriesRequest {})
.await
.map_err(|s| anyhow!("gRPC list_repositories failed: {}", s))?
.into_inner();
Ok(resp
.repositories
.into_iter()
.map(|r| RepositoryInfo {
name: r.name,
repo_type: r.repo_type,
writable: r.writable,
has_archive: r.has_archive,
})
.collect())
}
async fn list_artifacts(
&self,
repository: &str,
name_filter: Option<String>,
limit: u32,
page_token: Option<String>,
) -> Result<(Vec<ArtifactEntry>, String)> {
let mut client = self.repo_client();
let request = ListArtifactsRequest {
repository: repository.to_string(),
name_filter: name_filter.unwrap_or_default(),
limit: limit as i32,
page_token: page_token.unwrap_or_default(),
};
let resp = client
.list_artifacts(request)
.await
.map_err(|s| anyhow!("gRPC list_artifacts failed: {}", s))?
.into_inner();
let entries = resp
.artifacts
.into_iter()
.map(|a| {
let id = a.id.unwrap_or_default();
ArtifactEntry {
id: ArtifactId {
namespace: if id.namespace.is_empty() {
None
} else {
Some(id.namespace)
},
name: id.name,
version: id.version,
},
size_bytes: a.size_bytes,
content_type: a.content_type,
}
})
.collect();
Ok((entries, resp.next_page_token))
}
async fn list_archive_files(
&self,
repository: &str,
prefix: Option<String>,
) -> Result<Vec<String>> {
let mut client = self.archive_client();
let request = ListArchiveFilesRequest {
repository: repository.to_string(),
prefix: prefix.unwrap_or_default(),
};
let resp = client
.list_archive_files(request)
.await
.map_err(|s| anyhow!("gRPC list_archive_files failed: {}", s))?
.into_inner();
Ok(resp.paths)
}
async fn archive_info(&self, repository: &str) -> Result<ArchiveInfo> {
let mut client = self.archive_client();
let request = ArchiveInfoRequest {
repository: repository.to_string(),
};
let resp = client
.archive_info(request)
.await
.map_err(|s| anyhow!("gRPC archive_info failed: {}", s))?
.into_inner();
Ok(ArchiveInfo {
file_count: resp.file_count as u64,
total_uncompressed_bytes: resp.total_uncompressed_bytes as u64,
archive_path: resp.archive_path,
})
}
async fn health(&self) -> Result<Health> {
let mut client = self.admin_client();
let resp = client
.health(HealthRequest {})
.await
.map_err(|s| anyhow!("gRPC health failed: {}", s))?
.into_inner();
Ok(Health {
status: resp.status,
version: resp.version,
uptime_seconds: resp.uptime_seconds,
})
}
}