warg_server/
lib.rs

1use crate::{api::create_router, datastore::MemoryDataStore};
2use anyhow::{Context, Result};
3use axum::Router;
4use datastore::DataStore;
5use futures::Future;
6use policy::{content::ContentPolicy, record::RecordPolicy};
7use services::CoreService;
8use std::{fs, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
9use tokio::{net::TcpListener, task::JoinHandle};
10use url::Url;
11use warg_crypto::signing::PrivateKey;
12use warg_protocol::operator;
13
14pub mod api;
15pub mod args;
16pub mod datastore;
17pub mod policy;
18pub mod services;
19
20const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0:8090";
21const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5);
22
23type ShutdownFut = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
24
25/// The server configuration.
26pub struct Config {
27    operator_key: PrivateKey,
28    namespaces: Option<Vec<(String, operator::NamespaceState)>>,
29    addr: Option<SocketAddr>,
30    data_store: Option<Box<dyn DataStore>>,
31    content_dir: PathBuf,
32    content_base_url: Option<Url>,
33    shutdown: Option<ShutdownFut>,
34    checkpoint_interval: Option<Duration>,
35    content_policy: Option<Arc<dyn ContentPolicy>>,
36    record_policy: Option<Arc<dyn RecordPolicy>>,
37}
38
39impl std::fmt::Debug for Config {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        f.debug_struct("Config")
42            .field("operator_key", &"<redacted>")
43            .field("namespaces", &self.namespaces)
44            .field("addr", &self.addr)
45            .field(
46                "data_store",
47                &self.data_store.as_ref().map(|_| "dyn DataStore"),
48            )
49            .field("content_dir", &self.content_dir)
50            .field("shutdown", &self.shutdown.as_ref().map(|_| "dyn Future"))
51            .field("checkpoint_interval", &self.checkpoint_interval)
52            .field(
53                "content_policy",
54                &self.content_policy.as_ref().map(|_| "dyn ContentPolicy"),
55            )
56            .field(
57                "record_policy",
58                &self.record_policy.as_ref().map(|_| "dyn RecordPolicy"),
59            )
60            .finish()
61    }
62}
63
64impl Config {
65    /// Creates a new server configuration.
66    pub fn new(
67        operator_key: PrivateKey,
68        namespaces: Option<Vec<(String, operator::NamespaceState)>>,
69        content_dir: PathBuf,
70    ) -> Self {
71        Self {
72            operator_key,
73            namespaces,
74            addr: None,
75            data_store: None,
76            content_dir,
77            content_base_url: None,
78            shutdown: None,
79            checkpoint_interval: None,
80            content_policy: None,
81            record_policy: None,
82        }
83    }
84
85    /// Specify the address for the server to listen on.
86    pub fn with_addr(mut self, addr: impl Into<SocketAddr>) -> Self {
87        self.addr = Some(addr.into());
88        self
89    }
90
91    /// Specify the content base URL to use.
92    ///
93    /// If not set, the content base URL will be derived from the server address.
94    pub fn with_content_base_url(mut self, url: Url) -> Self {
95        self.content_base_url = Some(url);
96        self
97    }
98
99    /// Specify the data store to use.
100    ///
101    /// If this is not specified, the server will use an in-memory data store.
102    pub fn with_data_store(mut self, store: impl DataStore + 'static) -> Self {
103        self.data_store = Some(Box::new(store));
104        self
105    }
106
107    /// Specify the data store to use via a boxed data store.
108    ///
109    /// If this is not specified, the server will use an in-memory data store.
110    pub fn with_boxed_data_store(mut self, store: Box<dyn DataStore>) -> Self {
111        self.data_store = Some(store);
112        self
113    }
114
115    /// Specifies the future to wait on to shutdown the server.
116    ///
117    /// If the future completes, the server will initiate a graceful shutdown.
118    pub fn with_shutdown(
119        mut self,
120        shutdown: impl Future<Output = ()> + Send + Sync + 'static,
121    ) -> Self {
122        self.shutdown = Some(Box::pin(shutdown));
123        self
124    }
125
126    /// Sets the checkpoint interval to use for the server.
127    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
128        self.checkpoint_interval = Some(interval);
129        self
130    }
131
132    /// Sets the content policy to use for the server.
133    pub fn with_content_policy(mut self, policy: impl ContentPolicy + 'static) -> Self {
134        self.content_policy = Some(Arc::new(policy));
135        self
136    }
137
138    /// Sets the record policy to use for the server.
139    pub fn with_record_policy(mut self, policy: impl RecordPolicy + 'static) -> Self {
140        self.record_policy = Some(Arc::new(policy));
141        self
142    }
143}
144
145/// Represents the warg registry server.
146pub struct Server {
147    config: Config,
148}
149
150impl Server {
151    /// Creates a new server with the given configuration.
152    pub fn new(config: Config) -> Self {
153        Self { config }
154    }
155
156    /// Initializes the server and starts serving.
157    ///
158    /// Equivalent to calling [`Server::initialize`] followed by
159    /// [`InitializedServer::serve`].
160    pub async fn run(self) -> Result<()> {
161        self.initialize().await?.serve().await
162    }
163
164    /// Initializes the server's internal state, background task(s), and
165    /// listening socket, returning an [`InitializedServer`]. To actually begin
166    /// serving, call [`InitializedServer::serve`].
167    ///
168    /// Useful for tests that need full initialization before running.
169    pub async fn initialize(self) -> Result<InitializedServer> {
170        let addr = self
171            .config
172            .addr
173            .unwrap_or_else(|| DEFAULT_BIND_ADDRESS.parse().unwrap());
174
175        tracing::debug!("binding server to address `{addr}`");
176        let listener = TcpListener::bind(addr)
177            .await
178            .with_context(|| format!("failed to bind to address `{addr}`"))?;
179        let addr = listener.local_addr()?;
180
181        tracing::debug!(
182            "using server configuration: {config:?}",
183            config = self.config
184        );
185
186        let store = self
187            .config
188            .data_store
189            .unwrap_or_else(|| Box::<MemoryDataStore>::default());
190        let (core, core_handle) = CoreService::start(
191            self.config.operator_key,
192            self.config.namespaces,
193            store,
194            self.config
195                .checkpoint_interval
196                .unwrap_or(DEFAULT_CHECKPOINT_INTERVAL),
197        )
198        .await?;
199
200        let temp_dir = self.config.content_dir.join("tmp");
201        fs::create_dir_all(&temp_dir).with_context(|| {
202            format!(
203                "failed to create content temp directory `{path}`",
204                path = temp_dir.display()
205            )
206        })?;
207
208        let files_dir = self.config.content_dir.join("files");
209        fs::create_dir_all(&files_dir).with_context(|| {
210            format!(
211                "failed to create content files directory `{path}`",
212                path = files_dir.display()
213            )
214        })?;
215
216        let content_base_url = self
217            .config
218            .content_base_url
219            .unwrap_or_else(|| Url::parse(&format!("http://{addr}")).unwrap());
220
221        let router = create_router(
222            content_base_url,
223            core,
224            temp_dir,
225            files_dir,
226            self.config.content_policy,
227            self.config.record_policy,
228        );
229
230        Ok(InitializedServer {
231            listener,
232            router,
233            core_handle,
234            shutdown: self.config.shutdown,
235        })
236    }
237}
238
239/// Represents an initialized warg registry server.
240pub struct InitializedServer {
241    listener: TcpListener,
242    router: Router,
243    core_handle: JoinHandle<()>,
244    shutdown: Option<ShutdownFut>,
245}
246
247impl InitializedServer {
248    /// Returns the listening address of the server. If a random listening
249    /// port was requested (i.e. `:0`), this returns the actual bound port.
250    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
251        self.listener.local_addr()
252    }
253
254    /// Serves the server's services. On server shutdown, awaits completion of
255    /// background task(s) before returning.
256    pub async fn serve(self) -> Result<()> {
257        let addr = self.local_addr()?;
258
259        let server = axum::serve::serve(self.listener, self.router.into_make_service());
260
261        tracing::info!("listening on {addr}");
262
263        if let Some(shutdown) = self.shutdown {
264            tracing::debug!("server is running with a shutdown signal");
265            server.with_graceful_shutdown(shutdown).await?;
266        } else {
267            tracing::debug!("server is running without a shutdown signal");
268            server.await?;
269        }
270
271        tracing::info!("waiting for core service to stop");
272        self.core_handle.await?;
273
274        tracing::info!("server shutdown complete");
275        Ok(())
276    }
277}