use serde::{Deserialize, Serialize};
use std::{
fs::File,
io::BufReader,
path::Path,
};
use anyhow::Result;
use ron::de::from_reader;
use std::collections::HashMap;
use std::sync::Arc;
use traits::RepositoryBackendTrait;
use crate::exposed::ExposedEndpoint;
use crate::exposed::fast_routes::FastRoutes;
pub use crate::repository::Repository;
pub use crate::storage::StorageEndpoint;
pub mod auth;
pub mod exposed;
pub mod grpc;
pub mod object;
pub mod proxy;
pub mod repository;
pub mod storage;
pub use crate::object::{LocalHolger, RemoteHolger};
pub use traits::HolgerObject;
pub fn wire_holger(holger: &mut Holger) -> Result<()> {
let mut repo_map = HashMap::new();
let mut exposed_map = HashMap::new();
let mut storage_map = HashMap::new();
for repo in &*holger.repositories {
repo_map.insert(repo.ron_name.clone(), repo as *const Repository);
}
for exp in &*holger.exposed_endpoints {
exposed_map.insert(exp.ron_name.clone(), exp as *const ExposedEndpoint);
}
for st in &*holger.storage_endpoints {
storage_map.insert(st.ron_name.clone(), st as *const StorageEndpoint);
}
for repo in &mut holger.repositories {
for name in &repo.ron_upstreams {
if let Some(ptr) = repo_map.get(name) {
repo.wired_upstreams.push(*ptr);
} else {
return Err(anyhow::anyhow!("Missing upstream repo: {}", name));
}
}
if let Some(io) = &mut repo.ron_in {
io.wired_storage = *storage_map
.get(&io.ron_storage_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
io.wired_exposed = *exposed_map
.get(&io.ron_exposed_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
}
if let Some(io) = &mut repo.ron_out {
io.wired_storage = *storage_map
.get(&io.ron_storage_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
io.wired_exposed = *exposed_map
.get(&io.ron_exposed_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
}
}
for exp in &mut holger.exposed_endpoints {
for repo in &holger.repositories {
if let Some(io) = &repo.ron_out {
if io.ron_exposed_endpoint == exp.ron_name {
exp.wired_out_repositories.push(repo as *const Repository);
}
}
}
}
for st in &mut holger.storage_endpoints {
for repo in &holger.repositories {
if let Some(io) = &repo.ron_in {
if io.ron_storage_endpoint == st.ron_name {
st.wired_in_repositories.push(repo as *const Repository);
}
}
if let Some(io) = &repo.ron_out {
if io.ron_storage_endpoint == st.ron_name {
st.wired_out_repositories.push(repo as *const Repository);
}
}
}
}
for exp in &mut holger.exposed_endpoints {
let mut routes: Vec<(String, Arc<dyn RepositoryBackendTrait>)> = Vec::new();
for &repo_ptr in &exp.wired_out_repositories {
if repo_ptr.is_null() { continue; }
let repo: &Repository = unsafe { &*repo_ptr };
let Some(backend_arc) = &repo.backend_repository else { continue };
let backend: Arc<dyn RepositoryBackendTrait> = if repo.wired_upstreams.is_empty() {
backend_arc.clone()
} else {
let upstream_backends: Vec<Arc<dyn RepositoryBackendTrait>> = repo
.wired_upstreams
.iter()
.filter_map(|&ptr| {
if ptr.is_null() { return None; }
let upstream: &Repository = unsafe { &*ptr };
upstream.backend_repository.clone()
})
.collect();
Arc::new(crate::proxy::ProxyBackend::new(backend_arc.clone(), upstream_backends))
};
routes.push((repo.ron_name.clone(), backend));
}
exp.aggregated_routes = if routes.is_empty() {
None
} else {
Some(FastRoutes::new(routes))
};
}
Ok(())
}
#[derive(Serialize, Deserialize)]
pub struct Holger {
pub repositories: Vec<Repository>,
pub exposed_endpoints: Vec<ExposedEndpoint>,
pub storage_endpoints: Vec<StorageEndpoint>,
#[serde(default)]
pub auth: Option<auth::AuthConfig>,
}
pub fn read_ron_config<P: AsRef<Path>>(path: P) -> Result<Holger> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let holger: Holger = from_reader(reader)?;
Ok(holger)
}
impl Holger {
pub fn fetch(&self, repository: &str, id: &traits::ArtifactId) -> Result<Option<Vec<u8>>> {
let repo = self.get_repo(repository)?;
repo.fetch(id).map_err(Into::into)
}
pub fn put(&self, repository: &str, id: &traits::ArtifactId, data: &[u8]) -> Result<()> {
let repo = self.get_repo(repository)?;
if !repo.is_writable() {
anyhow::bail!("Repository '{}' is read-only", repository);
}
repo.put(id, data).map_err(Into::into)
}
pub fn list_repositories(&self) -> Vec<&str> {
self.repositories.iter().map(|r| r.ron_name.as_str()).collect()
}
fn get_repo(&self, name: &str) -> Result<&Arc<dyn RepositoryBackendTrait>> {
for repo in &self.repositories {
if repo.ron_name == name {
return repo.backend_repository.as_ref()
.ok_or_else(|| anyhow::anyhow!("Repository '{}' has no backend", name));
}
}
anyhow::bail!("Repository '{}' not found", name)
}
pub fn start(&self) -> Result<()> {
crate::exposed::tls::ensure_crypto_provider();
let auth_config = Arc::new(self.auth.clone().unwrap_or_default());
for ep in &self.exposed_endpoints {
if let Some(routes) = &ep.aggregated_routes {
self.start_grpc_endpoint(&ep.ron_url, routes.clone(), auth_config.clone(), ep.ron_tls.as_ref())?;
if let Some(http_url) = &ep.ron_http_url {
let addr: std::net::SocketAddr = http_url.parse()
.map_err(|e| anyhow::anyhow!("Invalid HTTP address '{}': {}", http_url, e))?;
let tls = match &ep.ron_tls {
Some(t) => Some(t.rustls_gateway_config()?),
None => None,
};
let settings = Arc::new(crate::exposed::http::GatewaySettings {
routes: routes.clone(),
auth_config: auth_config.clone(),
tls,
max_body_bytes: ep.ron_max_body_bytes.unwrap_or(1024 * 1024 * 1024),
});
crate::exposed::http::start_http_gateway(addr, settings)?;
}
}
}
Ok(())
}
fn start_grpc_endpoint(
&self,
addr: &str,
routes: FastRoutes,
auth_config: Arc<auth::AuthConfig>,
tls: Option<&crate::exposed::tls::TlsSettings>,
) -> Result<()> {
use crate::grpc::holger_proto::{
repository_service_server::RepositoryServiceServer,
archive_service_server::ArchiveServiceServer,
admin_service_server::AdminServiceServer,
};
use crate::grpc::HolgerGrpc;
let grpc_state = Arc::new(HolgerGrpc::with_auth(routes, auth_config));
let listen_addr: std::net::SocketAddr = addr.parse()
.map_err(|e| anyhow::anyhow!("Invalid gRPC address '{}': {}", addr, e))?;
let tls_was_set = tls.is_some();
let mut builder = tonic::transport::Server::builder();
match tls {
Some(t) => {
builder = builder.tls_config(t.tonic_config()?)
.map_err(|e| anyhow::anyhow!("gRPC TLS config: {}", e))?;
}
None => log::warn!(
"SECURITY: gRPC server on {} runs WITHOUT TLS (cleartext h2c). \
Set ron_tls for any non-loopback use.",
listen_addr
),
}
tokio::spawn(async move {
let scheme = if tls_was_set { "https/h2" } else { "h2c (cleartext)" };
println!("gRPC server listening on {} [{}]", listen_addr, scheme);
if let Err(e) = builder
.add_service(RepositoryServiceServer::new(grpc_state.clone()))
.add_service(ArchiveServiceServer::new(grpc_state.clone()))
.add_service(AdminServiceServer::new(grpc_state.clone()))
.serve(listen_addr)
.await
{
eprintln!("gRPC server error: {}", e);
}
});
Ok(())
}
pub fn instantiate_backends(&mut self) -> Result<()> {
for se in &mut self.storage_endpoints {
se.backend_from_config()?;
}
for repo in &mut self.repositories {
repo.backend_from_config()?;
}
Ok(())
}
}