use crate::{api::create_router, datastore::MemoryDataStore};
use anyhow::{Context, Result};
use axum::Router;
use datastore::DataStore;
use futures::Future;
use policy::{content::ContentPolicy, record::RecordPolicy};
use services::CoreService;
use std::{fs, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
use tokio::{net::TcpListener, task::JoinHandle};
use url::Url;
use warg_crypto::signing::PrivateKey;
use warg_protocol::operator;
pub mod api;
pub mod args;
pub mod datastore;
pub mod policy;
pub mod services;
const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0:8090";
const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5);
type ShutdownFut = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub struct Config {
operator_key: PrivateKey,
namespaces: Option<Vec<(String, operator::NamespaceState)>>,
addr: Option<SocketAddr>,
data_store: Option<Box<dyn DataStore>>,
content_dir: PathBuf,
content_base_url: Option<Url>,
shutdown: Option<ShutdownFut>,
checkpoint_interval: Option<Duration>,
content_policy: Option<Arc<dyn ContentPolicy>>,
record_policy: Option<Arc<dyn RecordPolicy>>,
}
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Config")
.field("operator_key", &"<redacted>")
.field("namespaces", &self.namespaces)
.field("addr", &self.addr)
.field(
"data_store",
&self.data_store.as_ref().map(|_| "dyn DataStore"),
)
.field("content_dir", &self.content_dir)
.field("shutdown", &self.shutdown.as_ref().map(|_| "dyn Future"))
.field("checkpoint_interval", &self.checkpoint_interval)
.field(
"content_policy",
&self.content_policy.as_ref().map(|_| "dyn ContentPolicy"),
)
.field(
"record_policy",
&self.record_policy.as_ref().map(|_| "dyn RecordPolicy"),
)
.finish()
}
}
impl Config {
pub fn new(
operator_key: PrivateKey,
namespaces: Option<Vec<(String, operator::NamespaceState)>>,
content_dir: PathBuf,
) -> Self {
Self {
operator_key,
namespaces,
addr: None,
data_store: None,
content_dir,
content_base_url: None,
shutdown: None,
checkpoint_interval: None,
content_policy: None,
record_policy: None,
}
}
pub fn with_addr(mut self, addr: impl Into<SocketAddr>) -> Self {
self.addr = Some(addr.into());
self
}
pub fn with_content_base_url(mut self, url: Url) -> Self {
self.content_base_url = Some(url);
self
}
pub fn with_data_store(mut self, store: impl DataStore + 'static) -> Self {
self.data_store = Some(Box::new(store));
self
}
pub fn with_boxed_data_store(mut self, store: Box<dyn DataStore>) -> Self {
self.data_store = Some(store);
self
}
pub fn with_shutdown(
mut self,
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
) -> Self {
self.shutdown = Some(Box::pin(shutdown));
self
}
pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
self.checkpoint_interval = Some(interval);
self
}
pub fn with_content_policy(mut self, policy: impl ContentPolicy + 'static) -> Self {
self.content_policy = Some(Arc::new(policy));
self
}
pub fn with_record_policy(mut self, policy: impl RecordPolicy + 'static) -> Self {
self.record_policy = Some(Arc::new(policy));
self
}
}
pub struct Server {
config: Config,
}
impl Server {
pub fn new(config: Config) -> Self {
Self { config }
}
pub async fn run(self) -> Result<()> {
self.initialize().await?.serve().await
}
pub async fn initialize(self) -> Result<InitializedServer> {
let addr = self
.config
.addr
.unwrap_or_else(|| DEFAULT_BIND_ADDRESS.parse().unwrap());
tracing::debug!("binding server to address `{addr}`");
let listener = TcpListener::bind(addr)
.await
.with_context(|| format!("failed to bind to address `{addr}`"))?;
let addr = listener.local_addr()?;
tracing::debug!(
"using server configuration: {config:?}",
config = self.config
);
let store = self
.config
.data_store
.unwrap_or_else(|| Box::<MemoryDataStore>::default());
let (core, core_handle) = CoreService::start(
self.config.operator_key,
self.config.namespaces,
store,
self.config
.checkpoint_interval
.unwrap_or(DEFAULT_CHECKPOINT_INTERVAL),
)
.await?;
let temp_dir = self.config.content_dir.join("tmp");
fs::create_dir_all(&temp_dir).with_context(|| {
format!(
"failed to create content temp directory `{path}`",
path = temp_dir.display()
)
})?;
let files_dir = self.config.content_dir.join("files");
fs::create_dir_all(&files_dir).with_context(|| {
format!(
"failed to create content files directory `{path}`",
path = files_dir.display()
)
})?;
let content_base_url = self
.config
.content_base_url
.unwrap_or_else(|| Url::parse(&format!("http://{addr}")).unwrap());
let router = create_router(
content_base_url,
core,
temp_dir,
files_dir,
self.config.content_policy,
self.config.record_policy,
);
Ok(InitializedServer {
listener,
router,
core_handle,
shutdown: self.config.shutdown,
})
}
}
pub struct InitializedServer {
listener: TcpListener,
router: Router,
core_handle: JoinHandle<()>,
shutdown: Option<ShutdownFut>,
}
impl InitializedServer {
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.listener.local_addr()
}
pub async fn serve(self) -> Result<()> {
let addr = self.local_addr()?;
let server = axum::serve::serve(self.listener, self.router.into_make_service());
tracing::info!("listening on {addr}");
if let Some(shutdown) = self.shutdown {
tracing::debug!("server is running with a shutdown signal");
server.with_graceful_shutdown(shutdown).await?;
} else {
tracing::debug!("server is running without a shutdown signal");
server.await?;
}
tracing::info!("waiting for core service to stop");
self.core_handle.await?;
tracing::info!("server shutdown complete");
Ok(())
}
}