Skip to main content

toolkit_zero/socket/
server.rs

1
2//! Typed, fluent HTTP server construction.
3//!
4//! This module provides a builder-oriented API for declaring HTTP routes and
5//! serving them with [`warp`] under the hood. The entry point for every route
6//! is [`ServerMechanism`], which pairs an HTTP method with a URL path and
7//! supports incremental enrichment — attaching a JSON body expectation, URL
8//! query parameter deserialisation, or shared state — before being finalised
9//! into a [`SocketType`] route handle via `onconnect`.
10//!
11//! Completed routes are registered on a [`Server`] and served with a single
12//! `.await`. Graceful shutdown is available via
13//! [`Server::serve_with_graceful_shutdown`] and [`Server::serve_from_listener`].
14//!
15//! # Builder chains at a glance
16//!
17//! | Chain | Handler receives |
18//! |---|---|
19//! | `mechanism(method, path).onconnect(f)` | nothing |
20//! | `.json::<T>().onconnect(f)` | `T: DeserializeOwned` |
21//! | `.query::<T>().onconnect(f)` | `T: DeserializeOwned` |
22//! | `.state(s).onconnect(f)` | `S: Clone + Send + Sync` |
23//! | `.state(s).json::<T>().onconnect(f)` | `(S, T)` |
24//! | `.state(s).query::<T>().onconnect(f)` | `(S, T)` |
25//!
26//! For blocking handlers (not recommended in production) every finaliser also
27//! has an unsafe `onconnect_sync` counterpart.
28//!
29//! # Response helpers
30//!
31//! Use the [`reply!`] macro as the most concise way to build a response:
32//!
33//! ```rust,no_run
34//! # use toolkit_zero::socket::server::*;
35//! # use serde::Serialize;
36//! # #[derive(Serialize)] struct Item { id: u32 }
37//! # let item = Item { id: 1 };
38//! // 200 OK, empty body
39//! // reply!()
40//!
41//! // 200 OK, JSON body
42//! // reply!(json => item)
43//!
44//! // 201 Created, JSON body
45//! // reply!(json => item, status => Status::Created)
46//! ```
47
48use std::{future::Future, net::SocketAddr};
49use serde::{de::DeserializeOwned, Serialize};
50use warp::{Filter, Rejection, Reply, filters::BoxedFilter};
51
52pub use super::SerializationKey;
53
54/// A fully assembled, type-erased HTTP route ready to be registered on a [`Server`].
55///
56/// This is the final product of every builder chain. Pass it to [`Server::mechanism`] to mount it.
57pub type SocketType = BoxedFilter<(warp::reply::Response, )>;
58
59#[derive(Clone, Copy, Debug)]
60enum HttpMethod {
61    Get,
62    Post,
63    Put,
64    Delete,
65    Patch,
66    Head,
67    Options,
68}
69
70impl HttpMethod {
71    fn filter(&self) -> BoxedFilter<()> {
72        match self {
73            HttpMethod::Get     => warp::get().boxed(),
74            HttpMethod::Post    => warp::post().boxed(),
75            HttpMethod::Put     => warp::put().boxed(),
76            HttpMethod::Delete  => warp::delete().boxed(),
77            HttpMethod::Patch   => warp::patch().boxed(),
78            HttpMethod::Head    => warp::head().boxed(),
79            HttpMethod::Options => warp::options().boxed(),
80        }
81    }
82}
83
84
85fn path_filter(path: &str) -> BoxedFilter<()> {
86    log::trace!("Building path filter for: '{}'", path);
87    let segs: Vec<&'static str> = path
88        .trim_matches('/')
89        .split('/')
90        .filter(|s| !s.is_empty())
91        .map(|s| -> &'static str { Box::leak(s.to_owned().into_boxed_str()) })
92        .collect();
93
94    if segs.is_empty() {
95        return warp::path::end().boxed();
96    }
97
98    // Start with the first segment, then and-chain the rest.
99    let mut f: BoxedFilter<()> = warp::path(segs[0]).boxed();
100    for seg in &segs[1..] {
101        f = f.and(warp::path(*seg)).boxed();
102    }
103    f.and(warp::path::end()).boxed()
104}
105
106/// Entry point for building an HTTP route.
107///
108/// Pairs an HTTP method with a URL path and acts as the root of a fluent builder chain.
109/// Optionally attach shared state, a JSON body expectation, or URL query parameter
110/// deserialisation — then finalise with [`onconnect`](ServerMechanism::onconnect) (async) or
111/// [`onconnect_sync`](ServerMechanism::onconnect_sync) (sync) to produce a [`SocketType`]
112/// ready to be mounted on a [`Server`].
113///
114/// # Example
115/// ```rust,no_run
116/// # use serde::{Deserialize, Serialize};
117/// # use std::sync::{Arc, Mutex};
118/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
119/// # #[derive(Deserialize)] struct CreateItem { name: String }
120/// # #[derive(Deserialize)] struct SearchQuery { q: String }
121///
122/// // Plain GET — no body, no state
123/// let health = ServerMechanism::get("/health")
124///     .onconnect(|| async { reply!() });
125///
126/// // POST — JSON body deserialised into `CreateItem`
127/// let create = ServerMechanism::post("/items")
128///     .json::<CreateItem>()
129///     .onconnect(|body| async move {
130///         let item = Item { id: 1, name: body.name };
131///         reply!(json => item, status => Status::Created)
132///     });
133///
134/// // GET — URL query params deserialised into `SearchQuery`
135/// let search = ServerMechanism::get("/search")
136///     .query::<SearchQuery>()
137///     .onconnect(|params| async move {
138///         let _q = params.q;
139///         reply!()
140///     });
141///
142/// // GET — shared counter state injected on every request
143/// let counter: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
144/// let count_route = ServerMechanism::get("/count")
145///     .state(counter.clone())
146///     .onconnect(|state| async move {
147///         let n = *state.lock().unwrap();
148///         reply!(json => n)
149///     });
150/// ```
151pub struct ServerMechanism {
152    method: HttpMethod,
153    path: String,
154}
155
156impl ServerMechanism {
157
158    fn instance(method: HttpMethod, path: impl Into<String>) -> Self {
159        let path = path.into();
160        log::debug!("Creating {:?} route at '{}'", method, path);
161        Self { method, path }
162    }
163
164    /// Creates a route matching HTTP `GET` requests at `path`.
165    pub fn get(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Get, path) }
166
167    /// Creates a route matching HTTP `POST` requests at `path`.
168    pub fn post(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Post, path) }
169
170    /// Creates a route matching HTTP `PUT` requests at `path`.
171    pub fn put(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Put, path) }
172
173    /// Creates a route matching HTTP `DELETE` requests at `path`.
174    pub fn delete(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Delete, path) }
175
176    /// Creates a route matching HTTP `PATCH` requests at `path`.
177    pub fn patch(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Patch, path) }
178
179    /// Creates a route matching HTTP `HEAD` requests at `path`.
180    pub fn head(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Head, path) }
181
182    /// Creates a route matching HTTP `OPTIONS` requests at `path`.
183    pub fn options(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Options, path) }
184
185    /// Attaches shared state `S` to this route, transitioning to [`StatefulSocketBuilder`].
186    ///
187    /// A fresh clone of `S` is injected into the handler on every request.  For mutable
188    /// shared state, wrap the inner value in `Arc<Mutex<_>>` or `Arc<RwLock<_>>` before
189    /// passing it here — only the outer `Arc` is cloned per request; the inner data stays
190    /// shared across all requests.
191    ///
192    /// `S` must be `Clone + Send + Sync + 'static`.
193    ///
194    /// From [`StatefulSocketBuilder`] you can further add a JSON body (`.json`), query
195    /// parameters (`.query`), or encryption (`.encryption` / `.encrypted_query`) before
196    /// finalising with `onconnect`.
197    ///
198    /// # Example
199    /// ```rust,no_run
200    /// use std::sync::{Arc, Mutex};
201    ///
202    /// let db: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
203    ///
204    /// let route = ServerMechanism::get("/list")
205    ///     .state(db.clone())
206    ///     .onconnect(|state| async move {
207    ///         let items = state.lock().unwrap().clone();
208    ///         reply!(json => items)
209    ///     });
210    /// ```
211    pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulSocketBuilder<S> {
212        log::trace!("Attaching state to {:?} route at '{}'", self.method, self.path);
213        StatefulSocketBuilder { base: self, state }
214    }
215
216    /// Declares that this route expects a JSON-encoded request body, transitioning to
217    /// [`JsonSocketBuilder`].
218    ///
219    /// On each incoming request the body is parsed as `Content-Type: application/json`
220    /// and deserialised into `T`.  If the body is absent, malformed, or fails to
221    /// deserialise, the request is rejected before the handler is ever called.
222    /// When you subsequently call [`onconnect`](JsonSocketBuilder::onconnect), the handler
223    /// receives a fully-deserialised, ready-to-use `T`.
224    ///
225    /// `T` must implement `serde::de::DeserializeOwned`.
226    ///
227    /// # Example
228    /// ```rust,no_run
229    /// # use serde::Deserialize;
230    /// # #[derive(Deserialize)] struct Payload { value: i32 }
231    ///
232    /// let route = ServerMechanism::post("/submit")
233    ///     .json::<Payload>()
234    ///     .onconnect(|body| async move {
235    ///         reply!(json => body.value)
236    ///     });
237    /// ```
238    pub fn json<T: DeserializeOwned + Send>(self) -> JsonSocketBuilder<T> {
239        log::trace!("Attaching JSON body expectation to {:?} route at '{}'", self.method, self.path);
240        JsonSocketBuilder { base: self, _phantom: std::marker::PhantomData }
241    }
242
243    /// Declares that this route extracts its input from URL query parameters, transitioning
244    /// to [`QuerySocketBuilder`].
245    ///
246    /// On each incoming request the query string (`?field=value&...`) is deserialised into
247    /// `T`.  A missing or malformed query string is rejected before the handler is called.
248    /// When you subsequently call [`onconnect`](QuerySocketBuilder::onconnect), the handler
249    /// receives a fully-deserialised, ready-to-use `T`.
250    ///
251    /// `T` must implement `serde::de::DeserializeOwned`.
252    ///
253    /// # Example
254    /// ```rust,no_run
255    /// # use serde::Deserialize;
256    /// # #[derive(Deserialize)] struct Filter { page: u32, per_page: u32 }
257    ///
258    /// let route = ServerMechanism::get("/items")
259    ///     .query::<Filter>()
260    ///     .onconnect(|filter| async move {
261    ///         let _ = (filter.page, filter.per_page);
262    ///         reply!()
263    ///     });
264    /// ```
265    pub fn query<T: DeserializeOwned + Send>(self) -> QuerySocketBuilder<T> {
266        log::trace!("Attaching query parameter expectation to {:?} route at '{}'", self.method, self.path);
267        QuerySocketBuilder { base: self, _phantom: std::marker::PhantomData }
268    }
269
270    /// Declares that this route expects a VEIL-encrypted request body, transitioning to
271    /// [`EncryptedBodyBuilder`].
272    ///
273    /// On each incoming request the raw body bytes are decrypted using the provided
274    /// [`SerializationKey`] before the handler is called.  If the key does not match or
275    /// the body is corrupt, the route responds with `403 Forbidden` and the handler is
276    /// never invoked — meaning the `T` your handler receives is always a legitimate,
277    /// trusted, fully-decrypted value.
278    ///
279    /// Use `SerializationKey::Default` when both client and server share the built-in key,
280    /// or `SerializationKey::Value("your-key")` for a custom shared secret.
281    /// For plain-JSON routes (no encryption) use [`.json::<T>()`](ServerMechanism::json)
282    /// instead.
283    ///
284    /// `T` must implement `bincode::Decode<()>`.
285    ///
286    /// # Example
287    /// ```rust,no_run
288    /// # use bincode::{Encode, Decode};
289    /// # #[derive(Encode, Decode)] struct Payload { value: i32 }
290    ///
291    /// let route = ServerMechanism::post("/submit")
292    ///     .encryption::<Payload>(SerializationKey::Default)
293    ///     .onconnect(|body| async move {
294    ///         // `body` is already decrypted and deserialised — use it directly.
295    ///         reply!(json => body.value)
296    ///     });
297    /// ```
298    pub fn encryption<T>(self, key: SerializationKey) -> EncryptedBodyBuilder<T> {
299        log::trace!("Attaching encrypted body to {:?} route at '{}'", self.method, self.path);
300        EncryptedBodyBuilder { base: self, key, _phantom: std::marker::PhantomData }
301    }
302
303    /// Declares that this route expects VEIL-encrypted URL query parameters, transitioning
304    /// to [`EncryptedQueryBuilder`].
305    ///
306    /// The client must send a single `?data=<base64url>` query parameter whose value is
307    /// the URL-safe base64 encoding of the VEIL-encrypted struct bytes.  On each request
308    /// the server base64-decodes then decrypts the payload using the provided
309    /// [`SerializationKey`].  If the `data` parameter is missing, the encoding is invalid,
310    /// the key does not match, or the bytes are corrupt, the route responds with
311    /// `403 Forbidden` and the handler is never invoked — meaning the `T` your handler
312    /// receives is always a legitimate, trusted, fully-decrypted value.
313    ///
314    /// Use `SerializationKey::Default` or `SerializationKey::Value("your-key")`.  For
315    /// plain query-string routes (no encryption) use
316    /// [`.query::<T>()`](ServerMechanism::query) instead.
317    ///
318    /// `T` must implement `bincode::Decode<()>`.
319    pub fn encrypted_query<T>(self, key: SerializationKey) -> EncryptedQueryBuilder<T> {
320        log::trace!("Attaching encrypted query to {:?} route at '{}'", self.method, self.path);
321        EncryptedQueryBuilder { base: self, key, _phantom: std::marker::PhantomData }
322    }
323
324    /// Finalises this route with an async handler that receives no arguments.
325    ///
326    /// Neither a request body nor query parameters are read.  The handler runs on every
327    /// matching request and must return `Result<impl Reply, Rejection>`.  Use the
328    /// [`reply!`] macro or the standalone reply helpers ([`reply_with_json`],
329    /// [`reply_with_status`], etc.) to construct a response.
330    ///
331    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
332    ///
333    /// # Example
334    /// ```rust,no_run
335    /// # use serde::Serialize;
336    /// # #[derive(Serialize)] struct Pong { ok: bool }
337    ///
338    /// let route = ServerMechanism::get("/ping")
339    ///     .onconnect(|| async {
340    ///         reply!(json => Pong { ok: true })
341    ///     });
342    /// ```
343    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
344    where
345        F: Fn() -> Fut + Clone + Send + Sync + 'static,
346        Fut: Future<Output = Result<Re, Rejection>> + Send,
347        Re: Reply + Send,
348    {
349        log::debug!("Finalising async {:?} route at '{}' (no args)", self.method, self.path);
350        let m = self.method.filter();
351        let p = path_filter(&self.path);
352        m.and(p)
353            .and_then(handler)
354            .map(|r: Re| r.into_response())
355            .boxed()
356    }
357
358    /// Finalises this route with a **synchronous** handler that receives no arguments.
359    ///
360    /// Behaviour and contract are identical to the async variant — neither a body nor
361    /// query parameters are read — but the closure may block.  Each request is dispatched
362    /// to the blocking thread pool, so the handler must complete quickly to avoid starving
363    /// other requests.
364    ///
365    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
366    ///
367    /// # Safety
368    ///
369    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
370    /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
371    /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
372    /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
373    /// gets a chance to run. Additionally, any panic inside the handler is silently converted
374    /// into a `Rejection`, masking runtime errors. Callers must ensure the handler completes
375    /// quickly and that adequate backpressure or rate limiting is applied externally.
376    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
377    where
378        F: Fn() -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
379        Re: Reply + Send + 'static,
380    {
381        log::warn!("Registering sync handler on {:?} '{}' — ensure rate-limiting is applied externally", self.method, self.path);
382        let m = self.method.filter();
383        let p = path_filter(&self.path);
384        m.and(p)
385            .and_then(move || {
386                let handler = handler.clone();
387                async move {
388                    tokio::task::spawn_blocking(move || handler())
389                        .await
390                        .unwrap_or_else(|_| {
391                            log::warn!("Sync handler panicked; converting to Rejection");
392                            Err(warp::reject())
393                        })
394                }
395            })
396            .map(|r: Re| r.into_response())
397            .boxed()
398    }
399}
400
401
402/// Route builder that expects and deserialises a JSON request body of type `T`.
403///
404/// Obtained from [`ServerMechanism::json`]. Optionally attach shared state via
405/// [`state`](JsonSocketBuilder::state), or finalise immediately with
406/// [`onconnect`](JsonSocketBuilder::onconnect) /
407/// [`onconnect_sync`](JsonSocketBuilder::onconnect_sync).
408pub struct JsonSocketBuilder<T> {
409    base: ServerMechanism,
410    _phantom: std::marker::PhantomData<T>,
411}
412
413impl<T: DeserializeOwned + Send + 'static> JsonSocketBuilder<T> {
414
415    /// Finalises this route with an async handler that receives the deserialised JSON body.
416    ///
417    /// Before the handler is called, the incoming request body is parsed as
418    /// `Content-Type: application/json` and deserialised into `T`.  The handler receives
419    /// a ready-to-use `T` — no manual parsing is needed.  If the body is absent or cannot
420    /// be decoded the request is rejected before the handler is ever invoked.
421    ///
422    /// The handler must return `Result<impl Reply, Rejection>`.
423    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
424    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
425    where 
426        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
427        Fut: Future<Output = Result<Re, Rejection>> + Send,
428        Re: Reply + Send,
429    {
430        log::debug!("Finalising async {:?} route at '{}' (JSON body)", self.base.method, self.base.path);
431        let m = self.base.method.filter();
432        let p = path_filter(&self.base.path);
433        m.and(p)
434            .and(warp::body::json::<T>())
435            .and_then(handler)
436            .map(|r: Re| r.into_response())
437            .boxed()
438    }
439
440    /// Finalises this route with a **synchronous** handler that receives the deserialised
441    /// JSON body.
442    ///
443    /// The body is decoded into `T` before the handler is dispatched, identical to the
444    /// async variant.  The closure may block but must complete quickly to avoid exhausting
445    /// the blocking thread pool.
446    ///
447    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
448    ///
449    /// # Safety
450    ///
451    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
452    /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
453    /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
454    /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
455    /// gets a chance to run. Additionally, any panic inside the handler is silently converted
456    /// into a `Rejection`, masking runtime errors. Callers must ensure the handler completes
457    /// quickly and that adequate backpressure or rate limiting is applied externally.
458    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
459    where
460        F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
461        Re: Reply + Send + 'static,
462    {
463        log::warn!("Registering sync handler on {:?} '{}' (JSON body) — ensure rate-limiting is applied externally", self.base.method, self.base.path);
464        let m = self.base.method.filter();
465        let p = path_filter(&self.base.path);
466        m.and(p)
467            .and(warp::body::json::<T>())
468            .and_then(move |body: T| {
469                let handler = handler.clone();
470                async move {
471                    tokio::task::spawn_blocking(move || handler(body))
472                        .await
473                        .unwrap_or_else(|_| {
474                            log::warn!("Sync handler (JSON body) panicked; converting to Rejection");
475                            Err(warp::reject())
476                        })
477                }
478            })
479            .map(|r: Re| r.into_response())
480            .boxed()
481    }
482
483    /// Attaches shared state `S`, transitioning to [`StatefulJsonSocketBuilder`].
484    ///
485    /// Alternative ordering to `.state(s).json::<T>()` — both produce the same route.
486    /// A fresh clone of `S` is injected alongside the JSON-decoded `T` on every request.
487    /// The handler will receive `(state: S, body: T)`.
488    ///
489    /// `S` must be `Clone + Send + Sync + 'static`.
490    pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulJsonSocketBuilder<T, S> {
491        StatefulJsonSocketBuilder { base: self.base, state, _phantom: std::marker::PhantomData }
492    }
493
494}
495
496/// Route builder that expects and deserialises URL query parameters of type `T`.
497///
498/// Obtained from [`ServerMechanism::query`]. Optionally attach shared state via
499/// [`state`](QuerySocketBuilder::state), or finalise immediately with
500/// [`onconnect`](QuerySocketBuilder::onconnect) /
501/// [`onconnect_sync`](QuerySocketBuilder::onconnect_sync).
502pub struct QuerySocketBuilder<T> {
503    base: ServerMechanism,
504    _phantom: std::marker::PhantomData<T>,
505}
506
507impl<T: DeserializeOwned + Send + 'static> QuerySocketBuilder<T> {
508    /// Finalises this route with an async handler that receives the deserialised query
509    /// parameters.
510    ///
511    /// Before the handler is called, the URL query string (`?field=value&...`) is parsed
512    /// and deserialised into `T`.  The handler receives a ready-to-use `T` — no manual
513    /// parsing is needed.  A missing or malformed query string is rejected before the
514    /// handler is ever invoked.
515    ///
516    /// The handler must return `Result<impl Reply, Rejection>`.
517    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
518    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
519    where
520        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
521        Fut: Future<Output = Result<Re, Rejection>> + Send,
522        Re: Reply + Send,
523    {
524        log::debug!("Finalising async {:?} route at '{}' (query params)", self.base.method, self.base.path);
525        let m = self.base.method.filter();
526        let p = path_filter(&self.base.path);
527        m.and(p)
528            .and(warp::filters::query::query::<T>())
529            .and_then(handler)
530            .map(|r: Re| r.into_response())
531            .boxed()
532    }
533
534    /// Finalises this route with a **synchronous** handler that receives the deserialised
535    /// query parameters.
536    ///
537    /// The query string is decoded into `T` before the handler is dispatched, identical to
538    /// the async variant.  The closure may block but must complete quickly.
539    ///
540    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
541    ///
542    /// # Safety
543    ///
544    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
545    /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
546    /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
547    /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
548    /// gets a chance to run. Additionally, any panic inside the handler is silently converted
549    /// into a `Rejection`, masking runtime errors. Callers must ensure the handler completes
550    /// quickly and that adequate backpressure or rate limiting is applied externally.
551    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
552    where
553        F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
554        Re: Reply + Send + 'static,
555    {
556        log::warn!("Registering sync handler on {:?} '{}' (query params) — ensure rate-limiting is applied externally", self.base.method, self.base.path);
557        let m = self.base.method.filter();
558        let p = path_filter(&self.base.path);
559        m.and(p)
560            .and(warp::filters::query::query::<T>())
561            .and_then(move |query: T| {
562                let handler = handler.clone();
563                async move {
564                    tokio::task::spawn_blocking(move || handler(query))
565                        .await
566                        .unwrap_or_else(|_| {
567                            log::warn!("Sync handler (query params) panicked; converting to Rejection");
568                            Err(warp::reject())
569                        })
570                }
571            })
572            .map(|r: Re| r.into_response())
573            .boxed()
574    }
575
576    /// Attaches shared state `S`, transitioning to [`StatefulQuerySocketBuilder`].
577    ///
578    /// Alternative ordering to `.state(s).query::<T>()` — both produce the same route.
579    /// A fresh clone of `S` is injected alongside the query-decoded `T` on every request.
580    /// The handler will receive `(state: S, query: T)`.
581    ///
582    /// `S` must be `Clone + Send + Sync + 'static`.
583    pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulQuerySocketBuilder<T, S> {
584        StatefulQuerySocketBuilder { base: self.base, state, _phantom: std::marker::PhantomData }
585    }
586
587}
588
589
590/// Route builder that carries shared state `S` with no body or query expectation.
591///
592/// Obtained from [`ServerMechanism::state`]. `S` must be `Clone + Send + Sync + 'static`.
593/// For mutable shared state, wrap it in `Arc<Mutex<_>>` or `Arc<RwLock<_>>` before passing
594/// it here. Finalise with [`onconnect`](StatefulSocketBuilder::onconnect) /
595/// [`onconnect_sync`](StatefulSocketBuilder::onconnect_sync).
596pub struct StatefulSocketBuilder<S> {
597    base: ServerMechanism,
598    state: S,
599}
600
601
602impl<S: Clone + Send + Sync + 'static> StatefulSocketBuilder<S> {
603    /// Adds a JSON body expectation, transitioning to [`StatefulJsonSocketBuilder`].
604    ///
605    /// Alternative ordering to `.json::<T>().state(s)` — both produce the same route.
606    /// On each request the incoming JSON body is deserialised into `T` and a fresh clone
607    /// of `S` is prepared; both are handed to the handler together as `(state: S, body: T)`.
608    ///
609    /// Allows `.state(s).json::<T>()` as an alternative ordering to `.json::<T>().state(s)`.
610    pub fn json<T: DeserializeOwned + Send>(self) -> StatefulJsonSocketBuilder<T, S> {
611        log::trace!("Attaching JSON body expectation (after state) to {:?} route at '{}'", self.base.method, self.base.path);
612        StatefulJsonSocketBuilder { base: self.base, state: self.state, _phantom: std::marker::PhantomData }
613    }
614
615    /// Adds a query parameter expectation, transitioning to [`StatefulQuerySocketBuilder`].
616    ///
617    /// Alternative ordering to `.query::<T>().state(s)` — both produce the same route.
618    /// On each request the URL query string is deserialised into `T` and a fresh clone
619    /// of `S` is prepared; both are handed to the handler together as `(state: S, query: T)`.
620    ///
621    /// Allows `.state(s).query::<T>()` as an alternative ordering to `.query::<T>().state(s)`.
622    pub fn query<T: DeserializeOwned + Send>(self) -> StatefulQuerySocketBuilder<T, S> {
623        log::trace!("Attaching query param expectation (after state) to {:?} route at '{}'", self.base.method, self.base.path);
624        StatefulQuerySocketBuilder { base: self.base, state: self.state, _phantom: std::marker::PhantomData }
625    }
626
627    /// Adds an encrypted body expectation, transitioning to [`StatefulEncryptedBodyBuilder`].
628    ///
629    /// Alternative ordering to `.encryption::<T>(key).state(s)` — both produce the same
630    /// route.  On each request the raw body bytes are VEIL-decrypted using `key` before
631    /// the handler runs; a wrong key or corrupt body returns `403 Forbidden` without
632    /// invoking the handler.  The handler will receive `(state: S, body: T)` where `T` is
633    /// always a trusted, fully-decrypted value.
634    ///
635    /// Allows `.state(s).encryption::<T>(key)` as an alternative ordering to
636    /// `.encryption::<T>(key).state(s)`.
637    pub fn encryption<T>(self, key: SerializationKey) -> StatefulEncryptedBodyBuilder<T, S> {
638        log::trace!("Attaching encrypted body (after state) to {:?} route at '{}'", self.base.method, self.base.path);
639        StatefulEncryptedBodyBuilder { base: self.base, key, state: self.state, _phantom: std::marker::PhantomData }
640    }
641
642    /// Adds an encrypted query expectation, transitioning to [`StatefulEncryptedQueryBuilder`].
643    ///
644    /// Alternative ordering to `.encrypted_query::<T>(key).state(s)` — both produce the
645    /// same route.  On each request the `?data=<base64url>` query parameter is
646    /// base64-decoded then VEIL-decrypted using `key`; a missing, malformed, or
647    /// undecryptable payload returns `403 Forbidden` without invoking the handler.  The
648    /// handler will receive `(state: S, query: T)` where `T` is always a trusted,
649    /// fully-decrypted value.
650    ///
651    /// Allows `.state(s).encrypted_query::<T>(key)` as an alternative ordering to
652    /// `.encrypted_query::<T>(key).state(s)`.
653    pub fn encrypted_query<T>(self, key: SerializationKey) -> StatefulEncryptedQueryBuilder<T, S> {
654        log::trace!("Attaching encrypted query (after state) to {:?} route at '{}'", self.base.method, self.base.path);
655        StatefulEncryptedQueryBuilder { base: self.base, key, state: self.state, _phantom: std::marker::PhantomData }
656    }
657
658    /// Finalises this route with an async handler that receives only the shared state.
659    ///
660    /// On each request a fresh clone of `S` is injected into the handler.  No request body
661    /// or query parameters are read.  The handler must return `Result<impl Reply, Rejection>`.
662    ///
663    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
664    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
665    where
666        F: Fn(S) -> Fut + Clone + Send + Sync + 'static,
667        Fut: Future<Output = Result<Re, Rejection>> + Send,
668        Re: Reply + Send,
669    {
670        log::debug!("Finalising async {:?} route at '{}' (state)", self.base.method, self.base.path);
671        let m = self.base.method.filter();
672        let p = path_filter(&self.base.path);
673        let state = self.state;
674        let state_filter = warp::any().map(move || state.clone());
675        m.and(p)
676            .and(state_filter)
677            .and_then(handler)
678            .map(|r: Re| r.into_response())
679            .boxed()
680    }
681
682    /// Finalises this route with a **synchronous** handler that receives only the shared
683    /// state.
684    ///
685    /// On each request a fresh clone of `S` is passed to the handler.  If `S` wraps a mutex
686    /// or lock, contention across concurrent requests can stall the thread pool — ensure the
687    /// lock is held only briefly.  The closure may block but must complete quickly.
688    ///
689    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
690    ///
691    /// # Safety
692    ///
693    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
694    /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
695    /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
696    /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
697    /// gets a chance to run. When the handler acquires a lock on `S` (e.g. `Arc<Mutex<_>>`),
698    /// concurrent blocking tasks contending on the same lock can stall indefinitely, causing the
699    /// thread pool queue to grow without bound and compounding the exhaustion risk. Additionally,
700    /// any panic inside the handler is silently converted into a `Rejection`, masking runtime
701    /// errors. Callers must ensure the handler completes quickly, that lock contention on `S`
702    /// cannot produce indefinite stalls, and that adequate backpressure or rate limiting is
703    /// applied externally.
704    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
705    where
706        F: Fn(S) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
707        Re: Reply + Send + 'static,
708    {
709        log::warn!("Registering sync handler on {:?} '{}' (state) — ensure rate-limiting and lock-free state are in place", self.base.method, self.base.path);
710        let m = self.base.method.filter();
711        let p = path_filter(&self.base.path);
712        let state = self.state;
713        let state_filter = warp::any().map(move || state.clone());
714        m.and(p)
715            .and(state_filter)
716            .and_then(move |s: S| {
717                let handler = handler.clone();
718                async move {
719                    tokio::task::spawn_blocking(move || handler(s))
720                        .await
721                        .unwrap_or_else(|_| {
722                            log::warn!("Sync handler (state) panicked; converting to Rejection");
723                            Err(warp::reject())
724                        })
725                }
726            })
727            .map(|r: Re| r.into_response())
728            .boxed()
729    }
730}
731
732
733/// Route builder that carries shared state `S` and expects a JSON body of type `T`.
734///
735/// Obtained from [`JsonSocketBuilder::state`]. `S` must be `Clone + Send + Sync + 'static`.
736/// `T` must implement `serde::de::DeserializeOwned`. Finalise with
737/// [`onconnect`](StatefulJsonSocketBuilder::onconnect) /
738/// [`onconnect_sync`](StatefulJsonSocketBuilder::onconnect_sync).
739pub struct StatefulJsonSocketBuilder<T, S> {
740    base: ServerMechanism,
741    state: S,
742    _phantom: std::marker::PhantomData<T>,
743}
744
745impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
746    StatefulJsonSocketBuilder<T, S>
747{
748    /// Finalises this route with an async handler that receives `(state: S, body: T)`.
749    ///
750    /// On each request the incoming JSON body is deserialised into `T` and a fresh clone
751    /// of `S` is prepared — both are handed to the handler together.  The handler is only
752    /// called when the body can be decoded; a missing or malformed body is rejected first.
753    ///
754    /// The handler must return `Result<impl Reply, Rejection>`.
755    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
756    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
757    where
758        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
759        Fut: Future<Output = Result<Re, Rejection>> + Send,
760        Re: Reply + Send,
761    {
762        log::debug!("Finalising async {:?} route at '{}' (state + JSON body)", self.base.method, self.base.path);
763        let m = self.base.method.filter();
764        let p = path_filter(&self.base.path);
765        let state = self.state;
766        let state_filter = warp::any().map(move || state.clone());
767        m.and(p)
768            .and(state_filter)
769            .and(warp::body::json::<T>())
770            .and_then(handler)
771            .map(|r: Re| r.into_response())
772            .boxed()
773    }
774
775    /// Finalises this route with a **synchronous** handler that receives
776    /// `(state: S, body: T)`.
777    ///
778    /// The body is decoded into `T` and a clone of `S` is prepared before the blocking
779    /// handler is dispatched.  If `S` wraps a lock, keep it held only briefly.  See
780    /// [`ServerMechanism::onconnect_sync`] for the full thread-pool safety notes.
781    ///
782    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
783    ///
784    /// # Safety
785    ///
786    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
787    /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
788    /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
789    /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
790    /// gets a chance to run. When the handler acquires a lock on `S` (e.g. `Arc<Mutex<_>>`),
791    /// concurrent blocking tasks contending on the same lock can stall indefinitely, causing the
792    /// thread pool queue to grow without bound and compounding the exhaustion risk. Additionally,
793    /// any panic inside the handler is silently converted into a `Rejection`, masking runtime
794    /// errors. Callers must ensure the handler completes quickly, that lock contention on `S`
795    /// cannot produce indefinite stalls, and that adequate backpressure or rate limiting is
796    /// applied externally.
797    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
798    where
799        F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
800        Re: Reply + Send + 'static,
801    {
802        log::warn!("Registering sync handler on {:?} '{}' (state + JSON body) — ensure rate-limiting and lock-free state are in place", self.base.method, self.base.path);
803        let m = self.base.method.filter();
804        let p = path_filter(&self.base.path);
805        let state = self.state;
806        let state_filter = warp::any().map(move || state.clone());
807        m.and(p)
808            .and(state_filter)
809            .and(warp::body::json::<T>())
810            .and_then(move |s: S, body: T| {
811                let handler = handler.clone();
812                async move {
813                    tokio::task::spawn_blocking(move || handler(s, body))
814                        .await
815                        .unwrap_or_else(|_| {
816                            log::warn!("Sync handler (state + JSON body) panicked; converting to Rejection");
817                            Err(warp::reject())
818                        })
819                }
820            })
821            .map(|r: Re| r.into_response())
822            .boxed()
823    }
824}
825
826/// Route builder that carries shared state `S` and expects URL query parameters of type `T`.
827///
828/// Obtained from [`QuerySocketBuilder::state`]. `S` must be `Clone + Send + Sync + 'static`.
829/// `T` must implement `serde::de::DeserializeOwned`. Finalise with
830/// [`onconnect`](StatefulQuerySocketBuilder::onconnect) /
831/// [`onconnect_sync`](StatefulQuerySocketBuilder::onconnect_sync).
832pub struct StatefulQuerySocketBuilder<T, S> {
833    base: ServerMechanism,
834    state: S,
835    _phantom: std::marker::PhantomData<T>,
836}
837
838impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
839    StatefulQuerySocketBuilder<T, S>
840{
841    /// Finalises this route with an async handler that receives `(state: S, query: T)`.
842    ///
843    /// On each request the URL query string is deserialised into `T` and a fresh clone of
844    /// `S` is prepared — both are handed to the handler together.  A missing or malformed
845    /// query string is rejected before the handler is ever invoked.
846    ///
847    /// The handler must return `Result<impl Reply, Rejection>`.
848    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
849    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
850    where
851        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
852        Fut: Future<Output = Result<Re, Rejection>> + Send,
853        Re: Reply + Send,
854    {
855        log::debug!("Finalising async {:?} route at '{}' (state + query params)", self.base.method, self.base.path);
856        let m = self.base.method.filter();
857        let p = path_filter(&self.base.path);
858        let state = self.state;
859        let state_filter = warp::any().map(move || state.clone());
860        m.and(p)
861            .and(state_filter)
862            .and(warp::filters::query::query::<T>())
863            .and_then(handler)
864            .map(|r: Re| r.into_response())
865            .boxed()
866    }
867
868    /// Finalises this route with a **synchronous** handler that receives
869    /// `(state: S, query: T)`.
870    ///
871    /// The query string is decoded into `T` and a clone of `S` is prepared before the
872    /// blocking handler is dispatched.  If `S` wraps a lock, keep it held only briefly.
873    /// See [`ServerMechanism::onconnect_sync`] for the full thread-pool safety notes.
874    ///
875    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
876    ///
877    /// # Safety
878    ///
879    /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
880    /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
881    /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
882    /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
883    /// gets a chance to run. When the handler acquires a lock on `S` (e.g. `Arc<Mutex<_>>`),
884    /// concurrent blocking tasks contending on the same lock can stall indefinitely, causing the
885    /// thread pool queue to grow without bound and compounding the exhaustion risk. Additionally,
886    /// any panic inside the handler is silently converted into a `Rejection`, masking runtime
887    /// errors. Callers must ensure the handler completes quickly, that lock contention on `S`
888    /// cannot produce indefinite stalls, and that adequate backpressure or rate limiting is
889    /// applied externally.
890    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
891    where
892        F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
893        Re: Reply + Send + 'static,
894    {
895        log::warn!("Registering sync handler on {:?} '{}' (state + query params) — ensure rate-limiting and lock-free state are in place", self.base.method, self.base.path);
896        let m = self.base.method.filter();
897        let p = path_filter(&self.base.path);
898        let state = self.state;
899        let state_filter = warp::any().map(move || state.clone());
900        m.and(p)
901            .and(state_filter)
902            .and(warp::filters::query::query::<T>())
903            .and_then(move |s: S, query: T| {
904                let handler = handler.clone();
905                async move {
906                    tokio::task::spawn_blocking(move || handler(s, query))
907                        .await
908                        .unwrap_or_else(|_| {
909                            log::warn!("Sync handler (state + query params) panicked; converting to Rejection");
910                            Err(warp::reject())
911                        })
912                }
913            })
914            .map(|r: Re| r.into_response())
915            .boxed()
916    }
917}
918
919// ─── EncryptedBodyBuilder ────────────────────────────────────────────────────
920
921/// Route builder that expects a VEIL-encrypted request body of type `T`.
922///
923/// Obtained from [`ServerMechanism::encryption`].  On each matching request the raw
924/// body bytes are decrypted using the [`SerializationKey`] supplied there.  If
925/// decryption fails for any reason (wrong key, mismatched secret, corrupted payload)
926/// the route immediately returns `403 Forbidden` — the handler is never invoked.
927///
928/// Optionally attach shared state via [`state`](EncryptedBodyBuilder::state) before
929/// finalising with [`onconnect`](EncryptedBodyBuilder::onconnect) /
930/// [`onconnect_sync`](EncryptedBodyBuilder::onconnect_sync).
931pub struct EncryptedBodyBuilder<T> {
932    base: ServerMechanism,
933    key: SerializationKey,
934    _phantom: std::marker::PhantomData<T>,
935}
936
937impl<T> EncryptedBodyBuilder<T>
938where
939    T: bincode::Decode<()> + Send + 'static,
940{
941    /// Finalises this route with an async handler that receives the decrypted body as `T`.
942    ///
943    /// On each request the raw body bytes are VEIL-decrypted using the [`SerializationKey`]
944    /// configured on this builder.  The handler is only invoked when decryption succeeds —
945    /// a wrong key, mismatched secret, or corrupted body causes the route to return
946    /// `403 Forbidden` without ever reaching the handler.  The `T` the closure receives is
947    /// therefore always a trusted, fully-decrypted value ready to use.
948    ///
949    /// The handler must return `Result<impl Reply, Rejection>`.
950    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
951    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
952    where
953        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
954        Fut: Future<Output = Result<Re, Rejection>> + Send,
955        Re: Reply + Send,
956    {
957        log::debug!("Finalising async {:?} route at '{}' (encrypted body)", self.base.method, self.base.path);
958        let m = self.base.method.filter();
959        let p = path_filter(&self.base.path);
960        let key = self.key;
961        m.and(p)
962            .and(warp::body::bytes())
963            .and_then(move |raw: bytes::Bytes| {
964                let key = key.clone();
965                let handler = handler.clone();
966                async move {
967                    let value: T = decode_body(&raw, &key)?;
968                    handler(value).await.map(|r| r.into_response())
969                }
970            })
971            .boxed()
972    }
973
974    /// Finalises this route with a **synchronous** handler that receives the decrypted
975    /// body as `T`.
976    ///
977    /// Decryption happens before the handler is dispatched to the thread pool: if the key
978    /// is wrong or the body is corrupt the request is rejected with `403 Forbidden` and
979    /// the thread pool is not touched at all.  The `T` handed to the closure is always a
980    /// trusted, fully-decrypted value.  The closure may block but must complete quickly.
981    ///
982    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
983    ///
984    /// # Safety
985    /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
986    pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
987    where
988        F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
989        Re: Reply + Send + 'static,
990    {
991        log::warn!("Registering sync handler on {:?} '{}' (encrypted body) — ensure rate-limiting is applied externally", self.base.method, self.base.path);
992        let m = self.base.method.filter();
993        let p = path_filter(&self.base.path);
994        let key = self.key;
995        m.and(p)
996            .and(warp::body::bytes())
997            .and_then(move |raw: bytes::Bytes| {
998                let key = key.clone();
999                let handler = handler.clone();
1000                async move {
1001                    let value: T = decode_body(&raw, &key)?;
1002                    tokio::task::spawn_blocking(move || handler(value))
1003                        .await
1004                        .unwrap_or_else(|_| {
1005                            log::warn!("Sync encrypted handler panicked; converting to Rejection");
1006                            Err(warp::reject())
1007                        })
1008                        .map(|r| r.into_response())
1009                }
1010            })
1011            .boxed()
1012    }
1013
1014    /// Attaches shared state `S`, transitioning to [`StatefulEncryptedBodyBuilder`].
1015    ///
1016    /// A fresh clone of `S` is injected alongside the decrypted `T` on every request.
1017    /// The handler will receive `(state: S, body: T)`.
1018    ///
1019    /// `S` must be `Clone + Send + Sync + 'static`.
1020    pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulEncryptedBodyBuilder<T, S> {
1021        StatefulEncryptedBodyBuilder { base: self.base, key: self.key, state, _phantom: std::marker::PhantomData }
1022    }
1023}
1024
1025// ─── EncryptedQueryBuilder ────────────────────────────────────────────────────
1026
1027/// Route builder that expects VEIL-encrypted URL query parameters of type `T`.
1028///
1029/// Obtained from [`ServerMechanism::encrypted_query`].  The client must send a single
1030/// `?data=<base64url>` query parameter whose value is the URL-safe base64 encoding of
1031/// the VEIL-encrypted struct bytes.  On each matching request the server base64-decodes
1032/// then decrypts the value using the configured [`SerializationKey`].  If any step fails
1033/// (missing `data` parameter, invalid base64, wrong key, corrupt bytes) the route
1034/// returns `403 Forbidden` — the handler is never invoked.
1035///
1036/// Optionally attach shared state via [`state`](EncryptedQueryBuilder::state) before
1037/// finalising with [`onconnect`](EncryptedQueryBuilder::onconnect).
1038pub struct EncryptedQueryBuilder<T> {
1039    base: ServerMechanism,
1040    key: SerializationKey,
1041    _phantom: std::marker::PhantomData<T>,
1042}
1043
1044impl<T> EncryptedQueryBuilder<T>
1045where
1046    T: bincode::Decode<()> + Send + 'static,
1047{
1048    /// Finalises this route with an async handler that receives the decrypted query
1049    /// parameters as `T`.
1050    ///
1051    /// On each request the `?data=<base64url>` query parameter is base64-decoded then
1052    /// VEIL-decrypted using the [`SerializationKey`] on this builder.  The handler is only
1053    /// invoked when every step succeeds — a missing `data` parameter, invalid base64,
1054    /// wrong key, or corrupt payload causes the route to return `403 Forbidden` without
1055    /// ever reaching the handler.  The `T` the closure receives is therefore always a
1056    /// trusted, fully-decrypted value ready to use.
1057    ///
1058    /// The handler must return `Result<impl Reply, Rejection>`.
1059    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
1060    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1061    where
1062        F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
1063        Fut: Future<Output = Result<Re, Rejection>> + Send,
1064        Re: Reply + Send,
1065    {
1066        log::debug!("Finalising async {:?} route at '{}' (encrypted query)", self.base.method, self.base.path);
1067        let m = self.base.method.filter();
1068        let p = path_filter(&self.base.path);
1069        let key = self.key;
1070        m.and(p)
1071            .and(warp::filters::query::raw())
1072            .and_then(move |raw_query: String| {
1073                let key = key.clone();
1074                let handler = handler.clone();
1075                async move {
1076                    let value: T = decode_query(&raw_query, &key)?;
1077                    handler(value).await.map(|r| r.into_response())
1078                }
1079            })
1080            .boxed()
1081    }
1082
1083    /// Attaches shared state `S`, transitioning to [`StatefulEncryptedQueryBuilder`].
1084    ///
1085    /// A fresh clone of `S` is injected alongside the decrypted `T` on every request.
1086    /// The handler will receive `(state: S, query: T)`.
1087    ///
1088    /// `S` must be `Clone + Send + Sync + 'static`.
1089    pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulEncryptedQueryBuilder<T, S> {
1090        StatefulEncryptedQueryBuilder { base: self.base, key: self.key, state, _phantom: std::marker::PhantomData }
1091    }
1092}
1093
1094// ─── StatefulEncryptedBodyBuilder ────────────────────────────────────────────
1095
1096/// Route builder carrying shared state `S` and a VEIL-encrypted request body of type `T`.
1097///
1098/// Obtained from [`EncryptedBodyBuilder::state`] or [`StatefulSocketBuilder::encryption`].
1099/// On each matching request the body is VEIL-decrypted and a fresh clone of `S` is
1100/// prepared before the handler is called.  A wrong key or corrupt body returns
1101/// `403 Forbidden` without ever invoking the handler.
1102///
1103/// Finalise with [`onconnect`](StatefulEncryptedBodyBuilder::onconnect).
1104pub struct StatefulEncryptedBodyBuilder<T, S> {
1105    base: ServerMechanism,
1106    key: SerializationKey,
1107    state: S,
1108    _phantom: std::marker::PhantomData<T>,
1109}
1110
1111impl<T, S> StatefulEncryptedBodyBuilder<T, S>
1112where
1113    T: bincode::Decode<()> + Send + 'static,
1114    S: Clone + Send + Sync + 'static,
1115{
1116    /// Finalises this route with an async handler that receives `(state: S, body: T)`.
1117    ///
1118    /// On each request the raw body bytes are VEIL-decrypted using the configured
1119    /// [`SerializationKey`] and a fresh clone of `S` is prepared — both are handed to the
1120    /// handler together.  If decryption fails (wrong key or corrupt body) the route returns
1121    /// `403 Forbidden` and the handler is never invoked.  The `T` the closure receives is
1122    /// always a trusted, fully-decrypted value ready to use.
1123    ///
1124    /// The handler must return `Result<impl Reply, Rejection>`.
1125    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
1126    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1127    where
1128        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1129        Fut: Future<Output = Result<Re, Rejection>> + Send,
1130        Re: Reply + Send,
1131    {
1132        log::debug!("Finalising async {:?} route at '{}' (state + encrypted body)", self.base.method, self.base.path);
1133        let m = self.base.method.filter();
1134        let p = path_filter(&self.base.path);
1135        let key = self.key;
1136        let state = self.state;
1137        let state_filter = warp::any().map(move || state.clone());
1138        m.and(p)
1139            .and(state_filter)
1140            .and(warp::body::bytes())
1141            .and_then(move |s: S, raw: bytes::Bytes| {
1142                let key = key.clone();
1143                let handler = handler.clone();
1144                async move {
1145                    let value: T = decode_body(&raw, &key)?;
1146                    handler(s, value).await.map(|r| r.into_response())
1147                }
1148            })
1149            .boxed()
1150    }
1151}
1152
1153// ─── StatefulEncryptedQueryBuilder ───────────────────────────────────────────
1154
1155/// Route builder carrying shared state `S` and VEIL-encrypted query parameters of
1156/// type `T`.
1157///
1158/// Obtained from [`EncryptedQueryBuilder::state`] or
1159/// [`StatefulSocketBuilder::encrypted_query`].  On each matching request the
1160/// `?data=<base64url>` parameter is base64-decoded then VEIL-decrypted and a fresh
1161/// clone of `S` is prepared before the handler is called.  Any decode or decryption
1162/// failure returns `403 Forbidden` without ever invoking the handler.
1163///
1164/// Finalise with [`onconnect`](StatefulEncryptedQueryBuilder::onconnect).
1165pub struct StatefulEncryptedQueryBuilder<T, S> {
1166    base: ServerMechanism,
1167    key: SerializationKey,
1168    state: S,
1169    _phantom: std::marker::PhantomData<T>,
1170}
1171
1172impl<T, S> StatefulEncryptedQueryBuilder<T, S>
1173where
1174    T: bincode::Decode<()> + Send + 'static,
1175    S: Clone + Send + Sync + 'static,
1176{
1177    /// Finalises this route with an async handler that receives `(state: S, query: T)`.
1178    ///
1179    /// On each request the `?data=<base64url>` query parameter is base64-decoded then
1180    /// VEIL-decrypted using the configured [`SerializationKey`] and a fresh clone of `S`
1181    /// is prepared — both are handed to the handler together.  If any step fails (missing
1182    /// parameter, bad encoding, wrong key, or corrupt data) the route returns
1183    /// `403 Forbidden` and the handler is never invoked.  The `T` the closure receives is
1184    /// always a trusted, fully-decrypted value ready to use.
1185    ///
1186    /// The handler must return `Result<impl Reply, Rejection>`.
1187    /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
1188    pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1189    where
1190        F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1191        Fut: Future<Output = Result<Re, Rejection>> + Send,
1192        Re: Reply + Send,
1193    {
1194        log::debug!("Finalising async {:?} route at '{}' (state + encrypted query)", self.base.method, self.base.path);
1195        let m = self.base.method.filter();
1196        let p = path_filter(&self.base.path);
1197        let key = self.key;
1198        let state = self.state;
1199        let state_filter = warp::any().map(move || state.clone());
1200        m.and(p)
1201            .and(state_filter)
1202            .and(warp::filters::query::raw())
1203            .and_then(move |s: S, raw_query: String| {
1204                let key = key.clone();
1205                let handler = handler.clone();
1206                async move {
1207                    let value: T = decode_query(&raw_query, &key)?;
1208                    handler(s, value).await.map(|r| r.into_response())
1209                }
1210            })
1211            .boxed()
1212    }
1213}
1214
1215// ─── Internal decode helpers ─────────────────────────────────────────────────
1216
1217/// Decode a VEIL-sealed request body into `T`, returning `403 Forbidden` on any failure.
1218fn decode_body<T: bincode::Decode<()>>(raw: &bytes::Bytes, key: &SerializationKey) -> Result<T, Rejection> {
1219    crate::serialization::open(raw, key.veil_key()).map_err(|e| {
1220        log::debug!("VEIL open failed (key mismatch or corrupt body): {e}");
1221        forbidden()
1222    })
1223}
1224
1225/// Decode a VEIL-sealed query parameter (`?data=<base64url>`) into `T`,
1226/// returning `403 Forbidden` on any failure.
1227fn decode_query<T: bincode::Decode<()>>(raw_query: &str, key: &SerializationKey) -> Result<T, Rejection> {
1228    // Extract the `data` parameter value from the raw query string.
1229    let data_value = serde_urlencoded::from_str::<std::collections::HashMap<String, String>>(raw_query)
1230        .ok()
1231        .and_then(|mut m| m.remove("data"));
1232
1233    let b64 = data_value.ok_or_else(|| {
1234        log::debug!("Encrypted query missing `data` parameter");
1235        forbidden()
1236    })?;
1237
1238    let bytes = base64::engine::Engine::decode(
1239        &base64::engine::general_purpose::URL_SAFE_NO_PAD,
1240        b64.trim_end_matches('='),
1241    )
1242    .map_err(|e| {
1243        log::debug!("base64url decode failed: {e}");
1244        forbidden()
1245    })?;
1246
1247    crate::serialization::open(&bytes, key.veil_key()).map_err(|e| {
1248        log::debug!("VEIL open (query) failed: {e}");
1249        forbidden()
1250    })
1251}
1252
1253/// Builds a `403 Forbidden` rejection response.
1254fn forbidden() -> Rejection {
1255    // Warp doesn't expose a built-in 403 rejection, so we use a custom one.
1256    warp::reject::custom(ForbiddenError)
1257}
1258
1259#[derive(Debug)]
1260struct ForbiddenError;
1261
1262impl warp::reject::Reject for ForbiddenError {}
1263
1264/// A collection of common HTTP status codes used with the reply helpers.
1265///
1266/// Covers 2xx success, 3xx redirect, 4xx client error, and 5xx server error ranges.
1267/// Converts into `warp::http::StatusCode` via [`From`] and is accepted by
1268/// [`reply_with_status`] and [`reply_with_status_and_json`]. Also usable directly in the
1269/// [`reply!`] macro via the `status => Status::X` argument.
1270#[derive(Clone, Copy, Debug)]
1271pub enum Status {
1272    // 2xx
1273    Ok,
1274    Created,
1275    Accepted,
1276    NoContent,
1277    // 3xx
1278    MovedPermanently,
1279    Found,
1280    NotModified,
1281    TemporaryRedirect,
1282    PermanentRedirect,
1283    // 4xx
1284    BadRequest,
1285    Unauthorized,
1286    Forbidden,
1287    NotFound,
1288    MethodNotAllowed,
1289    Conflict,
1290    Gone,
1291    UnprocessableEntity,
1292    TooManyRequests,
1293    // 5xx
1294    InternalServerError,
1295    NotImplemented,
1296    BadGateway,
1297    ServiceUnavailable,
1298    GatewayTimeout,
1299}
1300
1301impl From<Status> for warp::http::StatusCode {
1302    fn from(s: Status) -> Self {
1303        match s {
1304            Status::Ok                  => warp::http::StatusCode::OK,
1305            Status::Created             => warp::http::StatusCode::CREATED,
1306            Status::Accepted            => warp::http::StatusCode::ACCEPTED,
1307            Status::NoContent           => warp::http::StatusCode::NO_CONTENT,
1308            Status::MovedPermanently    => warp::http::StatusCode::MOVED_PERMANENTLY,
1309            Status::Found               => warp::http::StatusCode::FOUND,
1310            Status::NotModified         => warp::http::StatusCode::NOT_MODIFIED,
1311            Status::TemporaryRedirect   => warp::http::StatusCode::TEMPORARY_REDIRECT,
1312            Status::PermanentRedirect   => warp::http::StatusCode::PERMANENT_REDIRECT,
1313            Status::BadRequest          => warp::http::StatusCode::BAD_REQUEST,
1314            Status::Unauthorized        => warp::http::StatusCode::UNAUTHORIZED,
1315            Status::Forbidden           => warp::http::StatusCode::FORBIDDEN,
1316            Status::NotFound            => warp::http::StatusCode::NOT_FOUND,
1317            Status::MethodNotAllowed    => warp::http::StatusCode::METHOD_NOT_ALLOWED,
1318            Status::Conflict            => warp::http::StatusCode::CONFLICT,
1319            Status::Gone                => warp::http::StatusCode::GONE,
1320            Status::UnprocessableEntity => warp::http::StatusCode::UNPROCESSABLE_ENTITY,
1321            Status::TooManyRequests     => warp::http::StatusCode::TOO_MANY_REQUESTS,
1322            Status::InternalServerError => warp::http::StatusCode::INTERNAL_SERVER_ERROR,
1323            Status::NotImplemented      => warp::http::StatusCode::NOT_IMPLEMENTED,
1324            Status::BadGateway          => warp::http::StatusCode::BAD_GATEWAY,
1325            Status::ServiceUnavailable  => warp::http::StatusCode::SERVICE_UNAVAILABLE,
1326            Status::GatewayTimeout      => warp::http::StatusCode::GATEWAY_TIMEOUT,
1327        }
1328    }
1329}
1330
1331/// The HTTP server that owns and dispatches a collection of [`SocketType`] routes.
1332///
1333/// Build routes through the [`ServerMechanism`] builder chain, register each with
1334/// [`mechanism`](Server::mechanism), then start the server with [`serve`](Server::serve).
1335///
1336/// # Example
1337/// ```rust,no_run
1338/// # use serde::Serialize;
1339/// # #[derive(Serialize)] struct Pong { ok: bool }
1340///
1341/// let mut server = Server::default();
1342///
1343/// server
1344///     .mechanism(
1345///         ServerMechanism::get("/ping")
1346///             .onconnect(|| async { reply!(json => Pong { ok: true }) })
1347///     )
1348///     .mechanism(
1349///         ServerMechanism::delete("/session")
1350///             .onconnect(|| async { reply!(message => warp::reply(), status => Status::NoContent) })
1351///     );
1352///
1353/// // Blocks forever — call only to actually run the server:
1354/// // server.serve(([0, 0, 0, 0], 8080)).await;
1355/// ```
1356///
1357/// # Caution
1358/// Calling [`serve`](Server::serve) with no routes registered will **panic**.
1359pub struct Server {
1360    mechanisms: Vec<SocketType>,
1361}
1362
1363impl Default for Server {
1364    fn default() -> Self {
1365        Self::new()
1366    }
1367}
1368
1369impl Server {
1370    fn new() -> Self {
1371        Self { mechanisms: Vec::new() }
1372    }
1373
1374    /// Registers a [`SocketType`] route on this server.
1375    ///
1376    /// Routes are evaluated in registration order. Returns `&mut Self` for method chaining.
1377    pub fn mechanism(&mut self, mech: SocketType) -> &mut Self {
1378        self.mechanisms.push(mech);
1379        log::debug!("Route registered (total: {})", self.mechanisms.len());
1380        self
1381    }
1382
1383    /// Binds to `addr` and starts serving all registered routes.
1384    ///
1385    /// This is an infinite async loop — it never returns under normal operation.
1386    ///
1387    /// # Panics
1388    /// Panics immediately if no routes have been registered via [`mechanism`](Server::mechanism).
1389    pub async fn serve(self, addr: impl Into<SocketAddr>) {
1390        let addr = addr.into();
1391        log::info!("Server binding to {}", addr);
1392        let mut iter = self.mechanisms.into_iter();
1393        let first = iter.next().unwrap_or_else(|| {
1394            log::trace!("No mechanisms are defined on the server, this will result in a panic!");
1395            log::error!("The server contains no mechanisms to follow through");
1396            panic!();
1397        });
1398
1399        let combined = iter.fold(first.boxed(), |acc, next| {
1400            acc.or(next).unify().boxed()
1401        });
1402
1403        log::info!("Server running on {}", addr);
1404        warp::serve(combined).run(addr).await;
1405    }
1406
1407    /// Binds to `addr`, serves all registered routes, and shuts down gracefully when
1408    /// `shutdown` resolves.
1409    ///
1410    /// Equivalent to calling [`serve_from_listener`](Server::serve_from_listener) with an
1411    /// address instead of a pre-bound listener.
1412    ///
1413    /// # Example
1414    ///
1415    /// ```rust,no_run
1416    /// use tokio::sync::oneshot;
1417    ///
1418    /// # async fn run() {
1419    /// let (tx, rx) = oneshot::channel::<()>();
1420    /// // ... build server and mount routes ...
1421    /// // trigger shutdown later by calling tx.send(())
1422    /// server.serve_with_graceful_shutdown(([127, 0, 0, 1], 8080), async move {
1423    ///     rx.await.ok();
1424    /// }).await;
1425    /// # }
1426    /// ```
1427    pub async fn serve_with_graceful_shutdown(
1428        self,
1429        addr: impl Into<std::net::SocketAddr>,
1430        shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1431    ) {
1432        let addr = addr.into();
1433        log::info!("Server binding to {} (graceful shutdown enabled)", addr);
1434        let mut iter = self.mechanisms.into_iter();
1435        let first = iter.next().unwrap_or_else(|| {
1436            log::error!("The server contains no mechanisms to follow through");
1437            panic!("serve_with_graceful_shutdown called with no routes registered");
1438        });
1439        let combined = iter.fold(first.boxed(), |acc, next| acc.or(next).unify().boxed());
1440        log::info!("Server running on {} (graceful shutdown enabled)", addr);
1441        warp::serve(combined)
1442            .bind(addr)
1443            .await
1444            .graceful(shutdown)
1445            .run()
1446            .await;
1447    }
1448
1449    /// Serves all registered routes from an already-bound `listener`, shutting down gracefully
1450    /// when `shutdown` resolves.
1451    ///
1452    /// Use this when port `0` is passed to `TcpListener::bind` and you need to know the actual
1453    /// OS-assigned port before the server starts (e.g. to open a browser to the correct URL).
1454    ///
1455    /// # Example
1456    ///
1457    /// ```rust,no_run
1458    /// use tokio::net::TcpListener;
1459    /// use tokio::sync::oneshot;
1460    ///
1461    /// # async fn run() -> std::io::Result<()> {
1462    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
1463    /// let port = listener.local_addr()?.port();
1464    /// println!("Will listen on port {port}");
1465    ///
1466    /// let (tx, rx) = oneshot::channel::<()>();
1467    /// // ... build server and mount routes ...
1468    /// server.serve_from_listener(listener, async move { rx.await.ok(); }).await;
1469    /// # Ok(())
1470    /// # }
1471    /// ```
1472    pub async fn serve_from_listener(
1473        self,
1474        listener: tokio::net::TcpListener,
1475        shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1476    ) {
1477        log::info!("Server running on {} (graceful shutdown enabled)",
1478            listener.local_addr().map(|a| a.to_string()).unwrap_or_else(|_| "?".into()));
1479        let mut iter = self.mechanisms.into_iter();
1480        let first = iter.next().unwrap_or_else(|| {
1481            log::error!("The server contains no mechanisms to follow through");
1482            panic!("serve_from_listener called with no routes registered");
1483        });
1484        let combined = iter.fold(first.boxed(), |acc, next| acc.or(next).unify().boxed());
1485        warp::serve(combined)
1486            .incoming(listener)
1487            .graceful(shutdown)
1488            .run()
1489            .await;
1490    }
1491}
1492
1493/// Wraps `reply` with the given HTTP `status` code and returns it as a warp result.
1494///
1495/// Use when an endpoint needs to respond with a specific status alongside a plain message.
1496/// Pairs with the [`reply!`] macro form `reply!(message => ..., status => ...)`.
1497pub fn reply_with_status(status: Status, reply: impl Reply) -> Result<impl Reply, Rejection> {
1498    Ok::<_, Rejection>(warp::reply::with_status(reply, status.into()))
1499}
1500
1501/// Returns an empty `200 OK` warp reply result.
1502///
1503/// Useful for endpoints that need only to acknowledge a request with no body.
1504/// Equivalent to `reply!()`.
1505pub fn reply() -> Result<impl Reply, Rejection> {
1506    Ok::<_, Rejection>(warp::reply())
1507}
1508
1509/// Serialises `json` as a JSON body and returns it as a `200 OK` warp reply result.
1510///
1511/// `T` must implement `serde::Serialize`. Equivalent to `reply!(json => ...)`.
1512pub fn reply_with_json<T: Serialize>(json: &T) -> Result<impl Reply + use<T>, Rejection> {
1513    Ok::<_, Rejection>(warp::reply::json(json))
1514}
1515
1516/// Serialises `json` as a JSON body, attaches the given HTTP `status`, and returns a warp result.
1517///
1518/// `T` must implement `serde::Serialize`. Equivalent to `reply!(json => ..., status => ...)`.
1519pub fn reply_with_status_and_json<T: Serialize>(status: Status, json: &T) -> Result<impl Reply + use<T>, Rejection> {
1520    Ok::<_, Rejection>(warp::reply::with_status(warp::reply::json(json), status.into()))
1521}
1522
1523/// Seals `value` with `key` and returns it as an `application/octet-stream` response (`200 OK`).
1524///
1525/// `T` must implement `bincode::Encode`.
1526/// Equivalent to `reply!(sealed => value, key => key)`.
1527pub fn reply_sealed<T: bincode::Encode>(
1528    value: &T,
1529    key: SerializationKey,
1530) -> Result<warp::reply::Response, Rejection> {
1531    sealed_response(value, key, None)
1532}
1533
1534/// Seals `value` with `key`, attaches the given HTTP `status`, and returns it as a warp result.
1535///
1536/// Equivalent to `reply!(sealed => value, key => key, status => Status::X)`.
1537pub fn reply_sealed_with_status<T: bincode::Encode>(
1538    value: &T,
1539    key: SerializationKey,
1540    status: Status,
1541) -> Result<warp::reply::Response, Rejection> {
1542    sealed_response(value, key, Some(status))
1543}
1544
1545fn sealed_response<T: bincode::Encode>(
1546    value: &T,
1547    key: SerializationKey,
1548    status: Option<Status>,
1549) -> Result<warp::reply::Response, Rejection> {
1550    use warp::http::StatusCode;
1551    use warp::Reply;
1552    let code: StatusCode = status.map(Into::into).unwrap_or(StatusCode::OK);
1553    let sealed = crate::serialization::seal(value, key.veil_key()).map_err(|_| warp::reject())?;
1554    Ok(warp::reply::with_status(sealed, code).into_response())
1555}
1556
1557/// Convenience macro for constructing warp reply results inside route handlers.
1558///
1559/// | Syntax | Equivalent | Description |
1560/// |---|---|---|
1561/// | `reply!()` | [`reply()`] | Empty `200 OK` response. |
1562/// | `reply!(message => expr, status => Status::X)` | [`reply_with_status`] | Plain reply with a status code. |
1563/// | `reply!(json => expr)` | [`reply_with_json`] | JSON body with `200 OK`. |
1564/// | `reply!(json => expr, status => Status::X)` | [`reply_with_status_and_json`] | JSON body with a status code. |
1565/// | `reply!(sealed => expr, key => key)` | [`reply_sealed`] | VEIL-sealed (or JSON for `Plain`) body, `200 OK`. |
1566/// | `reply!(sealed => expr, key => key, status => Status::X)` | [`reply_sealed_with_status`] | VEIL-sealed (or JSON for `Plain`) body with status. |
1567///
1568/// # Example
1569/// ```rust,no_run
1570/// # use serde::{Deserialize, Serialize};
1571/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
1572///
1573/// // Empty 200 OK
1574/// let ping = ServerMechanism::get("/ping")
1575///     .onconnect(|| async { reply!() });
1576///
1577/// // Plain reply with a custom status
1578/// let gone = ServerMechanism::delete("/v1")
1579///     .onconnect(|| async {
1580///         reply!(message => warp::reply::html("endpoint deprecated"), status => Status::Gone)
1581///     });
1582///
1583/// // JSON body, 200 OK
1584/// let list = ServerMechanism::get("/items")
1585///     .onconnect(|| async {
1586///         let items: Vec<Item> = vec![];
1587///         reply!(json => items)
1588///     });
1589///
1590/// // JSON body with a custom status
1591/// let create = ServerMechanism::post("/items")
1592///     .json::<Item>()
1593///     .onconnect(|item| async move {
1594///         reply!(json => item, status => Status::Created)
1595///     });
1596/// ```
1597#[macro_export]
1598macro_rules! reply {
1599    () => {{
1600        $crate::socket::server::reply()
1601    }};
1602
1603    (message => $message: expr, status => $status: expr) => {{
1604        $crate::socket::server::reply_with_status($status, $message)
1605    }};
1606
1607    (json => $json: expr) => {{
1608        $crate::socket::server::reply_with_json(&$json)
1609    }};
1610
1611    (json => $json: expr, status => $status: expr) => {{
1612        $crate::socket::server::reply_with_status_and_json($status, &$json)
1613    }};
1614
1615    (sealed => $val: expr, key => $key: expr) => {{
1616        $crate::socket::server::reply_sealed(&$val, $key)
1617    }};
1618
1619    (sealed => $val: expr, key => $key: expr, status => $status: expr) => {{
1620        $crate::socket::server::reply_sealed_with_status(&$val, $key, $status)
1621    }};
1622}
1623