Skip to main content

zart_api/
server.rs

1//! HTTP server setup and lifecycle management.
2
3use crate::routes;
4use crate::state::AppState;
5use axum::Router;
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8use tower_http::cors::CorsLayer;
9use tower_http::trace::TraceLayer;
10use tracing::info;
11use zart::DurableApi;
12
13/// The Zart API server.
14///
15/// Wraps an Axum router and exposes durable execution management over HTTP.
16pub struct ApiServer {
17    /// TCP address to bind, e.g. `"0.0.0.0:8080"`.
18    addr: String,
19    /// The durable execution backend.
20    durable: Arc<dyn DurableApi>,
21    /// CancellationToken for graceful shutdown.
22    cancellation: Option<CancellationToken>,
23}
24
25impl ApiServer {
26    /// Create a new API server bound to `addr`.
27    pub fn new(addr: impl Into<String>, durable: Arc<dyn DurableApi>) -> Self {
28        Self {
29            addr: addr.into(),
30            durable,
31            cancellation: None,
32        }
33    }
34
35    /// Create a new API server with a cancellation token for graceful shutdown.
36    pub fn with_cancellation(
37        addr: impl Into<String>,
38        durable: Arc<dyn DurableApi>,
39        cancellation: CancellationToken,
40    ) -> Self {
41        Self {
42            addr: addr.into(),
43            durable,
44            cancellation: Some(cancellation),
45        }
46    }
47
48    /// Build the Axum router with all API routes and middleware.
49    pub fn router(&self) -> Router {
50        let state = AppState::new(self.durable.clone());
51        routes::api_router(state)
52            .layer(TraceLayer::new_for_http())
53            .layer(CorsLayer::permissive())
54    }
55
56    /// Start listening and serving requests.
57    ///
58    /// Blocks until the server is shut down.
59    pub async fn serve(self) -> Result<(), std::io::Error> {
60        info!(addr = %self.addr, "Zart API server starting");
61        let router = self.router();
62        let listener = tokio::net::TcpListener::bind(&self.addr).await?;
63
64        // Move cancellation token into a variable we can control
65        let cancellation = self.cancellation;
66
67        if let Some(cancellation) = cancellation {
68            info!("Zart API server configured with graceful shutdown");
69            // Create the shutdown signal future and keep cancellation alive
70            let shutdown_signal = async move {
71                cancellation.cancelled().await;
72            };
73            axum::serve(listener, router)
74                .with_graceful_shutdown(shutdown_signal)
75                .await
76        } else {
77            axum::serve(listener, router).await
78        }
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use async_trait::async_trait;
86    use std::time::Duration;
87    use zart::error::SchedulerError;
88    use zart_scheduler::ListExecutionsParams;
89    use zart_scheduler::{ExecutionRecord, ExecutionStats, ScheduleResult};
90
91    struct NullApi;
92
93    #[async_trait]
94    impl DurableApi for NullApi {
95        async fn start(
96            &self,
97            _: &str,
98            _: &str,
99            _: serde_json::Value,
100        ) -> Result<ScheduleResult, SchedulerError> {
101            unimplemented!()
102        }
103        async fn cancel(&self, _: &str) -> Result<bool, SchedulerError> {
104            unimplemented!()
105        }
106        async fn status(&self, _: &str) -> Result<ExecutionRecord, SchedulerError> {
107            unimplemented!()
108        }
109        async fn wait(
110            &self,
111            _: &str,
112            _: Duration,
113            _: Option<Duration>,
114        ) -> Result<ExecutionRecord, SchedulerError> {
115            unimplemented!()
116        }
117        async fn offer_event(
118            &self,
119            _: &str,
120            _: &str,
121            _: serde_json::Value,
122        ) -> Result<(), SchedulerError> {
123            unimplemented!()
124        }
125        async fn list_executions(
126            &self,
127            _: ListExecutionsParams,
128        ) -> Result<Vec<ExecutionRecord>, SchedulerError> {
129            unimplemented!()
130        }
131        async fn stats(&self) -> Result<ExecutionStats, SchedulerError> {
132            Ok(ExecutionStats::default())
133        }
134    }
135
136    #[test]
137    fn server_builds_router() {
138        let server = ApiServer::new("0.0.0.0:8080", Arc::new(NullApi));
139        let _ = server.router();
140    }
141}