Skip to main content

scp_node/
http.rs

1//! HTTP routing for [`ApplicationNode`].
2//!
3//! Provides axum routers for `.well-known/scp` and `/scp/v1` WebSocket
4//! upgrade, plus the `serve()` method that merges application routes with
5//! SCP routes on a single HTTPS listener.
6//!
7//! See spec section 18.6.2 for the SDK surface.
8
9use std::collections::HashMap;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use axum::Router;
15use axum::extract::State;
16use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
17use axum::http::StatusCode;
18use axum::response::IntoResponse;
19use axum::routing::get;
20use futures::{SinkExt, StreamExt};
21use tokio::sync::{RwLock, Semaphore};
22use tokio_util::sync::CancellationToken;
23use tower_http::cors::{AllowOrigin, CorsLayer};
24use zeroize::Zeroizing;
25
26use scp_identity::document::DidDocument;
27use scp_platform::traits::Storage;
28use scp_transport::native::server::RelayConfig as TransportRelayConfig;
29use scp_transport::native::storage::BlobStorageBackend;
30use scp_transport::relay::rate_limit::{ConnectionTracker, PublishRateLimiter};
31use scp_transport::relay::subscription::SubscriptionRegistry;
32
33use crate::tls;
34
35use crate::projection::ProjectedContext;
36use crate::well_known::well_known_handler;
37use crate::{ApplicationNode, NodeError};
38
39// ---------------------------------------------------------------------------
40// Shared state
41// ---------------------------------------------------------------------------
42
43/// Registered broadcast context for `.well-known/scp` generation.
44///
45/// Stored in [`NodeState`] and read on each `.well-known/scp` request.
46#[derive(Debug, Clone)]
47pub struct BroadcastContext {
48    /// Context ID (hex-encoded).
49    pub id: String,
50    /// Human-readable name (advisory).
51    pub name: Option<String>,
52}
53
54/// Shared state accessible by axum handlers.
55///
56/// Contains the node's identity information, registered broadcast
57/// contexts, dev API configuration, and blob storage reference.
58/// Read on every `.well-known/scp` request to generate the response
59/// dynamically (spec section 18.6.4).
60///
61/// Blob storage uses [`BlobStorageBackend`] (enum dispatch), shared between
62/// the relay server and projection handlers via `Arc` (spec section 18.11.5).
63pub struct NodeState {
64    /// The operator's DID string.
65    pub(crate) did: String,
66    /// The relay URL (e.g., `wss://example.com/scp/v1`).
67    pub(crate) relay_url: String,
68    /// Registered broadcast contexts, keyed by lowercase hex context ID.
69    /// Modified via [`ApplicationNode::register_broadcast_context`].
70    pub(crate) broadcast_contexts: RwLock<HashMap<String, BroadcastContext>>,
71    /// The relay server's bound address for WebSocket bridge connections.
72    pub(crate) relay_addr: SocketAddr,
73    /// Shared secret for authenticating internal bridge connections.
74    ///
75    /// Generated at startup and included as an `Authorization: Bearer`
76    /// header when the axum handler connects to the internal relay. The
77    /// relay validates this token during the WebSocket handshake
78    /// (defense-in-depth, #85). Moved from query parameter to header to
79    /// prevent leakage via server logs or error messages (#225).
80    /// Wrapped in `Zeroizing` so the secret is zeroed on drop.
81    pub(crate) bridge_secret: Zeroizing<[u8; 32]>,
82    /// Bearer token for the dev API (`scp_local_token_<32 hex chars>`).
83    ///
84    /// `Some` when `local_api()` was called on the builder, `None` otherwise.
85    /// See spec section 18.10.2.
86    pub(crate) dev_token: Option<String>,
87    /// Bind address for the dev API server.
88    ///
89    /// `Some` when `local_api()` was called on the builder, `None` otherwise.
90    /// See spec section 18.10.2.
91    pub(crate) dev_bind_addr: Option<SocketAddr>,
92    /// Registry of broadcast contexts whose messages are projected (decrypted
93    /// and served) by this node's HTTP endpoints. Keyed by routing ID.
94    ///
95    /// See spec section 18.11.5.
96    pub(crate) projected_contexts: RwLock<HashMap<[u8; 32], ProjectedContext>>,
97    /// Shared blob storage instance, used by both the relay server and
98    /// projection handlers to read stored blobs.
99    ///
100    /// See spec section 18.11.5.
101    pub(crate) blob_storage: Arc<BlobStorageBackend>,
102    /// Relay operational parameters exposed in `.well-known/scp`
103    /// `relay_config` (spec section 18.3.3).
104    pub(crate) relay_config: TransportRelayConfig,
105    /// The instant the node was started, used to compute uptime for the
106    /// dev API health endpoint (spec section 18.10.3).
107    pub(crate) start_time: Instant,
108    /// Bind address for the public HTTP server used by [`ApplicationNode::serve`].
109    ///
110    /// Separate from `relay_addr` (the relay's internal listener) to avoid
111    /// double-binding the same port (#224). Defaults to `0.0.0.0:8443`.
112    pub(crate) http_bind_addr: SocketAddr,
113    /// Shared cancellation token for graceful shutdown of both the public
114    /// HTTPS listener and the dev API listener. Cancelled by
115    /// [`ApplicationNode::shutdown`].
116    ///
117    /// See SCP-245 action item: "Ensure graceful shutdown of dev API
118    /// listener alongside main server."
119    pub(crate) shutdown_token: CancellationToken,
120    /// CORS allowed origins for public endpoints (`.well-known/scp`,
121    /// broadcast projection). `None` means permissive (`*`); `Some(list)`
122    /// restricts to exactly those origins.
123    ///
124    /// See issue #231.
125    pub(crate) cors_origins: Option<Vec<String>>,
126    /// Per-IP token-bucket rate limiter for broadcast projection endpoints.
127    ///
128    /// Limits the request rate from each source IP to prevent abuse of the
129    /// public, unauthenticated projection endpoints that perform crypto
130    /// decryption and blob reads per request. Returns HTTP 429 when exceeded.
131    ///
132    /// Configurable via `SCP_NODE_PROJECTION_RATE_LIMIT` (default 60 req/s).
133    /// See spec section 18.11.6.
134    pub(crate) projection_rate_limiter: PublishRateLimiter,
135    /// TLS configuration for the public HTTPS listener.
136    ///
137    /// When `Some`, [`ApplicationNode::serve`] terminates TLS using
138    /// [`tokio_rustls::TlsAcceptor`] before passing connections to axum.
139    /// When `None` (no-domain mode or explicit opt-out), the listener serves
140    /// plain HTTP/WS (spec section 10.12.8).
141    ///
142    /// Built from [`tls::CertificateData`] during [`ApplicationNodeBuilder::build`]
143    /// via [`tls::build_reloadable_tls_config`] (spec section 18.6.3).
144    pub(crate) tls_config: Option<Arc<rustls::ServerConfig>>,
145    /// The TLS certificate resolver for ACME hot-reload.
146    ///
147    /// When `Some`, the ACME renewal loop can call [`CertResolver::update`]
148    /// to hot-swap certificates without restarting the server.
149    /// When `None` (no-domain mode), ACME is not active.
150    ///
151    /// See spec section 18.6.3 (auto-renewal).
152    pub(crate) cert_resolver: Option<Arc<crate::tls::CertResolver>>,
153    /// The operator's DID document, populated at build time.
154    ///
155    /// Used by the dev API identity endpoint to return the full document
156    /// (spec section 18.10.3).
157    pub(crate) did_document: DidDocument,
158    /// Shared connection tracker from the relay server.
159    ///
160    /// Tracks active connections per IP address across all transports.
161    /// Used by the dev API health and relay status endpoints to report
162    /// real connection counts (spec section 18.10.3).
163    pub(crate) connection_tracker: ConnectionTracker,
164    /// Shared subscription registry from the relay server.
165    ///
166    /// Maps routing IDs to subscriber entries. Used by the dev API
167    /// context endpoint to report real subscriber counts (spec section 18.10.3).
168    pub(crate) subscription_registry: SubscriptionRegistry,
169    /// Shared ACME challenge map (token → key authorization).
170    ///
171    /// Mounted in [`serve()`](crate::ApplicationNode::serve) at
172    /// `GET /.well-known/acme-challenge/{token}` so that ACME renewal
173    /// challenges can be served without restarting the server (issue #305).
174    /// When `None` (no-domain or self-signed mode), no challenge router is
175    /// mounted.
176    pub(crate) acme_challenges: Option<Arc<RwLock<HashMap<String, String>>>>,
177
178    /// Shared state for bridge shadow operations.
179    ///
180    /// Holds per-context shadow registries and sender key stores for the
181    /// bridge shadow creation endpoint (`POST /v1/scp/bridge/shadow`).
182    /// See SCP-BCH-002.
183    pub(crate) bridge_state: Arc<crate::bridge_handlers::BridgeState>,
184}
185
186// ---------------------------------------------------------------------------
187// CORS layer construction
188// ---------------------------------------------------------------------------
189
190/// Constructs a [`CorsLayer`] from the node's CORS configuration.
191///
192/// - `None` origins: permissive (`Access-Control-Allow-Origin: *`).
193/// - `Some(list)`: restricts to exactly the listed origins.
194///
195/// Applied to public endpoints (`.well-known/scp`, broadcast projection)
196/// so browser-based JavaScript / WASM clients can read responses cross-origin
197/// (issue #231). Not applied to the WebSocket relay endpoint (WebSocket
198/// upgrades have their own origin mechanism) or the dev API (localhost-only).
199pub fn build_cors_layer(origins: &Option<Vec<String>>) -> CorsLayer {
200    let allow_origin = origins.as_ref().map_or_else(AllowOrigin::any, |list| {
201        let parsed: Vec<axum::http::HeaderValue> = list
202            .iter()
203            .filter_map(|o| {
204                o.parse().map_or_else(
205                    |_| {
206                        tracing::warn!(
207                            origin = %o,
208                            "ignoring invalid CORS origin; \
209                             this may make endpoints more permissive than intended"
210                        );
211                        None
212                    },
213                    Some,
214                )
215            })
216            .collect();
217        AllowOrigin::list(parsed)
218    });
219    CorsLayer::new()
220        .allow_origin(allow_origin)
221        .allow_methods([axum::http::Method::GET, axum::http::Method::OPTIONS])
222        .allow_headers([
223            axum::http::header::CONTENT_TYPE,
224            axum::http::header::IF_NONE_MATCH,
225        ])
226}
227
228// ---------------------------------------------------------------------------
229// Constants
230// ---------------------------------------------------------------------------
231
232/// Idle timeout for bridge WebSocket connections (5 minutes).
233///
234/// If no data flows in either direction for this duration, the bridge
235/// connection is closed. This prevents stale connections from holding
236/// resources indefinitely (#229).
237const BRIDGE_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
238
239// ---------------------------------------------------------------------------
240// Router constructors
241// ---------------------------------------------------------------------------
242
243/// Returns an axum [`Router`] serving `GET /.well-known/scp`.
244///
245/// The handler dynamically generates the `.well-known/scp` JSON document
246/// from the provided [`NodeState`]. See spec section 18.3.
247pub fn well_known_router(state: Arc<NodeState>) -> Router {
248    Router::new()
249        .route("/.well-known/scp", get(well_known_handler))
250        .with_state(state)
251}
252
253/// Returns an axum [`Router`] handling WebSocket upgrade at `/scp/v1`.
254///
255/// Incoming WebSocket connections are bridged to the node's internal
256/// relay server. The axum handler upgrades the HTTP connection to
257/// WebSocket, then connects to the relay on localhost and forwards
258/// frames bidirectionally.
259///
260/// An `Arc<Semaphore>` caps concurrent bridge connections to
261/// `max_total_connections` from the relay config. The permit is acquired
262/// before the HTTP 101 upgrade and held inside the `on_upgrade` closure
263/// for the entire WebSocket connection lifetime — ensuring the semaphore
264/// accurately tracks active connections, not just in-flight upgrade
265/// requests (#229).
266pub fn relay_router(state: Arc<NodeState>) -> Router {
267    let bridge_semaphore = Arc::new(Semaphore::new(state.relay_config.max_total_connections));
268    Router::new()
269        .route("/scp/v1", get(ws_upgrade_handler))
270        .with_state((state, bridge_semaphore))
271}
272
273/// Axum handler for WebSocket upgrade at `/scp/v1`.
274///
275/// Acquires a semaphore permit before upgrading; the permit is moved into
276/// the `on_upgrade` closure so it is held for the entire WebSocket
277/// connection lifetime. If the HTTP 101 upgrade never completes, axum
278/// drops the response — which drops the closure and releases the permit.
279/// Returns 503 Service Unavailable when the bridge is at capacity.
280async fn ws_upgrade_handler(
281    ws: WebSocketUpgrade,
282    State((state, sem)): State<(Arc<NodeState>, Arc<Semaphore>)>,
283) -> impl IntoResponse {
284    let Ok(permit) = sem.try_acquire_owned() else {
285        return StatusCode::SERVICE_UNAVAILABLE.into_response();
286    };
287    let relay_addr = state.relay_addr;
288    // Clone the Zeroizing wrapper so the bridge task's copy is also zeroed
289    // on drop. Avoids leaving bare secret bytes on the stack/heap.
290    let bridge_secret = state.bridge_secret.clone();
291    // Move the permit into the closure — it is released when the closure
292    // is dropped, either after the bridge ends or if the upgrade fails.
293    ws.on_upgrade(move |socket| async move {
294        let _permit = permit; // bind to ensure drop at end of scope
295        relay_bridge(socket, relay_addr, bridge_secret).await;
296    })
297    .into_response()
298}
299
300/// Bridges an axum WebSocket to the internal relay server.
301///
302/// Connects to the relay at `relay_addr` with the bridge secret included
303/// as an `Authorization: Bearer <hex>` header, then forwards frames in
304/// both directions until either side closes or the connection is idle for
305/// [`BRIDGE_IDLE_TIMEOUT`]. Sends explicit WebSocket close frames on
306/// both sides when the bridge terminates.
307///
308/// The secret is transmitted via HTTP header rather than query parameter
309/// to prevent leakage through server logs, error messages, or debug
310/// output (#225).
311///
312/// The idle timeout resets only on data frames (Text/Binary), not on
313/// Ping/Pong control frames. This prevents an attacker from keeping
314/// connections alive indefinitely by sending pings without real data
315/// (#229).
316async fn relay_bridge(
317    axum_ws: WebSocket,
318    relay_addr: SocketAddr,
319    bridge_secret: Zeroizing<[u8; 32]>,
320) {
321    use tokio_tungstenite::tungstenite::client::IntoClientRequest;
322    let token_hex = scp_transport::native::server::hex_encode_32(&bridge_secret);
323    let url = format!("ws://{relay_addr}/");
324    let mut request = match url.into_client_request() {
325        Ok(r) => r,
326        Err(e) => {
327            tracing::error!(
328                addr = %relay_addr,
329                error = %e,
330                "failed to build WebSocket request for internal relay bridge"
331            );
332            return;
333        }
334    };
335    // Safety: the token is a 64-char lowercase hex string — always valid
336    // as an HTTP header value. `parse()` only fails on non-visible ASCII
337    // or control characters, which hex digits never contain.
338    let Ok(header_value) = format!("Bearer {token_hex}").parse() else {
339        tracing::error!("bridge token produced invalid HTTP header value");
340        return;
341    };
342    request.headers_mut().insert("Authorization", header_value);
343    let relay_conn = tokio_tungstenite::connect_async(request).await;
344
345    let Ok((relay_ws, _)) = relay_conn else {
346        tracing::error!(
347            addr = %relay_addr,
348            "failed to connect to internal relay for WebSocket bridge"
349        );
350        return;
351    };
352
353    let (mut relay_sink, mut relay_source) = relay_ws.split();
354    let (mut axum_sink, mut axum_source) = axum_ws.split();
355
356    // Idle timeout: resets only on data frames (Text/Binary), not control frames.
357    let idle_timeout = tokio::time::sleep(BRIDGE_IDLE_TIMEOUT);
358    tokio::pin!(idle_timeout);
359
360    loop {
361        tokio::select! {
362            msg = StreamExt::next(&mut axum_source) => {
363                match msg {
364                    Some(Ok(Message::Close(_)) | Err(_)) | None => break,
365                    Some(Ok(msg)) => {
366                        let relay_msg = match msg {
367                            Message::Text(t) => {
368                                idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
369                                tokio_tungstenite::tungstenite::Message::Text(t.to_string())
370                            }
371                            Message::Binary(b) => {
372                                idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
373                                tokio_tungstenite::tungstenite::Message::Binary(b.to_vec())
374                            }
375                            Message::Ping(p) => tokio_tungstenite::tungstenite::Message::Ping(p.to_vec()),
376                            Message::Pong(p) => tokio_tungstenite::tungstenite::Message::Pong(p.to_vec()),
377                            Message::Close(_) => break,
378                        };
379                        if let Err(e) = SinkExt::send(&mut relay_sink, relay_msg).await {
380                            tracing::debug!(
381                                direction = "client->relay",
382                                error = %e,
383                                "bridge forwarding failed"
384                            );
385                            break;
386                        }
387                    }
388                }
389            }
390            msg = StreamExt::next(&mut relay_source) => {
391                match msg {
392                    Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_)) | Err(_)) | None => break,
393                    Some(Ok(msg)) => {
394                        let axum_msg = match msg {
395                            tokio_tungstenite::tungstenite::Message::Text(t) => {
396                                idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
397                                Message::Text(t.into())
398                            }
399                            tokio_tungstenite::tungstenite::Message::Binary(b) => {
400                                idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
401                                Message::Binary(b.into())
402                            }
403                            tokio_tungstenite::tungstenite::Message::Ping(p) => Message::Ping(p.into()),
404                            tokio_tungstenite::tungstenite::Message::Pong(p) => Message::Pong(p.into()),
405                            tokio_tungstenite::tungstenite::Message::Close(_) => break,
406                            tokio_tungstenite::tungstenite::Message::Frame(_) => continue,
407                        };
408                        if let Err(e) = SinkExt::send(&mut axum_sink, axum_msg).await {
409                            tracing::debug!(
410                                direction = "relay->client",
411                                error = %e,
412                                "bridge forwarding failed"
413                            );
414                            break;
415                        }
416                    }
417                }
418            }
419            () = &mut idle_timeout => {
420                tracing::debug!("bridge connection idle timeout reached");
421                break;
422            }
423        }
424    }
425
426    // Send explicit close frames (best-effort, ignore errors on close).
427    // SplitSink::drop() does NOT send close frames in tokio-tungstenite 0.24.
428    let _ = SinkExt::close(&mut relay_sink).await;
429    let _ = SinkExt::close(&mut axum_sink).await;
430}
431
432// ---------------------------------------------------------------------------
433// ApplicationNode HTTP methods
434// ---------------------------------------------------------------------------
435
436impl<S: Storage + Send + Sync + 'static> ApplicationNode<S> {
437    /// Returns an axum [`Router`] serving `GET /.well-known/scp`.
438    ///
439    /// The response is dynamically generated from the node's current state:
440    /// DID, relay URL, and registered broadcast contexts. Content-Type is
441    /// `application/json` (provided by axum's `Json` extractor).
442    ///
443    /// See spec section 18.3.
444    #[must_use = "returns the well-known router, which must be mounted into an axum application"]
445    pub fn well_known_router(&self) -> Router {
446        well_known_router(Arc::clone(&self.state))
447    }
448
449    /// Returns an axum [`Router`] handling WebSocket upgrade at `/scp/v1`.
450    ///
451    /// Incoming connections are bridged to the node's internal relay server.
452    ///
453    /// See spec section 18.6.2.
454    #[must_use = "returns the relay router, which must be mounted into an axum application"]
455    pub fn relay_router(&self) -> Router {
456        relay_router(Arc::clone(&self.state))
457    }
458
459    /// Returns an axum [`Router`] serving broadcast projection endpoints.
460    ///
461    /// Includes:
462    /// - `GET /scp/broadcast/<routing_id>/feed` -- recent messages feed
463    /// - `GET /scp/broadcast/<routing_id>/messages/<blob_id>` -- single message
464    ///
465    /// These are **public endpoints** with no authentication middleware --
466    /// broadcast content is intended for broad distribution (spec section
467    /// 18.11.6).
468    ///
469    /// Served on the public HTTPS port alongside `.well-known/scp` and
470    /// `/scp/v1`.
471    ///
472    /// See spec section 18.11.8.
473    #[must_use = "returns the projection router, which must be mounted into an axum application"]
474    pub fn broadcast_projection_router(&self) -> Router {
475        crate::projection::broadcast_projection_router(Arc::clone(&self.state))
476    }
477
478    /// Returns an axum [`Router`] serving bridge endpoints.
479    ///
480    /// Includes `POST /v1/scp/bridge/shadow` for shadow identity creation.
481    /// Requires bridge authentication middleware to be applied by the caller.
482    ///
483    /// See SCP-BCH-002 and spec section 12.10.
484    #[must_use = "returns the bridge router, which must be mounted into an axum application"]
485    pub fn bridge_router(&self) -> Router {
486        crate::bridge_handlers::bridge_router(Arc::clone(&self.state.bridge_state))
487    }
488
489    /// Returns the dev API router if the dev API is enabled.
490    ///
491    /// Returns `Some(Router)` when [`ApplicationNodeBuilder::local_api`] was
492    /// called (i.e., a dev token was generated), `None` otherwise. The
493    /// returned router includes all `/scp/dev/v1/*` routes with bearer token
494    /// middleware applied.
495    ///
496    /// See spec section 18.10.5.
497    #[must_use = "returns the dev API router, which must be served on a separate listener"]
498    pub fn dev_router(&self) -> Option<Router> {
499        let token = self.state.dev_token.clone()?;
500        Some(crate::dev_api::dev_router(Arc::clone(&self.state), token))
501    }
502
503    /// Takes ownership of the node and starts serving HTTP traffic.
504    ///
505    /// Binds HTTPS (or plain HTTP when TLS is not configured) on the
506    /// configured address, merging:
507    ///
508    /// 1. Application-provided routes (`app_router`)
509    /// 2. `.well-known/scp` route
510    /// 3. `/scp/v1` WebSocket upgrade route
511    /// 4. `/scp/broadcast/*` broadcast projection routes
512    ///
513    /// SCP routes take precedence for `/.well-known/scp`, `/scp/v1`, and
514    /// `/scp/broadcast/*`. All other paths route to `app_router`.
515    ///
516    /// ## TLS termination
517    ///
518    /// When a TLS configuration was provisioned during
519    /// [`ApplicationNodeBuilder::build`] (domain mode with successful ACME or
520    /// injected TLS provider), `serve()` terminates TLS using
521    /// [`tokio_rustls::TlsAcceptor`] and serves HTTPS/WSS. When no TLS
522    /// configuration is present (no-domain mode, or TLS provisioning opted
523    /// out), `serve()` falls back to plain HTTP/WS. See spec section 18.6.3.
524    ///
525    /// ## Dev API
526    ///
527    /// When the dev API is configured (via [`ApplicationNodeBuilder::local_api`]),
528    /// a separate tokio task is spawned to serve the dev API on the configured
529    /// address. The dev API listener runs concurrently with the public
530    /// listener. When the dev API is not configured, `serve()` behaves exactly
531    /// as before -- no additional listener is spawned. The dev API always
532    /// uses plain HTTP (it is bound to loopback only).
533    ///
534    /// ## HTTP/3
535    ///
536    /// When the `http3` feature is enabled and an [`Http3Config`] is provided
537    /// via [`ApplicationNodeBuilder::http3`], an HTTP/3 listener is started
538    /// on a separate QUIC endpoint. All HTTP/1.1 and HTTP/2 responses include
539    /// an `Alt-Svc` header advertising the HTTP/3 endpoint (spec section
540    /// 10.15.1).
541    ///
542    /// ## Graceful shutdown
543    ///
544    /// The `shutdown` future is awaited as a graceful shutdown signal: when
545    /// it completes, the server stops accepting new connections, drains
546    /// in-flight requests, cancels the internal shutdown token (stopping
547    /// the dev API listener if running), and shuts down the relay server.
548    /// The node is consumed -- callers do not need to call
549    /// [`ApplicationNode::shutdown`] separately.
550    ///
551    /// If the dev API task exits early (e.g., bind failure), the shutdown
552    /// token is cancelled and the error is propagated. Likewise, if the
553    /// main server exits first, the dev API task is cancelled via the
554    /// shutdown token and aborted.
555    ///
556    /// See spec sections 18.6.3, 18.10.5, and 18.11.8.
557    ///
558    /// # Errors
559    ///
560    /// Returns [`NodeError::Serve`] if either server cannot bind or
561    /// encounters a fatal I/O error.
562    pub async fn serve(
563        self,
564        app_router: Router,
565        shutdown: impl std::future::Future<Output = ()> + Send + 'static,
566    ) -> Result<(), NodeError> {
567        spawn_projection_rate_limit_cleanup(
568            self.state.projection_rate_limiter.clone(),
569            self.state.shutdown_token.clone(),
570        );
571
572        let cors = build_cors_layer(&self.state.cors_origins);
573
574        // Apply CORS to public endpoints only. The WebSocket relay endpoint
575        // uses its own origin mechanism; the dev API is localhost-only.
576        let well_known = well_known_router(Arc::clone(&self.state)).layer(cors.clone());
577        let relay_rt = relay_router(Arc::clone(&self.state));
578        let projection =
579            crate::projection::broadcast_projection_router(Arc::clone(&self.state)).layer(cors);
580
581        let dev_router = self
582            .state
583            .dev_token
584            .clone()
585            .map(|t| crate::dev_api::dev_router(Arc::clone(&self.state), t));
586        let dev_bind_addr = self.state.dev_bind_addr;
587        let tls_config = self.state.tls_config.clone();
588        #[cfg(feature = "http3")]
589        let http3_config = self.http3_config;
590
591        let bridge = crate::bridge_handlers::bridge_router(Arc::clone(&self.state.bridge_state));
592        let relay = self.relay;
593        let state = self.state;
594
595        // SCP routes take precedence: merge them last so they override
596        // any conflicting paths in app_router. ACME challenge router is
597        // included for renewal support (issue #305).
598        let merged = build_merged_router(
599            app_router,
600            well_known,
601            relay_rt,
602            projection,
603            bridge,
604            state.acme_challenges.as_ref(),
605        );
606
607        let dev_api_handle = spawn_dev_api(dev_router, dev_bind_addr, state.shutdown_token.clone());
608
609        // Spawn HTTP/3 listener when configured (spec section 10.15.1).
610        #[cfg(feature = "http3")]
611        if let Some(http3_config) = http3_config {
612            spawn_http3_listener(http3_config, &state);
613        }
614
615        let listener = tokio::net::TcpListener::bind(state.http_bind_addr)
616            .await
617            .map_err(|e| NodeError::Serve(e.to_string()))?;
618        let local_addr = listener
619            .local_addr()
620            .map_err(|e| NodeError::Serve(e.to_string()))?;
621        let shutdown_token = state.shutdown_token.clone();
622        let token = shutdown_token.clone();
623        tokio::spawn(async move {
624            shutdown.await;
625            token.cancel();
626        });
627
628        let main_server = build_main_server(
629            listener,
630            merged,
631            tls_config,
632            shutdown_token.clone(),
633            local_addr,
634        );
635
636        // If a dev API task is running, select! on both: if either exits
637        // early we propagate the result. This ensures a dev API bind
638        // failure doesn't go unnoticed while the main server keeps running.
639        let result = match dev_api_handle {
640            Some(handle) => {
641                tokio::pin!(handle);
642                tokio::select! {
643                    result = main_server => {
644                        // Main server exited — cancel shutdown token so the
645                        // dev API task also drains, then abort its handle.
646                        state.shutdown_token.cancel();
647                        handle.abort();
648                        result
649                    }
650                    result = &mut handle => {
651                        // Dev API exited early — cancel shutdown token so the
652                        // main server also drains.
653                        state.shutdown_token.cancel();
654                        // JoinError (task panic/cancel) or NodeError from inner.
655                        match result {
656                            Ok(inner) => inner,
657                            Err(join_err) => {
658                                Err(NodeError::Serve(
659                                    format!("dev API task failed: {join_err}")
660                                ))
661                            }
662                        }
663                    }
664                }
665            }
666            None => main_server.await,
667        };
668
669        // Graceful shutdown: cancel the shutdown token (idempotent if
670        // already cancelled above) and stop the relay server. This ensures
671        // callers don't need to call shutdown() separately.
672        state.shutdown_token.cancel();
673        relay.shutdown_handle.shutdown();
674        tracing::info!("application node shut down");
675
676        result
677    }
678}
679
680// ---------------------------------------------------------------------------
681// Dev API spawning (extracted for clippy::too_many_lines)
682// ---------------------------------------------------------------------------
683
684/// Builds the merged axum router for `serve()`, combining SCP protocol
685/// routes (well-known, relay, projection, ACME challenges) with the
686/// application router. Extracted from `serve()` for clippy line limits.
687fn build_merged_router(
688    app_router: Router,
689    well_known: Router,
690    relay_rt: Router,
691    projection: Router,
692    bridge: Router,
693    acme_challenges: Option<&Arc<RwLock<HashMap<String, String>>>>,
694) -> Router {
695    let merged = app_router
696        .merge(well_known)
697        .merge(relay_rt)
698        .merge(projection)
699        .merge(bridge);
700
701    // Mount ACME challenge router for renewal challenges (issue #305).
702    // Serves `GET /.well-known/acme-challenge/{token}` so the ACME CA can
703    // validate domain ownership during certificate renewal.
704    if let Some(challenges) = acme_challenges {
705        merged.merge(tls::acme_challenge_router(Arc::clone(challenges)))
706    } else {
707        merged
708    }
709}
710
711/// Spawns the dev API listener if configured.
712///
713/// Returns a `JoinHandle` so the caller can detect early exit (e.g., bind
714/// failure) and propagate the error. Dev API always uses plain HTTP
715/// (loopback-only, spec section 18.10.5).
716fn spawn_dev_api(
717    dev_router: Option<Router>,
718    dev_bind_addr: Option<SocketAddr>,
719    shutdown_token: CancellationToken,
720) -> Option<tokio::task::JoinHandle<Result<(), NodeError>>> {
721    let (Some(dev_router), Some(dev_addr)) = (dev_router, dev_bind_addr) else {
722        return None;
723    };
724    Some(tokio::spawn(async move {
725        let dev_listener = tokio::net::TcpListener::bind(dev_addr).await.map_err(|e| {
726            NodeError::Serve(format!("failed to bind dev API server on {dev_addr}: {e}"))
727        })?;
728        let local_addr = dev_listener.local_addr().unwrap_or(dev_addr);
729        tracing::info!(addr = %local_addr, "dev API server started");
730
731        axum::serve(dev_listener, dev_router)
732            .with_graceful_shutdown(shutdown_token.cancelled_owned())
733            .await
734            .map_err(|e| NodeError::Serve(format!("dev API server error: {e}")))
735    }))
736}
737
738// ---------------------------------------------------------------------------
739// Main server construction (extracted for clippy::too_many_lines)
740// ---------------------------------------------------------------------------
741
742/// Builds the main server future, branching on TLS configuration.
743///
744/// Returns a boxed future that resolves when the server shuts down.
745/// Extracted from `serve()` for clippy line limits.
746fn build_main_server(
747    listener: tokio::net::TcpListener,
748    merged: Router,
749    tls_config: Option<Arc<rustls::ServerConfig>>,
750    shutdown_token: CancellationToken,
751    local_addr: SocketAddr,
752) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), NodeError>> + Send>> {
753    if let Some(tls_cfg) = tls_config {
754        tracing::info!(
755            addr = %local_addr, scheme = "HTTPS",
756            "application node server started (TLS active)"
757        );
758        Box::pin(tls::serve_tls(listener, tls_cfg, merged, shutdown_token))
759    } else {
760        tracing::info!(
761            addr = %local_addr, scheme = "HTTP",
762            "application node server started (plain HTTP, broadcast projection endpoints active)"
763        );
764        Box::pin(async move {
765            axum::serve(listener, merged)
766                .with_graceful_shutdown(shutdown_token.cancelled_owned())
767                .await
768                .map_err(|e| NodeError::Serve(e.to_string()))
769        })
770    }
771}
772
773// ---------------------------------------------------------------------------
774// Projection rate limiter cleanup (extracted for clippy::too_many_lines)
775// ---------------------------------------------------------------------------
776
777/// Spawns the background cleanup loop for the projection rate limiter.
778///
779/// Evicts stale per-IP token buckets every 60 seconds (buckets idle for more
780/// than 300 seconds). Runs until `shutdown_token` is cancelled.
781fn spawn_projection_rate_limit_cleanup(
782    limiter: PublishRateLimiter,
783    shutdown_token: CancellationToken,
784) {
785    tokio::spawn(async move {
786        limiter
787            .cleanup_loop(
788                Duration::from_secs(60),
789                Duration::from_secs(300),
790                shutdown_token,
791            )
792            .await;
793    });
794}
795
796// ---------------------------------------------------------------------------
797// Tests
798// ---------------------------------------------------------------------------
799
800#[cfg(test)]
801#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
802mod tests {
803    use std::collections::HashMap;
804    use std::net::SocketAddr;
805    use std::time::Instant;
806
807    use axum::body::Body;
808    use axum::http::{Method, Request, StatusCode};
809    use tokio::sync::RwLock;
810    use tower::ServiceExt;
811
812    use scp_transport::native::storage::BlobStorageBackend;
813
814    use super::*;
815
816    /// Creates a minimal `NodeState` for CORS tests.
817    fn test_state(cors_origins: Option<Vec<String>>) -> Arc<NodeState> {
818        Arc::new(NodeState {
819            did: "did:dht:cors_test".to_owned(),
820            relay_url: "wss://localhost/scp/v1".to_owned(),
821            broadcast_contexts: RwLock::new(HashMap::new()),
822            relay_addr: "127.0.0.1:9000".parse::<SocketAddr>().unwrap(),
823            bridge_secret: Zeroizing::new([0u8; 32]),
824            dev_token: None,
825            dev_bind_addr: None,
826            projected_contexts: RwLock::new(HashMap::new()),
827            blob_storage: Arc::new(BlobStorageBackend::default()),
828            relay_config: scp_transport::native::server::RelayConfig::default(),
829            start_time: Instant::now(),
830            http_bind_addr: SocketAddr::from(([0, 0, 0, 0], 8443)),
831            shutdown_token: CancellationToken::new(),
832            cors_origins,
833            projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
834                1000,
835            ),
836            tls_config: None,
837            cert_resolver: None,
838            did_document: scp_identity::document::DidDocument {
839                context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
840                id: "did:dht:cors_test".to_owned(),
841                verification_method: vec![],
842                authentication: vec![],
843                assertion_method: vec![],
844                also_known_as: vec![],
845                service: vec![],
846            },
847            connection_tracker: scp_transport::relay::rate_limit::new_connection_tracker(),
848            subscription_registry: scp_transport::relay::subscription::new_registry(),
849            acme_challenges: None,
850            bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
851        })
852    }
853
854    #[tokio::test]
855    async fn cors_permissive_well_known_returns_wildcard_origin() {
856        let state = test_state(None);
857        let cors = build_cors_layer(&state.cors_origins);
858        let router = well_known_router(state).layer(cors);
859
860        let req = Request::builder()
861            .uri("/.well-known/scp")
862            .header("Origin", "https://example.com")
863            .body(Body::empty())
864            .unwrap();
865
866        let resp = router.oneshot(req).await.unwrap();
867        assert_eq!(resp.status(), StatusCode::OK);
868
869        let acao = resp
870            .headers()
871            .get("access-control-allow-origin")
872            .expect("should have ACAO header")
873            .to_str()
874            .unwrap();
875        assert_eq!(acao, "*", "permissive mode should return wildcard origin");
876    }
877
878    #[tokio::test]
879    async fn cors_restricted_well_known_allows_matching_origin() {
880        let origins = Some(vec!["https://allowed.example".to_owned()]);
881        let state = test_state(origins);
882        let cors = build_cors_layer(&state.cors_origins);
883        let router = well_known_router(state).layer(cors);
884
885        let req = Request::builder()
886            .uri("/.well-known/scp")
887            .header("Origin", "https://allowed.example")
888            .body(Body::empty())
889            .unwrap();
890
891        let resp = router.oneshot(req).await.unwrap();
892        assert_eq!(resp.status(), StatusCode::OK);
893
894        let acao = resp
895            .headers()
896            .get("access-control-allow-origin")
897            .expect("should have ACAO header for allowed origin")
898            .to_str()
899            .unwrap();
900        assert_eq!(acao, "https://allowed.example");
901    }
902
903    #[tokio::test]
904    async fn cors_restricted_well_known_rejects_non_matching_origin() {
905        let origins = Some(vec!["https://allowed.example".to_owned()]);
906        let state = test_state(origins);
907        let cors = build_cors_layer(&state.cors_origins);
908        let router = well_known_router(state).layer(cors);
909
910        let req = Request::builder()
911            .uri("/.well-known/scp")
912            .header("Origin", "https://evil.example")
913            .body(Body::empty())
914            .unwrap();
915
916        let resp = router.oneshot(req).await.unwrap();
917        // The response should succeed (CORS doesn't block the response,
918        // it omits the ACAO header so the browser rejects it client-side).
919        assert_eq!(resp.status(), StatusCode::OK);
920        assert!(
921            resp.headers().get("access-control-allow-origin").is_none(),
922            "non-matching origin should NOT receive ACAO header"
923        );
924    }
925
926    #[tokio::test]
927    async fn cors_preflight_options_returns_200() {
928        let state = test_state(None);
929        let cors = build_cors_layer(&state.cors_origins);
930        let router = well_known_router(state).layer(cors);
931
932        let req = Request::builder()
933            .method(Method::OPTIONS)
934            .uri("/.well-known/scp")
935            .header("Origin", "https://example.com")
936            .header("Access-Control-Request-Method", "GET")
937            .body(Body::empty())
938            .unwrap();
939
940        let resp = router.oneshot(req).await.unwrap();
941        assert_eq!(resp.status(), StatusCode::OK);
942
943        let acao = resp
944            .headers()
945            .get("access-control-allow-origin")
946            .expect("preflight should include ACAO")
947            .to_str()
948            .unwrap();
949        assert_eq!(acao, "*");
950
951        let methods = resp
952            .headers()
953            .get("access-control-allow-methods")
954            .expect("preflight should include allow-methods")
955            .to_str()
956            .unwrap();
957        assert!(methods.contains("GET"), "should allow GET method");
958    }
959}
960
961/// Spawns the HTTP/3 listener in a background task (spec §10.15.1).
962///
963/// Extracted from [`ApplicationNode::serve`] to keep the `serve()` method
964/// within the clippy line limit. Creates a `RequestHandler` that
965/// serves the full `.well-known/scp` document (same as the Axum handler)
966/// over HTTP/3 and returns 404 for all other paths.
967///
968/// The handler holds an `Arc<NodeState>` so it can build the same
969/// complete document that the HTTP/1.1+HTTP/2 handler serves, including
970/// relay_config, contexts, handles, and transports (SCP-264).
971#[cfg(feature = "http3")]
972fn spawn_http3_listener(http3_config: scp_transport::http3::Http3Config, state: &Arc<NodeState>) {
973    use scp_transport::http3::Http3Server;
974    use scp_transport::http3::adapter::RequestHandler;
975
976    struct H3RequestHandler {
977        state: Arc<NodeState>,
978        rt: tokio::runtime::Handle,
979    }
980
981    impl RequestHandler for H3RequestHandler {
982        fn handle(
983            &self,
984            method: &str,
985            uri: &str,
986            _headers: &[(String, String)],
987        ) -> axum::http::Response<Vec<u8>> {
988            if method == "GET" && uri == "/.well-known/scp" {
989                // Build the full WellKnownScp document using the same
990                // function as the Axum handler, ensuring identical
991                // responses across transports.
992                let doc = self
993                    .rt
994                    .block_on(crate::well_known::build_well_known_scp(&self.state));
995                let body_bytes = serde_json::to_vec(&doc).unwrap_or_default();
996                axum::http::Response::builder()
997                    .status(200)
998                    .header("content-type", "application/json")
999                    .body(body_bytes)
1000                    .unwrap_or_else(|_| axum::http::Response::new(b"internal error".to_vec()))
1001            } else {
1002                axum::http::Response::builder()
1003                    .status(404)
1004                    .body(b"not found".to_vec())
1005                    .unwrap_or_else(|_| axum::http::Response::new(Vec::new()))
1006            }
1007        }
1008    }
1009
1010    let handler: Arc<dyn RequestHandler> = Arc::new(H3RequestHandler {
1011        state: Arc::clone(state),
1012        rt: tokio::runtime::Handle::current(),
1013    });
1014
1015    tokio::spawn(async move {
1016        let mut server = Http3Server::new(http3_config, handler);
1017        match server.bind() {
1018            Ok(addr) => {
1019                tracing::info!(addr = %addr, "HTTP/3 server started");
1020                if let Err(e) = server.serve().await {
1021                    tracing::error!(error = %e, "HTTP/3 server exited with error");
1022                }
1023            }
1024            Err(e) => {
1025                tracing::error!(error = %e, "failed to bind HTTP/3 server");
1026            }
1027        }
1028    });
1029}