toolkit_zero/socket/server.rs
1
2//! Typed, fluent HTTP server construction.
3//!
4//! This module provides a builder-oriented API for declaring HTTP routes and
5//! serving them with [`warp`] under the hood. The entry point for every route
6//! is [`ServerMechanism`], which pairs an HTTP method with a URL path and
7//! supports incremental enrichment — attaching a JSON body expectation, URL
8//! query parameter deserialisation, or shared state — before being finalised
9//! into a [`SocketType`] route handle via `onconnect`.
10//!
11//! Completed routes are registered on a [`Server`] and served with a single
12//! `.await`. Graceful shutdown is available via
13//! [`Server::serve_with_graceful_shutdown`] and [`Server::serve_from_listener`].
14//!
15//! # Builder chains at a glance
16//!
17//! | Chain | Handler receives |
18//! |---|---|
19//! | `mechanism(method, path).onconnect(f)` | nothing |
20//! | `.json::<T>().onconnect(f)` | `T: DeserializeOwned` |
21//! | `.query::<T>().onconnect(f)` | `T: DeserializeOwned` |
22//! | `.state(s).onconnect(f)` | `S: Clone + Send + Sync` |
23//! | `.state(s).json::<T>().onconnect(f)` | `(S, T)` |
24//! | `.state(s).query::<T>().onconnect(f)` | `(S, T)` |
25//!
26//! For blocking handlers (not recommended in production) every finaliser also
27//! has an unsafe `onconnect_sync` counterpart.
28//!
29//! # Response helpers
30//!
31//! Use the [`reply!`] macro as the most concise way to build a response:
32//!
33//! ```rust,no_run
34//! # use toolkit_zero::socket::server::*;
35//! # use serde::Serialize;
36//! # #[derive(Serialize)] struct Item { id: u32 }
37//! # let item = Item { id: 1 };
38//! // 200 OK, empty body
39//! // reply!()
40//!
41//! // 200 OK, JSON body
42//! // reply!(json => item)
43//!
44//! // 201 Created, JSON body
45//! // reply!(json => item, status => Status::Created)
46//! ```
47
48use std::{future::Future, net::SocketAddr};
49use serde::{de::DeserializeOwned, Serialize};
50use warp::{Filter, Rejection, Reply, filters::BoxedFilter};
51
52pub use super::SerializationKey;
53
54/// A fully assembled, type-erased HTTP route ready to be registered on a [`Server`].
55///
56/// This is the final product of every builder chain. Pass it to [`Server::mechanism`] to mount it.
57pub type SocketType = BoxedFilter<(warp::reply::Response, )>;
58
59#[derive(Clone, Copy, Debug)]
60enum HttpMethod {
61 Get,
62 Post,
63 Put,
64 Delete,
65 Patch,
66 Head,
67 Options,
68}
69
70impl HttpMethod {
71 fn filter(&self) -> BoxedFilter<()> {
72 match self {
73 HttpMethod::Get => warp::get().boxed(),
74 HttpMethod::Post => warp::post().boxed(),
75 HttpMethod::Put => warp::put().boxed(),
76 HttpMethod::Delete => warp::delete().boxed(),
77 HttpMethod::Patch => warp::patch().boxed(),
78 HttpMethod::Head => warp::head().boxed(),
79 HttpMethod::Options => warp::options().boxed(),
80 }
81 }
82}
83
84
85fn path_filter(path: &str) -> BoxedFilter<()> {
86 log::trace!("Building path filter for: '{}'", path);
87 let segs: Vec<&'static str> = path
88 .trim_matches('/')
89 .split('/')
90 .filter(|s| !s.is_empty())
91 .map(|s| -> &'static str { Box::leak(s.to_owned().into_boxed_str()) })
92 .collect();
93
94 if segs.is_empty() {
95 return warp::path::end().boxed();
96 }
97
98 // Start with the first segment, then and-chain the rest.
99 let mut f: BoxedFilter<()> = warp::path(segs[0]).boxed();
100 for seg in &segs[1..] {
101 f = f.and(warp::path(*seg)).boxed();
102 }
103 f.and(warp::path::end()).boxed()
104}
105
106/// Entry point for building an HTTP route.
107///
108/// Pairs an HTTP method with a URL path and acts as the root of a fluent builder chain.
109/// Optionally attach shared state, a JSON body expectation, or URL query parameter
110/// deserialisation — then finalise with [`onconnect`](ServerMechanism::onconnect) (async) or
111/// [`onconnect_sync`](ServerMechanism::onconnect_sync) (sync) to produce a [`SocketType`]
112/// ready to be mounted on a [`Server`].
113///
114/// # Example
115/// ```rust,no_run
116/// # use serde::{Deserialize, Serialize};
117/// # use std::sync::{Arc, Mutex};
118/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
119/// # #[derive(Deserialize)] struct CreateItem { name: String }
120/// # #[derive(Deserialize)] struct SearchQuery { q: String }
121///
122/// // Plain GET — no body, no state
123/// let health = ServerMechanism::get("/health")
124/// .onconnect(|| async { reply!() });
125///
126/// // POST — JSON body deserialised into `CreateItem`
127/// let create = ServerMechanism::post("/items")
128/// .json::<CreateItem>()
129/// .onconnect(|body| async move {
130/// let item = Item { id: 1, name: body.name };
131/// reply!(json => item, status => Status::Created)
132/// });
133///
134/// // GET — URL query params deserialised into `SearchQuery`
135/// let search = ServerMechanism::get("/search")
136/// .query::<SearchQuery>()
137/// .onconnect(|params| async move {
138/// let _q = params.q;
139/// reply!()
140/// });
141///
142/// // GET — shared counter state injected on every request
143/// let counter: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
144/// let count_route = ServerMechanism::get("/count")
145/// .state(counter.clone())
146/// .onconnect(|state| async move {
147/// let n = *state.lock().unwrap();
148/// reply!(json => n)
149/// });
150/// ```
151pub struct ServerMechanism {
152 method: HttpMethod,
153 path: String,
154}
155
156impl ServerMechanism {
157
158 fn instance(method: HttpMethod, path: impl Into<String>) -> Self {
159 let path = path.into();
160 log::debug!("Creating {:?} route at '{}'", method, path);
161 Self { method, path }
162 }
163
164 /// Creates a route matching HTTP `GET` requests at `path`.
165 pub fn get(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Get, path) }
166
167 /// Creates a route matching HTTP `POST` requests at `path`.
168 pub fn post(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Post, path) }
169
170 /// Creates a route matching HTTP `PUT` requests at `path`.
171 pub fn put(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Put, path) }
172
173 /// Creates a route matching HTTP `DELETE` requests at `path`.
174 pub fn delete(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Delete, path) }
175
176 /// Creates a route matching HTTP `PATCH` requests at `path`.
177 pub fn patch(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Patch, path) }
178
179 /// Creates a route matching HTTP `HEAD` requests at `path`.
180 pub fn head(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Head, path) }
181
182 /// Creates a route matching HTTP `OPTIONS` requests at `path`.
183 pub fn options(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Options, path) }
184
185 /// Attaches shared state `S` to this route, transitioning to [`StatefulSocketBuilder`].
186 ///
187 /// A fresh clone of `S` is injected into the handler on every request. For mutable
188 /// shared state, wrap the inner value in `Arc<Mutex<_>>` or `Arc<RwLock<_>>` before
189 /// passing it here — only the outer `Arc` is cloned per request; the inner data stays
190 /// shared across all requests.
191 ///
192 /// `S` must be `Clone + Send + Sync + 'static`.
193 ///
194 /// From [`StatefulSocketBuilder`] you can further add a JSON body (`.json`), query
195 /// parameters (`.query`), or encryption (`.encryption` / `.encrypted_query`) before
196 /// finalising with `onconnect`.
197 ///
198 /// # Example
199 /// ```rust,no_run
200 /// use std::sync::{Arc, Mutex};
201 ///
202 /// let db: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
203 ///
204 /// let route = ServerMechanism::get("/list")
205 /// .state(db.clone())
206 /// .onconnect(|state| async move {
207 /// let items = state.lock().unwrap().clone();
208 /// reply!(json => items)
209 /// });
210 /// ```
211 pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulSocketBuilder<S> {
212 log::trace!("Attaching state to {:?} route at '{}'", self.method, self.path);
213 StatefulSocketBuilder { base: self, state }
214 }
215
216 /// Declares that this route expects a JSON-encoded request body, transitioning to
217 /// [`JsonSocketBuilder`].
218 ///
219 /// On each incoming request the body is parsed as `Content-Type: application/json`
220 /// and deserialised into `T`. If the body is absent, malformed, or fails to
221 /// deserialise, the request is rejected before the handler is ever called.
222 /// When you subsequently call [`onconnect`](JsonSocketBuilder::onconnect), the handler
223 /// receives a fully-deserialised, ready-to-use `T`.
224 ///
225 /// `T` must implement `serde::de::DeserializeOwned`.
226 ///
227 /// # Example
228 /// ```rust,no_run
229 /// # use serde::Deserialize;
230 /// # #[derive(Deserialize)] struct Payload { value: i32 }
231 ///
232 /// let route = ServerMechanism::post("/submit")
233 /// .json::<Payload>()
234 /// .onconnect(|body| async move {
235 /// reply!(json => body.value)
236 /// });
237 /// ```
238 pub fn json<T: DeserializeOwned + Send>(self) -> JsonSocketBuilder<T> {
239 log::trace!("Attaching JSON body expectation to {:?} route at '{}'", self.method, self.path);
240 JsonSocketBuilder { base: self, _phantom: std::marker::PhantomData }
241 }
242
243 /// Declares that this route extracts its input from URL query parameters, transitioning
244 /// to [`QuerySocketBuilder`].
245 ///
246 /// On each incoming request the query string (`?field=value&...`) is deserialised into
247 /// `T`. A missing or malformed query string is rejected before the handler is called.
248 /// When you subsequently call [`onconnect`](QuerySocketBuilder::onconnect), the handler
249 /// receives a fully-deserialised, ready-to-use `T`.
250 ///
251 /// `T` must implement `serde::de::DeserializeOwned`.
252 ///
253 /// # Example
254 /// ```rust,no_run
255 /// # use serde::Deserialize;
256 /// # #[derive(Deserialize)] struct Filter { page: u32, per_page: u32 }
257 ///
258 /// let route = ServerMechanism::get("/items")
259 /// .query::<Filter>()
260 /// .onconnect(|filter| async move {
261 /// let _ = (filter.page, filter.per_page);
262 /// reply!()
263 /// });
264 /// ```
265 pub fn query<T: DeserializeOwned + Send>(self) -> QuerySocketBuilder<T> {
266 log::trace!("Attaching query parameter expectation to {:?} route at '{}'", self.method, self.path);
267 QuerySocketBuilder { base: self, _phantom: std::marker::PhantomData }
268 }
269
270 /// Declares that this route expects a VEIL-encrypted request body, transitioning to
271 /// [`EncryptedBodyBuilder`].
272 ///
273 /// On each incoming request the raw body bytes are decrypted using the provided
274 /// [`SerializationKey`] before the handler is called. If the key does not match or
275 /// the body is corrupt, the route responds with `403 Forbidden` and the handler is
276 /// never invoked — meaning the `T` your handler receives is always a legitimate,
277 /// trusted, fully-decrypted value.
278 ///
279 /// Use `SerializationKey::Default` when both client and server share the built-in key,
280 /// or `SerializationKey::Value("your-key")` for a custom shared secret.
281 /// For plain-JSON routes (no encryption) use [`.json::<T>()`](ServerMechanism::json)
282 /// instead.
283 ///
284 /// `T` must implement `bincode::Decode<()>`.
285 ///
286 /// # Example
287 /// ```rust,no_run
288 /// # use bincode::{Encode, Decode};
289 /// # #[derive(Encode, Decode)] struct Payload { value: i32 }
290 ///
291 /// let route = ServerMechanism::post("/submit")
292 /// .encryption::<Payload>(SerializationKey::Default)
293 /// .onconnect(|body| async move {
294 /// // `body` is already decrypted and deserialised — use it directly.
295 /// reply!(json => body.value)
296 /// });
297 /// ```
298 pub fn encryption<T>(self, key: SerializationKey) -> EncryptedBodyBuilder<T> {
299 log::trace!("Attaching encrypted body to {:?} route at '{}'", self.method, self.path);
300 EncryptedBodyBuilder { base: self, key, _phantom: std::marker::PhantomData }
301 }
302
303 /// Declares that this route expects VEIL-encrypted URL query parameters, transitioning
304 /// to [`EncryptedQueryBuilder`].
305 ///
306 /// The client must send a single `?data=<base64url>` query parameter whose value is
307 /// the URL-safe base64 encoding of the VEIL-encrypted struct bytes. On each request
308 /// the server base64-decodes then decrypts the payload using the provided
309 /// [`SerializationKey`]. If the `data` parameter is missing, the encoding is invalid,
310 /// the key does not match, or the bytes are corrupt, the route responds with
311 /// `403 Forbidden` and the handler is never invoked — meaning the `T` your handler
312 /// receives is always a legitimate, trusted, fully-decrypted value.
313 ///
314 /// Use `SerializationKey::Default` or `SerializationKey::Value("your-key")`. For
315 /// plain query-string routes (no encryption) use
316 /// [`.query::<T>()`](ServerMechanism::query) instead.
317 ///
318 /// `T` must implement `bincode::Decode<()>`.
319 pub fn encrypted_query<T>(self, key: SerializationKey) -> EncryptedQueryBuilder<T> {
320 log::trace!("Attaching encrypted query to {:?} route at '{}'", self.method, self.path);
321 EncryptedQueryBuilder { base: self, key, _phantom: std::marker::PhantomData }
322 }
323
324 /// Finalises this route with an async handler that receives no arguments.
325 ///
326 /// Neither a request body nor query parameters are read. The handler runs on every
327 /// matching request and must return `Result<impl Reply, Rejection>`. Use the
328 /// [`reply!`] macro or the standalone reply helpers ([`reply_with_json`],
329 /// [`reply_with_status`], etc.) to construct a response.
330 ///
331 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
332 ///
333 /// # Example
334 /// ```rust,no_run
335 /// # use serde::Serialize;
336 /// # #[derive(Serialize)] struct Pong { ok: bool }
337 ///
338 /// let route = ServerMechanism::get("/ping")
339 /// .onconnect(|| async {
340 /// reply!(json => Pong { ok: true })
341 /// });
342 /// ```
343 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
344 where
345 F: Fn() -> Fut + Clone + Send + Sync + 'static,
346 Fut: Future<Output = Result<Re, Rejection>> + Send,
347 Re: Reply + Send,
348 {
349 log::debug!("Finalising async {:?} route at '{}' (no args)", self.method, self.path);
350 let m = self.method.filter();
351 let p = path_filter(&self.path);
352 m.and(p)
353 .and_then(handler)
354 .map(|r: Re| r.into_response())
355 .boxed()
356 }
357
358 /// Finalises this route with a **synchronous** handler that receives no arguments.
359 ///
360 /// Behaviour and contract are identical to the async variant — neither a body nor
361 /// query parameters are read — but the closure may block. Each request is dispatched
362 /// to the blocking thread pool, so the handler must complete quickly to avoid starving
363 /// other requests.
364 ///
365 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
366 ///
367 /// # Safety
368 ///
369 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
370 /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
371 /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
372 /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
373 /// gets a chance to run. Additionally, any panic inside the handler is silently converted
374 /// into a `Rejection`, masking runtime errors. Callers must ensure the handler completes
375 /// quickly and that adequate backpressure or rate limiting is applied externally.
376 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
377 where
378 F: Fn() -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
379 Re: Reply + Send + 'static,
380 {
381 log::warn!("Registering sync handler on {:?} '{}' — ensure rate-limiting is applied externally", self.method, self.path);
382 let m = self.method.filter();
383 let p = path_filter(&self.path);
384 m.and(p)
385 .and_then(move || {
386 let handler = handler.clone();
387 async move {
388 tokio::task::spawn_blocking(move || handler())
389 .await
390 .unwrap_or_else(|_| {
391 log::warn!("Sync handler panicked; converting to Rejection");
392 Err(warp::reject())
393 })
394 }
395 })
396 .map(|r: Re| r.into_response())
397 .boxed()
398 }
399}
400
401
402/// Route builder that expects and deserialises a JSON request body of type `T`.
403///
404/// Obtained from [`ServerMechanism::json`]. Optionally attach shared state via
405/// [`state`](JsonSocketBuilder::state), or finalise immediately with
406/// [`onconnect`](JsonSocketBuilder::onconnect) /
407/// [`onconnect_sync`](JsonSocketBuilder::onconnect_sync).
408pub struct JsonSocketBuilder<T> {
409 base: ServerMechanism,
410 _phantom: std::marker::PhantomData<T>,
411}
412
413impl<T: DeserializeOwned + Send + 'static> JsonSocketBuilder<T> {
414
415 /// Finalises this route with an async handler that receives the deserialised JSON body.
416 ///
417 /// Before the handler is called, the incoming request body is parsed as
418 /// `Content-Type: application/json` and deserialised into `T`. The handler receives
419 /// a ready-to-use `T` — no manual parsing is needed. If the body is absent or cannot
420 /// be decoded the request is rejected before the handler is ever invoked.
421 ///
422 /// The handler must return `Result<impl Reply, Rejection>`.
423 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
424 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
425 where
426 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
427 Fut: Future<Output = Result<Re, Rejection>> + Send,
428 Re: Reply + Send,
429 {
430 log::debug!("Finalising async {:?} route at '{}' (JSON body)", self.base.method, self.base.path);
431 let m = self.base.method.filter();
432 let p = path_filter(&self.base.path);
433 m.and(p)
434 .and(warp::body::json::<T>())
435 .and_then(handler)
436 .map(|r: Re| r.into_response())
437 .boxed()
438 }
439
440 /// Finalises this route with a **synchronous** handler that receives the deserialised
441 /// JSON body.
442 ///
443 /// The body is decoded into `T` before the handler is dispatched, identical to the
444 /// async variant. The closure may block but must complete quickly to avoid exhausting
445 /// the blocking thread pool.
446 ///
447 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
448 ///
449 /// # Safety
450 ///
451 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
452 /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
453 /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
454 /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
455 /// gets a chance to run. Additionally, any panic inside the handler is silently converted
456 /// into a `Rejection`, masking runtime errors. Callers must ensure the handler completes
457 /// quickly and that adequate backpressure or rate limiting is applied externally.
458 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
459 where
460 F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
461 Re: Reply + Send + 'static,
462 {
463 log::warn!("Registering sync handler on {:?} '{}' (JSON body) — ensure rate-limiting is applied externally", self.base.method, self.base.path);
464 let m = self.base.method.filter();
465 let p = path_filter(&self.base.path);
466 m.and(p)
467 .and(warp::body::json::<T>())
468 .and_then(move |body: T| {
469 let handler = handler.clone();
470 async move {
471 tokio::task::spawn_blocking(move || handler(body))
472 .await
473 .unwrap_or_else(|_| {
474 log::warn!("Sync handler (JSON body) panicked; converting to Rejection");
475 Err(warp::reject())
476 })
477 }
478 })
479 .map(|r: Re| r.into_response())
480 .boxed()
481 }
482
483 /// Attaches shared state `S`, transitioning to [`StatefulJsonSocketBuilder`].
484 ///
485 /// Alternative ordering to `.state(s).json::<T>()` — both produce the same route.
486 /// A fresh clone of `S` is injected alongside the JSON-decoded `T` on every request.
487 /// The handler will receive `(state: S, body: T)`.
488 ///
489 /// `S` must be `Clone + Send + Sync + 'static`.
490 pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulJsonSocketBuilder<T, S> {
491 StatefulJsonSocketBuilder { base: self.base, state, _phantom: std::marker::PhantomData }
492 }
493
494}
495
496/// Route builder that expects and deserialises URL query parameters of type `T`.
497///
498/// Obtained from [`ServerMechanism::query`]. Optionally attach shared state via
499/// [`state`](QuerySocketBuilder::state), or finalise immediately with
500/// [`onconnect`](QuerySocketBuilder::onconnect) /
501/// [`onconnect_sync`](QuerySocketBuilder::onconnect_sync).
502pub struct QuerySocketBuilder<T> {
503 base: ServerMechanism,
504 _phantom: std::marker::PhantomData<T>,
505}
506
507impl<T: DeserializeOwned + Send + 'static> QuerySocketBuilder<T> {
508 /// Finalises this route with an async handler that receives the deserialised query
509 /// parameters.
510 ///
511 /// Before the handler is called, the URL query string (`?field=value&...`) is parsed
512 /// and deserialised into `T`. The handler receives a ready-to-use `T` — no manual
513 /// parsing is needed. A missing or malformed query string is rejected before the
514 /// handler is ever invoked.
515 ///
516 /// The handler must return `Result<impl Reply, Rejection>`.
517 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
518 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
519 where
520 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
521 Fut: Future<Output = Result<Re, Rejection>> + Send,
522 Re: Reply + Send,
523 {
524 log::debug!("Finalising async {:?} route at '{}' (query params)", self.base.method, self.base.path);
525 let m = self.base.method.filter();
526 let p = path_filter(&self.base.path);
527 m.and(p)
528 .and(warp::filters::query::query::<T>())
529 .and_then(handler)
530 .map(|r: Re| r.into_response())
531 .boxed()
532 }
533
534 /// Finalises this route with a **synchronous** handler that receives the deserialised
535 /// query parameters.
536 ///
537 /// The query string is decoded into `T` before the handler is dispatched, identical to
538 /// the async variant. The closure may block but must complete quickly.
539 ///
540 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
541 ///
542 /// # Safety
543 ///
544 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
545 /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
546 /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
547 /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
548 /// gets a chance to run. Additionally, any panic inside the handler is silently converted
549 /// into a `Rejection`, masking runtime errors. Callers must ensure the handler completes
550 /// quickly and that adequate backpressure or rate limiting is applied externally.
551 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
552 where
553 F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
554 Re: Reply + Send + 'static,
555 {
556 log::warn!("Registering sync handler on {:?} '{}' (query params) — ensure rate-limiting is applied externally", self.base.method, self.base.path);
557 let m = self.base.method.filter();
558 let p = path_filter(&self.base.path);
559 m.and(p)
560 .and(warp::filters::query::query::<T>())
561 .and_then(move |query: T| {
562 let handler = handler.clone();
563 async move {
564 tokio::task::spawn_blocking(move || handler(query))
565 .await
566 .unwrap_or_else(|_| {
567 log::warn!("Sync handler (query params) panicked; converting to Rejection");
568 Err(warp::reject())
569 })
570 }
571 })
572 .map(|r: Re| r.into_response())
573 .boxed()
574 }
575
576 /// Attaches shared state `S`, transitioning to [`StatefulQuerySocketBuilder`].
577 ///
578 /// Alternative ordering to `.state(s).query::<T>()` — both produce the same route.
579 /// A fresh clone of `S` is injected alongside the query-decoded `T` on every request.
580 /// The handler will receive `(state: S, query: T)`.
581 ///
582 /// `S` must be `Clone + Send + Sync + 'static`.
583 pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulQuerySocketBuilder<T, S> {
584 StatefulQuerySocketBuilder { base: self.base, state, _phantom: std::marker::PhantomData }
585 }
586
587}
588
589
590/// Route builder that carries shared state `S` with no body or query expectation.
591///
592/// Obtained from [`ServerMechanism::state`]. `S` must be `Clone + Send + Sync + 'static`.
593/// For mutable shared state, wrap it in `Arc<Mutex<_>>` or `Arc<RwLock<_>>` before passing
594/// it here. Finalise with [`onconnect`](StatefulSocketBuilder::onconnect) /
595/// [`onconnect_sync`](StatefulSocketBuilder::onconnect_sync).
596pub struct StatefulSocketBuilder<S> {
597 base: ServerMechanism,
598 state: S,
599}
600
601
602impl<S: Clone + Send + Sync + 'static> StatefulSocketBuilder<S> {
603 /// Adds a JSON body expectation, transitioning to [`StatefulJsonSocketBuilder`].
604 ///
605 /// Alternative ordering to `.json::<T>().state(s)` — both produce the same route.
606 /// On each request the incoming JSON body is deserialised into `T` and a fresh clone
607 /// of `S` is prepared; both are handed to the handler together as `(state: S, body: T)`.
608 ///
609 /// Allows `.state(s).json::<T>()` as an alternative ordering to `.json::<T>().state(s)`.
610 pub fn json<T: DeserializeOwned + Send>(self) -> StatefulJsonSocketBuilder<T, S> {
611 log::trace!("Attaching JSON body expectation (after state) to {:?} route at '{}'", self.base.method, self.base.path);
612 StatefulJsonSocketBuilder { base: self.base, state: self.state, _phantom: std::marker::PhantomData }
613 }
614
615 /// Adds a query parameter expectation, transitioning to [`StatefulQuerySocketBuilder`].
616 ///
617 /// Alternative ordering to `.query::<T>().state(s)` — both produce the same route.
618 /// On each request the URL query string is deserialised into `T` and a fresh clone
619 /// of `S` is prepared; both are handed to the handler together as `(state: S, query: T)`.
620 ///
621 /// Allows `.state(s).query::<T>()` as an alternative ordering to `.query::<T>().state(s)`.
622 pub fn query<T: DeserializeOwned + Send>(self) -> StatefulQuerySocketBuilder<T, S> {
623 log::trace!("Attaching query param expectation (after state) to {:?} route at '{}'", self.base.method, self.base.path);
624 StatefulQuerySocketBuilder { base: self.base, state: self.state, _phantom: std::marker::PhantomData }
625 }
626
627 /// Adds an encrypted body expectation, transitioning to [`StatefulEncryptedBodyBuilder`].
628 ///
629 /// Alternative ordering to `.encryption::<T>(key).state(s)` — both produce the same
630 /// route. On each request the raw body bytes are VEIL-decrypted using `key` before
631 /// the handler runs; a wrong key or corrupt body returns `403 Forbidden` without
632 /// invoking the handler. The handler will receive `(state: S, body: T)` where `T` is
633 /// always a trusted, fully-decrypted value.
634 ///
635 /// Allows `.state(s).encryption::<T>(key)` as an alternative ordering to
636 /// `.encryption::<T>(key).state(s)`.
637 pub fn encryption<T>(self, key: SerializationKey) -> StatefulEncryptedBodyBuilder<T, S> {
638 log::trace!("Attaching encrypted body (after state) to {:?} route at '{}'", self.base.method, self.base.path);
639 StatefulEncryptedBodyBuilder { base: self.base, key, state: self.state, _phantom: std::marker::PhantomData }
640 }
641
642 /// Adds an encrypted query expectation, transitioning to [`StatefulEncryptedQueryBuilder`].
643 ///
644 /// Alternative ordering to `.encrypted_query::<T>(key).state(s)` — both produce the
645 /// same route. On each request the `?data=<base64url>` query parameter is
646 /// base64-decoded then VEIL-decrypted using `key`; a missing, malformed, or
647 /// undecryptable payload returns `403 Forbidden` without invoking the handler. The
648 /// handler will receive `(state: S, query: T)` where `T` is always a trusted,
649 /// fully-decrypted value.
650 ///
651 /// Allows `.state(s).encrypted_query::<T>(key)` as an alternative ordering to
652 /// `.encrypted_query::<T>(key).state(s)`.
653 pub fn encrypted_query<T>(self, key: SerializationKey) -> StatefulEncryptedQueryBuilder<T, S> {
654 log::trace!("Attaching encrypted query (after state) to {:?} route at '{}'", self.base.method, self.base.path);
655 StatefulEncryptedQueryBuilder { base: self.base, key, state: self.state, _phantom: std::marker::PhantomData }
656 }
657
658 /// Finalises this route with an async handler that receives only the shared state.
659 ///
660 /// On each request a fresh clone of `S` is injected into the handler. No request body
661 /// or query parameters are read. The handler must return `Result<impl Reply, Rejection>`.
662 ///
663 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
664 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
665 where
666 F: Fn(S) -> Fut + Clone + Send + Sync + 'static,
667 Fut: Future<Output = Result<Re, Rejection>> + Send,
668 Re: Reply + Send,
669 {
670 log::debug!("Finalising async {:?} route at '{}' (state)", self.base.method, self.base.path);
671 let m = self.base.method.filter();
672 let p = path_filter(&self.base.path);
673 let state = self.state;
674 let state_filter = warp::any().map(move || state.clone());
675 m.and(p)
676 .and(state_filter)
677 .and_then(handler)
678 .map(|r: Re| r.into_response())
679 .boxed()
680 }
681
682 /// Finalises this route with a **synchronous** handler that receives only the shared
683 /// state.
684 ///
685 /// On each request a fresh clone of `S` is passed to the handler. If `S` wraps a mutex
686 /// or lock, contention across concurrent requests can stall the thread pool — ensure the
687 /// lock is held only briefly. The closure may block but must complete quickly.
688 ///
689 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
690 ///
691 /// # Safety
692 ///
693 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
694 /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
695 /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
696 /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
697 /// gets a chance to run. When the handler acquires a lock on `S` (e.g. `Arc<Mutex<_>>`),
698 /// concurrent blocking tasks contending on the same lock can stall indefinitely, causing the
699 /// thread pool queue to grow without bound and compounding the exhaustion risk. Additionally,
700 /// any panic inside the handler is silently converted into a `Rejection`, masking runtime
701 /// errors. Callers must ensure the handler completes quickly, that lock contention on `S`
702 /// cannot produce indefinite stalls, and that adequate backpressure or rate limiting is
703 /// applied externally.
704 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
705 where
706 F: Fn(S) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
707 Re: Reply + Send + 'static,
708 {
709 log::warn!("Registering sync handler on {:?} '{}' (state) — ensure rate-limiting and lock-free state are in place", self.base.method, self.base.path);
710 let m = self.base.method.filter();
711 let p = path_filter(&self.base.path);
712 let state = self.state;
713 let state_filter = warp::any().map(move || state.clone());
714 m.and(p)
715 .and(state_filter)
716 .and_then(move |s: S| {
717 let handler = handler.clone();
718 async move {
719 tokio::task::spawn_blocking(move || handler(s))
720 .await
721 .unwrap_or_else(|_| {
722 log::warn!("Sync handler (state) panicked; converting to Rejection");
723 Err(warp::reject())
724 })
725 }
726 })
727 .map(|r: Re| r.into_response())
728 .boxed()
729 }
730}
731
732
733/// Route builder that carries shared state `S` and expects a JSON body of type `T`.
734///
735/// Obtained from [`JsonSocketBuilder::state`]. `S` must be `Clone + Send + Sync + 'static`.
736/// `T` must implement `serde::de::DeserializeOwned`. Finalise with
737/// [`onconnect`](StatefulJsonSocketBuilder::onconnect) /
738/// [`onconnect_sync`](StatefulJsonSocketBuilder::onconnect_sync).
739pub struct StatefulJsonSocketBuilder<T, S> {
740 base: ServerMechanism,
741 state: S,
742 _phantom: std::marker::PhantomData<T>,
743}
744
745impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
746 StatefulJsonSocketBuilder<T, S>
747{
748 /// Finalises this route with an async handler that receives `(state: S, body: T)`.
749 ///
750 /// On each request the incoming JSON body is deserialised into `T` and a fresh clone
751 /// of `S` is prepared — both are handed to the handler together. The handler is only
752 /// called when the body can be decoded; a missing or malformed body is rejected first.
753 ///
754 /// The handler must return `Result<impl Reply, Rejection>`.
755 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
756 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
757 where
758 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
759 Fut: Future<Output = Result<Re, Rejection>> + Send,
760 Re: Reply + Send,
761 {
762 log::debug!("Finalising async {:?} route at '{}' (state + JSON body)", self.base.method, self.base.path);
763 let m = self.base.method.filter();
764 let p = path_filter(&self.base.path);
765 let state = self.state;
766 let state_filter = warp::any().map(move || state.clone());
767 m.and(p)
768 .and(state_filter)
769 .and(warp::body::json::<T>())
770 .and_then(handler)
771 .map(|r: Re| r.into_response())
772 .boxed()
773 }
774
775 /// Finalises this route with a **synchronous** handler that receives
776 /// `(state: S, body: T)`.
777 ///
778 /// The body is decoded into `T` and a clone of `S` is prepared before the blocking
779 /// handler is dispatched. If `S` wraps a lock, keep it held only briefly. See
780 /// [`ServerMechanism::onconnect_sync`] for the full thread-pool safety notes.
781 ///
782 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
783 ///
784 /// # Safety
785 ///
786 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
787 /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
788 /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
789 /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
790 /// gets a chance to run. When the handler acquires a lock on `S` (e.g. `Arc<Mutex<_>>`),
791 /// concurrent blocking tasks contending on the same lock can stall indefinitely, causing the
792 /// thread pool queue to grow without bound and compounding the exhaustion risk. Additionally,
793 /// any panic inside the handler is silently converted into a `Rejection`, masking runtime
794 /// errors. Callers must ensure the handler completes quickly, that lock contention on `S`
795 /// cannot produce indefinite stalls, and that adequate backpressure or rate limiting is
796 /// applied externally.
797 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
798 where
799 F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
800 Re: Reply + Send + 'static,
801 {
802 log::warn!("Registering sync handler on {:?} '{}' (state + JSON body) — ensure rate-limiting and lock-free state are in place", self.base.method, self.base.path);
803 let m = self.base.method.filter();
804 let p = path_filter(&self.base.path);
805 let state = self.state;
806 let state_filter = warp::any().map(move || state.clone());
807 m.and(p)
808 .and(state_filter)
809 .and(warp::body::json::<T>())
810 .and_then(move |s: S, body: T| {
811 let handler = handler.clone();
812 async move {
813 tokio::task::spawn_blocking(move || handler(s, body))
814 .await
815 .unwrap_or_else(|_| {
816 log::warn!("Sync handler (state + JSON body) panicked; converting to Rejection");
817 Err(warp::reject())
818 })
819 }
820 })
821 .map(|r: Re| r.into_response())
822 .boxed()
823 }
824}
825
826/// Route builder that carries shared state `S` and expects URL query parameters of type `T`.
827///
828/// Obtained from [`QuerySocketBuilder::state`]. `S` must be `Clone + Send + Sync + 'static`.
829/// `T` must implement `serde::de::DeserializeOwned`. Finalise with
830/// [`onconnect`](StatefulQuerySocketBuilder::onconnect) /
831/// [`onconnect_sync`](StatefulQuerySocketBuilder::onconnect_sync).
832pub struct StatefulQuerySocketBuilder<T, S> {
833 base: ServerMechanism,
834 state: S,
835 _phantom: std::marker::PhantomData<T>,
836}
837
838impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
839 StatefulQuerySocketBuilder<T, S>
840{
841 /// Finalises this route with an async handler that receives `(state: S, query: T)`.
842 ///
843 /// On each request the URL query string is deserialised into `T` and a fresh clone of
844 /// `S` is prepared — both are handed to the handler together. A missing or malformed
845 /// query string is rejected before the handler is ever invoked.
846 ///
847 /// The handler must return `Result<impl Reply, Rejection>`.
848 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
849 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
850 where
851 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
852 Fut: Future<Output = Result<Re, Rejection>> + Send,
853 Re: Reply + Send,
854 {
855 log::debug!("Finalising async {:?} route at '{}' (state + query params)", self.base.method, self.base.path);
856 let m = self.base.method.filter();
857 let p = path_filter(&self.base.path);
858 let state = self.state;
859 let state_filter = warp::any().map(move || state.clone());
860 m.and(p)
861 .and(state_filter)
862 .and(warp::filters::query::query::<T>())
863 .and_then(handler)
864 .map(|r: Re| r.into_response())
865 .boxed()
866 }
867
868 /// Finalises this route with a **synchronous** handler that receives
869 /// `(state: S, query: T)`.
870 ///
871 /// The query string is decoded into `T` and a clone of `S` is prepared before the
872 /// blocking handler is dispatched. If `S` wraps a lock, keep it held only briefly.
873 /// See [`ServerMechanism::onconnect_sync`] for the full thread-pool safety notes.
874 ///
875 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
876 ///
877 /// # Safety
878 ///
879 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
880 /// The pool caps the number of live OS threads (default 512), but the **queue of waiting
881 /// tasks is unbounded** — under a traffic surge, tasks accumulate without limit, consuming
882 /// unbounded memory and causing severe latency spikes or OOM crashes before any queued task
883 /// gets a chance to run. When the handler acquires a lock on `S` (e.g. `Arc<Mutex<_>>`),
884 /// concurrent blocking tasks contending on the same lock can stall indefinitely, causing the
885 /// thread pool queue to grow without bound and compounding the exhaustion risk. Additionally,
886 /// any panic inside the handler is silently converted into a `Rejection`, masking runtime
887 /// errors. Callers must ensure the handler completes quickly, that lock contention on `S`
888 /// cannot produce indefinite stalls, and that adequate backpressure or rate limiting is
889 /// applied externally.
890 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
891 where
892 F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
893 Re: Reply + Send + 'static,
894 {
895 log::warn!("Registering sync handler on {:?} '{}' (state + query params) — ensure rate-limiting and lock-free state are in place", self.base.method, self.base.path);
896 let m = self.base.method.filter();
897 let p = path_filter(&self.base.path);
898 let state = self.state;
899 let state_filter = warp::any().map(move || state.clone());
900 m.and(p)
901 .and(state_filter)
902 .and(warp::filters::query::query::<T>())
903 .and_then(move |s: S, query: T| {
904 let handler = handler.clone();
905 async move {
906 tokio::task::spawn_blocking(move || handler(s, query))
907 .await
908 .unwrap_or_else(|_| {
909 log::warn!("Sync handler (state + query params) panicked; converting to Rejection");
910 Err(warp::reject())
911 })
912 }
913 })
914 .map(|r: Re| r.into_response())
915 .boxed()
916 }
917}
918
919// ─── EncryptedBodyBuilder ────────────────────────────────────────────────────
920
921/// Route builder that expects a VEIL-encrypted request body of type `T`.
922///
923/// Obtained from [`ServerMechanism::encryption`]. On each matching request the raw
924/// body bytes are decrypted using the [`SerializationKey`] supplied there. If
925/// decryption fails for any reason (wrong key, mismatched secret, corrupted payload)
926/// the route immediately returns `403 Forbidden` — the handler is never invoked.
927///
928/// Optionally attach shared state via [`state`](EncryptedBodyBuilder::state) before
929/// finalising with [`onconnect`](EncryptedBodyBuilder::onconnect) /
930/// [`onconnect_sync`](EncryptedBodyBuilder::onconnect_sync).
931pub struct EncryptedBodyBuilder<T> {
932 base: ServerMechanism,
933 key: SerializationKey,
934 _phantom: std::marker::PhantomData<T>,
935}
936
937impl<T> EncryptedBodyBuilder<T>
938where
939 T: bincode::Decode<()> + Send + 'static,
940{
941 /// Finalises this route with an async handler that receives the decrypted body as `T`.
942 ///
943 /// On each request the raw body bytes are VEIL-decrypted using the [`SerializationKey`]
944 /// configured on this builder. The handler is only invoked when decryption succeeds —
945 /// a wrong key, mismatched secret, or corrupted body causes the route to return
946 /// `403 Forbidden` without ever reaching the handler. The `T` the closure receives is
947 /// therefore always a trusted, fully-decrypted value ready to use.
948 ///
949 /// The handler must return `Result<impl Reply, Rejection>`.
950 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
951 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
952 where
953 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
954 Fut: Future<Output = Result<Re, Rejection>> + Send,
955 Re: Reply + Send,
956 {
957 log::debug!("Finalising async {:?} route at '{}' (encrypted body)", self.base.method, self.base.path);
958 let m = self.base.method.filter();
959 let p = path_filter(&self.base.path);
960 let key = self.key;
961 m.and(p)
962 .and(warp::body::bytes())
963 .and_then(move |raw: bytes::Bytes| {
964 let key = key.clone();
965 let handler = handler.clone();
966 async move {
967 let value: T = decode_body(&raw, &key)?;
968 handler(value).await.map(|r| r.into_response())
969 }
970 })
971 .boxed()
972 }
973
974 /// Finalises this route with a **synchronous** handler that receives the decrypted
975 /// body as `T`.
976 ///
977 /// Decryption happens before the handler is dispatched to the thread pool: if the key
978 /// is wrong or the body is corrupt the request is rejected with `403 Forbidden` and
979 /// the thread pool is not touched at all. The `T` handed to the closure is always a
980 /// trusted, fully-decrypted value. The closure may block but must complete quickly.
981 ///
982 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
983 ///
984 /// # Safety
985 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
986 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
987 where
988 F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
989 Re: Reply + Send + 'static,
990 {
991 log::warn!("Registering sync handler on {:?} '{}' (encrypted body) — ensure rate-limiting is applied externally", self.base.method, self.base.path);
992 let m = self.base.method.filter();
993 let p = path_filter(&self.base.path);
994 let key = self.key;
995 m.and(p)
996 .and(warp::body::bytes())
997 .and_then(move |raw: bytes::Bytes| {
998 let key = key.clone();
999 let handler = handler.clone();
1000 async move {
1001 let value: T = decode_body(&raw, &key)?;
1002 tokio::task::spawn_blocking(move || handler(value))
1003 .await
1004 .unwrap_or_else(|_| {
1005 log::warn!("Sync encrypted handler panicked; converting to Rejection");
1006 Err(warp::reject())
1007 })
1008 .map(|r| r.into_response())
1009 }
1010 })
1011 .boxed()
1012 }
1013
1014 /// Attaches shared state `S`, transitioning to [`StatefulEncryptedBodyBuilder`].
1015 ///
1016 /// A fresh clone of `S` is injected alongside the decrypted `T` on every request.
1017 /// The handler will receive `(state: S, body: T)`.
1018 ///
1019 /// `S` must be `Clone + Send + Sync + 'static`.
1020 pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulEncryptedBodyBuilder<T, S> {
1021 StatefulEncryptedBodyBuilder { base: self.base, key: self.key, state, _phantom: std::marker::PhantomData }
1022 }
1023}
1024
1025// ─── EncryptedQueryBuilder ────────────────────────────────────────────────────
1026
1027/// Route builder that expects VEIL-encrypted URL query parameters of type `T`.
1028///
1029/// Obtained from [`ServerMechanism::encrypted_query`]. The client must send a single
1030/// `?data=<base64url>` query parameter whose value is the URL-safe base64 encoding of
1031/// the VEIL-encrypted struct bytes. On each matching request the server base64-decodes
1032/// then decrypts the value using the configured [`SerializationKey`]. If any step fails
1033/// (missing `data` parameter, invalid base64, wrong key, corrupt bytes) the route
1034/// returns `403 Forbidden` — the handler is never invoked.
1035///
1036/// Optionally attach shared state via [`state`](EncryptedQueryBuilder::state) before
1037/// finalising with [`onconnect`](EncryptedQueryBuilder::onconnect).
1038pub struct EncryptedQueryBuilder<T> {
1039 base: ServerMechanism,
1040 key: SerializationKey,
1041 _phantom: std::marker::PhantomData<T>,
1042}
1043
1044impl<T> EncryptedQueryBuilder<T>
1045where
1046 T: bincode::Decode<()> + Send + 'static,
1047{
1048 /// Finalises this route with an async handler that receives the decrypted query
1049 /// parameters as `T`.
1050 ///
1051 /// On each request the `?data=<base64url>` query parameter is base64-decoded then
1052 /// VEIL-decrypted using the [`SerializationKey`] on this builder. The handler is only
1053 /// invoked when every step succeeds — a missing `data` parameter, invalid base64,
1054 /// wrong key, or corrupt payload causes the route to return `403 Forbidden` without
1055 /// ever reaching the handler. The `T` the closure receives is therefore always a
1056 /// trusted, fully-decrypted value ready to use.
1057 ///
1058 /// The handler must return `Result<impl Reply, Rejection>`.
1059 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
1060 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1061 where
1062 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
1063 Fut: Future<Output = Result<Re, Rejection>> + Send,
1064 Re: Reply + Send,
1065 {
1066 log::debug!("Finalising async {:?} route at '{}' (encrypted query)", self.base.method, self.base.path);
1067 let m = self.base.method.filter();
1068 let p = path_filter(&self.base.path);
1069 let key = self.key;
1070 m.and(p)
1071 .and(warp::filters::query::raw())
1072 .and_then(move |raw_query: String| {
1073 let key = key.clone();
1074 let handler = handler.clone();
1075 async move {
1076 let value: T = decode_query(&raw_query, &key)?;
1077 handler(value).await.map(|r| r.into_response())
1078 }
1079 })
1080 .boxed()
1081 }
1082
1083 /// Attaches shared state `S`, transitioning to [`StatefulEncryptedQueryBuilder`].
1084 ///
1085 /// A fresh clone of `S` is injected alongside the decrypted `T` on every request.
1086 /// The handler will receive `(state: S, query: T)`.
1087 ///
1088 /// `S` must be `Clone + Send + Sync + 'static`.
1089 pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulEncryptedQueryBuilder<T, S> {
1090 StatefulEncryptedQueryBuilder { base: self.base, key: self.key, state, _phantom: std::marker::PhantomData }
1091 }
1092}
1093
1094// ─── StatefulEncryptedBodyBuilder ────────────────────────────────────────────
1095
1096/// Route builder carrying shared state `S` and a VEIL-encrypted request body of type `T`.
1097///
1098/// Obtained from [`EncryptedBodyBuilder::state`] or [`StatefulSocketBuilder::encryption`].
1099/// On each matching request the body is VEIL-decrypted and a fresh clone of `S` is
1100/// prepared before the handler is called. A wrong key or corrupt body returns
1101/// `403 Forbidden` without ever invoking the handler.
1102///
1103/// Finalise with [`onconnect`](StatefulEncryptedBodyBuilder::onconnect).
1104pub struct StatefulEncryptedBodyBuilder<T, S> {
1105 base: ServerMechanism,
1106 key: SerializationKey,
1107 state: S,
1108 _phantom: std::marker::PhantomData<T>,
1109}
1110
1111impl<T, S> StatefulEncryptedBodyBuilder<T, S>
1112where
1113 T: bincode::Decode<()> + Send + 'static,
1114 S: Clone + Send + Sync + 'static,
1115{
1116 /// Finalises this route with an async handler that receives `(state: S, body: T)`.
1117 ///
1118 /// On each request the raw body bytes are VEIL-decrypted using the configured
1119 /// [`SerializationKey`] and a fresh clone of `S` is prepared — both are handed to the
1120 /// handler together. If decryption fails (wrong key or corrupt body) the route returns
1121 /// `403 Forbidden` and the handler is never invoked. The `T` the closure receives is
1122 /// always a trusted, fully-decrypted value ready to use.
1123 ///
1124 /// The handler must return `Result<impl Reply, Rejection>`.
1125 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
1126 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1127 where
1128 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1129 Fut: Future<Output = Result<Re, Rejection>> + Send,
1130 Re: Reply + Send,
1131 {
1132 log::debug!("Finalising async {:?} route at '{}' (state + encrypted body)", self.base.method, self.base.path);
1133 let m = self.base.method.filter();
1134 let p = path_filter(&self.base.path);
1135 let key = self.key;
1136 let state = self.state;
1137 let state_filter = warp::any().map(move || state.clone());
1138 m.and(p)
1139 .and(state_filter)
1140 .and(warp::body::bytes())
1141 .and_then(move |s: S, raw: bytes::Bytes| {
1142 let key = key.clone();
1143 let handler = handler.clone();
1144 async move {
1145 let value: T = decode_body(&raw, &key)?;
1146 handler(s, value).await.map(|r| r.into_response())
1147 }
1148 })
1149 .boxed()
1150 }
1151}
1152
1153// ─── StatefulEncryptedQueryBuilder ───────────────────────────────────────────
1154
1155/// Route builder carrying shared state `S` and VEIL-encrypted query parameters of
1156/// type `T`.
1157///
1158/// Obtained from [`EncryptedQueryBuilder::state`] or
1159/// [`StatefulSocketBuilder::encrypted_query`]. On each matching request the
1160/// `?data=<base64url>` parameter is base64-decoded then VEIL-decrypted and a fresh
1161/// clone of `S` is prepared before the handler is called. Any decode or decryption
1162/// failure returns `403 Forbidden` without ever invoking the handler.
1163///
1164/// Finalise with [`onconnect`](StatefulEncryptedQueryBuilder::onconnect).
1165pub struct StatefulEncryptedQueryBuilder<T, S> {
1166 base: ServerMechanism,
1167 key: SerializationKey,
1168 state: S,
1169 _phantom: std::marker::PhantomData<T>,
1170}
1171
1172impl<T, S> StatefulEncryptedQueryBuilder<T, S>
1173where
1174 T: bincode::Decode<()> + Send + 'static,
1175 S: Clone + Send + Sync + 'static,
1176{
1177 /// Finalises this route with an async handler that receives `(state: S, query: T)`.
1178 ///
1179 /// On each request the `?data=<base64url>` query parameter is base64-decoded then
1180 /// VEIL-decrypted using the configured [`SerializationKey`] and a fresh clone of `S`
1181 /// is prepared — both are handed to the handler together. If any step fails (missing
1182 /// parameter, bad encoding, wrong key, or corrupt data) the route returns
1183 /// `403 Forbidden` and the handler is never invoked. The `T` the closure receives is
1184 /// always a trusted, fully-decrypted value ready to use.
1185 ///
1186 /// The handler must return `Result<impl Reply, Rejection>`.
1187 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
1188 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1189 where
1190 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1191 Fut: Future<Output = Result<Re, Rejection>> + Send,
1192 Re: Reply + Send,
1193 {
1194 log::debug!("Finalising async {:?} route at '{}' (state + encrypted query)", self.base.method, self.base.path);
1195 let m = self.base.method.filter();
1196 let p = path_filter(&self.base.path);
1197 let key = self.key;
1198 let state = self.state;
1199 let state_filter = warp::any().map(move || state.clone());
1200 m.and(p)
1201 .and(state_filter)
1202 .and(warp::filters::query::raw())
1203 .and_then(move |s: S, raw_query: String| {
1204 let key = key.clone();
1205 let handler = handler.clone();
1206 async move {
1207 let value: T = decode_query(&raw_query, &key)?;
1208 handler(s, value).await.map(|r| r.into_response())
1209 }
1210 })
1211 .boxed()
1212 }
1213}
1214
1215// ─── Internal decode helpers ─────────────────────────────────────────────────
1216
1217/// Decode a VEIL-sealed request body into `T`, returning `403 Forbidden` on any failure.
1218fn decode_body<T: bincode::Decode<()>>(raw: &bytes::Bytes, key: &SerializationKey) -> Result<T, Rejection> {
1219 crate::serialization::open(raw, key.veil_key()).map_err(|e| {
1220 log::debug!("VEIL open failed (key mismatch or corrupt body): {e}");
1221 forbidden()
1222 })
1223}
1224
1225/// Decode a VEIL-sealed query parameter (`?data=<base64url>`) into `T`,
1226/// returning `403 Forbidden` on any failure.
1227fn decode_query<T: bincode::Decode<()>>(raw_query: &str, key: &SerializationKey) -> Result<T, Rejection> {
1228 // Extract the `data` parameter value from the raw query string.
1229 let data_value = serde_urlencoded::from_str::<std::collections::HashMap<String, String>>(raw_query)
1230 .ok()
1231 .and_then(|mut m| m.remove("data"));
1232
1233 let b64 = data_value.ok_or_else(|| {
1234 log::debug!("Encrypted query missing `data` parameter");
1235 forbidden()
1236 })?;
1237
1238 let bytes = base64::engine::Engine::decode(
1239 &base64::engine::general_purpose::URL_SAFE_NO_PAD,
1240 b64.trim_end_matches('='),
1241 )
1242 .map_err(|e| {
1243 log::debug!("base64url decode failed: {e}");
1244 forbidden()
1245 })?;
1246
1247 crate::serialization::open(&bytes, key.veil_key()).map_err(|e| {
1248 log::debug!("VEIL open (query) failed: {e}");
1249 forbidden()
1250 })
1251}
1252
1253/// Builds a `403 Forbidden` rejection response.
1254fn forbidden() -> Rejection {
1255 // Warp doesn't expose a built-in 403 rejection, so we use a custom one.
1256 warp::reject::custom(ForbiddenError)
1257}
1258
1259#[derive(Debug)]
1260struct ForbiddenError;
1261
1262impl warp::reject::Reject for ForbiddenError {}
1263
1264/// A collection of common HTTP status codes used with the reply helpers.
1265///
1266/// Covers 2xx success, 3xx redirect, 4xx client error, and 5xx server error ranges.
1267/// Converts into `warp::http::StatusCode` via [`From`] and is accepted by
1268/// [`reply_with_status`] and [`reply_with_status_and_json`]. Also usable directly in the
1269/// [`reply!`] macro via the `status => Status::X` argument.
1270#[derive(Clone, Copy, Debug)]
1271pub enum Status {
1272 // 2xx
1273 Ok,
1274 Created,
1275 Accepted,
1276 NoContent,
1277 // 3xx
1278 MovedPermanently,
1279 Found,
1280 NotModified,
1281 TemporaryRedirect,
1282 PermanentRedirect,
1283 // 4xx
1284 BadRequest,
1285 Unauthorized,
1286 Forbidden,
1287 NotFound,
1288 MethodNotAllowed,
1289 Conflict,
1290 Gone,
1291 UnprocessableEntity,
1292 TooManyRequests,
1293 // 5xx
1294 InternalServerError,
1295 NotImplemented,
1296 BadGateway,
1297 ServiceUnavailable,
1298 GatewayTimeout,
1299}
1300
1301impl From<Status> for warp::http::StatusCode {
1302 fn from(s: Status) -> Self {
1303 match s {
1304 Status::Ok => warp::http::StatusCode::OK,
1305 Status::Created => warp::http::StatusCode::CREATED,
1306 Status::Accepted => warp::http::StatusCode::ACCEPTED,
1307 Status::NoContent => warp::http::StatusCode::NO_CONTENT,
1308 Status::MovedPermanently => warp::http::StatusCode::MOVED_PERMANENTLY,
1309 Status::Found => warp::http::StatusCode::FOUND,
1310 Status::NotModified => warp::http::StatusCode::NOT_MODIFIED,
1311 Status::TemporaryRedirect => warp::http::StatusCode::TEMPORARY_REDIRECT,
1312 Status::PermanentRedirect => warp::http::StatusCode::PERMANENT_REDIRECT,
1313 Status::BadRequest => warp::http::StatusCode::BAD_REQUEST,
1314 Status::Unauthorized => warp::http::StatusCode::UNAUTHORIZED,
1315 Status::Forbidden => warp::http::StatusCode::FORBIDDEN,
1316 Status::NotFound => warp::http::StatusCode::NOT_FOUND,
1317 Status::MethodNotAllowed => warp::http::StatusCode::METHOD_NOT_ALLOWED,
1318 Status::Conflict => warp::http::StatusCode::CONFLICT,
1319 Status::Gone => warp::http::StatusCode::GONE,
1320 Status::UnprocessableEntity => warp::http::StatusCode::UNPROCESSABLE_ENTITY,
1321 Status::TooManyRequests => warp::http::StatusCode::TOO_MANY_REQUESTS,
1322 Status::InternalServerError => warp::http::StatusCode::INTERNAL_SERVER_ERROR,
1323 Status::NotImplemented => warp::http::StatusCode::NOT_IMPLEMENTED,
1324 Status::BadGateway => warp::http::StatusCode::BAD_GATEWAY,
1325 Status::ServiceUnavailable => warp::http::StatusCode::SERVICE_UNAVAILABLE,
1326 Status::GatewayTimeout => warp::http::StatusCode::GATEWAY_TIMEOUT,
1327 }
1328 }
1329}
1330
1331/// The HTTP server that owns and dispatches a collection of [`SocketType`] routes.
1332///
1333/// Build routes through the [`ServerMechanism`] builder chain, register each with
1334/// [`mechanism`](Server::mechanism), then start the server with [`serve`](Server::serve).
1335///
1336/// # Example
1337/// ```rust,no_run
1338/// # use serde::Serialize;
1339/// # #[derive(Serialize)] struct Pong { ok: bool }
1340///
1341/// let mut server = Server::default();
1342///
1343/// server
1344/// .mechanism(
1345/// ServerMechanism::get("/ping")
1346/// .onconnect(|| async { reply!(json => Pong { ok: true }) })
1347/// )
1348/// .mechanism(
1349/// ServerMechanism::delete("/session")
1350/// .onconnect(|| async { reply!(message => warp::reply(), status => Status::NoContent) })
1351/// );
1352///
1353/// // Blocks forever — call only to actually run the server:
1354/// // server.serve(([0, 0, 0, 0], 8080)).await;
1355/// ```
1356///
1357/// # Caution
1358/// Calling [`serve`](Server::serve) with no routes registered will **panic**.
1359pub struct Server {
1360 mechanisms: Vec<SocketType>,
1361}
1362
1363impl Default for Server {
1364 fn default() -> Self {
1365 Self::new()
1366 }
1367}
1368
1369impl Server {
1370 fn new() -> Self {
1371 Self { mechanisms: Vec::new() }
1372 }
1373
1374 /// Registers a [`SocketType`] route on this server.
1375 ///
1376 /// Routes are evaluated in registration order. Returns `&mut Self` for method chaining.
1377 pub fn mechanism(&mut self, mech: SocketType) -> &mut Self {
1378 self.mechanisms.push(mech);
1379 log::debug!("Route registered (total: {})", self.mechanisms.len());
1380 self
1381 }
1382
1383 /// Binds to `addr` and starts serving all registered routes.
1384 ///
1385 /// This is an infinite async loop — it never returns under normal operation.
1386 ///
1387 /// # Panics
1388 /// Panics immediately if no routes have been registered via [`mechanism`](Server::mechanism).
1389 pub async fn serve(self, addr: impl Into<SocketAddr>) {
1390 let addr = addr.into();
1391 log::info!("Server binding to {}", addr);
1392 let mut iter = self.mechanisms.into_iter();
1393 let first = iter.next().unwrap_or_else(|| {
1394 log::trace!("No mechanisms are defined on the server, this will result in a panic!");
1395 log::error!("The server contains no mechanisms to follow through");
1396 panic!();
1397 });
1398
1399 let combined = iter.fold(first.boxed(), |acc, next| {
1400 acc.or(next).unify().boxed()
1401 });
1402
1403 log::info!("Server running on {}", addr);
1404 warp::serve(combined).run(addr).await;
1405 }
1406
1407 /// Binds to `addr`, serves all registered routes, and shuts down gracefully when
1408 /// `shutdown` resolves.
1409 ///
1410 /// Equivalent to calling [`serve_from_listener`](Server::serve_from_listener) with an
1411 /// address instead of a pre-bound listener.
1412 ///
1413 /// # Example
1414 ///
1415 /// ```rust,no_run
1416 /// use tokio::sync::oneshot;
1417 ///
1418 /// # async fn run() {
1419 /// let (tx, rx) = oneshot::channel::<()>();
1420 /// // ... build server and mount routes ...
1421 /// // trigger shutdown later by calling tx.send(())
1422 /// server.serve_with_graceful_shutdown(([127, 0, 0, 1], 8080), async move {
1423 /// rx.await.ok();
1424 /// }).await;
1425 /// # }
1426 /// ```
1427 pub async fn serve_with_graceful_shutdown(
1428 self,
1429 addr: impl Into<std::net::SocketAddr>,
1430 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1431 ) {
1432 let addr = addr.into();
1433 log::info!("Server binding to {} (graceful shutdown enabled)", addr);
1434 let mut iter = self.mechanisms.into_iter();
1435 let first = iter.next().unwrap_or_else(|| {
1436 log::error!("The server contains no mechanisms to follow through");
1437 panic!("serve_with_graceful_shutdown called with no routes registered");
1438 });
1439 let combined = iter.fold(first.boxed(), |acc, next| acc.or(next).unify().boxed());
1440 log::info!("Server running on {} (graceful shutdown enabled)", addr);
1441 warp::serve(combined)
1442 .bind(addr)
1443 .await
1444 .graceful(shutdown)
1445 .run()
1446 .await;
1447 }
1448
1449 /// Serves all registered routes from an already-bound `listener`, shutting down gracefully
1450 /// when `shutdown` resolves.
1451 ///
1452 /// Use this when port `0` is passed to `TcpListener::bind` and you need to know the actual
1453 /// OS-assigned port before the server starts (e.g. to open a browser to the correct URL).
1454 ///
1455 /// # Example
1456 ///
1457 /// ```rust,no_run
1458 /// use tokio::net::TcpListener;
1459 /// use tokio::sync::oneshot;
1460 ///
1461 /// # async fn run() -> std::io::Result<()> {
1462 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
1463 /// let port = listener.local_addr()?.port();
1464 /// println!("Will listen on port {port}");
1465 ///
1466 /// let (tx, rx) = oneshot::channel::<()>();
1467 /// // ... build server and mount routes ...
1468 /// server.serve_from_listener(listener, async move { rx.await.ok(); }).await;
1469 /// # Ok(())
1470 /// # }
1471 /// ```
1472 pub async fn serve_from_listener(
1473 self,
1474 listener: tokio::net::TcpListener,
1475 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1476 ) {
1477 log::info!("Server running on {} (graceful shutdown enabled)",
1478 listener.local_addr().map(|a| a.to_string()).unwrap_or_else(|_| "?".into()));
1479 let mut iter = self.mechanisms.into_iter();
1480 let first = iter.next().unwrap_or_else(|| {
1481 log::error!("The server contains no mechanisms to follow through");
1482 panic!("serve_from_listener called with no routes registered");
1483 });
1484 let combined = iter.fold(first.boxed(), |acc, next| acc.or(next).unify().boxed());
1485 warp::serve(combined)
1486 .incoming(listener)
1487 .graceful(shutdown)
1488 .run()
1489 .await;
1490 }
1491}
1492
1493/// Wraps `reply` with the given HTTP `status` code and returns it as a warp result.
1494///
1495/// Use when an endpoint needs to respond with a specific status alongside a plain message.
1496/// Pairs with the [`reply!`] macro form `reply!(message => ..., status => ...)`.
1497pub fn reply_with_status(status: Status, reply: impl Reply) -> Result<impl Reply, Rejection> {
1498 Ok::<_, Rejection>(warp::reply::with_status(reply, status.into()))
1499}
1500
1501/// Returns an empty `200 OK` warp reply result.
1502///
1503/// Useful for endpoints that need only to acknowledge a request with no body.
1504/// Equivalent to `reply!()`.
1505pub fn reply() -> Result<impl Reply, Rejection> {
1506 Ok::<_, Rejection>(warp::reply())
1507}
1508
1509/// Serialises `json` as a JSON body and returns it as a `200 OK` warp reply result.
1510///
1511/// `T` must implement `serde::Serialize`. Equivalent to `reply!(json => ...)`.
1512pub fn reply_with_json<T: Serialize>(json: &T) -> Result<impl Reply + use<T>, Rejection> {
1513 Ok::<_, Rejection>(warp::reply::json(json))
1514}
1515
1516/// Serialises `json` as a JSON body, attaches the given HTTP `status`, and returns a warp result.
1517///
1518/// `T` must implement `serde::Serialize`. Equivalent to `reply!(json => ..., status => ...)`.
1519pub fn reply_with_status_and_json<T: Serialize>(status: Status, json: &T) -> Result<impl Reply + use<T>, Rejection> {
1520 Ok::<_, Rejection>(warp::reply::with_status(warp::reply::json(json), status.into()))
1521}
1522
1523/// Seals `value` with `key` and returns it as an `application/octet-stream` response (`200 OK`).
1524///
1525/// `T` must implement `bincode::Encode`.
1526/// Equivalent to `reply!(sealed => value, key => key)`.
1527pub fn reply_sealed<T: bincode::Encode>(
1528 value: &T,
1529 key: SerializationKey,
1530) -> Result<warp::reply::Response, Rejection> {
1531 sealed_response(value, key, None)
1532}
1533
1534/// Seals `value` with `key`, attaches the given HTTP `status`, and returns it as a warp result.
1535///
1536/// Equivalent to `reply!(sealed => value, key => key, status => Status::X)`.
1537pub fn reply_sealed_with_status<T: bincode::Encode>(
1538 value: &T,
1539 key: SerializationKey,
1540 status: Status,
1541) -> Result<warp::reply::Response, Rejection> {
1542 sealed_response(value, key, Some(status))
1543}
1544
1545fn sealed_response<T: bincode::Encode>(
1546 value: &T,
1547 key: SerializationKey,
1548 status: Option<Status>,
1549) -> Result<warp::reply::Response, Rejection> {
1550 use warp::http::StatusCode;
1551 use warp::Reply;
1552 let code: StatusCode = status.map(Into::into).unwrap_or(StatusCode::OK);
1553 let sealed = crate::serialization::seal(value, key.veil_key()).map_err(|_| warp::reject())?;
1554 Ok(warp::reply::with_status(sealed, code).into_response())
1555}
1556
1557/// Convenience macro for constructing warp reply results inside route handlers.
1558///
1559/// | Syntax | Equivalent | Description |
1560/// |---|---|---|
1561/// | `reply!()` | [`reply()`] | Empty `200 OK` response. |
1562/// | `reply!(message => expr, status => Status::X)` | [`reply_with_status`] | Plain reply with a status code. |
1563/// | `reply!(json => expr)` | [`reply_with_json`] | JSON body with `200 OK`. |
1564/// | `reply!(json => expr, status => Status::X)` | [`reply_with_status_and_json`] | JSON body with a status code. |
1565/// | `reply!(sealed => expr, key => key)` | [`reply_sealed`] | VEIL-sealed (or JSON for `Plain`) body, `200 OK`. |
1566/// | `reply!(sealed => expr, key => key, status => Status::X)` | [`reply_sealed_with_status`] | VEIL-sealed (or JSON for `Plain`) body with status. |
1567///
1568/// # Example
1569/// ```rust,no_run
1570/// # use serde::{Deserialize, Serialize};
1571/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
1572///
1573/// // Empty 200 OK
1574/// let ping = ServerMechanism::get("/ping")
1575/// .onconnect(|| async { reply!() });
1576///
1577/// // Plain reply with a custom status
1578/// let gone = ServerMechanism::delete("/v1")
1579/// .onconnect(|| async {
1580/// reply!(message => warp::reply::html("endpoint deprecated"), status => Status::Gone)
1581/// });
1582///
1583/// // JSON body, 200 OK
1584/// let list = ServerMechanism::get("/items")
1585/// .onconnect(|| async {
1586/// let items: Vec<Item> = vec![];
1587/// reply!(json => items)
1588/// });
1589///
1590/// // JSON body with a custom status
1591/// let create = ServerMechanism::post("/items")
1592/// .json::<Item>()
1593/// .onconnect(|item| async move {
1594/// reply!(json => item, status => Status::Created)
1595/// });
1596/// ```
1597#[macro_export]
1598macro_rules! reply {
1599 () => {{
1600 $crate::socket::server::reply()
1601 }};
1602
1603 (message => $message: expr, status => $status: expr) => {{
1604 $crate::socket::server::reply_with_status($status, $message)
1605 }};
1606
1607 (json => $json: expr) => {{
1608 $crate::socket::server::reply_with_json(&$json)
1609 }};
1610
1611 (json => $json: expr, status => $status: expr) => {{
1612 $crate::socket::server::reply_with_status_and_json($status, &$json)
1613 }};
1614
1615 (sealed => $val: expr, key => $key: expr) => {{
1616 $crate::socket::server::reply_sealed(&$val, $key)
1617 }};
1618
1619 (sealed => $val: expr, key => $key: expr, status => $status: expr) => {{
1620 $crate::socket::server::reply_sealed_with_status(&$val, $key, $status)
1621 }};
1622}
1623