actus_reply/reply.rs
1//! Contains the user-facing API for creating replies.
2use bytes::Bytes;
3use futures_util::{Stream, StreamExt};
4use serde::Serialize;
5use serde_json::Value;
6use std::any::Any;
7use std::borrow::Cow;
8use std::collections::HashMap;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::time::Duration;
13
14// ===================== Core Types =====================
15
16/// A boxed, `Send + Sync` stream of body chunks, used by [`ReplyData::Stream`]
17/// and SSE responses.
18pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, io::Error>> + Send + Sync>>;
19
20/// The concrete payload of a successful reply. The [`Finalizer`](crate::Finalizer)
21/// turns it into an HTTP response. Build one with the [`reply!`](crate::reply!)
22/// macro or the `reply::*` constructor functions rather than by hand.
23#[derive()]
24pub enum ReplyData {
25 /// A JSON body (`Content-Type: application/json`).
26 Json(Value),
27 /// Raw bytes plus the `Content-Type` to send. There is no default —
28 /// callers pick the type at the call site (`application/zip`,
29 /// `image/png`, `application/octet-stream`, …). Construct via
30 /// [`bytes()`](crate::bytes). The finalizer emits the header verbatim.
31 Bytes {
32 /// The `Content-Type` header value to send with the bytes.
33 content_type: Cow<'static, str>,
34 /// The raw response body.
35 data: Vec<u8>,
36 },
37 /// An empty body — `204 No Content` by default.
38 Empty,
39 /// A reply carrying explicit status / headers (a [`ReplySpec`]) on top of
40 /// one of the other payload kinds.
41 Rich(Box<ReplySpec>),
42 /// A streaming body, written out as the stream yields.
43 Stream(BodyStream),
44 /// An HTTP connection-upgrade reply (e.g. WebSocket). Produced by
45 /// `actus::ws::upgrade(...)` (with the `websocket` feature) and consumed
46 /// by the server, which completes the handshake (`101 Switching Protocols`)
47 /// and then hands the upgraded connection to the handler. The boxed value
48 /// is opaque server plumbing — don't construct or inspect it directly.
49 Upgrade(Box<dyn Any + Send>),
50}
51
52// Manual `Debug` implementation to handle the non-Debug variants.
53impl fmt::Debug for ReplyData {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 ReplyData::Json(j) => f.debug_tuple("Json").field(j).finish(),
57 ReplyData::Bytes { content_type, data } => f
58 .debug_struct("Bytes")
59 .field("content_type", content_type)
60 .field("data", data)
61 .finish(),
62 ReplyData::Empty => write!(f, "Empty"),
63 ReplyData::Rich(r) => f.debug_tuple("Rich").field(r).finish(),
64 ReplyData::Stream(_) => f.debug_tuple("Stream").field(&"...").finish(),
65 ReplyData::Upgrade(_) => f.debug_tuple("Upgrade").field(&"...").finish(),
66 }
67 }
68}
69
70// Manual `PartialEq` implementation. Streams are never equal.
71impl PartialEq for ReplyData {
72 fn eq(&self, other: &Self) -> bool {
73 match (self, other) {
74 (ReplyData::Json(l), ReplyData::Json(r)) => l == r,
75 (
76 ReplyData::Bytes {
77 content_type: lc,
78 data: ld,
79 },
80 ReplyData::Bytes {
81 content_type: rc,
82 data: rd,
83 },
84 ) => lc == rc && ld == rd,
85 (ReplyData::Empty, ReplyData::Empty) => true,
86 (ReplyData::Rich(l), ReplyData::Rich(r)) => l == r,
87 _ => false, // Streams and different variants are not equal
88 }
89 }
90}
91
92/// A handler's return type: `Ok(`[`ReplyData`]`)` for a success payload, or
93/// `Err(`[`WebError`]`)` for an error the framework renders as an HTTP error
94/// response. The [`reply!`](crate::reply!) macro produces the `Ok` side.
95pub type Reply = Result<ReplyData, WebError>;
96
97impl ReplyData {
98 /// Add a response header. Lifts `self` into [`ReplyData::Rich`] if the
99 /// current variant doesn't already carry headers (Json/Bytes/Empty/Stream),
100 /// so middleware `after` hooks (and any code that has a `ReplyData` it
101 /// wants to decorate) can stamp headers without manually wrangling
102 /// [`ReplySpec`]. The original payload is preserved.
103 ///
104 /// No-op on [`ReplyData::Upgrade`] — that variant is intercepted by the
105 /// server before the response body is finalized, and lifting it into
106 /// `Rich` would hide it from that interception.
107 pub fn add_header(&mut self, name: impl Into<String>, value: impl Into<String>) {
108 if matches!(self, ReplyData::Upgrade(_)) {
109 return;
110 }
111 let name = name.into();
112 let value = value.into();
113 if let ReplyData::Rich(spec) = self {
114 spec.headers.insert(name, value);
115 return;
116 }
117 let payload = std::mem::replace(self, ReplyData::Empty);
118 let mut headers = HashMap::new();
119 headers.insert(name, value);
120 *self = ReplyData::Rich(Box::new(ReplySpec {
121 payload,
122 status: None,
123 headers,
124 }));
125 }
126
127 /// Set the response status, lifting into [`ReplyData::Rich`] if needed
128 /// (see [`add_header`](Self::add_header)).
129 pub fn set_status(&mut self, status: http::StatusCode) {
130 if matches!(self, ReplyData::Upgrade(_)) {
131 return;
132 }
133 if let ReplyData::Rich(spec) = self {
134 spec.status = Some(status);
135 return;
136 }
137 let payload = std::mem::replace(self, ReplyData::Empty);
138 *self = ReplyData::Rich(Box::new(ReplySpec {
139 payload,
140 status: Some(status),
141 headers: HashMap::new(),
142 }));
143 }
144}
145
146// ===================== Simple Constructors =====================
147// All constructor functions must be `pub`.
148
149/// Build a JSON `ReplyData` from any [`Serialize`] value.
150///
151/// Panics on serialization failure. Most domain types (those with derived
152/// `Serialize` over stringy fields) never fail, but custom impls can.
153/// The `reply!` macro uses [`try_json`] internally to convert that case
154/// into a `WebError::Internal` (→ 500) rather than a handler-level panic
155/// that would drop the whole connection.
156///
157/// Prefer `reply!(my_value)` over calling this directly.
158pub fn json<T: Serialize>(val: T) -> ReplyData {
159 ReplyData::Json(serde_json::to_value(val).expect("serialize"))
160}
161
162/// Fallible counterpart of [`json`]. Returns the raw `serde_json::Error`
163/// on failure so callers (notably the `reply!` macro) can map it to a
164/// structured response instead of panicking.
165pub fn try_json<T: Serialize>(val: T) -> Result<ReplyData, serde_json::Error> {
166 Ok(ReplyData::Json(serde_json::to_value(val)?))
167}
168
169/// Build a `ReplyData::Bytes` with an explicit `Content-Type`. The
170/// content-type is required: there is no honest default for arbitrary
171/// bytes, and `application/octet-stream` masquerading as one tends to
172/// cause more bugs than it fixes (browsers guess; downstream HTTP
173/// caches mis-cache). Pass the right media type at the call site.
174///
175/// ```ignore
176/// reply::bytes("application/zip", zip_payload)
177/// reply::bytes("image/png", png_payload)
178/// ```
179pub fn bytes(content_type: impl Into<Cow<'static, str>>, data: impl Into<Vec<u8>>) -> ReplyData {
180 ReplyData::Bytes {
181 content_type: content_type.into(),
182 data: data.into(),
183 }
184}
185
186/// An empty-body reply (`204 No Content` unless a status is set via
187/// [`build_reply`]).
188pub fn empty() -> ReplyData {
189 ReplyData::Empty
190}
191
192/// Start a [`ReplySpec`] builder for a reply that needs an explicit status
193/// and/or headers on top of its body.
194pub fn build_reply() -> ReplySpec {
195 ReplySpec::new()
196}
197
198/// Build a streaming-body reply from a stream of byte chunks. The body is
199/// written out as the stream yields, not buffered.
200pub fn stream<S>(s: S) -> ReplyData
201where
202 S: Stream<Item = Result<Bytes, io::Error>> + Send + Sync + 'static,
203{
204 ReplyData::Stream(Box::pin(s))
205}
206
207// ===================== Server-Sent Events =====================
208
209/// One frame in a [Server-Sent Events][sse] stream. A frame may carry data
210/// (the common case), a comment (heartbeats and debug pings), and the
211/// optional `event` / `id` / `retry` fields. The wire encoding is handled
212/// by [`SseEvent::to_bytes`] — embedded newlines in `data` become multiple
213/// `data:` lines, the way the spec requires.
214///
215/// ```ignore
216/// SseEvent::data("hello").event("greeting").id("42")
217/// SseEvent::data("line1\nline2") // becomes two `data: ...` lines
218/// SseEvent::comment("keep-alive")
219/// ```
220///
221/// [sse]: https://html.spec.whatwg.org/multipage/server-sent-events.html
222#[derive(Default, Debug, Clone)]
223pub struct SseEvent {
224 event: Option<String>,
225 id: Option<String>,
226 retry: Option<Duration>,
227 data: Option<String>,
228 comment: Option<String>,
229}
230
231impl SseEvent {
232 /// New event with a `data:` field. The most common entry point.
233 pub fn data(data: impl Into<String>) -> Self {
234 Self {
235 data: Some(data.into()),
236 ..Self::default()
237 }
238 }
239
240 /// New event with just a comment line (`: ...\n\n`). Comments are
241 /// ignored by `EventSource` clients but keep the connection alive
242 /// through intermediaries — useful as a heartbeat when the stream is
243 /// idle for long stretches.
244 pub fn comment(text: impl Into<String>) -> Self {
245 Self {
246 comment: Some(text.into()),
247 ..Self::default()
248 }
249 }
250
251 /// Set the event-type name (the `event:` field). Clients can route on
252 /// this with `EventSource.addEventListener(name, …)`. Embedded newlines
253 /// are replaced with spaces (the spec doesn't allow newlines in event
254 /// names).
255 pub fn event(mut self, name: impl Into<String>) -> Self {
256 self.event = Some(name.into());
257 self
258 }
259
260 /// Set the `id:` field. The browser will replay it on reconnect via
261 /// the `Last-Event-ID` request header — use it to support resumable
262 /// streams.
263 pub fn id(mut self, id: impl Into<String>) -> Self {
264 self.id = Some(id.into());
265 self
266 }
267
268 /// Set the `retry:` field — tells the client how long (in
269 /// milliseconds) to wait before reconnecting after a drop.
270 pub fn retry(mut self, d: Duration) -> Self {
271 self.retry = Some(d);
272 self
273 }
274
275 /// Encode this event in the SSE wire format, ending with the blank-line
276 /// frame separator (`\n`). Used by [`sse`]; you rarely need to call it
277 /// directly unless you're building a custom streaming pipeline.
278 pub fn to_bytes(&self) -> Bytes {
279 // Per HTML spec §9.2.6: lines starting with `:` are comments; each
280 // field is `field-name: value\n` (the leading space after the colon
281 // is consumed by the parser, so it's pretty but not significant);
282 // a blank line ends the event. `data` spanning multiple lines is
283 // expressed as one `data:` line per source line, then reassembled
284 // by the client with `\n` joins.
285 let mut out = String::new();
286 if let Some(event) = &self.event {
287 out.push_str("event: ");
288 out.push_str(&single_line(event));
289 out.push('\n');
290 }
291 if let Some(id) = &self.id {
292 out.push_str("id: ");
293 out.push_str(&single_line(id));
294 out.push('\n');
295 }
296 if let Some(retry) = self.retry {
297 // u128 → u64 saturate. >584M years of retry delay would
298 // overflow; nobody cares.
299 let ms = u64::try_from(retry.as_millis()).unwrap_or(u64::MAX);
300 out.push_str("retry: ");
301 out.push_str(&ms.to_string());
302 out.push('\n');
303 }
304 if let Some(comment) = &self.comment {
305 for line in comment.split('\n') {
306 out.push(':');
307 if !line.is_empty() {
308 out.push(' ');
309 out.push_str(line);
310 }
311 out.push('\n');
312 }
313 }
314 if let Some(data) = &self.data {
315 for line in data.split('\n') {
316 out.push_str("data: ");
317 out.push_str(line);
318 out.push('\n');
319 }
320 }
321 out.push('\n'); // blank line ends the event
322 Bytes::from(out)
323 }
324}
325
326/// Build a streaming Server-Sent Events response from a stream of
327/// [`SseEvent`]s. Sets `Content-Type: text/event-stream` and
328/// `Cache-Control: no-cache` (without which an intermediate cache might
329/// hold the live stream, defeating the point).
330///
331/// ```ignore
332/// use actus::prelude::*;
333/// use futures_util::stream;
334/// use std::time::Duration;
335///
336/// pub async fn updates(&self) -> Reply {
337/// let events = stream::iter(vec![
338/// SseEvent::data("tick").id("1"),
339/// SseEvent::data("tick").id("2").retry(Duration::from_secs(5)),
340/// SseEvent::comment("keep-alive"),
341/// ]);
342/// Ok(reply::sse(events))
343/// }
344/// ```
345///
346/// The stream is consumed lazily by the connection — frames are written
347/// out as the stream yields, not buffered. When the stream ends, the
348/// connection closes.
349pub fn sse<S>(events: S) -> ReplyData
350where
351 S: Stream<Item = SseEvent> + Send + Sync + 'static,
352{
353 let byte_stream = events.map(|ev| Ok::<Bytes, io::Error>(ev.to_bytes()));
354 let mut headers = HashMap::new();
355 headers.insert("content-type".to_string(), "text/event-stream".to_string());
356 headers.insert("cache-control".to_string(), "no-cache".to_string());
357 ReplyData::Rich(Box::new(ReplySpec {
358 payload: ReplyData::Stream(Box::pin(byte_stream)),
359 status: None,
360 headers,
361 }))
362}
363
364/// Replace `\n` and `\r` with `\x20` (space). For the `event` and `id`
365/// fields, where the SSE spec doesn't permit embedded newlines.
366fn single_line(s: &str) -> String {
367 s.replace(['\n', '\r'], " ")
368}
369
370// ===================== Builder Pattern =====================
371
372/// A builder for a reply with an explicit status and/or headers wrapped
373/// around a payload. Start one with [`build_reply`], chain the setters, and
374/// finish with [`ReplySpec::done`] (which yields a [`ReplyData::Rich`]).
375#[derive(Debug, PartialEq)]
376pub struct ReplySpec {
377 /// The body payload this spec decorates.
378 pub payload: ReplyData,
379 /// The status code to send, or `None` to use the payload's default.
380 pub status: Option<http::StatusCode>,
381 /// Extra response headers, as name → value.
382 pub headers: HashMap<String, String>,
383}
384
385impl Default for ReplySpec {
386 fn default() -> Self {
387 Self::new()
388 }
389}
390
391impl ReplySpec {
392 /// Start an empty spec (no body, no status, no headers).
393 pub fn new() -> Self {
394 Self {
395 payload: ReplyData::Empty,
396 status: None,
397 headers: HashMap::new(),
398 }
399 }
400
401 /// Set the body payload.
402 pub fn body(mut self, data: ReplyData) -> Self {
403 self.payload = data;
404 self
405 }
406
407 /// Set the response status code.
408 pub fn status(mut self, s: http::StatusCode) -> Self {
409 self.status = Some(s);
410 self
411 }
412
413 /// Add a response header.
414 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
415 self.headers.insert(key.into(), value.into());
416 self
417 }
418
419 /// Finish the builder, producing a [`ReplyData::Rich`].
420 pub fn done(self) -> ReplyData {
421 ReplyData::Rich(Box::new(self))
422 }
423}
424
425// ===================== Ergonomic Macro =====================
426/// Creates a `Reply` (a `Result<ReplyData, WebError>`) for a success case.
427///
428/// Accepts any [`Serialize`] value; failures during JSON conversion become
429/// `Err(WebError::Internal(...))` (→ 500) rather than panicking. The
430/// underlying [`json`] function still panics for callers that prefer to
431/// surface bugs loudly; if you need that, call it directly.
432///
433/// ```ignore
434/// reply!(my_struct) // serialize as JSON
435/// reply!(json!({"status": "ok"})) // inline JSON literal
436/// reply!() // 204 No Content
437/// reply!(stream: byte_stream) // streaming body
438/// reply!(sse: event_stream) // Server-Sent Events
439/// reply!(status = StatusCode::CREATED, value)
440/// reply!(
441/// status = StatusCode::CREATED,
442/// headers = { "Location": "/users/123" },
443/// value
444/// )
445/// ```
446#[macro_export]
447macro_rules! reply {
448 // With status and headers
449 (status = $status:expr, headers = { $($k:literal : $v:expr),* $(,)? }, $body:expr) => {{
450 match $crate::reply::try_json($body) {
451 Ok(__rd) => {
452 let mut spec = $crate::reply::build_reply()
453 .status($status)
454 .body(__rd);
455 $(
456 spec = spec.header($k, &$v.to_string());
457 )*
458 Ok(spec.done())
459 }
460 Err(e) => Err($crate::reply::WebError::Internal(
461 ::std::format!("serialize response: {}", e),
462 )),
463 }
464 }};
465 // With just status
466 (status = $status:expr, $body:expr) => {
467 match $crate::reply::try_json($body) {
468 Ok(__rd) => Ok($crate::reply::build_reply()
469 .status($status)
470 .body(__rd)
471 .done()),
472 Err(e) => Err($crate::reply::WebError::Internal(
473 ::std::format!("serialize response: {}", e),
474 )),
475 }
476 };
477 // Stream
478 (stream: $s:expr) => {
479 Ok($crate::reply::stream($s))
480 };
481 // Server-Sent Events
482 (sse: $s:expr) => {
483 Ok($crate::reply::sse($s))
484 };
485 // Empty
486 () => {
487 Ok($crate::reply::empty())
488 };
489 // JSON from struct or literal
490 ($expr:expr) => {
491 match $crate::reply::try_json($expr) {
492 Ok(__rd) => Ok(__rd),
493 Err(e) => Err($crate::reply::WebError::Internal(
494 ::std::format!("serialize response: {}", e),
495 )),
496 }
497 };
498}
499
500// ===================== Error Type =====================
501
502/// Structured error response shaped like an [RFC 7807 Problem Details]
503/// document, plus arbitrary extension members that get serialized into
504/// the JSON body.
505///
506/// Use this when one of the simple `WebError::*` variants doesn't carry
507/// enough information (e.g., a validation failure that wants to name
508/// the failing field and rule).
509///
510/// ```ignore
511/// return Err(WebError::Problem(
512/// ProblemDetails::new(StatusCode::CONFLICT, "Validation")
513/// .detail("title is required")
514/// .extra("field", "data.title")
515/// .extra("rule", "required"),
516/// ));
517/// ```
518///
519/// Wire shape (`application/problem+json`):
520/// ```json
521/// { "status": 409, "title": "Validation", "detail": "title is required",
522/// "field": "data.title", "rule": "required" }
523/// ```
524///
525/// [RFC 7807 Problem Details]: https://datatracker.ietf.org/doc/html/rfc7807
526#[derive(Debug, Clone)]
527pub struct ProblemDetails {
528 /// The HTTP status code, serialized as the `status` member.
529 pub status: http::StatusCode,
530 /// A short, human-readable summary, serialized as the `title` member.
531 pub title: String,
532 /// An optional longer explanation, serialized as the `detail` member.
533 pub detail: Option<String>,
534 /// Extension members. Serialized alongside `status` / `title` / `detail`.
535 /// Keys that collide with the standard members are ignored.
536 ///
537 /// Boxed to keep `WebError` small: this map dominates the struct's size,
538 /// yet extensions are optional and most errors carry none — so the common
539 /// case pays one pointer, not a full inline map, on every `Result` return.
540 pub extra: Box<serde_json::Map<String, serde_json::Value>>,
541}
542
543impl ProblemDetails {
544 /// Construct a new problem with the given status code and title.
545 pub fn new(status: http::StatusCode, title: impl Into<String>) -> Self {
546 Self {
547 status,
548 title: title.into(),
549 detail: None,
550 extra: Box::new(serde_json::Map::new()),
551 }
552 }
553
554 /// Set the human-readable detail message.
555 pub fn detail(mut self, detail: impl Into<String>) -> Self {
556 self.detail = Some(detail.into());
557 self
558 }
559
560 /// Add an extension member. Use any JSON-serializable value via `.into()`,
561 /// `serde_json::json!(…)`, or `serde_json::to_value(…)`.
562 pub fn extra(mut self, key: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
563 self.extra.insert(key.into(), value.into());
564 self
565 }
566}
567
568impl fmt::Display for ProblemDetails {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 match &self.detail {
571 Some(d) => write!(f, "{}: {}", self.title, d),
572 None => write!(f, "{}", self.title),
573 }
574 }
575}
576
577/// An error returned from a handler, a `prepare` hook, or middleware. Each
578/// variant maps to an HTTP status the framework renders as the response
579/// ([`WebError::Problem`] as an `application/problem+json` body).
580#[derive(thiserror::Error, Debug)]
581pub enum WebError {
582 /// `404 Not Found`.
583 #[error("Not found")]
584 NotFound,
585 /// `405`. Carries the methods the matched resource *does* accept, so the
586 /// response can include the `Allow` header RFC 7231 §6.5.5 requires.
587 /// Tokens are the canonical uppercase verb names (`"GET"`, `"POST"`, …);
588 /// `actus-server`'s router builds this list, so handlers rarely construct
589 /// it directly.
590 #[error("Method not allowed; allowed methods: {0:?}")]
591 MethodNotAllowed(Vec<&'static str>),
592 /// `400 Bad Request`. The string is a human-readable reason (e.g. a
593 /// malformed-body or missing-parameter explanation).
594 #[error("Bad request: {0}")]
595 BadRequest(String),
596 /// `413`. The request body exceeded the configured limit (see
597 /// `Server::with_max_body_bytes`).
598 #[error("Payload too large")]
599 PayloadTooLarge,
600 /// `429`. Rate limit exceeded. The optional [`Duration`] is the
601 /// retry-after hint — when present, the framework sets a
602 /// `Retry-After: <seconds>` header on the response, per RFC 7231
603 /// §7.1.3. Used by an application-supplied rate-limit middleware /
604 /// prepare hook; the framework itself doesn't ship a limiter (policy
605 /// belongs in the application; see the rate-limiting pattern in the
606 /// README).
607 #[error("Too many requests")]
608 TooManyRequests(Option<Duration>),
609 /// `504`. The request didn't complete within the configured timeout
610 /// (see `Server::with_request_timeout`). Mapped to `504 Gateway
611 /// Timeout`: the framework is acting as a gateway between the HTTP
612 /// connection and the handler/middleware stack, and the upstream
613 /// (handler) didn't respond in time. Produced by the framework when
614 /// the per-request timer elapses; handlers can also `Err(Timeout)`
615 /// from a downstream call that timed out themselves.
616 #[error("Request timeout")]
617 Timeout,
618 /// `503`. The server is overloaded — typically the framework-level
619 /// concurrency / buffer budget is exhausted (see
620 /// `Server::with_max_inflight_body_bytes`). The optional [`Duration`]
621 /// is a retry-after hint; when present, the framework sets a
622 /// `Retry-After: <seconds>` header on the response. Distinct from
623 /// `TooManyRequests` (429): `Busy` is "the server is overwhelmed
624 /// globally," not "this client has hit its specific limit."
625 #[error("Server busy")]
626 Busy(Option<Duration>),
627 /// `401 Unauthorized` — authentication is missing or invalid.
628 #[error("Unauthorized")]
629 Unauthorized,
630 /// `403 Forbidden` — authenticated but not permitted.
631 #[error("Forbidden")]
632 Forbidden,
633 /// `500 Internal Server Error`. The string is logged/returned as the
634 /// reason; use it for unexpected failures, not for client mistakes.
635 #[error("Internal error: {0}")]
636 Internal(String),
637 /// Structured error with an explicit status code, title, optional detail,
638 /// and arbitrary extension members. Renders as `application/problem+json`.
639 #[error("{0}")]
640 Problem(ProblemDetails),
641}
642
643// ===================== Tests =====================
644#[cfg(test)]
645mod tests {
646 // Note: bring everything from the parent module into scope for tests
647 use super::*;
648 use futures_util::{StreamExt, stream};
649 use http::StatusCode;
650 use serde_json::json;
651
652 // Test that the macro now correctly returns a Result
653 #[test]
654 fn test_macro_returns_result() {
655 let success_reply: Reply = reply!(json!({"status": "ok"}));
656 assert!(success_reply.is_ok());
657
658 let empty_reply: Reply = reply!();
659 assert!(empty_reply.is_ok());
660 }
661
662 #[test]
663 fn test_constructors() {
664 matches!(json(json!({"ok": true})), ReplyData::Json(_));
665 matches!(
666 bytes("application/octet-stream", vec![0, 1]),
667 ReplyData::Bytes { .. }
668 );
669 matches!(empty(), ReplyData::Empty);
670 }
671
672 #[tokio::test]
673 async fn test_stream_constructor() {
674 let test_stream = stream::once(async { Ok::<_, io::Error>(Bytes::from("test")) });
675 let reply_data = stream(test_stream);
676 if let ReplyData::Stream(mut s) = reply_data {
677 assert_eq!(s.next().await.unwrap().unwrap(), "test");
678 } else {
679 panic!("Expected a stream reply");
680 }
681 }
682
683 #[test]
684 fn test_reply_macro_variants() {
685 // JSON literal
686 let data_res: Reply = reply!(json!({"a":1}));
687 matches!(data_res.unwrap(), ReplyData::Json(_));
688
689 // Struct
690 #[derive(Serialize)]
691 struct TestStruct {
692 val: i32,
693 }
694 let data_res: Reply = reply!(TestStruct { val: 10 });
695 matches!(data_res.unwrap(), ReplyData::Json(_));
696
697 // Empty
698 let data_res: Reply = reply!();
699 matches!(data_res.unwrap(), ReplyData::Empty);
700
701 // With status
702 let data_res: Reply = reply!(status = StatusCode::CREATED, json!({"id": 1}));
703 match data_res.unwrap() {
704 ReplyData::Rich(spec) => {
705 assert_eq!(spec.status, Some(StatusCode::CREATED));
706 }
707 _ => panic!("Expected Rich reply"),
708 }
709
710 // With status and headers
711 let data_res: Reply =
712 reply!(status = StatusCode::CREATED, headers = {"X-Test": "value"}, json!({"id": 1}));
713 match data_res.unwrap() {
714 ReplyData::Rich(spec) => {
715 assert_eq!(spec.headers.get("X-Test"), Some(&"value".to_string()));
716 }
717 _ => panic!("Expected Rich reply"),
718 }
719 }
720
721 #[test]
722 fn add_header_lifts_non_rich_into_rich_preserving_payload() {
723 // Json → Rich(Json), header set.
724 let mut r = ReplyData::Json(json!({"x": 1}));
725 r.add_header("X-Trace-Id", "abc");
726 match &r {
727 ReplyData::Rich(spec) => {
728 assert!(matches!(spec.payload, ReplyData::Json(_)));
729 assert_eq!(spec.headers.get("X-Trace-Id"), Some(&"abc".to_string()));
730 assert_eq!(spec.status, None);
731 }
732 _ => panic!("expected Rich, got {r:?}"),
733 }
734
735 // Empty → Rich(Empty), header set.
736 let mut r = ReplyData::Empty;
737 r.add_header("X-K", "v");
738 assert!(matches!(&r, ReplyData::Rich(s) if matches!(s.payload, ReplyData::Empty)));
739
740 // Already Rich → mutate in place, payload untouched.
741 let mut r = ReplyData::Rich(Box::new(ReplySpec {
742 payload: ReplyData::Json(json!({"y": 2})),
743 status: Some(StatusCode::CREATED),
744 headers: HashMap::from([("Existing".into(), "1".into())]),
745 }));
746 r.add_header("New", "2");
747 match &r {
748 ReplyData::Rich(spec) => {
749 assert_eq!(spec.status, Some(StatusCode::CREATED));
750 assert_eq!(spec.headers.get("Existing"), Some(&"1".to_string()));
751 assert_eq!(spec.headers.get("New"), Some(&"2".to_string()));
752 }
753 _ => unreachable!(),
754 }
755 }
756
757 #[test]
758 fn set_status_lifts_or_mutates() {
759 let mut r = ReplyData::Json(json!({"x": 1}));
760 r.set_status(StatusCode::ACCEPTED);
761 match &r {
762 ReplyData::Rich(spec) => {
763 assert_eq!(spec.status, Some(StatusCode::ACCEPTED));
764 assert!(matches!(spec.payload, ReplyData::Json(_)));
765 }
766 _ => panic!("expected Rich"),
767 }
768
769 // Already Rich: mutate.
770 let mut r = ReplyData::Rich(Box::new(ReplySpec {
771 payload: ReplyData::Empty,
772 status: None,
773 headers: HashMap::new(),
774 }));
775 r.set_status(StatusCode::NOT_FOUND);
776 if let ReplyData::Rich(spec) = &r {
777 assert_eq!(spec.status, Some(StatusCode::NOT_FOUND));
778 }
779 }
780
781 #[test]
782 fn add_header_and_set_status_are_a_noop_on_upgrade() {
783 let task: Box<dyn std::any::Any + Send> = Box::new(42i32); // any `Any+Send` is fine for this test
784 let mut r = ReplyData::Upgrade(task);
785 r.add_header("X-K", "v");
786 r.set_status(StatusCode::CREATED);
787 assert!(matches!(r, ReplyData::Upgrade(_))); // unchanged
788 }
789
790 // ===== SSE encoding =====
791 //
792 // Each test pins one bit of the W3C wire spec. Decode-side reassembly
793 // would be the obvious sanity check, but a `tokio-tungstenite`-style
794 // SSE-parser dep isn't worth pulling in for what amounts to "did we
795 // emit the right bytes" — we read the encoded string directly.
796
797 fn s(b: &Bytes) -> &str {
798 std::str::from_utf8(b).expect("SSE encoding is always valid UTF-8")
799 }
800
801 #[test]
802 fn sse_data_only_event_emits_one_data_line_and_blank_separator() {
803 let bytes = SseEvent::data("hello").to_bytes();
804 assert_eq!(s(&bytes), "data: hello\n\n");
805 }
806
807 #[test]
808 fn sse_multi_line_data_emits_one_data_line_per_source_line() {
809 // The browser reassembles by `\n`-joining the values, so a source
810 // string "a\nb\nc" must round-trip as the same string.
811 let bytes = SseEvent::data("a\nb\nc").to_bytes();
812 assert_eq!(s(&bytes), "data: a\ndata: b\ndata: c\n\n");
813 }
814
815 #[test]
816 fn sse_event_id_retry_data_all_in_one_frame() {
817 let bytes = SseEvent::data("payload")
818 .event("update")
819 .id("17")
820 .retry(Duration::from_millis(2500))
821 .to_bytes();
822 assert_eq!(
823 s(&bytes),
824 "event: update\nid: 17\nretry: 2500\ndata: payload\n\n",
825 );
826 }
827
828 #[test]
829 fn sse_event_and_id_strip_embedded_newlines() {
830 // The spec doesn't allow `\n` in single-line fields. We replace
831 // with a space rather than panic / reject — the value still
832 // travels, the field stays well-formed.
833 let bytes = SseEvent::data("ok").event("a\nb").id("x\ry").to_bytes();
834 assert_eq!(s(&bytes), "event: a b\nid: x y\ndata: ok\n\n");
835 }
836
837 #[test]
838 fn sse_comment_only_event_is_a_heartbeat() {
839 // Multi-line comments: each source line becomes its own `:` line.
840 // Empty comment lines emit a bare `:\n` (still a valid comment).
841 let single = SseEvent::comment("keep-alive").to_bytes();
842 assert_eq!(s(&single), ": keep-alive\n\n");
843 let multi = SseEvent::comment("line1\n\nline3").to_bytes();
844 assert_eq!(s(&multi), ": line1\n:\n: line3\n\n");
845 }
846
847 #[test]
848 fn sse_empty_data_string_still_emits_a_data_field() {
849 // `data("")` is explicit "send an empty data field"; absent data
850 // (a comment-only event) skips the field entirely. The previous
851 // test covers absent.
852 let bytes = SseEvent::data("").to_bytes();
853 assert_eq!(s(&bytes), "data: \n\n");
854 }
855
856 #[tokio::test]
857 async fn sse_stream_wraps_in_rich_with_correct_headers_and_streams_each_event() {
858 use futures_util::stream;
859 let events = stream::iter(vec![
860 SseEvent::data("first").id("1"),
861 SseEvent::data("second").event("update"),
862 ]);
863 let reply = sse(events);
864 let ReplyData::Rich(spec) = reply else {
865 panic!("sse() produces a Rich reply for the headers");
866 };
867 assert_eq!(
868 spec.headers.get("content-type"),
869 Some(&"text/event-stream".to_string())
870 );
871 assert_eq!(
872 spec.headers.get("cache-control"),
873 Some(&"no-cache".to_string())
874 );
875
876 let ReplyData::Stream(byte_stream) = spec.payload else {
877 panic!("sse() payload is a Stream of bytes");
878 };
879 // Drain the stream and concat to verify the wire framing of the
880 // whole conversation.
881 let chunks: Vec<Bytes> = byte_stream
882 .map(|r| r.expect("infallible map"))
883 .collect()
884 .await;
885 let joined: String = chunks
886 .iter()
887 .map(|b| std::str::from_utf8(b).unwrap())
888 .collect();
889 assert_eq!(
890 joined,
891 "id: 1\ndata: first\n\nevent: update\ndata: second\n\n",
892 );
893 }
894
895 #[test]
896 fn test_builder_pattern() {
897 let data = build_reply()
898 .status(StatusCode::ACCEPTED)
899 .header("X-Custom", "value123")
900 .body(bytes("application/zip", vec![1, 2, 3]))
901 .done();
902
903 match data {
904 ReplyData::Rich(spec) => {
905 assert_eq!(spec.status, Some(StatusCode::ACCEPTED));
906 assert_eq!(spec.headers.get("X-Custom"), Some(&"value123".to_string()));
907 assert_eq!(
908 spec.payload,
909 ReplyData::Bytes {
910 content_type: Cow::Borrowed("application/zip"),
911 data: vec![1, 2, 3],
912 }
913 );
914 }
915 _ => panic!("Expected Rich reply"),
916 }
917 }
918}