1use crate::{api::create_router, datastore::MemoryDataStore};
2use anyhow::{Context, Result};
3use axum::Router;
4use datastore::DataStore;
5use futures::Future;
6use policy::{content::ContentPolicy, record::RecordPolicy};
7use services::CoreService;
8use std::{fs, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
9use tokio::{net::TcpListener, task::JoinHandle};
10use url::Url;
11use warg_crypto::signing::PrivateKey;
12use warg_protocol::operator;
13
14pub mod api;
15pub mod args;
16pub mod datastore;
17pub mod policy;
18pub mod services;
19
20const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0:8090";
21const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5);
22
23type ShutdownFut = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
24
25pub struct Config {
27 operator_key: PrivateKey,
28 namespaces: Option<Vec<(String, operator::NamespaceState)>>,
29 addr: Option<SocketAddr>,
30 data_store: Option<Box<dyn DataStore>>,
31 content_dir: PathBuf,
32 content_base_url: Option<Url>,
33 shutdown: Option<ShutdownFut>,
34 checkpoint_interval: Option<Duration>,
35 content_policy: Option<Arc<dyn ContentPolicy>>,
36 record_policy: Option<Arc<dyn RecordPolicy>>,
37}
38
39impl std::fmt::Debug for Config {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 f.debug_struct("Config")
42 .field("operator_key", &"<redacted>")
43 .field("namespaces", &self.namespaces)
44 .field("addr", &self.addr)
45 .field(
46 "data_store",
47 &self.data_store.as_ref().map(|_| "dyn DataStore"),
48 )
49 .field("content_dir", &self.content_dir)
50 .field("shutdown", &self.shutdown.as_ref().map(|_| "dyn Future"))
51 .field("checkpoint_interval", &self.checkpoint_interval)
52 .field(
53 "content_policy",
54 &self.content_policy.as_ref().map(|_| "dyn ContentPolicy"),
55 )
56 .field(
57 "record_policy",
58 &self.record_policy.as_ref().map(|_| "dyn RecordPolicy"),
59 )
60 .finish()
61 }
62}
63
64impl Config {
65 pub fn new(
67 operator_key: PrivateKey,
68 namespaces: Option<Vec<(String, operator::NamespaceState)>>,
69 content_dir: PathBuf,
70 ) -> Self {
71 Self {
72 operator_key,
73 namespaces,
74 addr: None,
75 data_store: None,
76 content_dir,
77 content_base_url: None,
78 shutdown: None,
79 checkpoint_interval: None,
80 content_policy: None,
81 record_policy: None,
82 }
83 }
84
85 pub fn with_addr(mut self, addr: impl Into<SocketAddr>) -> Self {
87 self.addr = Some(addr.into());
88 self
89 }
90
91 pub fn with_content_base_url(mut self, url: Url) -> Self {
95 self.content_base_url = Some(url);
96 self
97 }
98
99 pub fn with_data_store(mut self, store: impl DataStore + 'static) -> Self {
103 self.data_store = Some(Box::new(store));
104 self
105 }
106
107 pub fn with_boxed_data_store(mut self, store: Box<dyn DataStore>) -> Self {
111 self.data_store = Some(store);
112 self
113 }
114
115 pub fn with_shutdown(
119 mut self,
120 shutdown: impl Future<Output = ()> + Send + Sync + 'static,
121 ) -> Self {
122 self.shutdown = Some(Box::pin(shutdown));
123 self
124 }
125
126 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
128 self.checkpoint_interval = Some(interval);
129 self
130 }
131
132 pub fn with_content_policy(mut self, policy: impl ContentPolicy + 'static) -> Self {
134 self.content_policy = Some(Arc::new(policy));
135 self
136 }
137
138 pub fn with_record_policy(mut self, policy: impl RecordPolicy + 'static) -> Self {
140 self.record_policy = Some(Arc::new(policy));
141 self
142 }
143}
144
145pub struct Server {
147 config: Config,
148}
149
150impl Server {
151 pub fn new(config: Config) -> Self {
153 Self { config }
154 }
155
156 pub async fn run(self) -> Result<()> {
161 self.initialize().await?.serve().await
162 }
163
164 pub async fn initialize(self) -> Result<InitializedServer> {
170 let addr = self
171 .config
172 .addr
173 .unwrap_or_else(|| DEFAULT_BIND_ADDRESS.parse().unwrap());
174
175 tracing::debug!("binding server to address `{addr}`");
176 let listener = TcpListener::bind(addr)
177 .await
178 .with_context(|| format!("failed to bind to address `{addr}`"))?;
179 let addr = listener.local_addr()?;
180
181 tracing::debug!(
182 "using server configuration: {config:?}",
183 config = self.config
184 );
185
186 let store = self
187 .config
188 .data_store
189 .unwrap_or_else(|| Box::<MemoryDataStore>::default());
190 let (core, core_handle) = CoreService::start(
191 self.config.operator_key,
192 self.config.namespaces,
193 store,
194 self.config
195 .checkpoint_interval
196 .unwrap_or(DEFAULT_CHECKPOINT_INTERVAL),
197 )
198 .await?;
199
200 let temp_dir = self.config.content_dir.join("tmp");
201 fs::create_dir_all(&temp_dir).with_context(|| {
202 format!(
203 "failed to create content temp directory `{path}`",
204 path = temp_dir.display()
205 )
206 })?;
207
208 let files_dir = self.config.content_dir.join("files");
209 fs::create_dir_all(&files_dir).with_context(|| {
210 format!(
211 "failed to create content files directory `{path}`",
212 path = files_dir.display()
213 )
214 })?;
215
216 let content_base_url = self
217 .config
218 .content_base_url
219 .unwrap_or_else(|| Url::parse(&format!("http://{addr}")).unwrap());
220
221 let router = create_router(
222 content_base_url,
223 core,
224 temp_dir,
225 files_dir,
226 self.config.content_policy,
227 self.config.record_policy,
228 );
229
230 Ok(InitializedServer {
231 listener,
232 router,
233 core_handle,
234 shutdown: self.config.shutdown,
235 })
236 }
237}
238
239pub struct InitializedServer {
241 listener: TcpListener,
242 router: Router,
243 core_handle: JoinHandle<()>,
244 shutdown: Option<ShutdownFut>,
245}
246
247impl InitializedServer {
248 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
251 self.listener.local_addr()
252 }
253
254 pub async fn serve(self) -> Result<()> {
257 let addr = self.local_addr()?;
258
259 let server = axum::serve::serve(self.listener, self.router.into_make_service());
260
261 tracing::info!("listening on {addr}");
262
263 if let Some(shutdown) = self.shutdown {
264 tracing::debug!("server is running with a shutdown signal");
265 server.with_graceful_shutdown(shutdown).await?;
266 } else {
267 tracing::debug!("server is running without a shutdown signal");
268 server.await?;
269 }
270
271 tracing::info!("waiting for core service to stop");
272 self.core_handle.await?;
273
274 tracing::info!("server shutdown complete");
275 Ok(())
276 }
277}