use serde::{Deserialize, Serialize};
use std::{
fs::File,
io::BufReader,
path::Path,
};
use anyhow::Result;
use ron::de::from_reader;
use std::collections::HashMap;
use std::sync::Arc;
use traits::RepositoryBackendTrait;
use crate::exposed::ExposedEndpoint;
use crate::exposed::fast_routes::FastRoutes;
pub use crate::repository::Repository;
pub use crate::storage::StorageEndpoint;
pub mod auth;
pub mod exposed;
pub mod grpc;
pub mod object;
pub mod proxy;
pub mod repository;
pub mod storage;
pub mod upstream_rust;
pub use crate::object::{LocalHolger, RemoteHolger};
pub use traits::HolgerObject;
pub fn wire_holger(holger: &mut Holger) -> Result<()> {
let mut repo_map = HashMap::new();
let mut exposed_map = HashMap::new();
let mut storage_map = HashMap::new();
for repo in &*holger.repositories {
repo_map.insert(repo.ron_name.clone(), repo as *const Repository);
}
for exp in &*holger.exposed_endpoints {
exposed_map.insert(exp.ron_name.clone(), exp as *const ExposedEndpoint);
}
for st in &*holger.storage_endpoints {
storage_map.insert(st.ron_name.clone(), st as *const StorageEndpoint);
}
for repo in &mut holger.repositories {
for name in &repo.ron_upstreams {
if let Some(ptr) = repo_map.get(name) {
repo.wired_upstreams.push(*ptr);
} else {
return Err(anyhow::anyhow!("Missing upstream repo: {}", name));
}
}
if let Some(io) = &mut repo.ron_in {
io.wired_storage = *storage_map
.get(&io.ron_storage_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
io.wired_exposed = *exposed_map
.get(&io.ron_exposed_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
}
if let Some(io) = &mut repo.ron_out {
io.wired_storage = *storage_map
.get(&io.ron_storage_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing storage endpoint: {}", io.ron_storage_endpoint))?;
io.wired_exposed = *exposed_map
.get(&io.ron_exposed_endpoint)
.ok_or_else(|| anyhow::anyhow!("Missing exposed endpoint: {}", io.ron_exposed_endpoint))?;
}
}
for exp in &mut holger.exposed_endpoints {
for repo in &holger.repositories {
if let Some(io) = &repo.ron_out {
if io.ron_exposed_endpoint == exp.ron_name {
exp.wired_out_repositories.push(repo as *const Repository);
}
}
}
}
for st in &mut holger.storage_endpoints {
for repo in &holger.repositories {
if let Some(io) = &repo.ron_in {
if io.ron_storage_endpoint == st.ron_name {
st.wired_in_repositories.push(repo as *const Repository);
}
}
if let Some(io) = &repo.ron_out {
if io.ron_storage_endpoint == st.ron_name {
st.wired_out_repositories.push(repo as *const Repository);
}
}
}
}
for exp in &mut holger.exposed_endpoints {
let mut routes: Vec<(String, Arc<dyn RepositoryBackendTrait>)> = Vec::new();
for &repo_ptr in &exp.wired_out_repositories {
if repo_ptr.is_null() { continue; }
let repo: &Repository = unsafe { &*repo_ptr };
let Some(backend_arc) = &repo.backend_repository else { continue };
let backend: Arc<dyn RepositoryBackendTrait> = if repo.wired_upstreams.is_empty() {
backend_arc.clone()
} else {
let upstream_backends: Vec<Arc<dyn RepositoryBackendTrait>> = repo
.wired_upstreams
.iter()
.filter_map(|&ptr| {
if ptr.is_null() { return None; }
let upstream: &Repository = unsafe { &*ptr };
upstream.backend_repository.clone()
})
.collect();
Arc::new(crate::proxy::ProxyBackend::new(backend_arc.clone(), upstream_backends))
};
routes.push((repo.ron_name.clone(), backend));
}
exp.aggregated_routes = if routes.is_empty() {
None
} else {
Some(FastRoutes::new(routes))
};
}
Ok(())
}
#[derive(Serialize, Deserialize)]
pub struct Holger {
pub repositories: Vec<Repository>,
pub exposed_endpoints: Vec<ExposedEndpoint>,
pub storage_endpoints: Vec<StorageEndpoint>,
#[serde(default)]
pub auth: Option<auth::AuthConfig>,
}
pub fn read_ron_config<P: AsRef<Path>>(path: P) -> Result<Holger> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let holger: Holger = from_reader(reader)?;
Ok(holger)
}
pub fn dev_pair_ron(data_dir: &Path, grpc_url: &str, http_addr: &str) -> String {
let d = data_dir.display();
format!(
r#"(
exposed_endpoints: [
( ron_name: "dev", ron_url: "{grpc_url}", ron_http_url: Some("{http_addr}") ),
],
storage_endpoints: [
( ron_name: "cache-store", ron_storage_type: "znippy", ron_path: "{d}/cache/" ),
( ron_name: "sparring-store", ron_storage_type: "znippy", ron_path: "{d}/sparring/" ),
],
repositories: [
( ron_name: "crates-io", ron_repo_type: "rust-remote", ron_upstreams: [], ron_in: None, ron_out: None ),
( ron_name: "cache", ron_repo_type: "rust", ron_writable_archive: Some("{d}/cache.znippy"), ron_base_url: Some("http://{http_addr}"), ron_upstreams: ["crates-io"], ron_in: None, ron_out: Some(( ron_storage_endpoint: "cache-store", ron_exposed_endpoint: "dev" )) ),
( ron_name: "sparring", ron_repo_type: "rust", ron_writable_archive: Some("{d}/sparring.znippy"), ron_base_url: Some("http://{http_addr}"), ron_upstreams: ["crates-io"], ron_in: None, ron_out: Some(( ron_storage_endpoint: "sparring-store", ron_exposed_endpoint: "dev" )) ),
],
)
"#
)
}
impl Holger {
pub fn fetch(&self, repository: &str, id: &traits::ArtifactId) -> Result<Option<Vec<u8>>> {
let repo = self.get_repo(repository)?;
repo.fetch(id).map_err(Into::into)
}
pub fn put(&self, repository: &str, id: &traits::ArtifactId, data: &[u8]) -> Result<()> {
let repo = self.get_repo(repository)?;
if !repo.is_writable() {
anyhow::bail!("Repository '{}' is read-only", repository);
}
repo.put(id, data).map_err(Into::into)
}
pub fn list_repositories(&self) -> Vec<&str> {
self.repositories.iter().map(|r| r.ron_name.as_str()).collect()
}
fn get_repo(&self, name: &str) -> Result<&Arc<dyn RepositoryBackendTrait>> {
for repo in &self.repositories {
if repo.ron_name == name {
return repo.backend_repository.as_ref()
.ok_or_else(|| anyhow::anyhow!("Repository '{}' has no backend", name));
}
}
anyhow::bail!("Repository '{}' not found", name)
}
pub fn start(&self) -> Result<()> {
crate::exposed::tls::ensure_crypto_provider();
let auth_config = Arc::new(self.auth.clone().unwrap_or_default());
for ep in &self.exposed_endpoints {
if let Some(routes) = &ep.aggregated_routes {
self.start_grpc_endpoint(&ep.ron_url, routes.clone(), auth_config.clone(), ep.ron_tls.as_ref())?;
if let Some(http_url) = &ep.ron_http_url {
let addr: std::net::SocketAddr = http_url.parse()
.map_err(|e| anyhow::anyhow!("Invalid HTTP address '{}': {}", http_url, e))?;
let tls = match &ep.ron_tls {
Some(t) => Some(t.rustls_gateway_config()?),
None => None,
};
let settings = Arc::new(crate::exposed::http::GatewaySettings {
routes: routes.clone(),
auth_config: auth_config.clone(),
tls,
max_body_bytes: ep.ron_max_body_bytes.unwrap_or(1024 * 1024 * 1024),
});
crate::exposed::http::start_http_gateway(addr, settings)?;
}
}
}
Ok(())
}
fn start_grpc_endpoint(
&self,
addr: &str,
routes: FastRoutes,
auth_config: Arc<auth::AuthConfig>,
tls: Option<&crate::exposed::tls::TlsSettings>,
) -> Result<()> {
use crate::grpc::holger_proto::{
repository_service_server::RepositoryServiceServer,
archive_service_server::ArchiveServiceServer,
admin_service_server::AdminServiceServer,
};
use crate::grpc::HolgerGrpc;
let grpc_state = Arc::new(HolgerGrpc::with_auth(routes, auth_config));
let listen_addr: std::net::SocketAddr = addr.parse()
.map_err(|e| anyhow::anyhow!("Invalid gRPC address '{}': {}", addr, e))?;
let tls_was_set = tls.is_some();
let mut builder = tonic::transport::Server::builder();
match tls {
Some(t) => {
builder = builder.tls_config(t.tonic_config()?)
.map_err(|e| anyhow::anyhow!("gRPC TLS config: {}", e))?;
}
None => log::warn!(
"SECURITY: gRPC server on {} runs WITHOUT TLS (cleartext h2c). \
Set ron_tls for any non-loopback use.",
listen_addr
),
}
tokio::spawn(async move {
let scheme = if tls_was_set { "https/h2" } else { "h2c (cleartext)" };
println!("gRPC server listening on {} [{}]", listen_addr, scheme);
if let Err(e) = builder
.add_service(RepositoryServiceServer::new(grpc_state.clone()))
.add_service(ArchiveServiceServer::new(grpc_state.clone()))
.add_service(AdminServiceServer::new(grpc_state.clone()))
.serve(listen_addr)
.await
{
eprintln!("gRPC server error: {}", e);
}
});
Ok(())
}
pub fn instantiate_backends(&mut self) -> Result<()> {
for se in &mut self.storage_endpoints {
se.backend_from_config()?;
}
for repo in &mut self.repositories {
repo.backend_from_config()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dev_pair_ron_parses_instantiates_and_wires() {
let dir = tempfile::tempdir().unwrap();
let ron = dev_pair_ron(dir.path(), "https://127.0.0.1:18443", "127.0.0.1:18444");
let mut holger: Holger =
from_reader(std::io::Cursor::new(ron.as_bytes())).expect("dev-pair RON must parse");
assert_eq!(holger.list_repositories(), vec!["crates-io", "cache", "sparring"]);
holger.instantiate_backends().expect("every backend must build");
wire_holger(&mut holger).expect("upstreams + endpoints must all wire");
let id = traits::ArtifactId { namespace: None, name: "demo".into(), version: "0.1.0".into() };
holger.put("sparring", &id, b"crate-bytes").expect("sparring is writable");
assert_eq!(
holger.fetch("sparring", &id).unwrap().as_deref(),
Some(b"crate-bytes".as_ref()),
);
assert!(holger.put("crates-io", &id, b"x").is_err(), "crates-io upstream is read-only");
}
}