Skip to main content

durable_streams_server/
router.rs

1//! Axum router construction for the Durable Streams HTTP surface.
2//!
3//! [`build_router`] is the main embedding entry point for library consumers.
4
5use crate::config::{Config, LongPollTimeout, SseReconnectInterval};
6use crate::protocol::stream_name::StreamNameLimits;
7use crate::{handlers, middleware, storage::Storage};
8use axum::http::HeaderValue;
9use axum::{Extension, Router, middleware as axum_middleware, routing::get};
10use std::sync::Arc;
11use std::sync::atomic::AtomicBool;
12use tokio_util::sync::CancellationToken;
13use tower_http::cors::{AllowOrigin, CorsLayer};
14
15/// Wrapper around [`CancellationToken`] for axum `Extension` extraction.
16///
17/// Long-poll and SSE handlers observe this token so they can drain cleanly
18/// when the server begins a graceful shutdown.
19#[derive(Clone)]
20pub struct ShutdownToken(pub CancellationToken);
21
22/// Default mount path for the Durable Streams protocol routes.
23pub const DEFAULT_STREAM_BASE_PATH: &str = "/v1/stream";
24
25/// Wrapper around the configured stream route mount path.
26#[derive(Clone)]
27pub(crate) struct StreamBasePath(pub Arc<str>);
28
29/// Build the application router with storage state.
30///
31/// Routes:
32/// - `GET /healthz`        – Liveness probe (always 200)
33/// - `GET/PUT/... <path>`  – Protocol routes mounted at the configured
34///   `config.stream_base_path` (default [`DEFAULT_STREAM_BASE_PATH`])
35///
36/// Uses a no-op cancellation token (never cancelled). For production
37/// use with graceful shutdown, prefer [`build_router_with_ready`].
38pub fn build_router<S: Storage + 'static>(storage: Arc<S>, config: &Config) -> Router {
39    build_router_with_ready(storage, config, None, CancellationToken::new())
40}
41
42/// Build the router with readiness flag and shutdown token.
43///
44/// When `ready` is `Some`, the `/readyz` endpoint is registered and returns
45/// 200 only after the flag is set to `true`. When `None`, the endpoint is
46/// not registered (backwards-compatible).
47///
48/// The `shutdown` token is propagated to long-poll and SSE handlers so they
49/// can observe server shutdown and drain in-flight connections cleanly.
50pub fn build_router_with_ready<S: Storage + 'static>(
51    storage: Arc<S>,
52    config: &Config,
53    ready: Option<Arc<AtomicBool>>,
54    shutdown: CancellationToken,
55) -> Router {
56    let stream_base_path = Arc::<str>::from(config.stream_base_path.as_str());
57    let mut app = Router::new()
58        .route("/healthz", get(handlers::health::health_check))
59        .nest(
60            stream_base_path.as_ref(),
61            protocol_routes(storage, config, shutdown, Arc::clone(&stream_base_path)),
62        );
63
64    if let Some(flag) = ready {
65        app = app
66            .route("/readyz", get(handlers::health::readiness_check))
67            .layer(Extension(flag));
68    }
69
70    app.layer(axum_middleware::from_fn(
71        middleware::telemetry::track_requests,
72    ))
73    .layer(cors_layer(&config.cors_origins))
74}
75
76/// Build a CORS layer from the configured origins string.
77///
78/// Accepts `"*"` for permissive (any origin) or a comma-separated list of
79/// allowed origins (e.g. `"http://localhost:3000,https://app.example.com"`).
80fn cors_layer(origins: &str) -> CorsLayer {
81    let allow_origin = if origins == "*" {
82        AllowOrigin::any()
83    } else {
84        let values: Vec<HeaderValue> = origins
85            .split(',')
86            .filter_map(|s| s.trim().parse().ok())
87            .collect();
88        AllowOrigin::list(values)
89    };
90
91    CorsLayer::new()
92        .allow_origin(allow_origin)
93        .allow_methods(tower_http::cors::Any)
94        .allow_headers(tower_http::cors::Any)
95        .expose_headers(tower_http::cors::Any)
96}
97
98/// Protocol routes under /v1/stream
99///
100/// All protocol routes have security headers applied via middleware.
101fn protocol_routes<S: Storage + 'static>(
102    storage: Arc<S>,
103    config: &Config,
104    shutdown: CancellationToken,
105    stream_base_path: Arc<str>,
106) -> Router {
107    Router::new()
108        .route(
109            "/{*name}",
110            get(handlers::get::read_stream::<S>)
111                .put(handlers::put::create_stream::<S>)
112                .head(handlers::head::stream_metadata::<S>)
113                .post(handlers::post::append_data::<S>)
114                .delete(handlers::delete::delete_stream::<S>),
115        )
116        .layer(Extension(StreamNameLimits {
117            max_bytes: config.max_stream_name_bytes,
118            max_segments: config.max_stream_name_segments,
119        }))
120        .layer(Extension(ShutdownToken(shutdown)))
121        .layer(Extension(SseReconnectInterval(
122            config.sse_reconnect_interval_secs,
123        )))
124        .layer(Extension(LongPollTimeout(config.long_poll_timeout)))
125        .layer(Extension(StreamBasePath(stream_base_path)))
126        .layer(axum_middleware::from_fn(
127            middleware::security::add_security_headers,
128        ))
129        .with_state(storage)
130}