toolkit_zero/socket/server.rs
1//! Typed, fluent HTTP server construction.
2//!
3//! This module provides a builder-oriented API for declaring HTTP routes and
4//! serving them with a built-in hyper-based HTTP engine. The entry point for every
5//! route is [`ServerMechanism`], which pairs an HTTP method with a URL path and
6//! supports incremental enrichment — attaching a JSON body expectation, URL
7//! query parameter deserialisation, or shared state — before being finalised
8//! into a [`SocketType`] route handle via `onconnect`.
9//!
10//! Completed routes are registered on a [`Server`] and served via one of the
11//! `serve*` methods, each of which returns a [`ServerFuture`]. A `ServerFuture`
12//! can be `.await`'d to run the server inline **or** called `.background()` on to
13//! spawn it as a background Tokio task and get a [`tokio::task::JoinHandle`] back.
14//! Graceful shutdown is available via
15//! [`Server::serve_with_graceful_shutdown`] and [`Server::serve_from_listener`].
16//!
17//! For **runtime address migration**, use [`Server::serve_managed`]: it starts
18//! the server immediately and returns a [`BackgroundServer`] handle that supports
19//! [`BackgroundServer::rebind`] (graceful shutdown + restart on a new address,
20//! all existing routes preserved) and [`BackgroundServer::stop`].
21//!
22//! # Hot-reloading routes
23//!
24//! [`BackgroundServer::mechanism`] pushes a new route into the running
25//! server's shared route table **without any restart or port gap**. Because
26//! routes are stored in an `Arc<RwLock<Vec<SocketType>>>` shared between the
27//! caller and the server loop, the new route becomes visible to the next
28//! incoming request immediately.
29//!
30//! # Builder chains at a glance
31//!
32//! | Chain | Handler receives |
33//! |---|---|
34//! | `ServerMechanism::method(path).onconnect(f)` | nothing |
35//! | `.json::<T>().onconnect(f)` | `T: DeserializeOwned` |
36//! | `.query::<T>().onconnect(f)` | `T: DeserializeOwned` |
37//! | `.encryption::<T>(key).onconnect(f)` | `T: bincode::Decode<()>` (decrypted body) |
38//! | `.encrypted_query::<T>(key).onconnect(f)` | `T: bincode::Decode<()>` (decrypted query) |
39//! | `.state(s).onconnect(f)` | `S: Clone + Send + Sync` |
40//! | `.state(s).json::<T>().onconnect(f)` | `(S, T)` |
41//! | `.state(s).query::<T>().onconnect(f)` | `(S, T)` |
42//! | `.state(s).encryption::<T>(key).onconnect(f)` | `(S, T)` — decrypted body |
43//! | `.state(s).encrypted_query::<T>(key).onconnect(f)` | `(S, T)` — decrypted query |
44//!
45//! For blocking handlers (not recommended in production) every finaliser also
46//! has an unsafe `onconnect_sync` counterpart.
47//!
48//! # `#[mechanism]` attribute macro
49//!
50//! As an alternative to spelling out the builder chain by hand, the
51//! [`mechanism`] attribute macro collapses the entire
52//! `server.mechanism(ServerMechanism::method(path) … .onconnect(handler))` call
53//! into a single decorated `async fn`.
54//!
55//! # Response helpers
56//!
57//! Use the [`reply!`] macro as the most concise way to build a response:
58//!
59//! ```rust,no_run
60//! # use toolkit_zero::socket::server::*;
61//! # use serde::Serialize;
62//! # #[derive(Serialize)] struct Item { id: u32 }
63//! # let item = Item { id: 1 };
64//! // 200 OK, empty body
65//! // reply!()
66//! // 200 OK, JSON body
67//! // reply!(json => item)
68//! // 201 Created, JSON body
69//! // reply!(json => item, status => Status::Created)
70//! ```
71
72use std::{
73 future::Future,
74 net::SocketAddr,
75 pin::Pin,
76 sync::Arc,
77};
78use serde::{de::DeserializeOwned, Serialize};
79
80pub use super::SerializationKey;
81pub use toolkit_zero_macros::mechanism;
82
83// ─── Internal future / handler types ─────────────────────────────────────────
84
85type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
86type HandlerFn = Arc<dyn Fn(IncomingRequest) -> BoxFuture<http::Response<bytes::Bytes>> + Send + Sync>;
87
88/// Raw data extracted from an incoming HTTP request, passed to every route handler.
89pub(crate) struct IncomingRequest {
90 /// Raw body bytes (empty for GET / HEAD / etc.).
91 body: bytes::Bytes,
92 /// Raw query string — everything after `?`; empty string when absent.
93 query: String,
94 /// Request headers.
95 #[allow(dead_code)]
96 headers: http::HeaderMap,
97}
98
99// ─── Rejection ────────────────────────────────────────────────────────────────
100
101/// An HTTP-level error value returned from route handlers.
102///
103/// Returning `Err(rejection)` from an `onconnect` closure sends the corresponding
104/// HTTP status code with an empty body to the client.
105///
106/// The most common source is [`forbidden()`] (produces `403 Forbidden`).
107pub struct Rejection {
108 status: http::StatusCode,
109}
110
111impl Rejection {
112 fn new(status: http::StatusCode) -> Self {
113 Self { status }
114 }
115
116 /// Builds a `403 Forbidden` rejection.
117 pub fn forbidden() -> Self {
118 Self::new(http::StatusCode::FORBIDDEN)
119 }
120
121 /// Builds a `400 Bad Request` rejection.
122 pub fn bad_request() -> Self {
123 Self::new(http::StatusCode::BAD_REQUEST)
124 }
125
126 /// Builds a `500 Internal Server Error` rejection.
127 pub fn internal() -> Self {
128 Self::new(http::StatusCode::INTERNAL_SERVER_ERROR)
129 }
130
131 fn into_response(self) -> http::Response<bytes::Bytes> {
132 http::Response::builder()
133 .status(self.status)
134 .body(bytes::Bytes::new())
135 .unwrap()
136 }
137}
138
139// ─── Reply ────────────────────────────────────────────────────────────────────
140
141/// A value that can be converted into a fully-formed HTTP response.
142///
143/// Implemented for all types returned by the [`reply!`] macro and the standalone
144/// reply helpers. You may also implement it for your own types.
145pub trait Reply: Send {
146 /// Consumes `self` and returns a complete HTTP response.
147 fn into_response(self) -> http::Response<bytes::Bytes>;
148}
149
150/// An empty `200 OK` reply — returned by `reply!()`.
151pub struct EmptyReply;
152
153impl Reply for EmptyReply {
154 fn into_response(self) -> http::Response<bytes::Bytes> {
155 http::Response::builder()
156 .status(http::StatusCode::OK)
157 .body(bytes::Bytes::new())
158 .unwrap()
159 }
160}
161
162/// An HTML body reply — returned by [`html_reply`].
163pub struct HtmlReply {
164 body: String,
165}
166
167impl Reply for HtmlReply {
168 fn into_response(self) -> http::Response<bytes::Bytes> {
169 http::Response::builder()
170 .status(http::StatusCode::OK)
171 .header(http::header::CONTENT_TYPE, "text/html; charset=utf-8")
172 .body(bytes::Bytes::from(self.body.into_bytes()))
173 .unwrap()
174 }
175}
176
177/// Wraps `content` into a `200 OK` HTML response.
178pub fn html_reply(content: impl Into<String>) -> HtmlReply {
179 HtmlReply { body: content.into() }
180}
181
182// Internal JSON reply — used by reply_with_json / reply_with_status_and_json.
183struct JsonReply {
184 body: bytes::Bytes,
185 status: http::StatusCode,
186}
187
188impl Reply for JsonReply {
189 fn into_response(self) -> http::Response<bytes::Bytes> {
190 http::Response::builder()
191 .status(self.status)
192 .header(http::header::CONTENT_TYPE, "application/json")
193 .body(self.body)
194 .unwrap()
195 }
196}
197
198// Passthrough — allows `http::Response<bytes::Bytes>` to satisfy the Reply bound directly.
199impl Reply for http::Response<bytes::Bytes> {
200 fn into_response(self) -> http::Response<bytes::Bytes> {
201 self
202 }
203}
204
205// ─── SocketType ───────────────────────────────────────────────────────────────
206
207/// A fully assembled, type-erased HTTP route ready to be registered on a [`Server`].
208///
209/// This is the final product of every builder chain. Pass it to [`Server::mechanism`]
210/// to mount it. Internally stores the HTTP method, the path pattern, and a
211/// reference-counted async handler closure — cloning a `SocketType` is cheap
212/// (just an `Arc` clone of the handler).
213pub struct SocketType {
214 pub(crate) method: http::Method,
215 pub(crate) path: String,
216 pub(crate) handler: HandlerFn,
217}
218
219impl Clone for SocketType {
220 fn clone(&self) -> Self {
221 Self {
222 method: self.method.clone(),
223 path: self.path.clone(),
224 handler: Arc::clone(&self.handler),
225 }
226 }
227}
228
229// ─── HttpMethod ───────────────────────────────────────────────────────────────
230
231#[derive(Clone, Copy, Debug)]
232enum HttpMethod {
233 Get, Post, Put, Delete, Patch, Head, Options,
234}
235
236impl HttpMethod {
237 fn to_http(self) -> http::Method {
238 match self {
239 HttpMethod::Get => http::Method::GET,
240 HttpMethod::Post => http::Method::POST,
241 HttpMethod::Put => http::Method::PUT,
242 HttpMethod::Delete => http::Method::DELETE,
243 HttpMethod::Patch => http::Method::PATCH,
244 HttpMethod::Head => http::Method::HEAD,
245 HttpMethod::Options => http::Method::OPTIONS,
246 }
247 }
248}
249
250// ─── Path matching ────────────────────────────────────────────────────────────
251
252fn path_matches(pattern: &str, actual_path: &str) -> bool {
253 let pat: Vec<&str> = pattern
254 .trim_matches('/')
255 .split('/')
256 .filter(|s| !s.is_empty())
257 .collect();
258 let act: Vec<&str> = actual_path
259 .trim_matches('/')
260 .split('/')
261 .filter(|s| !s.is_empty())
262 .collect();
263 pat == act
264}
265
266// ─── ServerMechanism ──────────────────────────────────────────────────────────
267
268/// Entry point for building an HTTP route.
269///
270/// Pairs an HTTP method with a URL path and acts as the root of a fluent builder chain.
271/// Optionally attach shared state, a JSON body expectation, or URL query parameter
272/// deserialisation — then finalise with [`onconnect`](ServerMechanism::onconnect) (async)
273/// or [`onconnect_sync`](ServerMechanism::onconnect_sync) (sync) to produce a
274/// [`SocketType`] ready to be mounted on a [`Server`].
275///
276/// # Example
277/// ```rust,no_run
278/// # use toolkit_zero::socket::server::{ServerMechanism, Status};
279/// # use toolkit_zero::socket::server::reply;
280/// # use serde::{Deserialize, Serialize};
281/// # use std::sync::{Arc, Mutex};
282/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
283/// # #[derive(Deserialize)] struct CreateItem { name: String }
284/// # #[derive(Deserialize)] struct SearchQuery { q: String }
285///
286/// // Plain GET — no body, no state
287/// let health = ServerMechanism::get("/health")
288/// .onconnect(|| async { reply!() });
289///
290/// // POST — JSON body deserialised into `CreateItem`
291/// let create = ServerMechanism::post("/items")
292/// .json::<CreateItem>()
293/// .onconnect(|body| async move {
294/// let item = Item { id: 1, name: body.name };
295/// reply!(json => item, status => Status::Created)
296/// });
297///
298/// // GET — shared counter state injected on every request
299/// let counter: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
300/// let count_route = ServerMechanism::get("/count")
301/// .state(counter.clone())
302/// .onconnect(|state| async move {
303/// let n = *state.lock().unwrap();
304/// reply!(json => n)
305/// });
306/// ```
307pub struct ServerMechanism {
308 method: HttpMethod,
309 path: String,
310}
311
312impl ServerMechanism {
313 fn instance(method: HttpMethod, path: impl Into<String>) -> Self {
314 let path = path.into();
315 log::debug!("Creating {:?} route at '{}'", method, path);
316 Self { method, path }
317 }
318
319 /// Creates a route matching HTTP `GET` requests at `path`.
320 pub fn get(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Get, path) }
321
322 /// Creates a route matching HTTP `POST` requests at `path`.
323 pub fn post(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Post, path) }
324
325 /// Creates a route matching HTTP `PUT` requests at `path`.
326 pub fn put(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Put, path) }
327
328 /// Creates a route matching HTTP `DELETE` requests at `path`.
329 pub fn delete(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Delete, path) }
330
331 /// Creates a route matching HTTP `PATCH` requests at `path`.
332 pub fn patch(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Patch, path) }
333
334 /// Creates a route matching HTTP `HEAD` requests at `path`.
335 pub fn head(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Head, path) }
336
337 /// Creates a route matching HTTP `OPTIONS` requests at `path`.
338 pub fn options(path: impl Into<String>) -> Self { Self::instance(HttpMethod::Options, path) }
339
340 /// Attaches shared state `S` to this route, transitioning to [`StatefulSocketBuilder`].
341 ///
342 /// A fresh clone of `S` is injected into the handler on every request.
343 pub fn state<S: Clone + Send + Sync + 'static>(self, state: S) -> StatefulSocketBuilder<S> {
344 log::trace!("Attaching state to {:?} route at '{}'", self.method, self.path);
345 StatefulSocketBuilder { base: self, state }
346 }
347
348 /// Declares that this route expects a JSON-encoded request body, transitioning to
349 /// [`JsonSocketBuilder`].
350 pub fn json<T: DeserializeOwned + Send>(self) -> JsonSocketBuilder<T> {
351 log::trace!("Attaching JSON body expectation to {:?} route at '{}'", self.method, self.path);
352 JsonSocketBuilder { base: self, _phantom: std::marker::PhantomData }
353 }
354
355 /// Declares that this route extracts its input from URL query parameters, transitioning
356 /// to [`QuerySocketBuilder`].
357 pub fn query<T: DeserializeOwned + Send>(self) -> QuerySocketBuilder<T> {
358 log::trace!("Attaching query parameter expectation to {:?} route at '{}'", self.method, self.path);
359 QuerySocketBuilder { base: self, _phantom: std::marker::PhantomData }
360 }
361
362 /// Declares that this route expects an authenticated-encrypted request body
363 /// (ChaCha20-Poly1305), transitioning to [`EncryptedBodyBuilder`].
364 pub fn encryption<T>(self, key: SerializationKey) -> EncryptedBodyBuilder<T> {
365 log::trace!("Attaching encrypted body to {:?} route at '{}'", self.method, self.path);
366 EncryptedBodyBuilder { base: self, key, _phantom: std::marker::PhantomData }
367 }
368
369 /// Declares that this route expects authenticated-encrypted URL query parameters
370 /// (ChaCha20-Poly1305), transitioning to [`EncryptedQueryBuilder`].
371 pub fn encrypted_query<T>(self, key: SerializationKey) -> EncryptedQueryBuilder<T> {
372 log::trace!("Attaching encrypted query to {:?} route at '{}'", self.method, self.path);
373 EncryptedQueryBuilder { base: self, key, _phantom: std::marker::PhantomData }
374 }
375
376 /// Finalises this route with an async handler that receives no arguments.
377 ///
378 /// Returns a [`SocketType`] ready to be passed to [`Server::mechanism`].
379 ///
380 /// # Example
381 /// ```rust,no_run
382 /// # use toolkit_zero::socket::server::ServerMechanism;
383 /// # use toolkit_zero::socket::server::reply;
384 /// # use serde::Serialize;
385 /// # #[derive(Serialize)] struct Pong { ok: bool }
386 ///
387 /// let route = ServerMechanism::get("/ping")
388 /// .onconnect(|| async {
389 /// reply!(json => Pong { ok: true })
390 /// });
391 /// ```
392 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
393 where
394 F: Fn() -> Fut + Clone + Send + Sync + 'static,
395 Fut: Future<Output = Result<Re, Rejection>> + Send,
396 Re: Reply + Send,
397 {
398 log::debug!("Finalising async {:?} route at '{}' (no args)", self.method, self.path);
399 let method = self.method.to_http();
400 let path = self.path.clone();
401 SocketType {
402 method,
403 path,
404 handler: Arc::new(move |_req: IncomingRequest| {
405 let h = handler.clone();
406 Box::pin(async move {
407 match h().await {
408 Ok(r) => r.into_response(),
409 Err(e) => e.into_response(),
410 }
411 })
412 }),
413 }
414 }
415
416 /// Finalises this route with a **synchronous** handler that receives no arguments.
417 ///
418 /// # Safety
419 ///
420 /// Every incoming request spawns an independent task on Tokio's blocking thread pool.
421 /// The pool caps the number of live OS threads (default 512), but the queue of waiting
422 /// tasks is unbounded — under a traffic surge, tasks accumulate without limit, consuming
423 /// unbounded memory and causing severe latency spikes or OOM crashes. Additionally, any
424 /// panic inside the handler is silently converted into a 500 response, masking runtime
425 /// errors. Callers must ensure the handler completes quickly and that adequate
426 /// backpressure or rate limiting is applied externally.
427 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
428 where
429 F: Fn() -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
430 Re: Reply + Send + 'static,
431 {
432 log::warn!(
433 "Registering sync handler on {:?} '{}' — ensure rate-limiting is applied externally",
434 self.method, self.path
435 );
436 let method = self.method.to_http();
437 let path = self.path.clone();
438 SocketType {
439 method,
440 path,
441 handler: Arc::new(move |_req: IncomingRequest| {
442 let h = handler.clone();
443 Box::pin(async move {
444 match tokio::task::spawn_blocking(move || h()).await {
445 Ok(Ok(r)) => r.into_response(),
446 Ok(Err(e)) => e.into_response(),
447 Err(_) => {
448 log::warn!("Sync handler panicked; returning 500");
449 Rejection::internal().into_response()
450 }
451 }
452 })
453 }),
454 }
455 }
456}
457
458// ─── JsonSocketBuilder ────────────────────────────────────────────────────────
459
460/// Route builder that expects and deserialises a JSON request body of type `T`.
461///
462/// Obtained from [`ServerMechanism::json`]. Optionally attach shared state via
463/// [`state`](JsonSocketBuilder::state), or finalise immediately with
464/// [`onconnect`](JsonSocketBuilder::onconnect) /
465/// [`onconnect_sync`](JsonSocketBuilder::onconnect_sync).
466pub struct JsonSocketBuilder<T> {
467 base: ServerMechanism,
468 _phantom: std::marker::PhantomData<T>,
469}
470
471impl<T: DeserializeOwned + Send + 'static> JsonSocketBuilder<T> {
472 /// Finalises this route with an async handler that receives the deserialised JSON body.
473 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
474 where
475 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
476 Fut: Future<Output = Result<Re, Rejection>> + Send,
477 Re: Reply + Send,
478 {
479 log::debug!(
480 "Finalising async {:?} route at '{}' (JSON body)",
481 self.base.method, self.base.path
482 );
483 let method = self.base.method.to_http();
484 let path = self.base.path.clone();
485 SocketType {
486 method,
487 path,
488 handler: Arc::new(move |req: IncomingRequest| {
489 let h = handler.clone();
490 Box::pin(async move {
491 let body: T = match serde_json::from_slice(&req.body) {
492 Ok(v) => v,
493 Err(e) => {
494 log::debug!("JSON body parse failed: {}", e);
495 return Rejection::bad_request().into_response();
496 }
497 };
498 match h(body).await {
499 Ok(r) => r.into_response(),
500 Err(e) => e.into_response(),
501 }
502 })
503 }),
504 }
505 }
506
507 /// Finalises this route with a **synchronous** handler that receives the deserialised
508 /// JSON body.
509 ///
510 /// # Safety
511 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
512 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
513 where
514 F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
515 Re: Reply + Send + 'static,
516 {
517 log::warn!(
518 "Registering sync handler on {:?} '{}' (JSON body) — ensure rate-limiting is applied externally",
519 self.base.method, self.base.path
520 );
521 let method = self.base.method.to_http();
522 let path = self.base.path.clone();
523 SocketType {
524 method,
525 path,
526 handler: Arc::new(move |req: IncomingRequest| {
527 let h = handler.clone();
528 Box::pin(async move {
529 let body: T = match serde_json::from_slice(&req.body) {
530 Ok(v) => v,
531 Err(e) => {
532 log::debug!("JSON body parse failed (sync): {}", e);
533 return Rejection::bad_request().into_response();
534 }
535 };
536 match tokio::task::spawn_blocking(move || h(body)).await {
537 Ok(Ok(r)) => r.into_response(),
538 Ok(Err(e)) => e.into_response(),
539 Err(_) => {
540 log::warn!("Sync handler (JSON body) panicked; returning 500");
541 Rejection::internal().into_response()
542 }
543 }
544 })
545 }),
546 }
547 }
548
549 /// Attaches shared state `S`, transitioning to [`StatefulJsonSocketBuilder`].
550 pub fn state<S: Clone + Send + Sync + 'static>(
551 self, state: S,
552 ) -> StatefulJsonSocketBuilder<T, S> {
553 StatefulJsonSocketBuilder {
554 base: self.base,
555 state,
556 _phantom: std::marker::PhantomData,
557 }
558 }
559}
560
561// ─── QuerySocketBuilder ───────────────────────────────────────────────────────
562
563/// Route builder that expects and deserialises URL query parameters of type `T`.
564///
565/// Obtained from [`ServerMechanism::query`]. Optionally attach shared state via
566/// [`state`](QuerySocketBuilder::state), or finalise immediately with
567/// [`onconnect`](QuerySocketBuilder::onconnect) /
568/// [`onconnect_sync`](QuerySocketBuilder::onconnect_sync).
569pub struct QuerySocketBuilder<T> {
570 base: ServerMechanism,
571 _phantom: std::marker::PhantomData<T>,
572}
573
574impl<T: DeserializeOwned + Send + 'static> QuerySocketBuilder<T> {
575 /// Finalises this route with an async handler that receives the deserialised query
576 /// parameters.
577 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
578 where
579 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
580 Fut: Future<Output = Result<Re, Rejection>> + Send,
581 Re: Reply + Send,
582 {
583 log::debug!(
584 "Finalising async {:?} route at '{}' (query params)",
585 self.base.method, self.base.path
586 );
587 let method = self.base.method.to_http();
588 let path = self.base.path.clone();
589 SocketType {
590 method,
591 path,
592 handler: Arc::new(move |req: IncomingRequest| {
593 let h = handler.clone();
594 Box::pin(async move {
595 let params: T = match serde_urlencoded::from_str(&req.query) {
596 Ok(v) => v,
597 Err(e) => {
598 log::debug!("query param parse failed: {}", e);
599 return Rejection::bad_request().into_response();
600 }
601 };
602 match h(params).await {
603 Ok(r) => r.into_response(),
604 Err(e) => e.into_response(),
605 }
606 })
607 }),
608 }
609 }
610
611 /// Finalises this route with a **synchronous** handler that receives the deserialised
612 /// query parameters.
613 ///
614 /// # Safety
615 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
616 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
617 where
618 F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
619 Re: Reply + Send + 'static,
620 {
621 log::warn!(
622 "Registering sync handler on {:?} '{}' (query params) — ensure rate-limiting is applied externally",
623 self.base.method, self.base.path
624 );
625 let method = self.base.method.to_http();
626 let path = self.base.path.clone();
627 SocketType {
628 method,
629 path,
630 handler: Arc::new(move |req: IncomingRequest| {
631 let h = handler.clone();
632 Box::pin(async move {
633 let params: T = match serde_urlencoded::from_str(&req.query) {
634 Ok(v) => v,
635 Err(e) => {
636 log::debug!("query param parse failed (sync): {}", e);
637 return Rejection::bad_request().into_response();
638 }
639 };
640 match tokio::task::spawn_blocking(move || h(params)).await {
641 Ok(Ok(r)) => r.into_response(),
642 Ok(Err(e)) => e.into_response(),
643 Err(_) => {
644 log::warn!("Sync handler (query params) panicked; returning 500");
645 Rejection::internal().into_response()
646 }
647 }
648 })
649 }),
650 }
651 }
652
653 /// Attaches shared state `S`, transitioning to [`StatefulQuerySocketBuilder`].
654 pub fn state<S: Clone + Send + Sync + 'static>(
655 self, state: S,
656 ) -> StatefulQuerySocketBuilder<T, S> {
657 StatefulQuerySocketBuilder {
658 base: self.base,
659 state,
660 _phantom: std::marker::PhantomData,
661 }
662 }
663}
664
665// ─── StatefulSocketBuilder ────────────────────────────────────────────────────
666
667/// Route builder that carries shared state `S` with no body or query expectation.
668///
669/// Obtained from [`ServerMechanism::state`]. `S` must be `Clone + Send + Sync + 'static`.
670pub struct StatefulSocketBuilder<S> {
671 base: ServerMechanism,
672 state: S,
673}
674
675impl<S: Clone + Send + Sync + 'static> StatefulSocketBuilder<S> {
676 /// Adds a JSON body expectation, transitioning to [`StatefulJsonSocketBuilder`].
677 pub fn json<T: DeserializeOwned + Send>(self) -> StatefulJsonSocketBuilder<T, S> {
678 StatefulJsonSocketBuilder {
679 base: self.base,
680 state: self.state,
681 _phantom: std::marker::PhantomData,
682 }
683 }
684
685 /// Adds a query parameter expectation, transitioning to [`StatefulQuerySocketBuilder`].
686 pub fn query<T: DeserializeOwned + Send>(self) -> StatefulQuerySocketBuilder<T, S> {
687 StatefulQuerySocketBuilder {
688 base: self.base,
689 state: self.state,
690 _phantom: std::marker::PhantomData,
691 }
692 }
693
694 /// Adds an encrypted body expectation, transitioning to [`StatefulEncryptedBodyBuilder`].
695 pub fn encryption<T>(self, key: SerializationKey) -> StatefulEncryptedBodyBuilder<T, S> {
696 StatefulEncryptedBodyBuilder {
697 base: self.base,
698 key,
699 state: self.state,
700 _phantom: std::marker::PhantomData,
701 }
702 }
703
704 /// Adds an encrypted query expectation, transitioning to [`StatefulEncryptedQueryBuilder`].
705 pub fn encrypted_query<T>(self, key: SerializationKey) -> StatefulEncryptedQueryBuilder<T, S> {
706 StatefulEncryptedQueryBuilder {
707 base: self.base,
708 key,
709 state: self.state,
710 _phantom: std::marker::PhantomData,
711 }
712 }
713
714 /// Finalises this route with an async handler that receives only the shared state.
715 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
716 where
717 F: Fn(S) -> Fut + Clone + Send + Sync + 'static,
718 Fut: Future<Output = Result<Re, Rejection>> + Send,
719 Re: Reply + Send,
720 {
721 log::debug!(
722 "Finalising async {:?} route at '{}' (state)",
723 self.base.method, self.base.path
724 );
725 let method = self.base.method.to_http();
726 let path = self.base.path.clone();
727 let state = self.state;
728 SocketType {
729 method,
730 path,
731 handler: Arc::new(move |_req: IncomingRequest| {
732 let h = handler.clone();
733 let s = state.clone();
734 Box::pin(async move {
735 match h(s).await {
736 Ok(r) => r.into_response(),
737 Err(e) => e.into_response(),
738 }
739 })
740 }),
741 }
742 }
743
744 /// Finalises this route with a **synchronous** handler that receives only the shared
745 /// state.
746 ///
747 /// # Safety
748 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
749 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
750 where
751 F: Fn(S) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
752 Re: Reply + Send + 'static,
753 {
754 log::warn!(
755 "Registering sync handler on {:?} '{}' (state) — ensure rate-limiting and lock-free state are in place",
756 self.base.method, self.base.path
757 );
758 let method = self.base.method.to_http();
759 let path = self.base.path.clone();
760 let state = self.state;
761 SocketType {
762 method,
763 path,
764 handler: Arc::new(move |_req: IncomingRequest| {
765 let h = handler.clone();
766 let s = state.clone();
767 Box::pin(async move {
768 match tokio::task::spawn_blocking(move || h(s)).await {
769 Ok(Ok(r)) => r.into_response(),
770 Ok(Err(e)) => e.into_response(),
771 Err(_) => {
772 log::warn!("Sync handler (state) panicked; returning 500");
773 Rejection::internal().into_response()
774 }
775 }
776 })
777 }),
778 }
779 }
780}
781
782// ─── StatefulJsonSocketBuilder ────────────────────────────────────────────────
783
784/// Route builder that carries shared state `S` and expects a JSON body of type `T`.
785///
786/// Obtained from [`JsonSocketBuilder::state`] or [`StatefulSocketBuilder::json`].
787pub struct StatefulJsonSocketBuilder<T, S> {
788 base: ServerMechanism,
789 state: S,
790 _phantom: std::marker::PhantomData<T>,
791}
792
793impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
794 StatefulJsonSocketBuilder<T, S>
795{
796 /// Finalises this route with an async handler that receives `(state: S, body: T)`.
797 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
798 where
799 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
800 Fut: Future<Output = Result<Re, Rejection>> + Send,
801 Re: Reply + Send,
802 {
803 log::debug!(
804 "Finalising async {:?} route at '{}' (state + JSON body)",
805 self.base.method, self.base.path
806 );
807 let method = self.base.method.to_http();
808 let path = self.base.path.clone();
809 let state = self.state;
810 SocketType {
811 method,
812 path,
813 handler: Arc::new(move |req: IncomingRequest| {
814 let h = handler.clone();
815 let s = state.clone();
816 Box::pin(async move {
817 let body: T = match serde_json::from_slice(&req.body) {
818 Ok(v) => v,
819 Err(e) => {
820 log::debug!("JSON body parse failed (state): {}", e);
821 return Rejection::bad_request().into_response();
822 }
823 };
824 match h(s, body).await {
825 Ok(r) => r.into_response(),
826 Err(e) => e.into_response(),
827 }
828 })
829 }),
830 }
831 }
832
833 /// Finalises this route with a **synchronous** handler that receives `(state: S, body: T)`.
834 ///
835 /// # Safety
836 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
837 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
838 where
839 F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
840 Re: Reply + Send + 'static,
841 {
842 log::warn!(
843 "Registering sync handler on {:?} '{}' (state + JSON body) — ensure rate-limiting and lock-free state are in place",
844 self.base.method, self.base.path
845 );
846 let method = self.base.method.to_http();
847 let path = self.base.path.clone();
848 let state = self.state;
849 SocketType {
850 method,
851 path,
852 handler: Arc::new(move |req: IncomingRequest| {
853 let h = handler.clone();
854 let s = state.clone();
855 Box::pin(async move {
856 let body: T = match serde_json::from_slice(&req.body) {
857 Ok(v) => v,
858 Err(e) => {
859 log::debug!("JSON body parse failed (state+sync): {}", e);
860 return Rejection::bad_request().into_response();
861 }
862 };
863 match tokio::task::spawn_blocking(move || h(s, body)).await {
864 Ok(Ok(r)) => r.into_response(),
865 Ok(Err(e)) => e.into_response(),
866 Err(_) => {
867 log::warn!("Sync handler (state + JSON body) panicked; returning 500");
868 Rejection::internal().into_response()
869 }
870 }
871 })
872 }),
873 }
874 }
875}
876
877// ─── StatefulQuerySocketBuilder ───────────────────────────────────────────────
878
879/// Route builder that carries shared state `S` and expects URL query parameters of type `T`.
880///
881/// Obtained from [`QuerySocketBuilder::state`] or [`StatefulSocketBuilder::query`].
882pub struct StatefulQuerySocketBuilder<T, S> {
883 base: ServerMechanism,
884 state: S,
885 _phantom: std::marker::PhantomData<T>,
886}
887
888impl<T: DeserializeOwned + Send + 'static, S: Clone + Send + Sync + 'static>
889 StatefulQuerySocketBuilder<T, S>
890{
891 /// Finalises this route with an async handler that receives `(state: S, query: T)`.
892 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
893 where
894 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
895 Fut: Future<Output = Result<Re, Rejection>> + Send,
896 Re: Reply + Send,
897 {
898 log::debug!(
899 "Finalising async {:?} route at '{}' (state + query params)",
900 self.base.method, self.base.path
901 );
902 let method = self.base.method.to_http();
903 let path = self.base.path.clone();
904 let state = self.state;
905 SocketType {
906 method,
907 path,
908 handler: Arc::new(move |req: IncomingRequest| {
909 let h = handler.clone();
910 let s = state.clone();
911 Box::pin(async move {
912 let params: T = match serde_urlencoded::from_str(&req.query) {
913 Ok(v) => v,
914 Err(e) => {
915 log::debug!("query param parse failed (state): {}", e);
916 return Rejection::bad_request().into_response();
917 }
918 };
919 match h(s, params).await {
920 Ok(r) => r.into_response(),
921 Err(e) => e.into_response(),
922 }
923 })
924 }),
925 }
926 }
927
928 /// Finalises this route with a **synchronous** handler that receives `(state: S, query: T)`.
929 ///
930 /// # Safety
931 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
932 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
933 where
934 F: Fn(S, T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
935 Re: Reply + Send + 'static,
936 {
937 log::warn!(
938 "Registering sync handler on {:?} '{}' (state + query params) — ensure rate-limiting and lock-free state are in place",
939 self.base.method, self.base.path
940 );
941 let method = self.base.method.to_http();
942 let path = self.base.path.clone();
943 let state = self.state;
944 SocketType {
945 method,
946 path,
947 handler: Arc::new(move |req: IncomingRequest| {
948 let h = handler.clone();
949 let s = state.clone();
950 Box::pin(async move {
951 let params: T = match serde_urlencoded::from_str(&req.query) {
952 Ok(v) => v,
953 Err(e) => {
954 log::debug!("query param parse failed (state+sync): {}", e);
955 return Rejection::bad_request().into_response();
956 }
957 };
958 match tokio::task::spawn_blocking(move || h(s, params)).await {
959 Ok(Ok(r)) => r.into_response(),
960 Ok(Err(e)) => e.into_response(),
961 Err(_) => {
962 log::warn!("Sync handler (state + query params) panicked; returning 500");
963 Rejection::internal().into_response()
964 }
965 }
966 })
967 }),
968 }
969 }
970}
971
972// ─── EncryptedBodyBuilder ────────────────────────────────────────────────────
973
974/// Route builder that expects an authenticated-encrypted request body of type `T`
975/// (ChaCha20-Poly1305).
976///
977/// Obtained from [`ServerMechanism::encryption`]. On each matching request the raw
978/// body bytes are decrypted using the [`SerializationKey`] supplied there. If
979/// decryption fails the route immediately returns `403 Forbidden`.
980///
981/// Optionally attach shared state via [`state`](EncryptedBodyBuilder::state) before
982/// finalising with [`onconnect`](EncryptedBodyBuilder::onconnect) /
983/// [`onconnect_sync`](EncryptedBodyBuilder::onconnect_sync).
984pub struct EncryptedBodyBuilder<T> {
985 base: ServerMechanism,
986 key: SerializationKey,
987 _phantom: std::marker::PhantomData<T>,
988}
989
990impl<T> EncryptedBodyBuilder<T>
991where
992 T: bincode::Decode<()> + Send + 'static,
993{
994 /// Finalises this route with an async handler that receives the decrypted body as `T`.
995 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
996 where
997 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
998 Fut: Future<Output = Result<Re, Rejection>> + Send,
999 Re: Reply + Send,
1000 {
1001 log::debug!(
1002 "Finalising async {:?} route at '{}' (encrypted body)",
1003 self.base.method, self.base.path
1004 );
1005 let method = self.base.method.to_http();
1006 let path = self.base.path.clone();
1007 let key = self.key;
1008 SocketType {
1009 method,
1010 path,
1011 handler: Arc::new(move |req: IncomingRequest| {
1012 let h = handler.clone();
1013 let key = key.clone();
1014 Box::pin(async move {
1015 let value: T = match decode_body(&req.body, &key) {
1016 Ok(v) => v,
1017 Err(e) => return e.into_response(),
1018 };
1019 match h(value).await {
1020 Ok(r) => r.into_response(),
1021 Err(e) => e.into_response(),
1022 }
1023 })
1024 }),
1025 }
1026 }
1027
1028 /// Finalises this route with a **synchronous** handler that receives the decrypted
1029 /// body as `T`.
1030 ///
1031 /// # Safety
1032 /// See [`ServerMechanism::onconnect_sync`] for the thread-pool safety notes.
1033 pub unsafe fn onconnect_sync<F, Re>(self, handler: F) -> SocketType
1034 where
1035 F: Fn(T) -> Result<Re, Rejection> + Clone + Send + Sync + 'static,
1036 Re: Reply + Send + 'static,
1037 {
1038 log::warn!(
1039 "Registering sync handler on {:?} '{}' (encrypted body) — ensure rate-limiting is applied externally",
1040 self.base.method, self.base.path
1041 );
1042 let method = self.base.method.to_http();
1043 let path = self.base.path.clone();
1044 let key = self.key;
1045 SocketType {
1046 method,
1047 path,
1048 handler: Arc::new(move |req: IncomingRequest| {
1049 let h = handler.clone();
1050 let key = key.clone();
1051 Box::pin(async move {
1052 let value: T = match decode_body(&req.body, &key) {
1053 Ok(v) => v,
1054 Err(e) => return e.into_response(),
1055 };
1056 match tokio::task::spawn_blocking(move || h(value)).await {
1057 Ok(Ok(r)) => r.into_response(),
1058 Ok(Err(e)) => e.into_response(),
1059 Err(_) => {
1060 log::warn!("Sync encrypted handler panicked; returning 500");
1061 Rejection::internal().into_response()
1062 }
1063 }
1064 })
1065 }),
1066 }
1067 }
1068
1069 /// Attaches shared state `S`, transitioning to [`StatefulEncryptedBodyBuilder`].
1070 pub fn state<S: Clone + Send + Sync + 'static>(
1071 self, state: S,
1072 ) -> StatefulEncryptedBodyBuilder<T, S> {
1073 StatefulEncryptedBodyBuilder {
1074 base: self.base,
1075 key: self.key,
1076 state,
1077 _phantom: std::marker::PhantomData,
1078 }
1079 }
1080}
1081
1082// ─── EncryptedQueryBuilder ────────────────────────────────────────────────────
1083
1084/// Route builder that expects authenticated-encrypted URL query parameters of type `T`
1085/// (ChaCha20-Poly1305).
1086///
1087/// Obtained from [`ServerMechanism::encrypted_query`]. The client must send a single
1088/// `?data=<base64url>` query parameter. Any failure returns `403 Forbidden`.
1089///
1090/// Optionally attach shared state via [`state`](EncryptedQueryBuilder::state) before
1091/// finalising with [`onconnect`](EncryptedQueryBuilder::onconnect).
1092pub struct EncryptedQueryBuilder<T> {
1093 base: ServerMechanism,
1094 key: SerializationKey,
1095 _phantom: std::marker::PhantomData<T>,
1096}
1097
1098impl<T> EncryptedQueryBuilder<T>
1099where
1100 T: bincode::Decode<()> + Send + 'static,
1101{
1102 /// Finalises this route with an async handler that receives the decrypted query
1103 /// parameters as `T`.
1104 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1105 where
1106 F: Fn(T) -> Fut + Clone + Send + Sync + 'static,
1107 Fut: Future<Output = Result<Re, Rejection>> + Send,
1108 Re: Reply + Send,
1109 {
1110 log::debug!(
1111 "Finalising async {:?} route at '{}' (encrypted query)",
1112 self.base.method, self.base.path
1113 );
1114 let method = self.base.method.to_http();
1115 let path = self.base.path.clone();
1116 let key = self.key;
1117 SocketType {
1118 method,
1119 path,
1120 handler: Arc::new(move |req: IncomingRequest| {
1121 let h = handler.clone();
1122 let key = key.clone();
1123 Box::pin(async move {
1124 let value: T = match decode_query(&req.query, &key) {
1125 Ok(v) => v,
1126 Err(e) => return e.into_response(),
1127 };
1128 match h(value).await {
1129 Ok(r) => r.into_response(),
1130 Err(e) => e.into_response(),
1131 }
1132 })
1133 }),
1134 }
1135 }
1136
1137 /// Attaches shared state `S`, transitioning to [`StatefulEncryptedQueryBuilder`].
1138 pub fn state<S: Clone + Send + Sync + 'static>(
1139 self, state: S,
1140 ) -> StatefulEncryptedQueryBuilder<T, S> {
1141 StatefulEncryptedQueryBuilder {
1142 base: self.base,
1143 key: self.key,
1144 state,
1145 _phantom: std::marker::PhantomData,
1146 }
1147 }
1148}
1149
1150// ─── StatefulEncryptedBodyBuilder ────────────────────────────────────────────
1151
1152/// Route builder carrying shared state `S` and an authenticated-encrypted request body
1153/// of type `T` (ChaCha20-Poly1305).
1154///
1155/// Obtained from [`EncryptedBodyBuilder::state`] or [`StatefulSocketBuilder::encryption`].
1156pub struct StatefulEncryptedBodyBuilder<T, S> {
1157 base: ServerMechanism,
1158 key: SerializationKey,
1159 state: S,
1160 _phantom: std::marker::PhantomData<T>,
1161}
1162
1163impl<T, S> StatefulEncryptedBodyBuilder<T, S>
1164where
1165 T: bincode::Decode<()> + Send + 'static,
1166 S: Clone + Send + Sync + 'static,
1167{
1168 /// Finalises this route with an async handler that receives `(state: S, body: T)`.
1169 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1170 where
1171 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1172 Fut: Future<Output = Result<Re, Rejection>> + Send,
1173 Re: Reply + Send,
1174 {
1175 log::debug!(
1176 "Finalising async {:?} route at '{}' (state + encrypted body)",
1177 self.base.method, self.base.path
1178 );
1179 let method = self.base.method.to_http();
1180 let path = self.base.path.clone();
1181 let key = self.key;
1182 let state = self.state;
1183 SocketType {
1184 method,
1185 path,
1186 handler: Arc::new(move |req: IncomingRequest| {
1187 let h = handler.clone();
1188 let key = key.clone();
1189 let s = state.clone();
1190 Box::pin(async move {
1191 let value: T = match decode_body(&req.body, &key) {
1192 Ok(v) => v,
1193 Err(e) => return e.into_response(),
1194 };
1195 match h(s, value).await {
1196 Ok(r) => r.into_response(),
1197 Err(e) => e.into_response(),
1198 }
1199 })
1200 }),
1201 }
1202 }
1203}
1204
1205// ─── StatefulEncryptedQueryBuilder ───────────────────────────────────────────
1206
1207/// Route builder carrying shared state `S` and authenticated-encrypted query parameters
1208/// of type `T` (ChaCha20-Poly1305).
1209///
1210/// Obtained from [`EncryptedQueryBuilder::state`] or
1211/// [`StatefulSocketBuilder::encrypted_query`].
1212pub struct StatefulEncryptedQueryBuilder<T, S> {
1213 base: ServerMechanism,
1214 key: SerializationKey,
1215 state: S,
1216 _phantom: std::marker::PhantomData<T>,
1217}
1218
1219impl<T, S> StatefulEncryptedQueryBuilder<T, S>
1220where
1221 T: bincode::Decode<()> + Send + 'static,
1222 S: Clone + Send + Sync + 'static,
1223{
1224 /// Finalises this route with an async handler that receives `(state: S, query: T)`.
1225 pub fn onconnect<F, Fut, Re>(self, handler: F) -> SocketType
1226 where
1227 F: Fn(S, T) -> Fut + Clone + Send + Sync + 'static,
1228 Fut: Future<Output = Result<Re, Rejection>> + Send,
1229 Re: Reply + Send,
1230 {
1231 log::debug!(
1232 "Finalising async {:?} route at '{}' (state + encrypted query)",
1233 self.base.method, self.base.path
1234 );
1235 let method = self.base.method.to_http();
1236 let path = self.base.path.clone();
1237 let key = self.key;
1238 let state = self.state;
1239 SocketType {
1240 method,
1241 path,
1242 handler: Arc::new(move |req: IncomingRequest| {
1243 let h = handler.clone();
1244 let key = key.clone();
1245 let s = state.clone();
1246 Box::pin(async move {
1247 let value: T = match decode_query(&req.query, &key) {
1248 Ok(v) => v,
1249 Err(e) => return e.into_response(),
1250 };
1251 match h(s, value).await {
1252 Ok(r) => r.into_response(),
1253 Err(e) => e.into_response(),
1254 }
1255 })
1256 }),
1257 }
1258 }
1259}
1260
1261// ─── Internal decode helpers ─────────────────────────────────────────────────
1262
1263/// Decrypt an authenticated-encrypted request body into `T`, returning 403 on failure.
1264fn decode_body<T: bincode::Decode<()>>(
1265 raw: &bytes::Bytes,
1266 key: &SerializationKey,
1267) -> Result<T, Rejection> {
1268 crate::serialization::open(raw, key.veil_key()).map_err(|e| {
1269 log::debug!("body decryption failed (key mismatch or corrupt body): {}", e);
1270 Rejection::forbidden()
1271 })
1272}
1273
1274/// Decrypt an authenticated-encrypted `?data=<base64url>` query into `T`, returning 403 on failure.
1275fn decode_query<T: bincode::Decode<()>>(
1276 raw_query: &str,
1277 key: &SerializationKey,
1278) -> Result<T, Rejection> {
1279 use base64::Engine;
1280
1281 #[derive(serde::Deserialize)]
1282 struct DataParam { data: String }
1283
1284 let q: DataParam = serde_urlencoded::from_str(raw_query).map_err(|_| {
1285 log::debug!("encrypted query missing `data` parameter");
1286 Rejection::forbidden()
1287 })?;
1288
1289 let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
1290 .decode(&q.data)
1291 .map_err(|e| {
1292 log::debug!("base64url decode failed: {}", e);
1293 Rejection::forbidden()
1294 })?;
1295
1296 crate::serialization::open(&bytes, key.veil_key()).map_err(|e| {
1297 log::debug!("query decryption failed: {}", e);
1298 Rejection::forbidden()
1299 })
1300}
1301
1302/// Returns a `403 Forbidden` rejection.
1303///
1304/// Use in route handlers to deny access:
1305/// ```rust,no_run
1306/// # use toolkit_zero::socket::server::*;
1307/// # let _ = ServerMechanism::post("/secure")
1308/// # .onconnect(|| async { Err::<EmptyReply, _>(forbidden()) });
1309/// ```
1310pub fn forbidden() -> Rejection {
1311 Rejection::forbidden()
1312}
1313
1314// ─── Status ───────────────────────────────────────────────────────────────────
1315
1316/// A collection of common HTTP status codes used with the reply helpers.
1317///
1318/// Converts into `http::StatusCode` via [`From`] and is accepted by
1319/// [`reply_with_status`] and [`reply_with_status_and_json`]. Also usable directly
1320/// in the [`reply!`] macro via the `status => Status::X` argument.
1321#[derive(Clone, Copy, Debug)]
1322pub enum Status {
1323 // 2xx
1324 Ok,
1325 Created,
1326 Accepted,
1327 NoContent,
1328 // 3xx
1329 MovedPermanently,
1330 Found,
1331 NotModified,
1332 TemporaryRedirect,
1333 PermanentRedirect,
1334 // 4xx
1335 BadRequest,
1336 Unauthorized,
1337 Forbidden,
1338 NotFound,
1339 MethodNotAllowed,
1340 Conflict,
1341 Gone,
1342 UnprocessableEntity,
1343 TooManyRequests,
1344 // 5xx
1345 InternalServerError,
1346 NotImplemented,
1347 BadGateway,
1348 ServiceUnavailable,
1349 GatewayTimeout,
1350}
1351
1352impl From<Status> for http::StatusCode {
1353 fn from(s: Status) -> Self {
1354 match s {
1355 Status::Ok => http::StatusCode::OK,
1356 Status::Created => http::StatusCode::CREATED,
1357 Status::Accepted => http::StatusCode::ACCEPTED,
1358 Status::NoContent => http::StatusCode::NO_CONTENT,
1359 Status::MovedPermanently => http::StatusCode::MOVED_PERMANENTLY,
1360 Status::Found => http::StatusCode::FOUND,
1361 Status::NotModified => http::StatusCode::NOT_MODIFIED,
1362 Status::TemporaryRedirect => http::StatusCode::TEMPORARY_REDIRECT,
1363 Status::PermanentRedirect => http::StatusCode::PERMANENT_REDIRECT,
1364 Status::BadRequest => http::StatusCode::BAD_REQUEST,
1365 Status::Unauthorized => http::StatusCode::UNAUTHORIZED,
1366 Status::Forbidden => http::StatusCode::FORBIDDEN,
1367 Status::NotFound => http::StatusCode::NOT_FOUND,
1368 Status::MethodNotAllowed => http::StatusCode::METHOD_NOT_ALLOWED,
1369 Status::Conflict => http::StatusCode::CONFLICT,
1370 Status::Gone => http::StatusCode::GONE,
1371 Status::UnprocessableEntity => http::StatusCode::UNPROCESSABLE_ENTITY,
1372 Status::TooManyRequests => http::StatusCode::TOO_MANY_REQUESTS,
1373 Status::InternalServerError => http::StatusCode::INTERNAL_SERVER_ERROR,
1374 Status::NotImplemented => http::StatusCode::NOT_IMPLEMENTED,
1375 Status::BadGateway => http::StatusCode::BAD_GATEWAY,
1376 Status::ServiceUnavailable => http::StatusCode::SERVICE_UNAVAILABLE,
1377 Status::GatewayTimeout => http::StatusCode::GATEWAY_TIMEOUT,
1378 }
1379 }
1380}
1381
1382// ─── Server ───────────────────────────────────────────────────────────────────
1383
1384/// The HTTP server that owns and dispatches a collection of [`SocketType`] routes.
1385///
1386/// Build routes through the [`ServerMechanism`] builder chain, register each with
1387/// [`mechanism`](Server::mechanism), then start the server with [`serve`](Server::serve).
1388///
1389/// # Example
1390/// ```rust,no_run
1391/// # use toolkit_zero::socket::server::{Server, ServerMechanism, Status};
1392/// # use toolkit_zero::socket::server::reply;
1393/// # use serde::Serialize;
1394/// # #[derive(Serialize)] struct Pong { ok: bool }
1395///
1396/// let mut server = Server::default();
1397///
1398/// server
1399/// .mechanism(
1400/// ServerMechanism::get("/ping")
1401/// .onconnect(|| async { reply!(json => Pong { ok: true }) })
1402/// )
1403/// .mechanism(
1404/// ServerMechanism::delete("/session")
1405/// .onconnect(|| async { reply!() })
1406/// );
1407///
1408/// // Blocks forever — call only to actually run the server:
1409/// // server.serve(([0, 0, 0, 0], 8080)).await;
1410/// ```
1411///
1412/// # Caution
1413/// Calling [`serve`](Server::serve) with no routes registered will **panic**.
1414pub struct Server {
1415 mechanisms: Vec<SocketType>,
1416 /// Default bind address, set by [`rebind`](Server::rebind).
1417 bind_addr: Option<std::net::SocketAddr>,
1418}
1419
1420impl Default for Server {
1421 fn default() -> Self {
1422 Self::new()
1423 }
1424}
1425
1426impl Server {
1427 fn new() -> Self {
1428 Self { mechanisms: Vec::new(), bind_addr: None }
1429 }
1430
1431 /// Registers a [`SocketType`] route on this server.
1432 ///
1433 /// Routes are evaluated in registration order. Returns `&mut Self` for chaining.
1434 pub fn mechanism(&mut self, mech: SocketType) -> &mut Self {
1435 self.mechanisms.push(mech);
1436 log::debug!("Route registered (total: {})", self.mechanisms.len());
1437 self
1438 }
1439
1440 /// Binds to `addr` and starts serving all registered routes.
1441 ///
1442 /// Returns a [`ServerFuture`] that can be:
1443 /// - **`.await`'d** — runs the server in the current task (infinite loop)
1444 /// - **`.background()`'d** — spawns the server as a Tokio background task
1445 ///
1446 /// # Panics
1447 /// Panics if no routes have been registered or if the address cannot be bound.
1448 pub fn serve(self, addr: impl Into<SocketAddr>) -> ServerFuture {
1449 let addr = addr.into();
1450 let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1451 ServerFuture::new(async move {
1452 log::info!("Server binding to {}", addr);
1453 run_hyper_server(routes, addr, std::future::pending::<()>()).await;
1454 })
1455 }
1456
1457 /// Binds to `addr`, serves all registered routes, and shuts down gracefully when
1458 /// `shutdown` resolves.
1459 ///
1460 /// Returns a [`ServerFuture`] that can be `.await`'d or `.background()`'d.
1461 ///
1462 /// # Example
1463 ///
1464 /// ```rust,no_run
1465 /// use tokio::sync::oneshot;
1466 ///
1467 /// # #[tokio::main]
1468 /// # async fn main() {
1469 /// let (tx, rx) = oneshot::channel::<()>();
1470 /// # let mut server = toolkit_zero::socket::server::Server::default();
1471 ///
1472 /// let handle = server.serve_with_graceful_shutdown(
1473 /// ([127, 0, 0, 1], 8080),
1474 /// async move { rx.await.ok(); },
1475 /// ).background();
1476 /// tx.send(()).ok();
1477 /// handle.await.ok();
1478 /// # }
1479 /// ```
1480 pub fn serve_with_graceful_shutdown(
1481 self,
1482 addr: impl Into<std::net::SocketAddr>,
1483 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1484 ) -> ServerFuture {
1485 let addr = addr.into();
1486 let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1487 ServerFuture::new(async move {
1488 log::info!("Server binding to {} (graceful shutdown enabled)", addr);
1489 run_hyper_server(routes, addr, shutdown).await;
1490 })
1491 }
1492
1493 /// Serves all registered routes from an already-bound `listener`, shutting down
1494 /// gracefully when `shutdown` resolves.
1495 ///
1496 /// Returns a [`ServerFuture`] that can be `.await`'d or `.background()`'d.
1497 ///
1498 /// Use this when port `0` is passed to `TcpListener::bind` and you need to know
1499 /// the actual OS-assigned port before the server starts.
1500 ///
1501 /// # Example
1502 ///
1503 /// ```rust,no_run
1504 /// use tokio::net::TcpListener;
1505 /// use tokio::sync::oneshot;
1506 ///
1507 /// # #[tokio::main]
1508 /// # async fn main() -> std::io::Result<()> {
1509 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
1510 /// let port = listener.local_addr()?.port();
1511 ///
1512 /// let (tx, rx) = oneshot::channel::<()>();
1513 /// # let mut server = toolkit_zero::socket::server::Server::default();
1514 ///
1515 /// let handle = server
1516 /// .serve_from_listener(listener, async move { rx.await.ok(); })
1517 /// .background();
1518 /// tx.send(()).ok();
1519 /// handle.await.ok();
1520 /// # Ok(())
1521 /// # }
1522 /// ```
1523 pub fn serve_from_listener(
1524 self,
1525 listener: tokio::net::TcpListener,
1526 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
1527 ) -> ServerFuture {
1528 let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1529 ServerFuture::new(async move {
1530 log::info!(
1531 "Server running on {} (graceful shutdown enabled)",
1532 listener.local_addr().map(|a| a.to_string()).unwrap_or_else(|_| "?".into())
1533 );
1534 run_hyper_server_inner(routes, listener, shutdown).await;
1535 })
1536 }
1537
1538 /// Stores `addr` as this server's default bind address.
1539 ///
1540 /// This is a pre-serve convenience setter. Call it before
1541 /// [`serve_managed`](Server::serve_managed) or any other `serve*` variant to
1542 /// record the initial address without starting the server.
1543 ///
1544 /// Returns `&mut Self` for method chaining.
1545 pub fn rebind(&mut self, addr: impl Into<std::net::SocketAddr>) -> &mut Self {
1546 self.bind_addr = Some(addr.into());
1547 log::debug!("Default bind address updated to {:?}", self.bind_addr);
1548 self
1549 }
1550
1551 /// Starts all registered routes in a background Tokio task and returns a
1552 /// [`BackgroundServer`] handle.
1553 ///
1554 /// Unlike `serve*` + `.background()`, this method keeps a live route table
1555 /// inside the handle, enabling:
1556 ///
1557 /// - [`BackgroundServer::rebind`] — graceful stop + restart on a new address
1558 /// - [`BackgroundServer::mechanism`] — add routes **without restarting**
1559 /// - [`BackgroundServer::addr`] — query the current bind address
1560 /// - [`BackgroundServer::stop`] — shut down and await completion
1561 ///
1562 /// # Panics
1563 /// Panics if no routes have been registered.
1564 ///
1565 /// # Example
1566 /// ```rust,no_run
1567 /// # use toolkit_zero::socket::server::Server;
1568 /// # use serde::Serialize;
1569 /// # use toolkit_zero::socket::server::reply;
1570 /// # #[derive(Serialize)] struct Pong { ok: bool }
1571 /// # #[tokio::main]
1572 /// # async fn main() {
1573 /// let mut server = Server::default();
1574 /// server.mechanism(
1575 /// toolkit_zero::socket::server::ServerMechanism::get("/ping")
1576 /// .onconnect(|| async { reply!(json => Pong { ok: true }) })
1577 /// );
1578 ///
1579 /// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1580 /// println!("Running on {}", bg.addr());
1581 ///
1582 /// bg.rebind(([127, 0, 0, 1], 9090)).await;
1583 /// println!("Rebound to {}", bg.addr());
1584 ///
1585 /// bg.stop().await;
1586 /// # }
1587 /// ```
1588 pub fn serve_managed(self, addr: impl Into<std::net::SocketAddr>) -> BackgroundServer {
1589 let addr = addr.into();
1590 let routes = Arc::new(tokio::sync::RwLock::new(self.mechanisms));
1591 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1592 let routes_ref = Arc::clone(&routes);
1593 let handle = tokio::spawn(run_hyper_server(
1594 routes_ref,
1595 addr,
1596 async { rx.await.ok(); },
1597 ));
1598 BackgroundServer {
1599 routes,
1600 addr,
1601 shutdown_tx: Some(tx),
1602 handle: Some(handle),
1603 }
1604 }
1605}
1606
1607// ─── ServerFuture ────────────────────────────────────────────────────────────
1608
1609/// Opaque future returned by [`Server::serve`], [`Server::serve_with_graceful_shutdown`],
1610/// and [`Server::serve_from_listener`].
1611///
1612/// A `ServerFuture` can be used in two ways:
1613///
1614/// - **`.await`** — drives the server inline in the current task.
1615/// - **`.background()`** — spawns the server on a new Tokio task and returns a
1616/// [`tokio::task::JoinHandle<()>`] immediately.
1617pub struct ServerFuture(Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
1618
1619impl ServerFuture {
1620 fn new(fut: impl Future<Output = ()> + Send + 'static) -> Self {
1621 Self(Box::pin(fut))
1622 }
1623
1624 /// Spawns the server on a new Tokio background task and returns a `JoinHandle<()>`.
1625 ///
1626 /// # Panics
1627 /// Panics if called outside a Tokio runtime.
1628 pub fn background(self) -> tokio::task::JoinHandle<()> {
1629 tokio::spawn(self.0)
1630 }
1631}
1632
1633impl std::future::IntoFuture for ServerFuture {
1634 type Output = ();
1635 type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
1636 fn into_future(self) -> Self::IntoFuture {
1637 self.0
1638 }
1639}
1640
1641// ─── Internal: dispatch + server loop ────────────────────────────────────────
1642
1643/// Dispatch a single hyper request to the matching route handler.
1644async fn dispatch(
1645 routes: &Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1646 req: hyper::Request<hyper::body::Incoming>,
1647) -> http::Response<bytes::Bytes> {
1648 use http_body_util::BodyExt;
1649
1650 let (parts, body) = req.into_parts();
1651 let path = parts.uri.path().to_owned();
1652 let query = parts.uri.query().unwrap_or("").to_owned();
1653 let method = parts.method.clone();
1654 let headers = parts.headers.clone();
1655
1656 // Collect body bytes before acquiring the route lock.
1657 let body_bytes = match body.collect().await {
1658 Ok(c) => c.to_bytes(),
1659 Err(e) => {
1660 log::debug!("failed to read request body: {}", e);
1661 return http::Response::builder()
1662 .status(http::StatusCode::BAD_REQUEST)
1663 .body(bytes::Bytes::new())
1664 .unwrap();
1665 }
1666 };
1667
1668 // Hold lock only long enough to clone the matching handler Arc.
1669 let handler = {
1670 let guard = routes.read().await;
1671 guard
1672 .iter()
1673 .find(|s| s.method == method && path_matches(&s.path, &path))
1674 .map(|s| Arc::clone(&s.handler))
1675 };
1676
1677 match handler {
1678 Some(h) => {
1679 h(IncomingRequest { body: body_bytes, query, headers }).await
1680 }
1681 None => {
1682 log::debug!("No route matched {} {}", method, path);
1683 http::Response::builder()
1684 .status(http::StatusCode::NOT_FOUND)
1685 .body(bytes::Bytes::new())
1686 .unwrap()
1687 }
1688 }
1689}
1690
1691/// Core server loop — drives an already-bound listener with graceful shutdown.
1692///
1693/// Uses `hyper::server::conn::http1::Builder` (HTTP/1.1) whose `Connection<IO, S>`
1694/// has no lifetime tied to the builder, making it `'static` and compatible with
1695/// `tokio::spawn` and `GracefulShutdown::watch`.
1696async fn run_hyper_server_inner(
1697 routes: Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1698 listener: tokio::net::TcpListener,
1699 shutdown: impl Future<Output = ()> + Send + 'static,
1700) {
1701 use hyper_util::server::graceful::GracefulShutdown;
1702 use hyper_util::rt::TokioIo;
1703 use hyper::server::conn::http1;
1704
1705 let graceful = GracefulShutdown::new();
1706 let mut shutdown = std::pin::pin!(shutdown);
1707
1708 loop {
1709 tokio::select! {
1710 result = listener.accept() => {
1711 let (stream, remote) = match result {
1712 Ok(pair) => pair,
1713 Err(e) => {
1714 log::warn!("accept error: {}", e);
1715 continue;
1716 }
1717 };
1718 log::trace!("accepted connection from {}", remote);
1719
1720 let routes_ref = Arc::clone(&routes);
1721 // http1::Builder::serve_connection returns Connection<IO, S> with no
1722 // builder-lifetime, so it is 'static when IO + S are 'static.
1723 let conn = http1::Builder::new().serve_connection(
1724 TokioIo::new(stream),
1725 hyper::service::service_fn(move |req| {
1726 let r = Arc::clone(&routes_ref);
1727 async move {
1728 let resp = dispatch(&r, req).await;
1729 let (parts, body) = resp.into_parts();
1730 Ok::<_, std::convert::Infallible>(
1731 http::Response::from_parts(
1732 parts,
1733 http_body_util::Full::new(body),
1734 )
1735 )
1736 }
1737 }),
1738 );
1739 let fut = graceful.watch(conn);
1740 tokio::spawn(async move {
1741 if let Err(e) = fut.await {
1742 log::debug!("connection error: {}", e);
1743 }
1744 });
1745 }
1746 _ = &mut shutdown => {
1747 log::info!("shutdown signal received — draining in-flight connections");
1748 break;
1749 }
1750 }
1751 }
1752
1753 // Free the TCP port so a rebound server can bind immediately.
1754 drop(listener);
1755
1756 // Block until every in-flight request completes.
1757 graceful.shutdown().await;
1758 log::info!("all connections drained");
1759}
1760
1761/// Bind to `addr` then delegate to [`run_hyper_server_inner`].
1762async fn run_hyper_server(
1763 routes: Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1764 addr: SocketAddr,
1765 shutdown: impl Future<Output = ()> + Send + 'static,
1766) {
1767 let listener = match tokio::net::TcpListener::bind(addr).await {
1768 Ok(l) => {
1769 log::info!("server bound to {}", addr);
1770 l
1771 }
1772 Err(e) => {
1773 log::error!("failed to bind {}: {}", addr, e);
1774 panic!("server bind failed: {}", e);
1775 }
1776 };
1777 run_hyper_server_inner(routes, listener, shutdown).await;
1778}
1779
1780// ─── BackgroundServer ─────────────────────────────────────────────────────────
1781
1782/// A managed, background HTTP server returned by [`Server::serve_managed`].
1783///
1784/// The server starts as soon as `serve_managed` is called. Use this handle to:
1785///
1786/// - [`addr`](BackgroundServer::addr) — query the current bind address
1787/// - [`rebind`](BackgroundServer::rebind) — gracefully migrate to a new address
1788/// - [`mechanism`](BackgroundServer::mechanism) — add a route live, **no restart**
1789/// - [`stop`](BackgroundServer::stop) — shut down and await the task
1790///
1791/// # Routes are preserved across `rebind`
1792/// All routes registered before [`serve_managed`](Server::serve_managed), plus any
1793/// added via [`mechanism`](BackgroundServer::mechanism), are automatically carried over when
1794/// [`rebind`](BackgroundServer::rebind) restarts the server.
1795///
1796/// # Ownership
1797/// Dropping a `BackgroundServer` without calling [`stop`](BackgroundServer::stop)
1798/// leaves the background task running until the Tokio runtime exits.
1799///
1800/// # Example
1801/// ```rust,no_run
1802/// # use toolkit_zero::socket::server::Server;
1803/// # use serde::Serialize;
1804/// # use toolkit_zero::socket::server::reply;
1805/// # #[derive(Serialize)] struct Pong { ok: bool }
1806/// # #[tokio::main]
1807/// # async fn main() {
1808/// let mut server = Server::default();
1809/// server.mechanism(
1810/// toolkit_zero::socket::server::ServerMechanism::get("/ping")
1811/// .onconnect(|| async { reply!(json => Pong { ok: true }) })
1812/// );
1813/// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1814/// assert_eq!(bg.addr().port(), 8080);
1815///
1816/// bg.rebind(([127, 0, 0, 1], 9090)).await;
1817/// assert_eq!(bg.addr().port(), 9090);
1818///
1819/// bg.stop().await;
1820/// # }
1821/// ```
1822pub struct BackgroundServer {
1823 /// Shared mutable route table — written by [`mechanism`](BackgroundServer::mechanism), read by the server loop.
1824 routes: Arc<tokio::sync::RwLock<Vec<SocketType>>>,
1825 addr: std::net::SocketAddr,
1826 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
1827 handle: Option<tokio::task::JoinHandle<()>>,
1828}
1829
1830impl BackgroundServer {
1831 /// Returns the address the server is currently bound to.
1832 pub fn addr(&self) -> std::net::SocketAddr {
1833 self.addr
1834 }
1835
1836 /// Shuts the server down gracefully and awaits the background task.
1837 ///
1838 /// In-flight requests complete before the server stops.
1839 pub async fn stop(mut self) {
1840 if let Some(tx) = self.shutdown_tx.take() {
1841 let _ = tx.send(());
1842 }
1843 if let Some(h) = self.handle.take() {
1844 let _ = h.await;
1845 }
1846 }
1847
1848 /// Migrates the server to `addr` with zero route loss:
1849 ///
1850 /// 1. Sends a graceful shutdown signal to the current instance.
1851 /// 2. Waits for all in-flight requests to complete.
1852 /// 3. Spawns a fresh server task on the new address with the same routes.
1853 ///
1854 /// After this method returns, [`addr`](BackgroundServer::addr) reflects the
1855 /// new address and the server is accepting connections.
1856 ///
1857 /// # Example
1858 /// ```rust,no_run
1859 /// # use toolkit_zero::socket::server::Server;
1860 /// # use serde::Serialize;
1861 /// # use toolkit_zero::socket::server::reply;
1862 /// # #[derive(Serialize)] struct Pong { ok: bool }
1863 /// # #[tokio::main]
1864 /// # async fn main() {
1865 /// # let mut server = Server::default();
1866 /// # server.mechanism(
1867 /// # toolkit_zero::socket::server::ServerMechanism::get("/ping")
1868 /// # .onconnect(|| async { reply!(json => Pong { ok: true }) })
1869 /// # );
1870 /// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1871 ///
1872 /// bg.rebind(([127, 0, 0, 1], 9090)).await;
1873 /// assert_eq!(bg.addr().port(), 9090);
1874 ///
1875 /// bg.stop().await;
1876 /// # }
1877 /// ```
1878 pub async fn rebind(&mut self, addr: impl Into<std::net::SocketAddr>) {
1879 // 1. Graceful shutdown of the current server.
1880 if let Some(tx) = self.shutdown_tx.take() {
1881 let _ = tx.send(());
1882 }
1883 // 2. Wait for all in-flight requests to drain.
1884 if let Some(h) = self.handle.take() {
1885 let _ = h.await;
1886 }
1887 // 3. Start on the new address, sharing the existing route table.
1888 let new_addr = addr.into();
1889 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
1890 self.shutdown_tx = Some(tx);
1891 self.addr = new_addr;
1892 let routes = Arc::clone(&self.routes);
1893 self.handle = Some(tokio::spawn(run_hyper_server(
1894 routes,
1895 new_addr,
1896 async { rx.await.ok(); },
1897 )));
1898 log::info!("Server rebound to {}", new_addr);
1899 }
1900
1901 /// Registers a new route on the **running** server without any restart.
1902 ///
1903 /// Because routes are stored in an `Arc<RwLock<Vec<SocketType>>>` shared between
1904 /// this handle and the server's dispatch loop, writing through the lock makes the
1905 /// new route visible to the next incoming request immediately — no TCP port gap,
1906 /// no in-flight request interruption.
1907 ///
1908 /// Returns `&mut Self` for chaining.
1909 ///
1910 /// # Example
1911 ///
1912 /// ```rust,no_run
1913 /// use toolkit_zero::socket::server::{Server, ServerMechanism, reply};
1914 /// use serde::Serialize;
1915 ///
1916 /// #[derive(Serialize)] struct Pong { ok: bool }
1917 /// #[derive(Serialize)] struct Status { msg: String }
1918 ///
1919 /// # #[tokio::main]
1920 /// # async fn main() {
1921 /// let mut server = Server::default();
1922 /// server.mechanism(
1923 /// ServerMechanism::get("/ping")
1924 /// .onconnect(|| async { reply!(json => Pong { ok: true }) })
1925 /// );
1926 ///
1927 /// let mut bg = server.serve_managed(([127, 0, 0, 1], 8080));
1928 ///
1929 /// bg.mechanism(
1930 /// ServerMechanism::get("/status")
1931 /// .onconnect(|| async { reply!(json => Status { msg: "enabled".into() }) })
1932 /// ).await;
1933 ///
1934 /// // /status is now live alongside /ping — no restart.
1935 /// bg.stop().await;
1936 /// # }
1937 /// ```
1938 pub async fn mechanism(&mut self, mech: SocketType) -> &mut Self {
1939 self.routes.write().await.push(mech);
1940 log::debug!(
1941 "mechanism: live route added (total = {})",
1942 self.routes.read().await.len()
1943 );
1944 self
1945 }
1946}
1947
1948// ─── Reply helpers ────────────────────────────────────────────────────────────
1949
1950/// Wraps `reply` with the given HTTP `status` code and returns it as a result.
1951///
1952/// Pairs with the [`reply!`] macro form `reply!(message => ..., status => ...)`.
1953pub fn reply_with_status(
1954 status: Status,
1955 reply: impl Reply,
1956) -> Result<http::Response<bytes::Bytes>, Rejection> {
1957 let mut resp = reply.into_response();
1958 *resp.status_mut() = status.into();
1959 Ok(resp)
1960}
1961
1962/// Returns an empty `200 OK` reply result.
1963///
1964/// Equivalent to `reply!()`.
1965pub fn reply() -> Result<impl Reply, Rejection> {
1966 Ok::<_, Rejection>(EmptyReply)
1967}
1968
1969/// Serialises `json` as a JSON body and returns it as a `200 OK` reply result.
1970///
1971/// `T` must implement `serde::Serialize`. Equivalent to `reply!(json => ...)`.
1972pub fn reply_with_json<T: Serialize>(
1973 json: &T,
1974) -> Result<impl Reply + use<T>, Rejection> {
1975 let bytes = serde_json::to_vec(json).map_err(|_| Rejection::internal())?;
1976 Ok::<_, Rejection>(JsonReply {
1977 body: bytes::Bytes::from(bytes),
1978 status: http::StatusCode::OK,
1979 })
1980}
1981
1982/// Serialises `json` as a JSON body, attaches the given HTTP `status`, and returns a result.
1983///
1984/// `T` must implement `serde::Serialize`. Equivalent to `reply!(json => ..., status => ...)`.
1985pub fn reply_with_status_and_json<T: Serialize>(
1986 status: Status,
1987 json: &T,
1988) -> Result<impl Reply + use<T>, Rejection> {
1989 let bytes = serde_json::to_vec(json).map_err(|_| Rejection::internal())?;
1990 Ok::<_, Rejection>(JsonReply {
1991 body: bytes::Bytes::from(bytes),
1992 status: status.into(),
1993 })
1994}
1995
1996/// Seals `value` with `key` and returns it as an `application/octet-stream` response (`200 OK`).
1997///
1998/// `T` must implement `bincode::Encode`.
1999/// Equivalent to `reply!(sealed => value, key => key)`.
2000pub fn reply_sealed<T: bincode::Encode>(
2001 value: &T,
2002 key: SerializationKey,
2003) -> Result<http::Response<bytes::Bytes>, Rejection> {
2004 sealed_response(value, key, None)
2005}
2006
2007/// Seals `value` with `key`, attaches the given HTTP `status`, and returns it as a result.
2008///
2009/// Equivalent to `reply!(sealed => value, key => key, status => Status::X)`.
2010pub fn reply_sealed_with_status<T: bincode::Encode>(
2011 value: &T,
2012 key: SerializationKey,
2013 status: Status,
2014) -> Result<http::Response<bytes::Bytes>, Rejection> {
2015 sealed_response(value, key, Some(status))
2016}
2017
2018fn sealed_response<T: bincode::Encode>(
2019 value: &T,
2020 key: SerializationKey,
2021 status: Option<Status>,
2022) -> Result<http::Response<bytes::Bytes>, Rejection> {
2023 let code: http::StatusCode = status.map(Into::into).unwrap_or(http::StatusCode::OK);
2024 let sealed = crate::serialization::seal(value, key.veil_key())
2025 .map_err(|_| Rejection::internal())?;
2026 Ok(http::Response::builder()
2027 .status(code)
2028 .header(http::header::CONTENT_TYPE, "application/octet-stream")
2029 .body(bytes::Bytes::from(sealed))
2030 .unwrap())
2031}
2032
2033// ─── reply! macro ─────────────────────────────────────────────────────────────
2034
2035/// Convenience macro for constructing reply results inside route handlers.
2036///
2037/// | Syntax | Equivalent | Description |
2038/// |---|---|---|
2039/// | `reply!()` | [`reply()`] | Empty `200 OK` response. |
2040/// | `reply!(message => expr, status => Status::X)` | [`reply_with_status`] | Any `Reply` with a status code. |
2041/// | `reply!(json => expr)` | [`reply_with_json`] | JSON body with `200 OK`. |
2042/// | `reply!(json => expr, status => Status::X)` | [`reply_with_status_and_json`] | JSON body with a status code. |
2043/// | `reply!(sealed => expr, key => key)` | [`reply_sealed`] | Encrypted body, `200 OK`. |
2044/// | `reply!(sealed => expr, key => key, status => Status::X)` | [`reply_sealed_with_status`] | Encrypted body with status. |
2045///
2046/// # Example
2047/// ```rust,no_run
2048/// # use toolkit_zero::socket::server::{ServerMechanism, Status};
2049/// # use toolkit_zero::socket::server::reply;
2050/// # use serde::{Deserialize, Serialize};
2051/// # #[derive(Deserialize, Serialize)] struct Item { id: u32, name: String }
2052///
2053/// // Empty 200 OK
2054/// let ping = ServerMechanism::get("/ping")
2055/// .onconnect(|| async { reply!() });
2056///
2057/// // JSON body, 200 OK
2058/// let list = ServerMechanism::get("/items")
2059/// .onconnect(|| async {
2060/// let items: Vec<Item> = vec![];
2061/// reply!(json => items)
2062/// });
2063///
2064/// // JSON body with a custom status
2065/// let create = ServerMechanism::post("/items")
2066/// .json::<Item>()
2067/// .onconnect(|item| async move {
2068/// reply!(json => item, status => Status::Created)
2069/// });
2070/// ```
2071#[doc(hidden)]
2072#[macro_export]
2073macro_rules! reply {
2074 () => {{
2075 $crate::socket::server::reply()
2076 }};
2077
2078 (message => $message: expr, status => $status: expr) => {{
2079 $crate::socket::server::reply_with_status($status, $message)
2080 }};
2081
2082 (json => $json: expr) => {{
2083 $crate::socket::server::reply_with_json(&$json)
2084 }};
2085
2086 (json => $json: expr, status => $status: expr) => {{
2087 $crate::socket::server::reply_with_status_and_json($status, &$json)
2088 }};
2089
2090 (sealed => $val: expr, key => $key: expr) => {{
2091 $crate::socket::server::reply_sealed(&$val, $key)
2092 }};
2093
2094 (sealed => $val: expr, key => $key: expr, status => $status: expr) => {{
2095 $crate::socket::server::reply_sealed_with_status(&$val, $key, $status)
2096 }};
2097}
2098
2099/// Re-export the [`reply!`] macro so it is accessible as
2100/// `toolkit_zero::socket::server::reply` and included in glob imports.
2101pub use crate::reply;