Skip to main content

toolkit_zero/socket/
server.rs

1//! Typed, fluent HTTP server construction.
2//!
3//! This module provides a builder-oriented API for declaring HTTP routes and
4//! serving them with a built-in hyper-based HTTP engine.  The entry point for every
5//! route is [`ServerMechanism`], which pairs an HTTP method with a URL path and
6//! supports incremental enrichment — attaching a JSON body expectation, URL
7//! query parameter deserialisation, or shared state — before being finalised
8//! into a [`SocketType`] route handle via `onconnect`.
9//!
10//! Completed routes are registered on a [`Server`] and served via one of the
11//! `serve*` methods, each of which returns a [`ServerFuture`].  A `ServerFuture`
12//! can be `.await`'d to run the server inline **or** called `.background()` on to
13//! spawn it as a background Tokio task and get a [`tokio::task::JoinHandle`] back.
14//! Graceful shutdown is available via
15//! [`Server::serve_with_graceful_shutdown`] and [`Server::serve_from_listener`].
16//!
17//! For **runtime address migration**, use [`Server::serve_managed`]: it starts
18//! the server immediately and returns a [`BackgroundServer`] handle that supports
19//! [`BackgroundServer::rebind`] (graceful shutdown + restart on a new address,
20//! all existing routes preserved) and [`BackgroundServer::stop`].
21//!
22//! # Hot-reloading routes
23//!
24//! [`BackgroundServer::mechanism`] pushes a new route into the running
25//! server's shared route table **without any restart or port gap**.  Because
26//! routes are stored in an `Arc<RwLock<Vec<SocketType>>>` shared between the
27//! caller and the server loop, the new route becomes visible to the next
28//! incoming request immediately.
29//!
30//! # Builder chains at a glance
31//!
32//! | Chain | Handler receives |
33//! |---|---|
34//! | `ServerMechanism::method(path).onconnect(f)` | nothing |
35//! | `.json::<T>().onconnect(f)` | `T: DeserializeOwned` |
36//! | `.query::<T>().onconnect(f)` | `T: DeserializeOwned` |
37//! | `.encryption::<T>(key).onconnect(f)` | `T: bincode::Decode<()>` (decrypted body) |
38//! | `.encrypted_query::<T>(key).onconnect(f)` | `T: bincode::Decode<()>` (decrypted query) |
39//! | `.state(s).onconnect(f)` | `S: Clone + Send + Sync` |
40//! | `.state(s).json::<T>().onconnect(f)` | `(S, T)` |
41//! | `.state(s).query::<T>().onconnect(f)` | `(S, T)` |
42//! | `.state(s).encryption::<T>(key).onconnect(f)` | `(S, T)` — decrypted body |
43//! | `.state(s).encrypted_query::<T>(key).onconnect(f)` | `(S, T)` — decrypted query |
44//!
45//! For blocking handlers (not recommended in production) every finaliser also
46//! has an unsafe `onconnect_sync` counterpart.
47//!
48//! # `#[mechanism]` attribute macro
49//!
50//! As an alternative to spelling out the builder chain by hand, the
51//! [`mechanism`] attribute macro collapses the entire
52//! `server.mechanism(ServerMechanism::method(path) … .onconnect(handler))` call
53//! into a single decorated `async fn`.
54//!
55//! # Response helpers
56//!
57//! Use the [`reply!`] macro as the most concise way to build a response:
58//!
59//! ```rust,no_run
60//! # use toolkit_zero::socket::server::*;
61//! # use serde::Serialize;
62//! # #[derive(Serialize)] struct Item { id: u32 }
63//! # let item = Item { id: 1 };
64//! // 200 OK, empty body
65//! // reply!()
66//! // 200 OK, JSON body
67//! // reply!(json => item)
68//! // 201 Created, JSON body
69//! // reply!(json => item, status => Status::Created)
70//! ```
71
72use std::{
73    future::Future,
74    net::SocketAddr,
75    pin::Pin,
76    sync::Arc,
77};
78use serde::{de::DeserializeOwned, Serialize};
79
80pub use super::SerializationKey;
81pub use toolkit_zero_macros::mechanism;
82
83// ─── Internal future / handler types ─────────────────────────────────────────
84
85type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
86type HandlerFn = Arc<dyn Fn(IncomingRequest) -> BoxFuture<http::Response<bytes::Bytes>> + Send + Sync>;
87
88/// Raw data extracted from an incoming HTTP request, passed to every route handler.
89pub(crate) struct IncomingRequest {
90    /// Raw body bytes (empty for GET / HEAD / etc.).
91    body: bytes::Bytes,
92    /// Raw query string — everything after `?`; empty string when absent.
93    query: String,
94    /// Request headers.
95    #[allow(dead_code)]
96    headers: http::HeaderMap,
97}
98
99// ─── Rejection ────────────────────────────────────────────────────────────────
100
101/// An HTTP-level error value returned from route handlers.
102///
103/// Returning `Err(rejection)` from an `onconnect` closure sends the corresponding
104/// HTTP status code with an empty body to the client.
105///
106/// The most common source is [`forbidden()`] (produces `403 Forbidden`).
107pub struct Rejection {
108    status: http::StatusCode,
109}
110
111impl Rejection {
112    fn new(status: http::StatusCode) -> Self {
113        Self { status }
114    }
115
116    /// Builds a `403 Forbidden` rejection.
117    pub fn forbidden() -> Self {
118        Self::new(http::StatusCode::FORBIDDEN)
119    }
120
121    /// Builds a `400 Bad Request` rejection.
122    pub fn bad_request() -> Self {
123        Self::new(http::StatusCode::BAD_REQUEST)
124    }
125
126    /// Builds a `500 Internal Server Error` rejection.
127    pub fn internal() -> Self {
128        Self::new(http::StatusCode::INTERNAL_SERVER_ERROR)
129    }
130
131    fn into_response(self) -> http::Response<bytes::Bytes> {
132        http::Response::builder()
133            .status(self.status)
134            .body(bytes::Bytes::new())
135            .unwrap()
136    }
137}
138
139// ─── Reply ────────────────────────────────────────────────────────────────────
140
141/// A value that can be converted into a fully-formed HTTP response.
142///
143/// Implemented for all types returned by the [`reply!`] macro and the standalone
144/// reply helpers.  You may also implement it for your own types.
145pub trait Reply: Send {
146    /// Consumes `self` and returns a complete HTTP response.
147    fn into_response(self) -> http::Response<bytes::Bytes>;
148}
149
150/// An empty `200 OK` reply — returned by `reply!()`.
151pub struct EmptyReply;
152
153impl Reply for EmptyReply {
154    fn into_response(self) -> http::Response<bytes::Bytes> {
155        http::Response::builder()
156            .status(http::StatusCode::OK)
157            .body(bytes::Bytes::new())
158            .unwrap()
159    }
160}
161
162/// An HTML body reply — returned by [`html_reply`].
163pub struct HtmlReply {
164    body: String,
165}
166
167impl Reply for HtmlReply {
168    fn into_response(self) -> http::Response<bytes::Bytes> {
169        http::Response::builder()
170            .status(http::StatusCode::OK)
171            .header(http::header::CONTENT_TYPE, "text/html; charset=utf-8")
172            .body(bytes::Bytes::from(self.body.into_bytes()))
173            .unwrap()
174    }
175}
176
177/// Wraps `content` into a `200 OK` HTML response.
178pub fn html_reply(content: impl Into<String>) -> HtmlReply {
179    HtmlReply { body: content.into() }
180}
181
182// Internal JSON reply — used by reply_with_json / reply_with_status_and_json.
183struct JsonReply {
184    body: bytes::Bytes,
185    status: http::StatusCode,
186}
187
188impl Reply for JsonReply {
189    fn into_response(self) -> http::Response<bytes::Bytes> {
190        http::Response::builder()
191            .status(self.status)
192            .header(http::header::CONTENT_TYPE, "application/json")
193            .body(self.body)
194            .unwrap()
195    }
196}
197
198// Passthrough — allows `http::Response<bytes::Bytes>` to satisfy the Reply bound directly.
199impl Reply for http::Response<bytes::Bytes> {
200    fn into_response(self) -> http::Response<bytes::Bytes> {
201        self
202    }
203}
204
205// ─── SocketType ───────────────────────────────────────────────────────────────
206
207/// A fully assembled, type-erased HTTP route ready to be registered on a [`Server`].
208///
209/// This is the final product of every builder chain.  Pass it to [`Server::mechanism`]
210/// to mount it.  Internally stores the HTTP method, the path pattern, and a
211/// reference-counted async handler closure — cloning a `SocketType` is cheap
212/// (just an `Arc` clone of the handler).
213pub struct SocketType {
214    pub(crate) method: http::Method,
215    pub(crate) path:   String,
216    pub(crate) handler: HandlerFn,
217}
218
219impl Clone for SocketType {
220    fn clone(&self) -> Self {
221        Self {
222            method:  self.method.clone(),
223            path:    self.path.clone(),
224            handler: Arc::clone(&self.handler),
225        }
226    }
227}
228
229// ─── HttpMethod ───────────────────────────────────────────────────────────────
230
231#[derive(Clone, Copy, Debug)]
232enum HttpMethod {
233    Get, Post, Put, Delete, Patch, Head, Options,
234}
235
236impl HttpMethod {
237    fn to_http(self) -> http::Method {
238        match self {
239            HttpMethod::Get     => http::Method::GET,
240            HttpMethod::Post    => http::Method::POST,
241            HttpMethod::Put     => http::Method::PUT,
242            HttpMethod::Delete  => http::Method::DELETE,
243            HttpMethod::Patch   => http::Method::PATCH,
244            HttpMethod::Head    => http::Method::HEAD,
245            HttpMethod::Options => http::Method::OPTIONS,
246        }
247    }
248}
249
250// ─── Path matching ────────────────────────────────────────────────────────────
251
252fn path_matches(pattern: &str, actual_path: &str) -> bool {
253    let pat: Vec<&str> = pattern
254        .trim_matches('/')
255        .split('/')
256        .filter(|s| !s.is_empty())
257        .collect();
258    let act: Vec<&str> = actual_path
259        .trim_matches('/')
260        .split('/')
261        .filter(|s| !s.is_empty())
262        .collect();
263    pat == act
264}
265
266// ─── ServerMechanism ──────────────────────────────────────────────────────────
267
268/// Entry point for building an HTTP route.
269///
270/// Pairs an HTTP method with a URL path and acts as the root of a fluent builder chain.
271/// Optionally attach shared state, a JSON body expectation, or URL query parameter
272/// deserialisation — then finalise with [`onconnect`](ServerMechanism::onconnect) (async)
273/// or [`onconnect_sync`](ServerMechanism::onconnect_sync) (sync) to produce a
274/// [`SocketType`] ready to be mounted on a [`Server`].
275///
276/// # Example
277/// ```rust,no_run
278/// # use toolkit_zero::socket::server::{ServerMechanism, Status};
279/// # use toolkit_zero::socket::server::reply;
280/// # use serde::{Deserialize, Serialize};
281/// # use std::sync::{Arc, Mutex};
282/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
283/// # #[derive(Deserialize)] struct CreateItem { name: String }
284/// # #[derive(Deserialize)] struct SearchQuery { q: String }
285///
286/// // Plain GET — no body, no state
287/// let health = ServerMechanism::get("/health")
288///     .onconnect(|| async { reply!() });
289///
290/// // POST — JSON body deserialised into `CreateItem`
291/// let create = ServerMechanism::post("/items")
292///     .json::<CreateItem>()
293///     .onconnect(|body| async move {
294///         let item = Item { id: 1, name: body.name };
295///         reply!(json => item, status => Status::Created)
296///     });
297///
298/// // GET — shared counter state injected on every request
299/// let counter: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
300/// let count_route = ServerMechanism::get("/count")
301///     .state(counter.clone())
302///     .onconnect(|state| async move {
303///         let n = *state.lock().unwrap();
304///         reply!(json => n)
305///     });
306/// ```
307pub struct ServerMechanism {
308    method: HttpMethod,
309    path: String,
310}
311
312impl ServerMechanism {
313    fn instance(method: HttpMethod, path: impl Into<String>) -> Self {
314        let path = path.into();
315        log::debug!("Creating {:?} route at '{}'", method, path);
316        Self { method, path }
317    }
318
319    /// Creates a route matching HTTP `GET` requests at `path`.
320    pub fn get(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Get, path) }
321
322    /// Creates a route matching HTTP `POST` requests at `path`.
323    pub fn post(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Post, path) }
324
325    /// Creates a route matching HTTP `PUT` requests at `path`.
326    pub fn put(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Put, path) }
327
328    /// Creates a route matching HTTP `DELETE` requests at `path`.
329    pub fn delete(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Delete, path) }
330
331    /// Creates a route matching HTTP `PATCH` requests at `path`.
332    pub fn patch(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Patch, path) }
333
334    /// Creates a route matching HTTP `HEAD` requests at `path`.
335    pub fn head(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Head, path) }
336
337    /// Creates a route matching HTTP `OPTIONS` requests at `path`.
338    pub fn options(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Options, path) }
339
340    /// Attaches shared state `S` to this route, transitioning to [`StatefulSocketBuilder`].
341    ///
342    /// A fresh clone of `S` is injected into the handler on every request.
343    pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulSocketBuilder<S> {
344        log::trace!("Attaching state to {:?} route at '{}'", self.method, self.path);
345        StatefulSocketBuilder { base: self, state }
346    }
347
348    /// Declares that this route expects a JSON-encoded request body, transitioning to
349    /// [`JsonSocketBuilder`].
350    pub fn json<T: DeserializeOwned + Send>(self) -> JsonSocketBuilder<T> {
351        log::trace!("Attaching JSON body expectation to {:?} route at '{}'", self.method, self.path);
352        JsonSocketBuilder { base: self, _phantom: std::marker::PhantomData }
353    }
354
355    /// Declares that this route extracts its input from URL query parameters, transitioning
356    /// to [`QuerySocketBuilder`].
357    pub fn query<T: DeserializeOwned + Send>(self) -> QuerySocketBuilder<T> {
358        log::trace!("Attaching query parameter expectation to {:?} route at '{}'", self.method, self.path);
359        QuerySocketBuilder { base: self, _phantom: std::marker::PhantomData }
360    }
361
362    /// Declares that this route expects an authenticated-encrypted request body
363    /// (ChaCha20-Poly1305), transitioning to [`EncryptedBodyBuilder`].
364    pub fn encryption<T>(self, key: SerializationKey) -> EncryptedBodyBuilder<T> {
365        log::trace!("Attaching encrypted body to {:?} route at '{}'", self.method, self.path);
366        EncryptedBodyBuilder { base: self, key, _phantom: std::marker::PhantomData }
367    }
368
369    /// Declares that this route expects authenticated-encrypted URL query parameters
370    /// (ChaCha20-Poly1305), transitioning to [`EncryptedQueryBuilder`].
371    pub fn encrypted_query<T>(self, key: SerializationKey) -> EncryptedQueryBuilder<T> {
372        log::trace!("Attaching encrypted query to {:?} route at '{}'", self.method, self.path);
373        EncryptedQueryBuilder { base: self, key, _phantom: std::marker::PhantomData }
374    }
375
376    /// Finalises this route with an async handler that receives no arguments.
377    ///
378    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
379    ///
380    /// # Example
381    /// ```rust,no_run
382    /// # use toolkit_zero::socket::server::ServerMechanism;
383    /// # use toolkit_zero::socket::server::reply;
384    /// # use serde::Serialize;
385    /// # #[derive(Serialize)] struct Pong { ok: bool }
386    ///
387    /// let route = ServerMechanism::get("/ping")
388    ///     .onconnect(|| async {
389    ///         reply!(json => Pong { ok: true })
390    ///     });
391    /// ```
392    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
393    where
394        F: Fn() -> Fut + Clone + Send + Sync + 'static,
395        Fut: Future<Output = Result<Re, Rejection>> + Send,
396        Re: Reply + Send,
397    {
398        log::debug!("Finalising async {:?} route at '{}' (no args)", self.method, self.path);
399        let method = self.method.to_http();
400        let path   = self.path.clone();
401        SocketType {
402            method,
403            path,
404            handler: Arc::new(move |_req: IncomingRequest| {
405                let h = handler.clone();
406                Box::pin(async move {
407                    match h().await {
408                        Ok(r)  => r.into_response(),
409                        Err(e) => e.into_response(),
410                    }
411                })
412            }),
413        }
414    }
415
416    /// Finalises this route with a **synchronous** handler that receives no arguments.
417    ///
418    /// # Safety
419    ///
420    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
421    /// The pool caps the number of live OS threads (default 512), but the queue of waiting
422    /// tasks is unbounded — under a traffic surge, tasks accumulate without limit, consuming
423    /// unbounded memory and causing severe latency spikes or OOM crashes. Additionally, any
424    /// panic inside the handler is silently converted into a 500 response, masking runtime
425    /// errors. Callers must ensure the handler completes quickly and that adequate
426    /// backpressure or rate limiting is applied externally.
427    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
428    where
429        F: Fn() -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
430        Re: Reply + Send + 'static,
431    {
432        log::warn!(
433            "Registering sync handler on {:?} '{}' — ensure rate-limiting is applied externally",
434            self.method, self.path
435        );
436        let method = self.method.to_http();
437        let path   = self.path.clone();
438        SocketType {
439            method,
440            path,
441            handler: Arc::new(move |_req: IncomingRequest| {
442                let h = handler.clone();
443                Box::pin(async move {
444                    match tokio::task::spawn_blocking(move || h()).await {
445                        Ok(Ok(r))  => r.into_response(),
446                        Ok(Err(e)) => e.into_response(),
447                        Err(_) => {
448                            log::warn!("Sync handler panicked; returning 500");
449                            Rejection::internal().into_response()
450                        }
451                    }
452                })
453            }),
454        }
455    }
456}
457
458// ─── JsonSocketBuilder ────────────────────────────────────────────────────────
459
460/// Route builder that expects and deserialises a JSON request body of type `T`.
461///
462/// Obtained from [`ServerMechanism::json`]. Optionally attach shared state via
463/// [`state`](JsonSocketBuilder::state), or finalise immediately with
464/// [`onconnect`](JsonSocketBuilder::onconnect) /
465/// [`onconnect_sync`](JsonSocketBuilder::onconnect_sync).
466pub struct JsonSocketBuilder<T> {
467    base: ServerMechanism,
468    _phantom: std::marker::PhantomData<T>,
469}
470
471impl<T: DeserializeOwned + Send + 'static> JsonSocketBuilder<T> {
472    /// Finalises this route with an async handler that receives the deserialised JSON body.
473    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
474    where
475        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
476        Fut: Future<Output = Result<Re, Rejection>> + Send,
477        Re: Reply + Send,
478    {
479        log::debug!(
480            "Finalising async {:?} route at '{}' (JSON body)",
481            self.base.method, self.base.path
482        );
483        let method = self.base.method.to_http();
484        let path   = self.base.path.clone();
485        SocketType {
486            method,
487            path,
488            handler: Arc::new(move |req: IncomingRequest| {
489                let h = handler.clone();
490                Box::pin(async move {
491                    let body: T = match serde_json::from_slice(&req.body) {
492                        Ok(v)  => v,
493                        Err(e) => {
494                            log::debug!("JSON body parse failed: {}", e);
495                            return Rejection::bad_request().into_response();
496                        }
497                    };
498                    match h(body).await {
499                        Ok(r)  => r.into_response(),
500                        Err(e) => e.into_response(),
501                    }
502                })
503            }),
504        }
505    }
506
507    /// Finalises this route with a **synchronous** handler that receives the deserialised
508    /// JSON body.
509    ///
510    /// # Safety
511    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
512    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
513    where
514        F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
515        Re: Reply + Send + 'static,
516    {
517        log::warn!(
518            "Registering sync handler on {:?} '{}' (JSON body) — ensure rate-limiting is applied externally",
519            self.base.method, self.base.path
520        );
521        let method = self.base.method.to_http();
522        let path   = self.base.path.clone();
523        SocketType {
524            method,
525            path,
526            handler: Arc::new(move |req: IncomingRequest| {
527                let h = handler.clone();
528                Box::pin(async move {
529                    let body: T = match serde_json::from_slice(&req.body) {
530                        Ok(v)  => v,
531                        Err(e) => {
532                            log::debug!("JSON body parse failed (sync): {}", e);
533                            return Rejection::bad_request().into_response();
534                        }
535                    };
536                    match tokio::task::spawn_blocking(move || h(body)).await {
537                        Ok(Ok(r))  => r.into_response(),
538                        Ok(Err(e)) => e.into_response(),
539                        Err(_) => {
540                            log::warn!("Sync handler (JSON body) panicked; returning 500");
541                            Rejection::internal().into_response()
542                        }
543                    }
544                })
545            }),
546        }
547    }
548
549    /// Attaches shared state `S`, transitioning to [`StatefulJsonSocketBuilder`].
550    pub fn state<S: Clone + Send + Sync + 'static>(
551        self, state: S,
552    ) -> StatefulJsonSocketBuilder<T, S> {
553        StatefulJsonSocketBuilder {
554            base: self.base,
555            state,
556            _phantom: std::marker::PhantomData,
557        }
558    }
559}
560
561// ─── QuerySocketBuilder ───────────────────────────────────────────────────────
562
563/// Route builder that expects and deserialises URL query parameters of type `T`.
564///
565/// Obtained from [`ServerMechanism::query`]. Optionally attach shared state via
566/// [`state`](QuerySocketBuilder::state), or finalise immediately with
567/// [`onconnect`](QuerySocketBuilder::onconnect) /
568/// [`onconnect_sync`](QuerySocketBuilder::onconnect_sync).
569pub struct QuerySocketBuilder<T> {
570    base: ServerMechanism,
571    _phantom: std::marker::PhantomData<T>,
572}
573
574impl<T: DeserializeOwned + Send + 'static> QuerySocketBuilder<T> {
575    /// Finalises this route with an async handler that receives the deserialised query
576    /// parameters.
577    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
578    where
579        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
580        Fut: Future<Output = Result<Re, Rejection>> + Send,
581        Re: Reply + Send,
582    {
583        log::debug!(
584            "Finalising async {:?} route at '{}' (query params)",
585            self.base.method, self.base.path
586        );
587        let method = self.base.method.to_http();
588        let path   = self.base.path.clone();
589        SocketType {
590            method,
591            path,
592            handler: Arc::new(move |req: IncomingRequest| {
593                let h = handler.clone();
594                Box::pin(async move {
595                    let params: T = match serde_urlencoded::from_str(&req.query) {
596                        Ok(v)  => v,
597                        Err(e) => {
598                            log::debug!("query param parse failed: {}", e);
599                            return Rejection::bad_request().into_response();
600                        }
601                    };
602                    match h(params).await {
603                        Ok(r)  => r.into_response(),
604                        Err(e) => e.into_response(),
605                    }
606                })
607            }),
608        }
609    }
610
611    /// Finalises this route with a **synchronous** handler that receives the deserialised
612    /// query parameters.
613    ///
614    /// # Safety
615    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
616    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
617    where
618        F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
619        Re: Reply + Send + 'static,
620    {
621        log::warn!(
622            "Registering sync handler on {:?} '{}' (query params) — ensure rate-limiting is applied externally",
623            self.base.method, self.base.path
624        );
625        let method = self.base.method.to_http();
626        let path   = self.base.path.clone();
627        SocketType {
628            method,
629            path,
630            handler: Arc::new(move |req: IncomingRequest| {
631                let h = handler.clone();
632                Box::pin(async move {
633                    let params: T = match serde_urlencoded::from_str(&req.query) {
634                        Ok(v)  => v,
635                        Err(e) => {
636                            log::debug!("query param parse failed (sync): {}", e);
637                            return Rejection::bad_request().into_response();
638                        }
639                    };
640                    match tokio::task::spawn_blocking(move || h(params)).await {
641                        Ok(Ok(r))  => r.into_response(),
642                        Ok(Err(e)) => e.into_response(),
643                        Err(_) => {
644                            log::warn!("Sync handler (query params) panicked; returning 500");
645                            Rejection::internal().into_response()
646                        }
647                    }
648                })
649            }),
650        }
651    }
652
653    /// Attaches shared state `S`, transitioning to [`StatefulQuerySocketBuilder`].
654    pub fn state<S: Clone + Send + Sync + 'static>(
655        self, state: S,
656    ) -> StatefulQuerySocketBuilder<T, S> {
657        StatefulQuerySocketBuilder {
658            base: self.base,
659            state,
660            _phantom: std::marker::PhantomData,
661        }
662    }
663}
664
665// ─── StatefulSocketBuilder ────────────────────────────────────────────────────
666
667/// Route builder that carries shared state `S` with no body or query expectation.
668///
669/// Obtained from [`ServerMechanism::state`]. `S` must be `Clone + Send + Sync + 'static`.
670pub struct StatefulSocketBuilder<S> {
671    base: ServerMechanism,
672    state: S,
673}
674
675impl<S: Clone + Send + Sync + 'static> StatefulSocketBuilder<S> {
676    /// Adds a JSON body expectation, transitioning to [`StatefulJsonSocketBuilder`].
677    pub fn json<T: DeserializeOwned + Send>(self) -> StatefulJsonSocketBuilder<T, S> {
678        StatefulJsonSocketBuilder {
679            base: self.base,
680            state: self.state,
681            _phantom: std::marker::PhantomData,
682        }
683    }
684
685    /// Adds a query parameter expectation, transitioning to [`StatefulQuerySocketBuilder`].
686    pub fn query<T: DeserializeOwned + Send>(self) -> StatefulQuerySocketBuilder<T, S> {
687        StatefulQuerySocketBuilder {
688            base: self.base,
689            state: self.state,
690            _phantom: std::marker::PhantomData,
691        }
692    }
693
694    /// Adds an encrypted body expectation, transitioning to [`StatefulEncryptedBodyBuilder`].
695    pub fn encryption<T>(self, key: SerializationKey) -> StatefulEncryptedBodyBuilder<T, S> {
696        StatefulEncryptedBodyBuilder {
697            base: self.base,
698            key,
699            state: self.state,
700            _phantom: std::marker::PhantomData,
701        }
702    }
703
704    /// Adds an encrypted query expectation, transitioning to [`StatefulEncryptedQueryBuilder`].
705    pub fn encrypted_query<T>(self, key: SerializationKey) -> StatefulEncryptedQueryBuilder<T, S> {
706        StatefulEncryptedQueryBuilder {
707            base: self.base,
708            key,
709            state: self.state,
710            _phantom: std::marker::PhantomData,
711        }
712    }
713
714    /// Finalises this route with an async handler that receives only the shared state.
715    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
716    where
717        F: Fn(S) -> Fut + Clone + Send + Sync + 'static,
718        Fut: Future<Output = Result<Re, Rejection>> + Send,
719        Re: Reply + Send,
720    {
721        log::debug!(
722            "Finalising async {:?} route at '{}' (state)",
723            self.base.method, self.base.path
724        );
725        let method = self.base.method.to_http();
726        let path   = self.base.path.clone();
727        let state  = self.state;
728        SocketType {
729            method,
730            path,
731            handler: Arc::new(move |_req: IncomingRequest| {
732                let h = handler.clone();
733                let s = state.clone();
734                Box::pin(async move {
735                    match h(s).await {
736                        Ok(r)  => r.into_response(),
737                        Err(e) => e.into_response(),
738                    }
739                })
740            }),
741        }
742    }
743
744    /// Finalises this route with a **synchronous** handler that receives only the shared
745    /// state.
746    ///
747    /// # Safety
748    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
749    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
750    where
751        F: Fn(S) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
752        Re: Reply + Send + 'static,
753    {
754        log::warn!(
755            "Registering sync handler on {:?} '{}' (state) — ensure rate-limiting and lock-free state are in place",
756            self.base.method, self.base.path
757        );
758        let method = self.base.method.to_http();
759        let path   = self.base.path.clone();
760        let state  = self.state;
761        SocketType {
762            method,
763            path,
764            handler: Arc::new(move |_req: IncomingRequest| {
765                let h = handler.clone();
766                let s = state.clone();
767                Box::pin(async move {
768                    match tokio::task::spawn_blocking(move || h(s)).await {
769                        Ok(Ok(r))  => r.into_response(),
770                        Ok(Err(e)) => e.into_response(),
771                        Err(_) => {
772                            log::warn!("Sync handler (state) panicked; returning 500");
773                            Rejection::internal().into_response()
774                        }
775                    }
776                })
777            }),
778        }
779    }
780}
781
782// ─── StatefulJsonSocketBuilder ────────────────────────────────────────────────
783
784/// Route builder that carries shared state `S` and expects a JSON body of type `T`.
785///
786/// Obtained from [`JsonSocketBuilder::state`] or [`StatefulSocketBuilder::json`].
787pub struct StatefulJsonSocketBuilder<T, S> {
788    base:     ServerMechanism,
789    state:    S,
790    _phantom: std::marker::PhantomData<T>,
791}
792
793impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
794    StatefulJsonSocketBuilder<T, S>
795{
796    /// Finalises this route with an async handler that receives `(state: S, body: T)`.
797    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
798    where
799        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
800        Fut: Future<Output = Result<Re, Rejection>> + Send,
801        Re: Reply + Send,
802    {
803        log::debug!(
804            "Finalising async {:?} route at '{}' (state + JSON body)",
805            self.base.method, self.base.path
806        );
807        let method = self.base.method.to_http();
808        let path   = self.base.path.clone();
809        let state  = self.state;
810        SocketType {
811            method,
812            path,
813            handler: Arc::new(move |req: IncomingRequest| {
814                let h = handler.clone();
815                let s = state.clone();
816                Box::pin(async move {
817                    let body: T = match serde_json::from_slice(&req.body) {
818                        Ok(v)  => v,
819                        Err(e) => {
820                            log::debug!("JSON body parse failed (state): {}", e);
821                            return Rejection::bad_request().into_response();
822                        }
823                    };
824                    match h(s, body).await {
825                        Ok(r)  => r.into_response(),
826                        Err(e) => e.into_response(),
827                    }
828                })
829            }),
830        }
831    }
832
833    /// Finalises this route with a **synchronous** handler that receives `(state: S, body: T)`.
834    ///
835    /// # Safety
836    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
837    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
838    where
839        F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
840        Re: Reply + Send + 'static,
841    {
842        log::warn!(
843            "Registering sync handler on {:?} '{}' (state + JSON body) — ensure rate-limiting and lock-free state are in place",
844            self.base.method, self.base.path
845        );
846        let method = self.base.method.to_http();
847        let path   = self.base.path.clone();
848        let state  = self.state;
849        SocketType {
850            method,
851            path,
852            handler: Arc::new(move |req: IncomingRequest| {
853                let h = handler.clone();
854                let s = state.clone();
855                Box::pin(async move {
856                    let body: T = match serde_json::from_slice(&req.body) {
857                        Ok(v)  => v,
858                        Err(e) => {
859                            log::debug!("JSON body parse failed (state+sync): {}", e);
860                            return Rejection::bad_request().into_response();
861                        }
862                    };
863                    match tokio::task::spawn_blocking(move || h(s, body)).await {
864                        Ok(Ok(r))  => r.into_response(),
865                        Ok(Err(e)) => e.into_response(),
866                        Err(_) => {
867                            log::warn!("Sync handler (state + JSON body) panicked; returning 500");
868                            Rejection::internal().into_response()
869                        }
870                    }
871                })
872            }),
873        }
874    }
875}
876
877// ─── StatefulQuerySocketBuilder ───────────────────────────────────────────────
878
879/// Route builder that carries shared state `S` and expects URL query parameters of type `T`.
880///
881/// Obtained from [`QuerySocketBuilder::state`] or [`StatefulSocketBuilder::query`].
882pub struct StatefulQuerySocketBuilder<T, S> {
883    base:     ServerMechanism,
884    state:    S,
885    _phantom: std::marker::PhantomData<T>,
886}
887
888impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
889    StatefulQuerySocketBuilder<T, S>
890{
891    /// Finalises this route with an async handler that receives `(state: S, query: T)`.
892    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
893    where
894        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
895        Fut: Future<Output = Result<Re, Rejection>> + Send,
896        Re: Reply + Send,
897    {
898        log::debug!(
899            "Finalising async {:?} route at '{}' (state + query params)",
900            self.base.method, self.base.path
901        );
902        let method = self.base.method.to_http();
903        let path   = self.base.path.clone();
904        let state  = self.state;
905        SocketType {
906            method,
907            path,
908            handler: Arc::new(move |req: IncomingRequest| {
909                let h = handler.clone();
910                let s = state.clone();
911                Box::pin(async move {
912                    let params: T = match serde_urlencoded::from_str(&req.query) {
913                        Ok(v)  => v,
914                        Err(e) => {
915                            log::debug!("query param parse failed (state): {}", e);
916                            return Rejection::bad_request().into_response();
917                        }
918                    };
919                    match h(s, params).await {
920                        Ok(r)  => r.into_response(),
921                        Err(e) => e.into_response(),
922                    }
923                })
924            }),
925        }
926    }
927
928    /// Finalises this route with a **synchronous** handler that receives `(state: S, query: T)`.
929    ///
930    /// # Safety
931    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
932    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
933    where
934        F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
935        Re: Reply + Send + 'static,
936    {
937        log::warn!(
938            "Registering sync handler on {:?} '{}' (state + query params) — ensure rate-limiting and lock-free state are in place",
939            self.base.method, self.base.path
940        );
941        let method = self.base.method.to_http();
942        let path   = self.base.path.clone();
943        let state  = self.state;
944        SocketType {
945            method,
946            path,
947            handler: Arc::new(move |req: IncomingRequest| {
948                let h = handler.clone();
949                let s = state.clone();
950                Box::pin(async move {
951                    let params: T = match serde_urlencoded::from_str(&req.query) {
952                        Ok(v)  => v,
953                        Err(e) => {
954                            log::debug!("query param parse failed (state+sync): {}", e);
955                            return Rejection::bad_request().into_response();
956                        }
957                    };
958                    match tokio::task::spawn_blocking(move || h(s, params)).await {
959                        Ok(Ok(r))  => r.into_response(),
960                        Ok(Err(e)) => e.into_response(),
961                        Err(_) => {
962                            log::warn!("Sync handler (state + query params) panicked; returning 500");
963                            Rejection::internal().into_response()
964                        }
965                    }
966                })
967            }),
968        }
969    }
970}
971
972// ─── EncryptedBodyBuilder ────────────────────────────────────────────────────
973
974/// Route builder that expects an authenticated-encrypted request body of type `T`
975/// (ChaCha20-Poly1305).
976///
977/// Obtained from [`ServerMechanism::encryption`].  On each matching request the raw
978/// body bytes are decrypted using the [`SerializationKey`] supplied there.  If
979/// decryption fails the route immediately returns `403 Forbidden`.
980///
981/// Optionally attach shared state via [`state`](EncryptedBodyBuilder::state) before
982/// finalising with [`onconnect`](EncryptedBodyBuilder::onconnect) /
983/// [`onconnect_sync`](EncryptedBodyBuilder::onconnect_sync).
984pub struct EncryptedBodyBuilder<T> {
985    base:     ServerMechanism,
986    key:      SerializationKey,
987    _phantom: std::marker::PhantomData<T>,
988}
989
990impl<T> EncryptedBodyBuilder<T>
991where
992    T: bincode::Decode<()> + Send + 'static,
993{
994    /// Finalises this route with an async handler that receives the decrypted body as `T`.
995    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
996    where
997        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
998        Fut: Future<Output = Result<Re, Rejection>> + Send,
999        Re: Reply + Send,
1000    {
1001        log::debug!(
1002            "Finalising async {:?} route at '{}' (encrypted body)",
1003            self.base.method, self.base.path
1004        );
1005        let method = self.base.method.to_http();
1006        let path   = self.base.path.clone();
1007        let key    = self.key;
1008        SocketType {
1009            method,
1010            path,
1011            handler: Arc::new(move |req: IncomingRequest| {
1012                let h   = handler.clone();
1013                let key = key.clone();
1014                Box::pin(async move {
1015                    let value: T = match decode_body(&req.body, &key) {
1016                        Ok(v)  => v,
1017                        Err(e) => return e.into_response(),
1018                    };
1019                    match h(value).await {
1020                        Ok(r)  => r.into_response(),
1021                        Err(e) => e.into_response(),
1022                    }
1023                })
1024            }),
1025        }
1026    }
1027
1028    /// Finalises this route with a **synchronous** handler that receives the decrypted
1029    /// body as `T`.
1030    ///
1031    /// # Safety
1032    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
1033    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
1034    where
1035        F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
1036        Re: Reply + Send + 'static,
1037    {
1038        log::warn!(
1039            "Registering sync handler on {:?} '{}' (encrypted body) — ensure rate-limiting is applied externally",
1040            self.base.method, self.base.path
1041        );
1042        let method = self.base.method.to_http();
1043        let path   = self.base.path.clone();
1044        let key    = self.key;
1045        SocketType {
1046            method,
1047            path,
1048            handler: Arc::new(move |req: IncomingRequest| {
1049                let h   = handler.clone();
1050                let key = key.clone();
1051                Box::pin(async move {
1052                    let value: T = match decode_body(&req.body, &key) {
1053                        Ok(v)  => v,
1054                        Err(e) => return e.into_response(),
1055                    };
1056                    match tokio::task::spawn_blocking(move || h(value)).await {
1057                        Ok(Ok(r))  => r.into_response(),
1058                        Ok(Err(e)) => e.into_response(),
1059                        Err(_) => {
1060                            log::warn!("Sync encrypted handler panicked; returning 500");
1061                            Rejection::internal().into_response()
1062                        }
1063                    }
1064                })
1065            }),
1066        }
1067    }
1068
1069    /// Attaches shared state `S`, transitioning to [`StatefulEncryptedBodyBuilder`].
1070    pub fn state<S: Clone + Send + Sync + 'static>(
1071        self, state: S,
1072    ) -> StatefulEncryptedBodyBuilder<T, S> {
1073        StatefulEncryptedBodyBuilder {
1074            base: self.base,
1075            key: self.key,
1076            state,
1077            _phantom: std::marker::PhantomData,
1078        }
1079    }
1080}
1081
1082// ─── EncryptedQueryBuilder ────────────────────────────────────────────────────
1083
1084/// Route builder that expects authenticated-encrypted URL query parameters of type `T`
1085/// (ChaCha20-Poly1305).
1086///
1087/// Obtained from [`ServerMechanism::encrypted_query`].  The client must send a single
1088/// `?data=<base64url>` query parameter.  Any failure returns `403 Forbidden`.
1089///
1090/// Optionally attach shared state via [`state`](EncryptedQueryBuilder::state) before
1091/// finalising with [`onconnect`](EncryptedQueryBuilder::onconnect).
1092pub struct EncryptedQueryBuilder<T> {
1093    base:     ServerMechanism,
1094    key:      SerializationKey,
1095    _phantom: std::marker::PhantomData<T>,
1096}
1097
1098impl<T> EncryptedQueryBuilder<T>
1099where
1100    T: bincode::Decode<()> + Send + 'static,
1101{
1102    /// Finalises this route with an async handler that receives the decrypted query
1103    /// parameters as `T`.
1104    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1105    where
1106        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
1107        Fut: Future<Output = Result<Re, Rejection>> + Send,
1108        Re: Reply + Send,
1109    {
1110        log::debug!(
1111            "Finalising async {:?} route at '{}' (encrypted query)",
1112            self.base.method, self.base.path
1113        );
1114        let method = self.base.method.to_http();
1115        let path   = self.base.path.clone();
1116        let key    = self.key;
1117        SocketType {
1118            method,
1119            path,
1120            handler: Arc::new(move |req: IncomingRequest| {
1121                let h   = handler.clone();
1122                let key = key.clone();
1123                Box::pin(async move {
1124                    let value: T = match decode_query(&req.query, &key) {
1125                        Ok(v)  => v,
1126                        Err(e) => return e.into_response(),
1127                    };
1128                    match h(value).await {
1129                        Ok(r)  => r.into_response(),
1130                        Err(e) => e.into_response(),
1131                    }
1132                })
1133            }),
1134        }
1135    }
1136
1137    /// Attaches shared state `S`, transitioning to [`StatefulEncryptedQueryBuilder`].
1138    pub fn state<S: Clone + Send + Sync + 'static>(
1139        self, state: S,
1140    ) -> StatefulEncryptedQueryBuilder<T, S> {
1141        StatefulEncryptedQueryBuilder {
1142            base: self.base,
1143            key: self.key,
1144            state,
1145            _phantom: std::marker::PhantomData,
1146        }
1147    }
1148}
1149
1150// ─── StatefulEncryptedBodyBuilder ────────────────────────────────────────────
1151
1152/// Route builder carrying shared state `S` and an authenticated-encrypted request body
1153/// of type `T` (ChaCha20-Poly1305).
1154///
1155/// Obtained from [`EncryptedBodyBuilder::state`] or [`StatefulSocketBuilder::encryption`].
1156pub struct StatefulEncryptedBodyBuilder<T, S> {
1157    base:     ServerMechanism,
1158    key:      SerializationKey,
1159    state:    S,
1160    _phantom: std::marker::PhantomData<T>,
1161}
1162
1163impl<T, S> StatefulEncryptedBodyBuilder<T, S>
1164where
1165    T: bincode::Decode<()> + Send + 'static,
1166    S: Clone + Send + Sync + 'static,
1167{
1168    /// Finalises this route with an async handler that receives `(state: S, body: T)`.
1169    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1170    where
1171        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1172        Fut: Future<Output = Result<Re, Rejection>> + Send,
1173        Re: Reply + Send,
1174    {
1175        log::debug!(
1176            "Finalising async {:?} route at '{}' (state + encrypted body)",
1177            self.base.method, self.base.path
1178        );
1179        let method = self.base.method.to_http();
1180        let path   = self.base.path.clone();
1181        let key    = self.key;
1182        let state  = self.state;
1183        SocketType {
1184            method,
1185            path,
1186            handler: Arc::new(move |req: IncomingRequest| {
1187                let h   = handler.clone();
1188                let key = key.clone();
1189                let s   = state.clone();
1190                Box::pin(async move {
1191                    let value: T = match decode_body(&req.body, &key) {
1192                        Ok(v)  => v,
1193                        Err(e) => return e.into_response(),
1194                    };
1195                    match h(s, value).await {
1196                        Ok(r)  => r.into_response(),
1197                        Err(e) => e.into_response(),
1198                    }
1199                })
1200            }),
1201        }
1202    }
1203}
1204
1205// ─── StatefulEncryptedQueryBuilder ───────────────────────────────────────────
1206
1207/// Route builder carrying shared state `S` and authenticated-encrypted query parameters
1208/// of type `T` (ChaCha20-Poly1305).
1209///
1210/// Obtained from [`EncryptedQueryBuilder::state`] or
1211/// [`StatefulSocketBuilder::encrypted_query`].
1212pub struct StatefulEncryptedQueryBuilder<T, S> {
1213    base:     ServerMechanism,
1214    key:      SerializationKey,
1215    state:    S,
1216    _phantom: std::marker::PhantomData<T>,
1217}
1218
1219impl<T, S> StatefulEncryptedQueryBuilder<T, S>
1220where
1221    T: bincode::Decode<()> + Send + 'static,
1222    S: Clone + Send + Sync + 'static,
1223{
1224    /// Finalises this route with an async handler that receives `(state: S, query: T)`.
1225    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1226    where
1227        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1228        Fut: Future<Output = Result<Re, Rejection>> + Send,
1229        Re: Reply + Send,
1230    {
1231        log::debug!(
1232            "Finalising async {:?} route at '{}' (state + encrypted query)",
1233            self.base.method, self.base.path
1234        );
1235        let method = self.base.method.to_http();
1236        let path   = self.base.path.clone();
1237        let key    = self.key;
1238        let state  = self.state;
1239        SocketType {
1240            method,
1241            path,
1242            handler: Arc::new(move |req: IncomingRequest| {
1243                let h   = handler.clone();
1244                let key = key.clone();
1245                let s   = state.clone();
1246                Box::pin(async move {
1247                    let value: T = match decode_query(&req.query, &key) {
1248                        Ok(v)  => v,
1249                        Err(e) => return e.into_response(),
1250                    };
1251                    match h(s, value).await {
1252                        Ok(r)  => r.into_response(),
1253                        Err(e) => e.into_response(),
1254                    }
1255                })
1256            }),
1257        }
1258    }
1259}
1260
1261// ─── Internal decode helpers ─────────────────────────────────────────────────
1262
1263/// Decrypt an authenticated-encrypted request body into `T`, returning 403 on failure.
1264fn decode_body<T: bincode::Decode<()>>(
1265    raw: &bytes::Bytes,
1266    key: &SerializationKey,
1267) -> Result<T, Rejection> {
1268    crate::serialization::open(raw, key.veil_key()).map_err(|e| {
1269        log::debug!("body decryption failed (key mismatch or corrupt body): {}", e);
1270        Rejection::forbidden()
1271    })
1272}
1273
1274/// Decrypt an authenticated-encrypted `?data=<base64url>` query into `T`, returning 403 on failure.
1275fn decode_query<T: bincode::Decode<()>>(
1276    raw_query: &str,
1277    key: &SerializationKey,
1278) -> Result<T, Rejection> {
1279    use base64::Engine;
1280
1281    #[derive(serde::Deserialize)]
1282    struct DataParam { data: String }
1283
1284    let q: DataParam = serde_urlencoded::from_str(raw_query).map_err(|_| {
1285        log::debug!("encrypted query missing `data` parameter");
1286        Rejection::forbidden()
1287    })?;
1288
1289    let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
1290        .decode(&q.data)
1291        .map_err(|e| {
1292            log::debug!("base64url decode failed: {}", e);
1293            Rejection::forbidden()
1294        })?;
1295
1296    crate::serialization::open(&bytes, key.veil_key()).map_err(|e| {
1297        log::debug!("query decryption failed: {}", e);
1298        Rejection::forbidden()
1299    })
1300}
1301
1302/// Returns a `403 Forbidden` rejection.
1303///
1304/// Use in route handlers to deny access:
1305/// ```rust,no_run
1306/// # use toolkit_zero::socket::server::*;
1307/// # let _ = ServerMechanism::post("/secure")
1308/// #     .onconnect(|| async { Err::<EmptyReply, _>(forbidden()) });
1309/// ```
1310pub fn forbidden() -> Rejection {
1311    Rejection::forbidden()
1312}
1313
1314// ─── Status ───────────────────────────────────────────────────────────────────
1315
1316/// A collection of common HTTP status codes used with the reply helpers.
1317///
1318/// Converts into `http::StatusCode` via [`From`] and is accepted by
1319/// [`reply_with_status`] and [`reply_with_status_and_json`].  Also usable directly
1320/// in the [`reply!`] macro via the `status => Status::X` argument.
1321#[derive(Clone, Copy, Debug)]
1322pub enum Status {
1323    // 2xx
1324    Ok,
1325    Created,
1326    Accepted,
1327    NoContent,
1328    // 3xx
1329    MovedPermanently,
1330    Found,
1331    NotModified,
1332    TemporaryRedirect,
1333    PermanentRedirect,
1334    // 4xx
1335    BadRequest,
1336    Unauthorized,
1337    Forbidden,
1338    NotFound,
1339    MethodNotAllowed,
1340    Conflict,
1341    Gone,
1342    UnprocessableEntity,
1343    TooManyRequests,
1344    // 5xx
1345    InternalServerError,
1346    NotImplemented,
1347    BadGateway,
1348    ServiceUnavailable,
1349    GatewayTimeout,
1350}
1351
1352impl From<Status> for http::StatusCode {
1353    fn from(s: Status) -> Self {
1354        match s {
1355            Status::Ok                  => http::StatusCode::OK,
1356            Status::Created             => http::StatusCode::CREATED,
1357            Status::Accepted            => http::StatusCode::ACCEPTED,
1358            Status::NoContent           => http::StatusCode::NO_CONTENT,
1359            Status::MovedPermanently    => http::StatusCode::MOVED_PERMANENTLY,
1360            Status::Found               => http::StatusCode::FOUND,
1361            Status::NotModified         => http::StatusCode::NOT_MODIFIED,
1362            Status::TemporaryRedirect   => http::StatusCode::TEMPORARY_REDIRECT,
1363            Status::PermanentRedirect   => http::StatusCode::PERMANENT_REDIRECT,
1364            Status::BadRequest          => http::StatusCode::BAD_REQUEST,
1365            Status::Unauthorized        => http::StatusCode::UNAUTHORIZED,
1366            Status::Forbidden           => http::StatusCode::FORBIDDEN,
1367            Status::NotFound            => http::StatusCode::NOT_FOUND,
1368            Status::MethodNotAllowed    => http::StatusCode::METHOD_NOT_ALLOWED,
1369            Status::Conflict            => http::StatusCode::CONFLICT,
1370            Status::Gone                => http::StatusCode::GONE,
1371            Status::UnprocessableEntity => http::StatusCode::UNPROCESSABLE_ENTITY,
1372            Status::TooManyRequests     => http::StatusCode::TOO_MANY_REQUESTS,
1373            Status::InternalServerError => http::StatusCode::INTERNAL_SERVER_ERROR,
1374            Status::NotImplemented      => http::StatusCode::NOT_IMPLEMENTED,
1375            Status::BadGateway          => http::StatusCode::BAD_GATEWAY,
1376            Status::ServiceUnavailable  => http::StatusCode::SERVICE_UNAVAILABLE,
1377            Status::GatewayTimeout      => http::StatusCode::GATEWAY_TIMEOUT,
1378        }
1379    }
1380}
1381
1382// ─── Server ───────────────────────────────────────────────────────────────────
1383
1384/// The HTTP server that owns and dispatches a collection of [`SocketType`] routes.
1385///
1386/// Build routes through the [`ServerMechanism`] builder chain, register each with
1387/// [`mechanism`](Server::mechanism), then start the server with [`serve`](Server::serve).
1388///
1389/// # Example
1390/// ```rust,no_run
1391/// # use toolkit_zero::socket::server::{Server, ServerMechanism, Status};
1392/// # use toolkit_zero::socket::server::reply;
1393/// # use serde::Serialize;
1394/// # #[derive(Serialize)] struct Pong { ok: bool }
1395///
1396/// let mut server = Server::default();
1397///
1398/// server
1399///     .mechanism(
1400///         ServerMechanism::get("/ping")
1401///             .onconnect(|| async { reply!(json => Pong { ok: true }) })
1402///     )
1403///     .mechanism(
1404///         ServerMechanism::delete("/session")
1405///             .onconnect(|| async { reply!() })
1406///     );
1407///
1408/// // Blocks forever — call only to actually run the server:
1409/// // server.serve(([0, 0, 0, 0], 8080)).await;
1410/// ```
1411///
1412/// # Caution
1413/// Calling [`serve`](Server::serve) with no routes registered will **panic**.
1414pub struct Server {
1415    mechanisms: Vec<SocketType>,
1416    /// Default bind address, set by [`rebind`](Server::rebind).
1417    bind_addr: Option<std::net::SocketAddr>,
1418}
1419
1420impl Default for Server {
1421    fn default() -> Self {
1422        Self::new()
1423    }
1424}
1425
1426impl Server {
1427    fn new() -> Self {
1428        Self { mechanisms: Vec::new(), bind_addr: None }
1429    }
1430
1431    /// Registers a [`SocketType`] route on this server.
1432    ///
1433    /// Routes are evaluated in registration order.  Returns `&mut Self` for chaining.
1434    pub fn mechanism(&mut self, mech: SocketType) -> &mut Self {
1435        self.mechanisms.push(mech);
1436        log::debug!("Route registered (total: {})", self.mechanisms.len());
1437        self
1438    }
1439
1440    /// Binds to `addr` and starts serving all registered routes.
1441    ///
1442    /// Returns a [`ServerFuture`] that can be:
1443    /// - **`.await`'d** — runs the server in the current task (infinite loop)
1444    /// - **`.background()`'d** — spawns the server as a Tokio background task
1445    ///
1446    /// # Panics
1447    /// Panics if no routes have been registered or if the address cannot be bound.
1448    pub fn serve(self, addr: impl Into<SocketAddr>) -> ServerFuture {
1449        let addr   = addr.into();
1450        let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1451        ServerFuture::new(async move {
1452            log::info!("Server binding to {}", addr);
1453            run_hyper_server(routes, addr, std::future::pending::<()>()).await;
1454        })
1455    }
1456
1457    /// Binds to `addr`, serves all registered routes, and shuts down gracefully when
1458    /// `shutdown` resolves.
1459    ///
1460    /// Returns a [`ServerFuture`] that can be `.await`'d or `.background()`'d.
1461    ///
1462    /// # Example
1463    ///
1464    /// ```rust,no_run
1465    /// use tokio::sync::oneshot;
1466    ///
1467    /// # #[tokio::main]
1468    /// # async fn main() {
1469    /// let (tx, rx) = oneshot::channel::<()>();
1470    /// # let mut server = toolkit_zero::socket::server::Server::default();
1471    ///
1472    /// let handle = server.serve_with_graceful_shutdown(
1473    ///     ([127, 0, 0, 1], 8080),
1474    ///     async move { rx.await.ok(); },
1475    /// ).background();
1476    /// tx.send(()).ok();
1477    /// handle.await.ok();
1478    /// # }
1479    /// ```
1480    pub fn serve_with_graceful_shutdown(
1481        self,
1482        addr: impl Into<std::net::SocketAddr>,
1483        shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1484    ) -> ServerFuture {
1485        let addr   = addr.into();
1486        let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1487        ServerFuture::new(async move {
1488            log::info!("Server binding to {} (graceful shutdown enabled)", addr);
1489            run_hyper_server(routes, addr, shutdown).await;
1490        })
1491    }
1492
1493    /// Serves all registered routes from an already-bound `listener`, shutting down
1494    /// gracefully when `shutdown` resolves.
1495    ///
1496    /// Returns a [`ServerFuture`] that can be `.await`'d or `.background()`'d.
1497    ///
1498    /// Use this when port `0` is passed to `TcpListener::bind` and you need to know
1499    /// the actual OS-assigned port before the server starts.
1500    ///
1501    /// # Example
1502    ///
1503    /// ```rust,no_run
1504    /// use tokio::net::TcpListener;
1505    /// use tokio::sync::oneshot;
1506    ///
1507    /// # #[tokio::main]
1508    /// # async fn main() -> std::io::Result<()> {
1509    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
1510    /// let port = listener.local_addr()?.port();
1511    ///
1512    /// let (tx, rx) = oneshot::channel::<()>();
1513    /// # let mut server = toolkit_zero::socket::server::Server::default();
1514    ///
1515    /// let handle = server
1516    ///     .serve_from_listener(listener, async move { rx.await.ok(); })
1517    ///     .background();
1518    /// tx.send(()).ok();
1519    /// handle.await.ok();
1520    /// # Ok(())
1521    /// # }
1522    /// ```
1523    pub fn serve_from_listener(
1524        self,
1525        listener: tokio::net::TcpListener,
1526        shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1527    ) -> ServerFuture {
1528        let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1529        ServerFuture::new(async move {
1530            log::info!(
1531                "Server running on {} (graceful shutdown enabled)",
1532                listener.local_addr().map(|a| a.to_string()).unwrap_or_else(|_| "?".into())
1533            );
1534            run_hyper_server_inner(routes, listener, shutdown).await;
1535        })
1536    }
1537
1538    /// Stores `addr` as this server's default bind address.
1539    ///
1540    /// This is a pre-serve convenience setter.  Call it before
1541    /// [`serve_managed`](Server::serve_managed) or any other `serve*` variant to
1542    /// record the initial address without starting the server.
1543    ///
1544    /// Returns `&mut Self` for method chaining.
1545    pub fn rebind(&mut self, addr: impl Into<std::net::SocketAddr>) -> &mut Self {
1546        self.bind_addr = Some(addr.into());
1547        log::debug!("Default bind address updated to {:?}", self.bind_addr);
1548        self
1549    }
1550
1551    /// Starts all registered routes in a background Tokio task and returns a
1552    /// [`BackgroundServer`] handle.
1553    ///
1554    /// Unlike `serve*` + `.background()`, this method keeps a live route table
1555    /// inside the handle, enabling:
1556    ///
1557    /// - [`BackgroundServer::rebind`]     — graceful stop + restart on a new address
1558    /// - [`BackgroundServer::mechanism`]   — add routes **without restarting**
1559    /// - [`BackgroundServer::addr`]        — query the current bind address
1560    /// - [`BackgroundServer::stop`]        — shut down and await completion
1561    ///
1562    /// # Panics
1563    /// Panics if no routes have been registered.
1564    ///
1565    /// # Example
1566    /// ```rust,no_run
1567    /// # use toolkit_zero::socket::server::Server;
1568    /// # use serde::Serialize;
1569    /// # use toolkit_zero::socket::server::reply;
1570    /// # #[derive(Serialize)] struct Pong { ok: bool }
1571    /// # #[tokio::main]
1572    /// # async fn main() {
1573    /// let mut server = Server::default();
1574    /// server.mechanism(
1575    ///     toolkit_zero::socket::server::ServerMechanism::get("/ping")
1576    ///         .onconnect(|| async { reply!(json => Pong { ok: true }) })
1577    /// );
1578    ///
1579    /// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1580    /// println!("Running on {}", bg.addr());
1581    ///
1582    /// bg.rebind(([127, 0, 0, 1], 9090)).await;
1583    /// println!("Rebound to {}", bg.addr());
1584    ///
1585    /// bg.stop().await;
1586    /// # }
1587    /// ```
1588    pub fn serve_managed(self, addr: impl Into<std::net::SocketAddr>) -> BackgroundServer {
1589        let addr   = addr.into();
1590        let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1591        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1592        let routes_ref = Arc::clone(&routes);
1593        let handle = tokio::spawn(run_hyper_server(
1594            routes_ref,
1595            addr,
1596            async { rx.await.ok(); },
1597        ));
1598        BackgroundServer {
1599            routes,
1600            addr,
1601            shutdown_tx: Some(tx),
1602            handle: Some(handle),
1603        }
1604    }
1605}
1606
1607// ─── ServerFuture ────────────────────────────────────────────────────────────
1608
1609/// Opaque future returned by [`Server::serve`], [`Server::serve_with_graceful_shutdown`],
1610/// and [`Server::serve_from_listener`].
1611///
1612/// A `ServerFuture` can be used in two ways:
1613///
1614/// - **`.await`** — drives the server inline in the current task.
1615/// - **`.background()`** — spawns the server on a new Tokio task and returns a
1616///   [`tokio::task::JoinHandle<()>`] immediately.
1617pub struct ServerFuture(Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
1618
1619impl ServerFuture {
1620    fn new(fut: impl Future<Output = ()> + Send + 'static) -> Self {
1621        Self(Box::pin(fut))
1622    }
1623
1624    /// Spawns the server on a new Tokio background task and returns a `JoinHandle<()>`.
1625    ///
1626    /// # Panics
1627    /// Panics if called outside a Tokio runtime.
1628    pub fn background(self) -> tokio::task::JoinHandle<()> {
1629        tokio::spawn(self.0)
1630    }
1631}
1632
1633impl std::future::IntoFuture for ServerFuture {
1634    type Output     = ();
1635    type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
1636    fn into_future(self) -> Self::IntoFuture {
1637        self.0
1638    }
1639}
1640
1641// ─── Internal: dispatch + server loop ────────────────────────────────────────
1642
1643/// Dispatch a single hyper request to the matching route handler.
1644async fn dispatch(
1645    routes: &Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1646    req: hyper::Request<hyper::body::Incoming>,
1647) -> http::Response<bytes::Bytes> {
1648    use http_body_util::BodyExt;
1649
1650    let (parts, body) = req.into_parts();
1651    let path    = parts.uri.path().to_owned();
1652    let query   = parts.uri.query().unwrap_or("").to_owned();
1653    let method  = parts.method.clone();
1654    let headers = parts.headers.clone();
1655
1656    // Collect body bytes before acquiring the route lock.
1657    let body_bytes = match body.collect().await {
1658        Ok(c) => c.to_bytes(),
1659        Err(e) => {
1660            log::debug!("failed to read request body: {}", e);
1661            return http::Response::builder()
1662                .status(http::StatusCode::BAD_REQUEST)
1663                .body(bytes::Bytes::new())
1664                .unwrap();
1665        }
1666    };
1667
1668    // Hold lock only long enough to clone the matching handler Arc.
1669    let handler = {
1670        let guard = routes.read().await;
1671        guard
1672            .iter()
1673            .find(|s| s.method == method && path_matches(&s.path, &path))
1674            .map(|s| Arc::clone(&s.handler))
1675    };
1676
1677    match handler {
1678        Some(h) => {
1679            h(IncomingRequest { body: body_bytes, query, headers }).await
1680        }
1681        None => {
1682            log::debug!("No route matched {} {}", method, path);
1683            http::Response::builder()
1684                .status(http::StatusCode::NOT_FOUND)
1685                .body(bytes::Bytes::new())
1686                .unwrap()
1687        }
1688    }
1689}
1690
1691/// Core server loop — drives an already-bound listener with graceful shutdown.
1692///
1693/// Uses `hyper::server::conn::http1::Builder` (HTTP/1.1) whose `Connection<IO, S>`
1694/// has no lifetime tied to the builder, making it `'static` and compatible with
1695/// `tokio::spawn` and `GracefulShutdown::watch`.
1696async fn run_hyper_server_inner(
1697    routes:   Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1698    listener: tokio::net::TcpListener,
1699    shutdown: impl Future<Output = ()> + Send + 'static,
1700) {
1701    use hyper_util::server::graceful::GracefulShutdown;
1702    use hyper_util::rt::TokioIo;
1703    use hyper::server::conn::http1;
1704
1705    let graceful = GracefulShutdown::new();
1706    let mut shutdown = std::pin::pin!(shutdown);
1707
1708    loop {
1709        tokio::select! {
1710            result = listener.accept() => {
1711                let (stream, remote) = match result {
1712                    Ok(pair) => pair,
1713                    Err(e) => {
1714                        log::warn!("accept error: {}", e);
1715                        continue;
1716                    }
1717                };
1718                log::trace!("accepted connection from {}", remote);
1719
1720                let routes_ref = Arc::clone(&routes);
1721                // http1::Builder::serve_connection returns Connection<IO, S> with no
1722                // builder-lifetime, so it is 'static when IO + S are 'static.
1723                let conn = http1::Builder::new().serve_connection(
1724                    TokioIo::new(stream),
1725                    hyper::service::service_fn(move |req| {
1726                        let r = Arc::clone(&routes_ref);
1727                        async move {
1728                            let resp = dispatch(&r, req).await;
1729                            let (parts, body) = resp.into_parts();
1730                            Ok::<_, std::convert::Infallible>(
1731                                http::Response::from_parts(
1732                                    parts,
1733                                    http_body_util::Full::new(body),
1734                                )
1735                            )
1736                        }
1737                    }),
1738                );
1739                let fut = graceful.watch(conn);
1740                tokio::spawn(async move {
1741                    if let Err(e) = fut.await {
1742                        log::debug!("connection error: {}", e);
1743                    }
1744                });
1745            }
1746            _ = &mut shutdown => {
1747                log::info!("shutdown signal received — draining in-flight connections");
1748                break;
1749            }
1750        }
1751    }
1752
1753    // Free the TCP port so a rebound server can bind immediately.
1754    drop(listener);
1755
1756    // Block until every in-flight request completes.
1757    graceful.shutdown().await;
1758    log::info!("all connections drained");
1759}
1760
1761/// Bind to `addr` then delegate to [`run_hyper_server_inner`].
1762async fn run_hyper_server(
1763    routes:   Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1764    addr:     SocketAddr,
1765    shutdown: impl Future<Output = ()> + Send + 'static,
1766) {
1767    let listener = match tokio::net::TcpListener::bind(addr).await {
1768        Ok(l) => {
1769            log::info!("server bound to {}", addr);
1770            l
1771        }
1772        Err(e) => {
1773            log::error!("failed to bind {}: {}", addr, e);
1774            panic!("server bind failed: {}", e);
1775        }
1776    };
1777    run_hyper_server_inner(routes, listener, shutdown).await;
1778}
1779
1780// ─── BackgroundServer ─────────────────────────────────────────────────────────
1781
1782/// A managed, background HTTP server returned by [`Server::serve_managed`].
1783///
1784/// The server starts as soon as `serve_managed` is called.  Use this handle to:
1785///
1786/// - [`addr`](BackgroundServer::addr) — query the current bind address
1787/// - [`rebind`](BackgroundServer::rebind) — gracefully migrate to a new address
1788/// - [`mechanism`](BackgroundServer::mechanism) — add a route live, **no restart**
1789/// - [`stop`](BackgroundServer::stop) — shut down and await the task
1790///
1791/// # Routes are preserved across `rebind`
1792/// All routes registered before [`serve_managed`](Server::serve_managed), plus any
1793/// added via [`mechanism`](BackgroundServer::mechanism), are automatically carried over when
1794/// [`rebind`](BackgroundServer::rebind) restarts the server.
1795///
1796/// # Ownership
1797/// Dropping a `BackgroundServer` without calling [`stop`](BackgroundServer::stop)
1798/// leaves the background task running until the Tokio runtime exits.
1799///
1800/// # Example
1801/// ```rust,no_run
1802/// # use toolkit_zero::socket::server::Server;
1803/// # use serde::Serialize;
1804/// # use toolkit_zero::socket::server::reply;
1805/// # #[derive(Serialize)] struct Pong { ok: bool }
1806/// # #[tokio::main]
1807/// # async fn main() {
1808/// let mut server = Server::default();
1809/// server.mechanism(
1810///     toolkit_zero::socket::server::ServerMechanism::get("/ping")
1811///         .onconnect(|| async { reply!(json => Pong { ok: true }) })
1812/// );
1813/// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1814/// assert_eq!(bg.addr().port(), 8080);
1815///
1816/// bg.rebind(([127, 0, 0, 1], 9090)).await;
1817/// assert_eq!(bg.addr().port(), 9090);
1818///
1819/// bg.stop().await;
1820/// # }
1821/// ```
1822pub struct BackgroundServer {
1823    /// Shared mutable route table — written by [`mechanism`](BackgroundServer::mechanism), read by the server loop.
1824    routes:      Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1825    addr:        std::net::SocketAddr,
1826    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
1827    handle:      Option<tokio::task::JoinHandle<()>>,
1828}
1829
1830impl BackgroundServer {
1831    /// Returns the address the server is currently bound to.
1832    pub fn addr(&self) -> std::net::SocketAddr {
1833        self.addr
1834    }
1835
1836    /// Shuts the server down gracefully and awaits the background task.
1837    ///
1838    /// In-flight requests complete before the server stops.
1839    pub async fn stop(mut self) {
1840        if let Some(tx) = self.shutdown_tx.take() {
1841            let _ = tx.send(());
1842        }
1843        if let Some(h) = self.handle.take() {
1844            let _ = h.await;
1845        }
1846    }
1847
1848    /// Migrates the server to `addr` with zero route loss:
1849    ///
1850    /// 1. Sends a graceful shutdown signal to the current instance.
1851    /// 2. Waits for all in-flight requests to complete.
1852    /// 3. Spawns a fresh server task on the new address with the same routes.
1853    ///
1854    /// After this method returns, [`addr`](BackgroundServer::addr) reflects the
1855    /// new address and the server is accepting connections.
1856    ///
1857    /// # Example
1858    /// ```rust,no_run
1859    /// # use toolkit_zero::socket::server::Server;
1860    /// # use serde::Serialize;
1861    /// # use toolkit_zero::socket::server::reply;
1862    /// # #[derive(Serialize)] struct Pong { ok: bool }
1863    /// # #[tokio::main]
1864    /// # async fn main() {
1865    /// # let mut server = Server::default();
1866    /// # server.mechanism(
1867    /// #     toolkit_zero::socket::server::ServerMechanism::get("/ping")
1868    /// #         .onconnect(|| async { reply!(json => Pong { ok: true }) })
1869    /// # );
1870    /// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1871    ///
1872    /// bg.rebind(([127, 0, 0, 1], 9090)).await;
1873    /// assert_eq!(bg.addr().port(), 9090);
1874    ///
1875    /// bg.stop().await;
1876    /// # }
1877    /// ```
1878    pub async fn rebind(&mut self, addr: impl Into<std::net::SocketAddr>) {
1879        // 1. Graceful shutdown of the current server.
1880        if let Some(tx) = self.shutdown_tx.take() {
1881            let _ = tx.send(());
1882        }
1883        // 2. Wait for all in-flight requests to drain.
1884        if let Some(h) = self.handle.take() {
1885            let _ = h.await;
1886        }
1887        // 3. Start on the new address, sharing the existing route table.
1888        let new_addr = addr.into();
1889        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1890        self.shutdown_tx = Some(tx);
1891        self.addr        = new_addr;
1892        let routes = Arc::clone(&self.routes);
1893        self.handle = Some(tokio::spawn(run_hyper_server(
1894            routes,
1895            new_addr,
1896            async { rx.await.ok(); },
1897        )));
1898        log::info!("Server rebound to {}", new_addr);
1899    }
1900
1901    /// Registers a new route on the **running** server without any restart.
1902    ///
1903    /// Because routes are stored in an `Arc<RwLock<Vec<SocketType>>>` shared between
1904    /// this handle and the server's dispatch loop, writing through the lock makes the
1905    /// new route visible to the next incoming request immediately — no TCP port gap,
1906    /// no in-flight request interruption.
1907    ///
1908    /// Returns `&mut Self` for chaining.
1909    ///
1910    /// # Example
1911    ///
1912    /// ```rust,no_run
1913    /// use toolkit_zero::socket::server::{Server, ServerMechanism, reply};
1914    /// use serde::Serialize;
1915    ///
1916    /// #[derive(Serialize)] struct Pong   { ok:  bool   }
1917    /// #[derive(Serialize)] struct Status { msg: String }
1918    ///
1919    /// # #[tokio::main]
1920    /// # async fn main() {
1921    /// let mut server = Server::default();
1922    /// server.mechanism(
1923    ///     ServerMechanism::get("/ping")
1924    ///         .onconnect(|| async { reply!(json => Pong { ok: true }) })
1925    /// );
1926    ///
1927    /// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1928    ///
1929    /// bg.mechanism(
1930    ///     ServerMechanism::get("/status")
1931    ///         .onconnect(|| async { reply!(json => Status { msg: "enabled".into() }) })
1932    /// ).await;
1933    ///
1934    /// // /status is now live alongside /ping — no restart.
1935    /// bg.stop().await;
1936    /// # }
1937    /// ```
1938    pub async fn mechanism(&mut self, mech: SocketType) -> &mut Self {
1939        self.routes.write().await.push(mech);
1940        log::debug!(
1941            "mechanism: live route added (total = {})",
1942            self.routes.read().await.len()
1943        );
1944        self
1945    }
1946}
1947
1948// ─── Reply helpers ────────────────────────────────────────────────────────────
1949
1950/// Wraps `reply` with the given HTTP `status` code and returns it as a result.
1951///
1952/// Pairs with the [`reply!`] macro form `reply!(message => ..., status => ...)`.
1953pub fn reply_with_status(
1954    status: Status,
1955    reply: impl Reply,
1956) -> Result<http::Response<bytes::Bytes>, Rejection> {
1957    let mut resp = reply.into_response();
1958    *resp.status_mut() = status.into();
1959    Ok(resp)
1960}
1961
1962/// Returns an empty `200 OK` reply result.
1963///
1964/// Equivalent to `reply!()`.
1965pub fn reply() -> Result<impl Reply, Rejection> {
1966    Ok::<_, Rejection>(EmptyReply)
1967}
1968
1969/// Serialises `json` as a JSON body and returns it as a `200 OK` reply result.
1970///
1971/// `T` must implement `serde::Serialize`.  Equivalent to `reply!(json => ...)`.
1972pub fn reply_with_json<T: Serialize>(
1973    json: &T,
1974) -> Result<impl Reply + use<T>, Rejection> {
1975    let bytes = serde_json::to_vec(json).map_err(|_| Rejection::internal())?;
1976    Ok::<_, Rejection>(JsonReply {
1977        body:   bytes::Bytes::from(bytes),
1978        status: http::StatusCode::OK,
1979    })
1980}
1981
1982/// Serialises `json` as a JSON body, attaches the given HTTP `status`, and returns a result.
1983///
1984/// `T` must implement `serde::Serialize`.  Equivalent to `reply!(json => ..., status => ...)`.
1985pub fn reply_with_status_and_json<T: Serialize>(
1986    status: Status,
1987    json: &T,
1988) -> Result<impl Reply + use<T>, Rejection> {
1989    let bytes = serde_json::to_vec(json).map_err(|_| Rejection::internal())?;
1990    Ok::<_, Rejection>(JsonReply {
1991        body:   bytes::Bytes::from(bytes),
1992        status: status.into(),
1993    })
1994}
1995
1996/// Seals `value` with `key` and returns it as an `application/octet-stream` response (`200 OK`).
1997///
1998/// `T` must implement `bincode::Encode`.
1999/// Equivalent to `reply!(sealed => value, key => key)`.
2000pub fn reply_sealed<T: bincode::Encode>(
2001    value: &T,
2002    key: SerializationKey,
2003) -> Result<http::Response<bytes::Bytes>, Rejection> {
2004    sealed_response(value, key, None)
2005}
2006
2007/// Seals `value` with `key`, attaches the given HTTP `status`, and returns it as a result.
2008///
2009/// Equivalent to `reply!(sealed => value, key => key, status => Status::X)`.
2010pub fn reply_sealed_with_status<T: bincode::Encode>(
2011    value: &T,
2012    key: SerializationKey,
2013    status: Status,
2014) -> Result<http::Response<bytes::Bytes>, Rejection> {
2015    sealed_response(value, key, Some(status))
2016}
2017
2018fn sealed_response<T: bincode::Encode>(
2019    value: &T,
2020    key: SerializationKey,
2021    status: Option<Status>,
2022) -> Result<http::Response<bytes::Bytes>, Rejection> {
2023    let code: http::StatusCode = status.map(Into::into).unwrap_or(http::StatusCode::OK);
2024    let sealed = crate::serialization::seal(value, key.veil_key())
2025        .map_err(|_| Rejection::internal())?;
2026    Ok(http::Response::builder()
2027        .status(code)
2028        .header(http::header::CONTENT_TYPE, "application/octet-stream")
2029        .body(bytes::Bytes::from(sealed))
2030        .unwrap())
2031}
2032
2033// ─── reply! macro ─────────────────────────────────────────────────────────────
2034
2035/// Convenience macro for constructing reply results inside route handlers.
2036///
2037/// | Syntax | Equivalent | Description |
2038/// |---|---|---|
2039/// | `reply!()` | [`reply()`] | Empty `200 OK` response. |
2040/// | `reply!(message => expr, status => Status::X)` | [`reply_with_status`] | Any `Reply` with a status code. |
2041/// | `reply!(json => expr)` | [`reply_with_json`] | JSON body with `200 OK`. |
2042/// | `reply!(json => expr, status => Status::X)` | [`reply_with_status_and_json`] | JSON body with a status code. |
2043/// | `reply!(sealed => expr, key => key)` | [`reply_sealed`] | Encrypted body, `200 OK`. |
2044/// | `reply!(sealed => expr, key => key, status => Status::X)` | [`reply_sealed_with_status`] | Encrypted body with status. |
2045///
2046/// # Example
2047/// ```rust,no_run
2048/// # use toolkit_zero::socket::server::{ServerMechanism, Status};
2049/// # use toolkit_zero::socket::server::reply;
2050/// # use serde::{Deserialize, Serialize};
2051/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
2052///
2053/// // Empty 200 OK
2054/// let ping = ServerMechanism::get("/ping")
2055///     .onconnect(|| async { reply!() });
2056///
2057/// // JSON body, 200 OK
2058/// let list = ServerMechanism::get("/items")
2059///     .onconnect(|| async {
2060///         let items: Vec<Item> = vec![];
2061///         reply!(json => items)
2062///     });
2063///
2064/// // JSON body with a custom status
2065/// let create = ServerMechanism::post("/items")
2066///     .json::<Item>()
2067///     .onconnect(|item| async move {
2068///         reply!(json => item, status => Status::Created)
2069///     });
2070/// ```
2071#[doc(hidden)]
2072#[macro_export]
2073macro_rules! reply {
2074    () => {{
2075        $crate::socket::server::reply()
2076    }};
2077
2078    (message => $message: expr, status => $status: expr) => {{
2079        $crate::socket::server::reply_with_status($status, $message)
2080    }};
2081
2082    (json => $json: expr) => {{
2083        $crate::socket::server::reply_with_json(&$json)
2084    }};
2085
2086    (json => $json: expr, status => $status: expr) => {{
2087        $crate::socket::server::reply_with_status_and_json($status, &$json)
2088    }};
2089
2090    (sealed => $val: expr, key => $key: expr) => {{
2091        $crate::socket::server::reply_sealed(&$val, $key)
2092    }};
2093
2094    (sealed => $val: expr, key => $key: expr, status => $status: expr) => {{
2095        $crate::socket::server::reply_sealed_with_status(&$val, $key, $status)
2096    }};
2097}
2098
2099/// Re-export the [`reply!`] macro so it is accessible as
2100/// `toolkit_zero::socket::server::reply` and included in glob imports.
2101pub use crate::reply;