scp_node/http.rs
1//! HTTP routing for [`ApplicationNode`].
2//!
3//! Provides axum routers for `.well-known/scp` and `/scp/v1` WebSocket
4//! upgrade, plus the `serve()` method that merges application routes with
5//! SCP routes on a single HTTPS listener.
6//!
7//! See spec section 18.6.2 for the SDK surface.
8
9use std::collections::HashMap;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use axum::Router;
15use axum::extract::State;
16use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
17use axum::http::StatusCode;
18use axum::response::IntoResponse;
19use axum::routing::get;
20use futures::{SinkExt, StreamExt};
21use tokio::sync::{RwLock, Semaphore};
22use tokio_util::sync::CancellationToken;
23use tower_http::cors::{AllowOrigin, CorsLayer};
24use zeroize::Zeroizing;
25
26use scp_identity::document::DidDocument;
27use scp_platform::traits::Storage;
28use scp_transport::native::server::RelayConfig as TransportRelayConfig;
29use scp_transport::native::storage::BlobStorageBackend;
30use scp_transport::relay::rate_limit::{ConnectionTracker, PublishRateLimiter};
31use scp_transport::relay::subscription::SubscriptionRegistry;
32
33use crate::tls;
34
35use crate::projection::ProjectedContext;
36use crate::well_known::well_known_handler;
37use crate::{ApplicationNode, NodeError};
38
39// ---------------------------------------------------------------------------
40// Shared state
41// ---------------------------------------------------------------------------
42
43/// Registered broadcast context for `.well-known/scp` generation.
44///
45/// Stored in [`NodeState`] and read on each `.well-known/scp` request.
46#[derive(Debug, Clone)]
47pub struct BroadcastContext {
48 /// Context ID (hex-encoded).
49 pub id: String,
50 /// Human-readable name (advisory).
51 pub name: Option<String>,
52}
53
54/// Shared state accessible by axum handlers.
55///
56/// Contains the node's identity information, registered broadcast
57/// contexts, dev API configuration, and blob storage reference.
58/// Read on every `.well-known/scp` request to generate the response
59/// dynamically (spec section 18.6.4).
60///
61/// Blob storage uses [`BlobStorageBackend`] (enum dispatch), shared between
62/// the relay server and projection handlers via `Arc` (spec section 18.11.5).
63pub struct NodeState {
64 /// The operator's DID string.
65 pub(crate) did: String,
66 /// The relay URL (e.g., `wss://example.com/scp/v1`).
67 pub(crate) relay_url: String,
68 /// Registered broadcast contexts, keyed by lowercase hex context ID.
69 /// Modified via [`ApplicationNode::register_broadcast_context`].
70 pub(crate) broadcast_contexts: RwLock<HashMap<String, BroadcastContext>>,
71 /// The relay server's bound address for WebSocket bridge connections.
72 pub(crate) relay_addr: SocketAddr,
73 /// Shared secret for authenticating internal bridge connections.
74 ///
75 /// Generated at startup and included as an `Authorization: Bearer`
76 /// header when the axum handler connects to the internal relay. The
77 /// relay validates this token during the WebSocket handshake
78 /// (defense-in-depth, #85). Moved from query parameter to header to
79 /// prevent leakage via server logs or error messages (#225).
80 /// Wrapped in `Zeroizing` so the secret is zeroed on drop.
81 pub(crate) bridge_secret: Zeroizing<[u8; 32]>,
82 /// Bearer token for the dev API (`scp_local_token_<32 hex chars>`).
83 ///
84 /// `Some` when `local_api()` was called on the builder, `None` otherwise.
85 /// See spec section 18.10.2.
86 pub(crate) dev_token: Option<String>,
87 /// Bind address for the dev API server.
88 ///
89 /// `Some` when `local_api()` was called on the builder, `None` otherwise.
90 /// See spec section 18.10.2.
91 pub(crate) dev_bind_addr: Option<SocketAddr>,
92 /// Registry of broadcast contexts whose messages are projected (decrypted
93 /// and served) by this node's HTTP endpoints. Keyed by routing ID.
94 ///
95 /// See spec section 18.11.5.
96 pub(crate) projected_contexts: RwLock<HashMap<[u8; 32], ProjectedContext>>,
97 /// Shared blob storage instance, used by both the relay server and
98 /// projection handlers to read stored blobs.
99 ///
100 /// See spec section 18.11.5.
101 pub(crate) blob_storage: Arc<BlobStorageBackend>,
102 /// Relay operational parameters exposed in `.well-known/scp`
103 /// `relay_config` (spec section 18.3.3).
104 pub(crate) relay_config: TransportRelayConfig,
105 /// The instant the node was started, used to compute uptime for the
106 /// dev API health endpoint (spec section 18.10.3).
107 pub(crate) start_time: Instant,
108 /// Bind address for the public HTTP server used by [`ApplicationNode::serve`].
109 ///
110 /// Separate from `relay_addr` (the relay's internal listener) to avoid
111 /// double-binding the same port (#224). Defaults to `0.0.0.0:8443`.
112 pub(crate) http_bind_addr: SocketAddr,
113 /// Shared cancellation token for graceful shutdown of both the public
114 /// HTTPS listener and the dev API listener. Cancelled by
115 /// [`ApplicationNode::shutdown`].
116 ///
117 /// See SCP-245 action item: "Ensure graceful shutdown of dev API
118 /// listener alongside main server."
119 pub(crate) shutdown_token: CancellationToken,
120 /// CORS allowed origins for public endpoints (`.well-known/scp`,
121 /// broadcast projection). `None` means permissive (`*`); `Some(list)`
122 /// restricts to exactly those origins.
123 ///
124 /// See issue #231.
125 pub(crate) cors_origins: Option<Vec<String>>,
126 /// Per-IP token-bucket rate limiter for broadcast projection endpoints.
127 ///
128 /// Limits the request rate from each source IP to prevent abuse of the
129 /// public, unauthenticated projection endpoints that perform crypto
130 /// decryption and blob reads per request. Returns HTTP 429 when exceeded.
131 ///
132 /// Configurable via `SCP_NODE_PROJECTION_RATE_LIMIT` (default 60 req/s).
133 /// See spec section 18.11.6.
134 pub(crate) projection_rate_limiter: PublishRateLimiter,
135 /// TLS configuration for the public HTTPS listener.
136 ///
137 /// When `Some`, [`ApplicationNode::serve`] terminates TLS using
138 /// [`tokio_rustls::TlsAcceptor`] before passing connections to axum.
139 /// When `None` (no-domain mode or explicit opt-out), the listener serves
140 /// plain HTTP/WS (spec section 10.12.8).
141 ///
142 /// Built from [`tls::CertificateData`] during [`ApplicationNodeBuilder::build`]
143 /// via [`tls::build_reloadable_tls_config`] (spec section 18.6.3).
144 pub(crate) tls_config: Option<Arc<rustls::ServerConfig>>,
145 /// The TLS certificate resolver for ACME hot-reload.
146 ///
147 /// When `Some`, the ACME renewal loop can call [`CertResolver::update`]
148 /// to hot-swap certificates without restarting the server.
149 /// When `None` (no-domain mode), ACME is not active.
150 ///
151 /// See spec section 18.6.3 (auto-renewal).
152 pub(crate) cert_resolver: Option<Arc<crate::tls::CertResolver>>,
153 /// The operator's DID document, populated at build time.
154 ///
155 /// Used by the dev API identity endpoint to return the full document
156 /// (spec section 18.10.3).
157 pub(crate) did_document: DidDocument,
158 /// Shared connection tracker from the relay server.
159 ///
160 /// Tracks active connections per IP address across all transports.
161 /// Used by the dev API health and relay status endpoints to report
162 /// real connection counts (spec section 18.10.3).
163 pub(crate) connection_tracker: ConnectionTracker,
164 /// Shared subscription registry from the relay server.
165 ///
166 /// Maps routing IDs to subscriber entries. Used by the dev API
167 /// context endpoint to report real subscriber counts (spec section 18.10.3).
168 pub(crate) subscription_registry: SubscriptionRegistry,
169 /// Shared ACME challenge map (token → key authorization).
170 ///
171 /// Mounted in [`serve()`](crate::ApplicationNode::serve) at
172 /// `GET /.well-known/acme-challenge/{token}` so that ACME renewal
173 /// challenges can be served without restarting the server (issue #305).
174 /// When `None` (no-domain or self-signed mode), no challenge router is
175 /// mounted.
176 pub(crate) acme_challenges: Option<Arc<RwLock<HashMap<String, String>>>>,
177
178 /// Shared state for bridge shadow operations.
179 ///
180 /// Holds per-context shadow registries and sender key stores for the
181 /// bridge shadow creation endpoint (`POST /v1/scp/bridge/shadow`).
182 /// See SCP-BCH-002.
183 pub(crate) bridge_state: Arc<crate::bridge_handlers::BridgeState>,
184}
185
186// ---------------------------------------------------------------------------
187// CORS layer construction
188// ---------------------------------------------------------------------------
189
190/// Constructs a [`CorsLayer`] from the node's CORS configuration.
191///
192/// - `None` origins: permissive (`Access-Control-Allow-Origin: *`).
193/// - `Some(list)`: restricts to exactly the listed origins.
194///
195/// Applied to public endpoints (`.well-known/scp`, broadcast projection)
196/// so browser-based JavaScript / WASM clients can read responses cross-origin
197/// (issue #231). Not applied to the WebSocket relay endpoint (WebSocket
198/// upgrades have their own origin mechanism) or the dev API (localhost-only).
199pub fn build_cors_layer(origins: &Option<Vec<String>>) -> CorsLayer {
200 let allow_origin = origins.as_ref().map_or_else(AllowOrigin::any, |list| {
201 let parsed: Vec<axum::http::HeaderValue> = list
202 .iter()
203 .filter_map(|o| {
204 o.parse().map_or_else(
205 |_| {
206 tracing::warn!(
207 origin = %o,
208 "ignoring invalid CORS origin; \
209 this may make endpoints more permissive than intended"
210 );
211 None
212 },
213 Some,
214 )
215 })
216 .collect();
217 AllowOrigin::list(parsed)
218 });
219 CorsLayer::new()
220 .allow_origin(allow_origin)
221 .allow_methods([axum::http::Method::GET, axum::http::Method::OPTIONS])
222 .allow_headers([
223 axum::http::header::CONTENT_TYPE,
224 axum::http::header::IF_NONE_MATCH,
225 ])
226}
227
228// ---------------------------------------------------------------------------
229// Constants
230// ---------------------------------------------------------------------------
231
232/// Idle timeout for bridge WebSocket connections (5 minutes).
233///
234/// If no data flows in either direction for this duration, the bridge
235/// connection is closed. This prevents stale connections from holding
236/// resources indefinitely (#229).
237const BRIDGE_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
238
239// ---------------------------------------------------------------------------
240// Router constructors
241// ---------------------------------------------------------------------------
242
243/// Returns an axum [`Router`] serving `GET /.well-known/scp`.
244///
245/// The handler dynamically generates the `.well-known/scp` JSON document
246/// from the provided [`NodeState`]. See spec section 18.3.
247pub fn well_known_router(state: Arc<NodeState>) -> Router {
248 Router::new()
249 .route("/.well-known/scp", get(well_known_handler))
250 .with_state(state)
251}
252
253/// Returns an axum [`Router`] handling WebSocket upgrade at `/scp/v1`.
254///
255/// Incoming WebSocket connections are bridged to the node's internal
256/// relay server. The axum handler upgrades the HTTP connection to
257/// WebSocket, then connects to the relay on localhost and forwards
258/// frames bidirectionally.
259///
260/// An `Arc<Semaphore>` caps concurrent bridge connections to
261/// `max_total_connections` from the relay config. The permit is acquired
262/// before the HTTP 101 upgrade and held inside the `on_upgrade` closure
263/// for the entire WebSocket connection lifetime — ensuring the semaphore
264/// accurately tracks active connections, not just in-flight upgrade
265/// requests (#229).
266pub fn relay_router(state: Arc<NodeState>) -> Router {
267 let bridge_semaphore = Arc::new(Semaphore::new(state.relay_config.max_total_connections));
268 Router::new()
269 .route("/scp/v1", get(ws_upgrade_handler))
270 .with_state((state, bridge_semaphore))
271}
272
273/// Axum handler for WebSocket upgrade at `/scp/v1`.
274///
275/// Acquires a semaphore permit before upgrading; the permit is moved into
276/// the `on_upgrade` closure so it is held for the entire WebSocket
277/// connection lifetime. If the HTTP 101 upgrade never completes, axum
278/// drops the response — which drops the closure and releases the permit.
279/// Returns 503 Service Unavailable when the bridge is at capacity.
280async fn ws_upgrade_handler(
281 ws: WebSocketUpgrade,
282 State((state, sem)): State<(Arc<NodeState>, Arc<Semaphore>)>,
283) -> impl IntoResponse {
284 let Ok(permit) = sem.try_acquire_owned() else {
285 return StatusCode::SERVICE_UNAVAILABLE.into_response();
286 };
287 let relay_addr = state.relay_addr;
288 // Clone the Zeroizing wrapper so the bridge task's copy is also zeroed
289 // on drop. Avoids leaving bare secret bytes on the stack/heap.
290 let bridge_secret = state.bridge_secret.clone();
291 // Move the permit into the closure — it is released when the closure
292 // is dropped, either after the bridge ends or if the upgrade fails.
293 ws.on_upgrade(move |socket| async move {
294 let _permit = permit; // bind to ensure drop at end of scope
295 relay_bridge(socket, relay_addr, bridge_secret).await;
296 })
297 .into_response()
298}
299
300/// Bridges an axum WebSocket to the internal relay server.
301///
302/// Connects to the relay at `relay_addr` with the bridge secret included
303/// as an `Authorization: Bearer <hex>` header, then forwards frames in
304/// both directions until either side closes or the connection is idle for
305/// [`BRIDGE_IDLE_TIMEOUT`]. Sends explicit WebSocket close frames on
306/// both sides when the bridge terminates.
307///
308/// The secret is transmitted via HTTP header rather than query parameter
309/// to prevent leakage through server logs, error messages, or debug
310/// output (#225).
311///
312/// The idle timeout resets only on data frames (Text/Binary), not on
313/// Ping/Pong control frames. This prevents an attacker from keeping
314/// connections alive indefinitely by sending pings without real data
315/// (#229).
316async fn relay_bridge(
317 axum_ws: WebSocket,
318 relay_addr: SocketAddr,
319 bridge_secret: Zeroizing<[u8; 32]>,
320) {
321 use tokio_tungstenite::tungstenite::client::IntoClientRequest;
322 let token_hex = scp_transport::native::server::hex_encode_32(&bridge_secret);
323 let url = format!("ws://{relay_addr}/");
324 let mut request = match url.into_client_request() {
325 Ok(r) => r,
326 Err(e) => {
327 tracing::error!(
328 addr = %relay_addr,
329 error = %e,
330 "failed to build WebSocket request for internal relay bridge"
331 );
332 return;
333 }
334 };
335 // Safety: the token is a 64-char lowercase hex string — always valid
336 // as an HTTP header value. `parse()` only fails on non-visible ASCII
337 // or control characters, which hex digits never contain.
338 let Ok(header_value) = format!("Bearer {token_hex}").parse() else {
339 tracing::error!("bridge token produced invalid HTTP header value");
340 return;
341 };
342 request.headers_mut().insert("Authorization", header_value);
343 let relay_conn = tokio_tungstenite::connect_async(request).await;
344
345 let Ok((relay_ws, _)) = relay_conn else {
346 tracing::error!(
347 addr = %relay_addr,
348 "failed to connect to internal relay for WebSocket bridge"
349 );
350 return;
351 };
352
353 let (mut relay_sink, mut relay_source) = relay_ws.split();
354 let (mut axum_sink, mut axum_source) = axum_ws.split();
355
356 // Idle timeout: resets only on data frames (Text/Binary), not control frames.
357 let idle_timeout = tokio::time::sleep(BRIDGE_IDLE_TIMEOUT);
358 tokio::pin!(idle_timeout);
359
360 loop {
361 tokio::select! {
362 msg = StreamExt::next(&mut axum_source) => {
363 match msg {
364 Some(Ok(Message::Close(_)) | Err(_)) | None => break,
365 Some(Ok(msg)) => {
366 let relay_msg = match msg {
367 Message::Text(t) => {
368 idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
369 tokio_tungstenite::tungstenite::Message::Text(t.to_string())
370 }
371 Message::Binary(b) => {
372 idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
373 tokio_tungstenite::tungstenite::Message::Binary(b.to_vec())
374 }
375 Message::Ping(p) => tokio_tungstenite::tungstenite::Message::Ping(p.to_vec()),
376 Message::Pong(p) => tokio_tungstenite::tungstenite::Message::Pong(p.to_vec()),
377 Message::Close(_) => break,
378 };
379 if let Err(e) = SinkExt::send(&mut relay_sink, relay_msg).await {
380 tracing::debug!(
381 direction = "client->relay",
382 error = %e,
383 "bridge forwarding failed"
384 );
385 break;
386 }
387 }
388 }
389 }
390 msg = StreamExt::next(&mut relay_source) => {
391 match msg {
392 Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_)) | Err(_)) | None => break,
393 Some(Ok(msg)) => {
394 let axum_msg = match msg {
395 tokio_tungstenite::tungstenite::Message::Text(t) => {
396 idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
397 Message::Text(t.into())
398 }
399 tokio_tungstenite::tungstenite::Message::Binary(b) => {
400 idle_timeout.as_mut().reset(tokio::time::Instant::now() + BRIDGE_IDLE_TIMEOUT);
401 Message::Binary(b.into())
402 }
403 tokio_tungstenite::tungstenite::Message::Ping(p) => Message::Ping(p.into()),
404 tokio_tungstenite::tungstenite::Message::Pong(p) => Message::Pong(p.into()),
405 tokio_tungstenite::tungstenite::Message::Close(_) => break,
406 tokio_tungstenite::tungstenite::Message::Frame(_) => continue,
407 };
408 if let Err(e) = SinkExt::send(&mut axum_sink, axum_msg).await {
409 tracing::debug!(
410 direction = "relay->client",
411 error = %e,
412 "bridge forwarding failed"
413 );
414 break;
415 }
416 }
417 }
418 }
419 () = &mut idle_timeout => {
420 tracing::debug!("bridge connection idle timeout reached");
421 break;
422 }
423 }
424 }
425
426 // Send explicit close frames (best-effort, ignore errors on close).
427 // SplitSink::drop() does NOT send close frames in tokio-tungstenite 0.24.
428 let _ = SinkExt::close(&mut relay_sink).await;
429 let _ = SinkExt::close(&mut axum_sink).await;
430}
431
432// ---------------------------------------------------------------------------
433// ApplicationNode HTTP methods
434// ---------------------------------------------------------------------------
435
436impl<S: Storage + Send + Sync + 'static> ApplicationNode<S> {
437 /// Returns an axum [`Router`] serving `GET /.well-known/scp`.
438 ///
439 /// The response is dynamically generated from the node's current state:
440 /// DID, relay URL, and registered broadcast contexts. Content-Type is
441 /// `application/json` (provided by axum's `Json` extractor).
442 ///
443 /// See spec section 18.3.
444 #[must_use = "returns the well-known router, which must be mounted into an axum application"]
445 pub fn well_known_router(&self) -> Router {
446 well_known_router(Arc::clone(&self.state))
447 }
448
449 /// Returns an axum [`Router`] handling WebSocket upgrade at `/scp/v1`.
450 ///
451 /// Incoming connections are bridged to the node's internal relay server.
452 ///
453 /// See spec section 18.6.2.
454 #[must_use = "returns the relay router, which must be mounted into an axum application"]
455 pub fn relay_router(&self) -> Router {
456 relay_router(Arc::clone(&self.state))
457 }
458
459 /// Returns an axum [`Router`] serving broadcast projection endpoints.
460 ///
461 /// Includes:
462 /// - `GET /scp/broadcast/<routing_id>/feed` -- recent messages feed
463 /// - `GET /scp/broadcast/<routing_id>/messages/<blob_id>` -- single message
464 ///
465 /// These are **public endpoints** with no authentication middleware --
466 /// broadcast content is intended for broad distribution (spec section
467 /// 18.11.6).
468 ///
469 /// Served on the public HTTPS port alongside `.well-known/scp` and
470 /// `/scp/v1`.
471 ///
472 /// See spec section 18.11.8.
473 #[must_use = "returns the projection router, which must be mounted into an axum application"]
474 pub fn broadcast_projection_router(&self) -> Router {
475 crate::projection::broadcast_projection_router(Arc::clone(&self.state))
476 }
477
478 /// Returns an axum [`Router`] serving bridge endpoints.
479 ///
480 /// Includes `POST /v1/scp/bridge/shadow` for shadow identity creation.
481 /// Requires bridge authentication middleware to be applied by the caller.
482 ///
483 /// See SCP-BCH-002 and spec section 12.10.
484 #[must_use = "returns the bridge router, which must be mounted into an axum application"]
485 pub fn bridge_router(&self) -> Router {
486 crate::bridge_handlers::bridge_router(Arc::clone(&self.state.bridge_state))
487 }
488
489 /// Returns the dev API router if the dev API is enabled.
490 ///
491 /// Returns `Some(Router)` when [`ApplicationNodeBuilder::local_api`] was
492 /// called (i.e., a dev token was generated), `None` otherwise. The
493 /// returned router includes all `/scp/dev/v1/*` routes with bearer token
494 /// middleware applied.
495 ///
496 /// See spec section 18.10.5.
497 #[must_use = "returns the dev API router, which must be served on a separate listener"]
498 pub fn dev_router(&self) -> Option<Router> {
499 let token = self.state.dev_token.clone()?;
500 Some(crate::dev_api::dev_router(Arc::clone(&self.state), token))
501 }
502
503 /// Takes ownership of the node and starts serving HTTP traffic.
504 ///
505 /// Binds HTTPS (or plain HTTP when TLS is not configured) on the
506 /// configured address, merging:
507 ///
508 /// 1. Application-provided routes (`app_router`)
509 /// 2. `.well-known/scp` route
510 /// 3. `/scp/v1` WebSocket upgrade route
511 /// 4. `/scp/broadcast/*` broadcast projection routes
512 ///
513 /// SCP routes take precedence for `/.well-known/scp`, `/scp/v1`, and
514 /// `/scp/broadcast/*`. All other paths route to `app_router`.
515 ///
516 /// ## TLS termination
517 ///
518 /// When a TLS configuration was provisioned during
519 /// [`ApplicationNodeBuilder::build`] (domain mode with successful ACME or
520 /// injected TLS provider), `serve()` terminates TLS using
521 /// [`tokio_rustls::TlsAcceptor`] and serves HTTPS/WSS. When no TLS
522 /// configuration is present (no-domain mode, or TLS provisioning opted
523 /// out), `serve()` falls back to plain HTTP/WS. See spec section 18.6.3.
524 ///
525 /// ## Dev API
526 ///
527 /// When the dev API is configured (via [`ApplicationNodeBuilder::local_api`]),
528 /// a separate tokio task is spawned to serve the dev API on the configured
529 /// address. The dev API listener runs concurrently with the public
530 /// listener. When the dev API is not configured, `serve()` behaves exactly
531 /// as before -- no additional listener is spawned. The dev API always
532 /// uses plain HTTP (it is bound to loopback only).
533 ///
534 /// ## HTTP/3
535 ///
536 /// When the `http3` feature is enabled and an [`Http3Config`] is provided
537 /// via [`ApplicationNodeBuilder::http3`], an HTTP/3 listener is started
538 /// on a separate QUIC endpoint. All HTTP/1.1 and HTTP/2 responses include
539 /// an `Alt-Svc` header advertising the HTTP/3 endpoint (spec section
540 /// 10.15.1).
541 ///
542 /// ## Graceful shutdown
543 ///
544 /// The `shutdown` future is awaited as a graceful shutdown signal: when
545 /// it completes, the server stops accepting new connections, drains
546 /// in-flight requests, cancels the internal shutdown token (stopping
547 /// the dev API listener if running), and shuts down the relay server.
548 /// The node is consumed -- callers do not need to call
549 /// [`ApplicationNode::shutdown`] separately.
550 ///
551 /// If the dev API task exits early (e.g., bind failure), the shutdown
552 /// token is cancelled and the error is propagated. Likewise, if the
553 /// main server exits first, the dev API task is cancelled via the
554 /// shutdown token and aborted.
555 ///
556 /// See spec sections 18.6.3, 18.10.5, and 18.11.8.
557 ///
558 /// # Errors
559 ///
560 /// Returns [`NodeError::Serve`] if either server cannot bind or
561 /// encounters a fatal I/O error.
562 pub async fn serve(
563 self,
564 app_router: Router,
565 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
566 ) -> Result<(), NodeError> {
567 spawn_projection_rate_limit_cleanup(
568 self.state.projection_rate_limiter.clone(),
569 self.state.shutdown_token.clone(),
570 );
571
572 let cors = build_cors_layer(&self.state.cors_origins);
573
574 // Apply CORS to public endpoints only. The WebSocket relay endpoint
575 // uses its own origin mechanism; the dev API is localhost-only.
576 let well_known = well_known_router(Arc::clone(&self.state)).layer(cors.clone());
577 let relay_rt = relay_router(Arc::clone(&self.state));
578 let projection =
579 crate::projection::broadcast_projection_router(Arc::clone(&self.state)).layer(cors);
580
581 let dev_router = self
582 .state
583 .dev_token
584 .clone()
585 .map(|t| crate::dev_api::dev_router(Arc::clone(&self.state), t));
586 let dev_bind_addr = self.state.dev_bind_addr;
587 let tls_config = self.state.tls_config.clone();
588 #[cfg(feature = "http3")]
589 let http3_config = self.http3_config;
590
591 let bridge = crate::bridge_handlers::bridge_router(Arc::clone(&self.state.bridge_state));
592 let relay = self.relay;
593 let state = self.state;
594
595 // SCP routes take precedence: merge them last so they override
596 // any conflicting paths in app_router. ACME challenge router is
597 // included for renewal support (issue #305).
598 let merged = build_merged_router(
599 app_router,
600 well_known,
601 relay_rt,
602 projection,
603 bridge,
604 state.acme_challenges.as_ref(),
605 );
606
607 let dev_api_handle = spawn_dev_api(dev_router, dev_bind_addr, state.shutdown_token.clone());
608
609 // Spawn HTTP/3 listener when configured (spec section 10.15.1).
610 #[cfg(feature = "http3")]
611 if let Some(http3_config) = http3_config {
612 spawn_http3_listener(http3_config, &state);
613 }
614
615 let listener = tokio::net::TcpListener::bind(state.http_bind_addr)
616 .await
617 .map_err(|e| NodeError::Serve(e.to_string()))?;
618 let local_addr = listener
619 .local_addr()
620 .map_err(|e| NodeError::Serve(e.to_string()))?;
621 let shutdown_token = state.shutdown_token.clone();
622 let token = shutdown_token.clone();
623 tokio::spawn(async move {
624 shutdown.await;
625 token.cancel();
626 });
627
628 let main_server = build_main_server(
629 listener,
630 merged,
631 tls_config,
632 shutdown_token.clone(),
633 local_addr,
634 );
635
636 // If a dev API task is running, select! on both: if either exits
637 // early we propagate the result. This ensures a dev API bind
638 // failure doesn't go unnoticed while the main server keeps running.
639 let result = match dev_api_handle {
640 Some(handle) => {
641 tokio::pin!(handle);
642 tokio::select! {
643 result = main_server => {
644 // Main server exited — cancel shutdown token so the
645 // dev API task also drains, then abort its handle.
646 state.shutdown_token.cancel();
647 handle.abort();
648 result
649 }
650 result = &mut handle => {
651 // Dev API exited early — cancel shutdown token so the
652 // main server also drains.
653 state.shutdown_token.cancel();
654 // JoinError (task panic/cancel) or NodeError from inner.
655 match result {
656 Ok(inner) => inner,
657 Err(join_err) => {
658 Err(NodeError::Serve(
659 format!("dev API task failed: {join_err}")
660 ))
661 }
662 }
663 }
664 }
665 }
666 None => main_server.await,
667 };
668
669 // Graceful shutdown: cancel the shutdown token (idempotent if
670 // already cancelled above) and stop the relay server. This ensures
671 // callers don't need to call shutdown() separately.
672 state.shutdown_token.cancel();
673 relay.shutdown_handle.shutdown();
674 tracing::info!("application node shut down");
675
676 result
677 }
678}
679
680// ---------------------------------------------------------------------------
681// Dev API spawning (extracted for clippy::too_many_lines)
682// ---------------------------------------------------------------------------
683
684/// Builds the merged axum router for `serve()`, combining SCP protocol
685/// routes (well-known, relay, projection, ACME challenges) with the
686/// application router. Extracted from `serve()` for clippy line limits.
687fn build_merged_router(
688 app_router: Router,
689 well_known: Router,
690 relay_rt: Router,
691 projection: Router,
692 bridge: Router,
693 acme_challenges: Option<&Arc<RwLock<HashMap<String, String>>>>,
694) -> Router {
695 let merged = app_router
696 .merge(well_known)
697 .merge(relay_rt)
698 .merge(projection)
699 .merge(bridge);
700
701 // Mount ACME challenge router for renewal challenges (issue #305).
702 // Serves `GET /.well-known/acme-challenge/{token}` so the ACME CA can
703 // validate domain ownership during certificate renewal.
704 if let Some(challenges) = acme_challenges {
705 merged.merge(tls::acme_challenge_router(Arc::clone(challenges)))
706 } else {
707 merged
708 }
709}
710
711/// Spawns the dev API listener if configured.
712///
713/// Returns a `JoinHandle` so the caller can detect early exit (e.g., bind
714/// failure) and propagate the error. Dev API always uses plain HTTP
715/// (loopback-only, spec section 18.10.5).
716fn spawn_dev_api(
717 dev_router: Option<Router>,
718 dev_bind_addr: Option<SocketAddr>,
719 shutdown_token: CancellationToken,
720) -> Option<tokio::task::JoinHandle<Result<(), NodeError>>> {
721 let (Some(dev_router), Some(dev_addr)) = (dev_router, dev_bind_addr) else {
722 return None;
723 };
724 Some(tokio::spawn(async move {
725 let dev_listener = tokio::net::TcpListener::bind(dev_addr).await.map_err(|e| {
726 NodeError::Serve(format!("failed to bind dev API server on {dev_addr}: {e}"))
727 })?;
728 let local_addr = dev_listener.local_addr().unwrap_or(dev_addr);
729 tracing::info!(addr = %local_addr, "dev API server started");
730
731 axum::serve(dev_listener, dev_router)
732 .with_graceful_shutdown(shutdown_token.cancelled_owned())
733 .await
734 .map_err(|e| NodeError::Serve(format!("dev API server error: {e}")))
735 }))
736}
737
738// ---------------------------------------------------------------------------
739// Main server construction (extracted for clippy::too_many_lines)
740// ---------------------------------------------------------------------------
741
742/// Builds the main server future, branching on TLS configuration.
743///
744/// Returns a boxed future that resolves when the server shuts down.
745/// Extracted from `serve()` for clippy line limits.
746fn build_main_server(
747 listener: tokio::net::TcpListener,
748 merged: Router,
749 tls_config: Option<Arc<rustls::ServerConfig>>,
750 shutdown_token: CancellationToken,
751 local_addr: SocketAddr,
752) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), NodeError>> + Send>> {
753 if let Some(tls_cfg) = tls_config {
754 tracing::info!(
755 addr = %local_addr, scheme = "HTTPS",
756 "application node server started (TLS active)"
757 );
758 Box::pin(tls::serve_tls(listener, tls_cfg, merged, shutdown_token))
759 } else {
760 tracing::info!(
761 addr = %local_addr, scheme = "HTTP",
762 "application node server started (plain HTTP, broadcast projection endpoints active)"
763 );
764 Box::pin(async move {
765 axum::serve(listener, merged)
766 .with_graceful_shutdown(shutdown_token.cancelled_owned())
767 .await
768 .map_err(|e| NodeError::Serve(e.to_string()))
769 })
770 }
771}
772
773// ---------------------------------------------------------------------------
774// Projection rate limiter cleanup (extracted for clippy::too_many_lines)
775// ---------------------------------------------------------------------------
776
777/// Spawns the background cleanup loop for the projection rate limiter.
778///
779/// Evicts stale per-IP token buckets every 60 seconds (buckets idle for more
780/// than 300 seconds). Runs until `shutdown_token` is cancelled.
781fn spawn_projection_rate_limit_cleanup(
782 limiter: PublishRateLimiter,
783 shutdown_token: CancellationToken,
784) {
785 tokio::spawn(async move {
786 limiter
787 .cleanup_loop(
788 Duration::from_secs(60),
789 Duration::from_secs(300),
790 shutdown_token,
791 )
792 .await;
793 });
794}
795
796// ---------------------------------------------------------------------------
797// Tests
798// ---------------------------------------------------------------------------
799
800#[cfg(test)]
801#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
802mod tests {
803 use std::collections::HashMap;
804 use std::net::SocketAddr;
805 use std::time::Instant;
806
807 use axum::body::Body;
808 use axum::http::{Method, Request, StatusCode};
809 use tokio::sync::RwLock;
810 use tower::ServiceExt;
811
812 use scp_transport::native::storage::BlobStorageBackend;
813
814 use super::*;
815
816 /// Creates a minimal `NodeState` for CORS tests.
817 fn test_state(cors_origins: Option<Vec<String>>) -> Arc<NodeState> {
818 Arc::new(NodeState {
819 did: "did:dht:cors_test".to_owned(),
820 relay_url: "wss://localhost/scp/v1".to_owned(),
821 broadcast_contexts: RwLock::new(HashMap::new()),
822 relay_addr: "127.0.0.1:9000".parse::<SocketAddr>().unwrap(),
823 bridge_secret: Zeroizing::new([0u8; 32]),
824 dev_token: None,
825 dev_bind_addr: None,
826 projected_contexts: RwLock::new(HashMap::new()),
827 blob_storage: Arc::new(BlobStorageBackend::default()),
828 relay_config: scp_transport::native::server::RelayConfig::default(),
829 start_time: Instant::now(),
830 http_bind_addr: SocketAddr::from(([0, 0, 0, 0], 8443)),
831 shutdown_token: CancellationToken::new(),
832 cors_origins,
833 projection_rate_limiter: scp_transport::relay::rate_limit::PublishRateLimiter::new(
834 1000,
835 ),
836 tls_config: None,
837 cert_resolver: None,
838 did_document: scp_identity::document::DidDocument {
839 context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
840 id: "did:dht:cors_test".to_owned(),
841 verification_method: vec![],
842 authentication: vec![],
843 assertion_method: vec![],
844 also_known_as: vec![],
845 service: vec![],
846 },
847 connection_tracker: scp_transport::relay::rate_limit::new_connection_tracker(),
848 subscription_registry: scp_transport::relay::subscription::new_registry(),
849 acme_challenges: None,
850 bridge_state: Arc::new(crate::bridge_handlers::BridgeState::new()),
851 })
852 }
853
854 #[tokio::test]
855 async fn cors_permissive_well_known_returns_wildcard_origin() {
856 let state = test_state(None);
857 let cors = build_cors_layer(&state.cors_origins);
858 let router = well_known_router(state).layer(cors);
859
860 let req = Request::builder()
861 .uri("/.well-known/scp")
862 .header("Origin", "https://example.com")
863 .body(Body::empty())
864 .unwrap();
865
866 let resp = router.oneshot(req).await.unwrap();
867 assert_eq!(resp.status(), StatusCode::OK);
868
869 let acao = resp
870 .headers()
871 .get("access-control-allow-origin")
872 .expect("should have ACAO header")
873 .to_str()
874 .unwrap();
875 assert_eq!(acao, "*", "permissive mode should return wildcard origin");
876 }
877
878 #[tokio::test]
879 async fn cors_restricted_well_known_allows_matching_origin() {
880 let origins = Some(vec!["https://allowed.example".to_owned()]);
881 let state = test_state(origins);
882 let cors = build_cors_layer(&state.cors_origins);
883 let router = well_known_router(state).layer(cors);
884
885 let req = Request::builder()
886 .uri("/.well-known/scp")
887 .header("Origin", "https://allowed.example")
888 .body(Body::empty())
889 .unwrap();
890
891 let resp = router.oneshot(req).await.unwrap();
892 assert_eq!(resp.status(), StatusCode::OK);
893
894 let acao = resp
895 .headers()
896 .get("access-control-allow-origin")
897 .expect("should have ACAO header for allowed origin")
898 .to_str()
899 .unwrap();
900 assert_eq!(acao, "https://allowed.example");
901 }
902
903 #[tokio::test]
904 async fn cors_restricted_well_known_rejects_non_matching_origin() {
905 let origins = Some(vec!["https://allowed.example".to_owned()]);
906 let state = test_state(origins);
907 let cors = build_cors_layer(&state.cors_origins);
908 let router = well_known_router(state).layer(cors);
909
910 let req = Request::builder()
911 .uri("/.well-known/scp")
912 .header("Origin", "https://evil.example")
913 .body(Body::empty())
914 .unwrap();
915
916 let resp = router.oneshot(req).await.unwrap();
917 // The response should succeed (CORS doesn't block the response,
918 // it omits the ACAO header so the browser rejects it client-side).
919 assert_eq!(resp.status(), StatusCode::OK);
920 assert!(
921 resp.headers().get("access-control-allow-origin").is_none(),
922 "non-matching origin should NOT receive ACAO header"
923 );
924 }
925
926 #[tokio::test]
927 async fn cors_preflight_options_returns_200() {
928 let state = test_state(None);
929 let cors = build_cors_layer(&state.cors_origins);
930 let router = well_known_router(state).layer(cors);
931
932 let req = Request::builder()
933 .method(Method::OPTIONS)
934 .uri("/.well-known/scp")
935 .header("Origin", "https://example.com")
936 .header("Access-Control-Request-Method", "GET")
937 .body(Body::empty())
938 .unwrap();
939
940 let resp = router.oneshot(req).await.unwrap();
941 assert_eq!(resp.status(), StatusCode::OK);
942
943 let acao = resp
944 .headers()
945 .get("access-control-allow-origin")
946 .expect("preflight should include ACAO")
947 .to_str()
948 .unwrap();
949 assert_eq!(acao, "*");
950
951 let methods = resp
952 .headers()
953 .get("access-control-allow-methods")
954 .expect("preflight should include allow-methods")
955 .to_str()
956 .unwrap();
957 assert!(methods.contains("GET"), "should allow GET method");
958 }
959}
960
961/// Spawns the HTTP/3 listener in a background task (spec §10.15.1).
962///
963/// Extracted from [`ApplicationNode::serve`] to keep the `serve()` method
964/// within the clippy line limit. Creates a `RequestHandler` that
965/// serves the full `.well-known/scp` document (same as the Axum handler)
966/// over HTTP/3 and returns 404 for all other paths.
967///
968/// The handler holds an `Arc<NodeState>` so it can build the same
969/// complete document that the HTTP/1.1+HTTP/2 handler serves, including
970/// relay_config, contexts, handles, and transports (SCP-264).
971#[cfg(feature = "http3")]
972fn spawn_http3_listener(http3_config: scp_transport::http3::Http3Config, state: &Arc<NodeState>) {
973 use scp_transport::http3::Http3Server;
974 use scp_transport::http3::adapter::RequestHandler;
975
976 struct H3RequestHandler {
977 state: Arc<NodeState>,
978 rt: tokio::runtime::Handle,
979 }
980
981 impl RequestHandler for H3RequestHandler {
982 fn handle(
983 &self,
984 method: &str,
985 uri: &str,
986 _headers: &[(String, String)],
987 ) -> axum::http::Response<Vec<u8>> {
988 if method == "GET" && uri == "/.well-known/scp" {
989 // Build the full WellKnownScp document using the same
990 // function as the Axum handler, ensuring identical
991 // responses across transports.
992 let doc = self
993 .rt
994 .block_on(crate::well_known::build_well_known_scp(&self.state));
995 let body_bytes = serde_json::to_vec(&doc).unwrap_or_default();
996 axum::http::Response::builder()
997 .status(200)
998 .header("content-type", "application/json")
999 .body(body_bytes)
1000 .unwrap_or_else(|_| axum::http::Response::new(b"internal error".to_vec()))
1001 } else {
1002 axum::http::Response::builder()
1003 .status(404)
1004 .body(b"not found".to_vec())
1005 .unwrap_or_else(|_| axum::http::Response::new(Vec::new()))
1006 }
1007 }
1008 }
1009
1010 let handler: Arc<dyn RequestHandler> = Arc::new(H3RequestHandler {
1011 state: Arc::clone(state),
1012 rt: tokio::runtime::Handle::current(),
1013 });
1014
1015 tokio::spawn(async move {
1016 let mut server = Http3Server::new(http3_config, handler);
1017 match server.bind() {
1018 Ok(addr) => {
1019 tracing::info!(addr = %addr, "HTTP/3 server started");
1020 if let Err(e) = server.serve().await {
1021 tracing::error!(error = %e, "HTTP/3 server exited with error");
1022 }
1023 }
1024 Err(e) => {
1025 tracing::error!(error = %e, "failed to bind HTTP/3 server");
1026 }
1027 }
1028 });
1029}