Skip to main content

actus_reply/
reply.rs

1//! Contains the user-facing API for creating replies.
2use bytes::Bytes;
3use futures_util::{Stream, StreamExt};
4use serde::Serialize;
5use serde_json::Value;
6use std::any::Any;
7use std::borrow::Cow;
8use std::collections::HashMap;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::time::Duration;
13
14// ===================== Core Types =====================
15
16/// A boxed, `Send + Sync` stream of body chunks, used by [`ReplyData::Stream`]
17/// and SSE responses.
18pub type BodyStream = Pin<Box<dyn Stream<Item = Result<Bytes, io::Error>> + Send + Sync>>;
19
20/// The concrete payload of a successful reply. The [`Finalizer`](crate::Finalizer)
21/// turns it into an HTTP response. Build one with the [`reply!`](crate::reply!)
22/// macro or the `reply::*` constructor functions rather than by hand.
23#[derive()]
24pub enum ReplyData {
25    /// A JSON body (`Content-Type: application/json`).
26    Json(Value),
27    /// Raw bytes plus the `Content-Type` to send. There is no default —
28    /// callers pick the type at the call site (`application/zip`,
29    /// `image/png`, `application/octet-stream`, …). Construct via
30    /// [`bytes()`](crate::bytes). The finalizer emits the header verbatim.
31    Bytes {
32        /// The `Content-Type` header value to send with the bytes.
33        content_type: Cow<'static, str>,
34        /// The raw response body.
35        data: Vec<u8>,
36    },
37    /// An empty body — `204 No Content` by default.
38    Empty,
39    /// A reply carrying explicit status / headers (a [`ReplySpec`]) on top of
40    /// one of the other payload kinds.
41    Rich(Box<ReplySpec>),
42    /// A streaming body, written out as the stream yields.
43    Stream(BodyStream),
44    /// An HTTP connection-upgrade reply (e.g. WebSocket). Produced by
45    /// `actus::ws::upgrade(...)` (with the `websocket` feature) and consumed
46    /// by the server, which completes the handshake (`101 Switching Protocols`)
47    /// and then hands the upgraded connection to the handler. The boxed value
48    /// is opaque server plumbing — don't construct or inspect it directly.
49    Upgrade(Box<dyn Any + Send>),
50}
51
52// Manual `Debug` implementation to handle the non-Debug variants.
53impl fmt::Debug for ReplyData {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        match self {
56            ReplyData::Json(j) => f.debug_tuple("Json").field(j).finish(),
57            ReplyData::Bytes { content_type, data } => f
58                .debug_struct("Bytes")
59                .field("content_type", content_type)
60                .field("data", data)
61                .finish(),
62            ReplyData::Empty => write!(f, "Empty"),
63            ReplyData::Rich(r) => f.debug_tuple("Rich").field(r).finish(),
64            ReplyData::Stream(_) => f.debug_tuple("Stream").field(&"...").finish(),
65            ReplyData::Upgrade(_) => f.debug_tuple("Upgrade").field(&"...").finish(),
66        }
67    }
68}
69
70// Manual `PartialEq` implementation. Streams are never equal.
71impl PartialEq for ReplyData {
72    fn eq(&self, other: &Self) -> bool {
73        match (self, other) {
74            (ReplyData::Json(l), ReplyData::Json(r)) => l == r,
75            (
76                ReplyData::Bytes {
77                    content_type: lc,
78                    data: ld,
79                },
80                ReplyData::Bytes {
81                    content_type: rc,
82                    data: rd,
83                },
84            ) => lc == rc && ld == rd,
85            (ReplyData::Empty, ReplyData::Empty) => true,
86            (ReplyData::Rich(l), ReplyData::Rich(r)) => l == r,
87            _ => false, // Streams and different variants are not equal
88        }
89    }
90}
91
92/// A handler's return type: `Ok(`[`ReplyData`]`)` for a success payload, or
93/// `Err(`[`WebError`]`)` for an error the framework renders as an HTTP error
94/// response. The [`reply!`](crate::reply!) macro produces the `Ok` side.
95pub type Reply = Result<ReplyData, WebError>;
96
97impl ReplyData {
98    /// Add a response header. Lifts `self` into [`ReplyData::Rich`] if the
99    /// current variant doesn't already carry headers (Json/Bytes/Empty/Stream),
100    /// so middleware `after` hooks (and any code that has a `ReplyData` it
101    /// wants to decorate) can stamp headers without manually wrangling
102    /// [`ReplySpec`]. The original payload is preserved.
103    ///
104    /// No-op on [`ReplyData::Upgrade`] — that variant is intercepted by the
105    /// server before the response body is finalized, and lifting it into
106    /// `Rich` would hide it from that interception.
107    pub fn add_header(&mut self, name: impl Into<String>, value: impl Into<String>) {
108        if matches!(self, ReplyData::Upgrade(_)) {
109            return;
110        }
111        let name = name.into();
112        let value = value.into();
113        if let ReplyData::Rich(spec) = self {
114            spec.headers.insert(name, value);
115            return;
116        }
117        let payload = std::mem::replace(self, ReplyData::Empty);
118        let mut headers = HashMap::new();
119        headers.insert(name, value);
120        *self = ReplyData::Rich(Box::new(ReplySpec {
121            payload,
122            status: None,
123            headers,
124        }));
125    }
126
127    /// Set the response status, lifting into [`ReplyData::Rich`] if needed
128    /// (see [`add_header`](Self::add_header)).
129    pub fn set_status(&mut self, status: http::StatusCode) {
130        if matches!(self, ReplyData::Upgrade(_)) {
131            return;
132        }
133        if let ReplyData::Rich(spec) = self {
134            spec.status = Some(status);
135            return;
136        }
137        let payload = std::mem::replace(self, ReplyData::Empty);
138        *self = ReplyData::Rich(Box::new(ReplySpec {
139            payload,
140            status: Some(status),
141            headers: HashMap::new(),
142        }));
143    }
144}
145
146// ===================== Simple Constructors =====================
147// All constructor functions must be `pub`.
148
149/// Build a JSON `ReplyData` from any [`Serialize`] value.
150///
151/// Panics on serialization failure. Most domain types (those with derived
152/// `Serialize` over stringy fields) never fail, but custom impls can.
153/// The `reply!` macro uses [`try_json`] internally to convert that case
154/// into a `WebError::Internal` (→ 500) rather than a handler-level panic
155/// that would drop the whole connection.
156///
157/// Prefer `reply!(my_value)` over calling this directly.
158pub fn json<T: Serialize>(val: T) -> ReplyData {
159    ReplyData::Json(serde_json::to_value(val).expect("serialize"))
160}
161
162/// Fallible counterpart of [`json`]. Returns the raw `serde_json::Error`
163/// on failure so callers (notably the `reply!` macro) can map it to a
164/// structured response instead of panicking.
165pub fn try_json<T: Serialize>(val: T) -> Result<ReplyData, serde_json::Error> {
166    Ok(ReplyData::Json(serde_json::to_value(val)?))
167}
168
169/// Build a `ReplyData::Bytes` with an explicit `Content-Type`. The
170/// content-type is required: there is no honest default for arbitrary
171/// bytes, and `application/octet-stream` masquerading as one tends to
172/// cause more bugs than it fixes (browsers guess; downstream HTTP
173/// caches mis-cache). Pass the right media type at the call site.
174///
175/// ```ignore
176/// reply::bytes("application/zip", zip_payload)
177/// reply::bytes("image/png", png_payload)
178/// ```
179pub fn bytes(content_type: impl Into<Cow<'static, str>>, data: impl Into<Vec<u8>>) -> ReplyData {
180    ReplyData::Bytes {
181        content_type: content_type.into(),
182        data: data.into(),
183    }
184}
185
186/// An empty-body reply (`204 No Content` unless a status is set via
187/// [`build_reply`]).
188pub fn empty() -> ReplyData {
189    ReplyData::Empty
190}
191
192/// Start a [`ReplySpec`] builder for a reply that needs an explicit status
193/// and/or headers on top of its body.
194pub fn build_reply() -> ReplySpec {
195    ReplySpec::new()
196}
197
198/// Build a streaming-body reply from a stream of byte chunks. The body is
199/// written out as the stream yields, not buffered.
200pub fn stream<S>(s: S) -> ReplyData
201where
202    S: Stream<Item = Result<Bytes, io::Error>> + Send + Sync + 'static,
203{
204    ReplyData::Stream(Box::pin(s))
205}
206
207// ===================== Server-Sent Events =====================
208
209/// One frame in a [Server-Sent Events][sse] stream. A frame may carry data
210/// (the common case), a comment (heartbeats and debug pings), and the
211/// optional `event` / `id` / `retry` fields. The wire encoding is handled
212/// by [`SseEvent::to_bytes`] — embedded newlines in `data` become multiple
213/// `data:` lines, the way the spec requires.
214///
215/// ```ignore
216/// SseEvent::data("hello").event("greeting").id("42")
217/// SseEvent::data("line1\nline2")  // becomes two `data: ...` lines
218/// SseEvent::comment("keep-alive")
219/// ```
220///
221/// [sse]: https://html.spec.whatwg.org/multipage/server-sent-events.html
222#[derive(Default, Debug, Clone)]
223pub struct SseEvent {
224    event: Option<String>,
225    id: Option<String>,
226    retry: Option<Duration>,
227    data: Option<String>,
228    comment: Option<String>,
229}
230
231impl SseEvent {
232    /// New event with a `data:` field. The most common entry point.
233    pub fn data(data: impl Into<String>) -> Self {
234        Self {
235            data: Some(data.into()),
236            ..Self::default()
237        }
238    }
239
240    /// New event with just a comment line (`: ...\n\n`). Comments are
241    /// ignored by `EventSource` clients but keep the connection alive
242    /// through intermediaries — useful as a heartbeat when the stream is
243    /// idle for long stretches.
244    pub fn comment(text: impl Into<String>) -> Self {
245        Self {
246            comment: Some(text.into()),
247            ..Self::default()
248        }
249    }
250
251    /// Set the event-type name (the `event:` field). Clients can route on
252    /// this with `EventSource.addEventListener(name, …)`. Embedded newlines
253    /// are replaced with spaces (the spec doesn't allow newlines in event
254    /// names).
255    pub fn event(mut self, name: impl Into<String>) -> Self {
256        self.event = Some(name.into());
257        self
258    }
259
260    /// Set the `id:` field. The browser will replay it on reconnect via
261    /// the `Last-Event-ID` request header — use it to support resumable
262    /// streams.
263    pub fn id(mut self, id: impl Into<String>) -> Self {
264        self.id = Some(id.into());
265        self
266    }
267
268    /// Set the `retry:` field — tells the client how long (in
269    /// milliseconds) to wait before reconnecting after a drop.
270    pub fn retry(mut self, d: Duration) -> Self {
271        self.retry = Some(d);
272        self
273    }
274
275    /// Encode this event in the SSE wire format, ending with the blank-line
276    /// frame separator (`\n`). Used by [`sse`]; you rarely need to call it
277    /// directly unless you're building a custom streaming pipeline.
278    pub fn to_bytes(&self) -> Bytes {
279        // Per HTML spec §9.2.6: lines starting with `:` are comments; each
280        // field is `field-name: value\n` (the leading space after the colon
281        // is consumed by the parser, so it's pretty but not significant);
282        // a blank line ends the event. `data` spanning multiple lines is
283        // expressed as one `data:` line per source line, then reassembled
284        // by the client with `\n` joins.
285        let mut out = String::new();
286        if let Some(event) = &self.event {
287            out.push_str("event: ");
288            out.push_str(&single_line(event));
289            out.push('\n');
290        }
291        if let Some(id) = &self.id {
292            out.push_str("id: ");
293            out.push_str(&single_line(id));
294            out.push('\n');
295        }
296        if let Some(retry) = self.retry {
297            // u128 → u64 saturate. >584M years of retry delay would
298            // overflow; nobody cares.
299            let ms = u64::try_from(retry.as_millis()).unwrap_or(u64::MAX);
300            out.push_str("retry: ");
301            out.push_str(&ms.to_string());
302            out.push('\n');
303        }
304        if let Some(comment) = &self.comment {
305            for line in comment.split('\n') {
306                out.push(':');
307                if !line.is_empty() {
308                    out.push(' ');
309                    out.push_str(line);
310                }
311                out.push('\n');
312            }
313        }
314        if let Some(data) = &self.data {
315            for line in data.split('\n') {
316                out.push_str("data: ");
317                out.push_str(line);
318                out.push('\n');
319            }
320        }
321        out.push('\n'); // blank line ends the event
322        Bytes::from(out)
323    }
324}
325
326/// Build a streaming Server-Sent Events response from a stream of
327/// [`SseEvent`]s. Sets `Content-Type: text/event-stream` and
328/// `Cache-Control: no-cache` (without which an intermediate cache might
329/// hold the live stream, defeating the point).
330///
331/// ```ignore
332/// use actus::prelude::*;
333/// use futures_util::stream;
334/// use std::time::Duration;
335///
336/// pub async fn updates(&self) -> Reply {
337///     let events = stream::iter(vec![
338///         SseEvent::data("tick").id("1"),
339///         SseEvent::data("tick").id("2").retry(Duration::from_secs(5)),
340///         SseEvent::comment("keep-alive"),
341///     ]);
342///     Ok(reply::sse(events))
343/// }
344/// ```
345///
346/// The stream is consumed lazily by the connection — frames are written
347/// out as the stream yields, not buffered. When the stream ends, the
348/// connection closes.
349pub fn sse<S>(events: S) -> ReplyData
350where
351    S: Stream<Item = SseEvent> + Send + Sync + 'static,
352{
353    let byte_stream = events.map(|ev| Ok::<Bytes, io::Error>(ev.to_bytes()));
354    let mut headers = HashMap::new();
355    headers.insert("content-type".to_string(), "text/event-stream".to_string());
356    headers.insert("cache-control".to_string(), "no-cache".to_string());
357    ReplyData::Rich(Box::new(ReplySpec {
358        payload: ReplyData::Stream(Box::pin(byte_stream)),
359        status: None,
360        headers,
361    }))
362}
363
364/// Replace `\n` and `\r` with `\x20` (space). For the `event` and `id`
365/// fields, where the SSE spec doesn't permit embedded newlines.
366fn single_line(s: &str) -> String {
367    s.replace(['\n', '\r'], " ")
368}
369
370// ===================== Builder Pattern =====================
371
372/// A builder for a reply with an explicit status and/or headers wrapped
373/// around a payload. Start one with [`build_reply`], chain the setters, and
374/// finish with [`ReplySpec::done`] (which yields a [`ReplyData::Rich`]).
375#[derive(Debug, PartialEq)]
376pub struct ReplySpec {
377    /// The body payload this spec decorates.
378    pub payload: ReplyData,
379    /// The status code to send, or `None` to use the payload's default.
380    pub status: Option<http::StatusCode>,
381    /// Extra response headers, as name → value.
382    pub headers: HashMap<String, String>,
383}
384
385impl Default for ReplySpec {
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391impl ReplySpec {
392    /// Start an empty spec (no body, no status, no headers).
393    pub fn new() -> Self {
394        Self {
395            payload: ReplyData::Empty,
396            status: None,
397            headers: HashMap::new(),
398        }
399    }
400
401    /// Set the body payload.
402    pub fn body(mut self, data: ReplyData) -> Self {
403        self.payload = data;
404        self
405    }
406
407    /// Set the response status code.
408    pub fn status(mut self, s: http::StatusCode) -> Self {
409        self.status = Some(s);
410        self
411    }
412
413    /// Add a response header.
414    pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
415        self.headers.insert(key.into(), value.into());
416        self
417    }
418
419    /// Finish the builder, producing a [`ReplyData::Rich`].
420    pub fn done(self) -> ReplyData {
421        ReplyData::Rich(Box::new(self))
422    }
423}
424
425// ===================== Ergonomic Macro =====================
426/// Creates a `Reply` (a `Result<ReplyData, WebError>`) for a success case.
427///
428/// Accepts any [`Serialize`] value; failures during JSON conversion become
429/// `Err(WebError::Internal(...))` (→ 500) rather than panicking. The
430/// underlying [`json`] function still panics for callers that prefer to
431/// surface bugs loudly; if you need that, call it directly.
432///
433/// ```ignore
434/// reply!(my_struct)                          // serialize as JSON
435/// reply!(json!({"status": "ok"}))            // inline JSON literal
436/// reply!()                                   // 204 No Content
437/// reply!(stream: byte_stream)                // streaming body
438/// reply!(sse: event_stream)                  // Server-Sent Events
439/// reply!(status = StatusCode::CREATED, value)
440/// reply!(
441///     status = StatusCode::CREATED,
442///     headers = { "Location": "/users/123" },
443///     value
444/// )
445/// ```
446#[macro_export]
447macro_rules! reply {
448    // With status and headers
449    (status = $status:expr, headers = { $($k:literal : $v:expr),* $(,)? }, $body:expr) => {{
450        match $crate::reply::try_json($body) {
451            Ok(__rd) => {
452                let mut spec = $crate::reply::build_reply()
453                    .status($status)
454                    .body(__rd);
455                $(
456                    spec = spec.header($k, &$v.to_string());
457                )*
458                Ok(spec.done())
459            }
460            Err(e) => Err($crate::reply::WebError::Internal(
461                ::std::format!("serialize response: {}", e),
462            )),
463        }
464    }};
465    // With just status
466    (status = $status:expr, $body:expr) => {
467        match $crate::reply::try_json($body) {
468            Ok(__rd) => Ok($crate::reply::build_reply()
469                .status($status)
470                .body(__rd)
471                .done()),
472            Err(e) => Err($crate::reply::WebError::Internal(
473                ::std::format!("serialize response: {}", e),
474            )),
475        }
476    };
477    // Stream
478    (stream: $s:expr) => {
479        Ok($crate::reply::stream($s))
480    };
481    // Server-Sent Events
482    (sse: $s:expr) => {
483        Ok($crate::reply::sse($s))
484    };
485    // Empty
486    () => {
487        Ok($crate::reply::empty())
488    };
489    // JSON from struct or literal
490    ($expr:expr) => {
491        match $crate::reply::try_json($expr) {
492            Ok(__rd) => Ok(__rd),
493            Err(e) => Err($crate::reply::WebError::Internal(
494                ::std::format!("serialize response: {}", e),
495            )),
496        }
497    };
498}
499
500// ===================== Error Type =====================
501
502/// Structured error response shaped like an [RFC 7807 Problem Details]
503/// document, plus arbitrary extension members that get serialized into
504/// the JSON body.
505///
506/// Use this when one of the simple `WebError::*` variants doesn't carry
507/// enough information (e.g., a validation failure that wants to name
508/// the failing field and rule).
509///
510/// ```ignore
511/// return Err(WebError::Problem(
512///     ProblemDetails::new(StatusCode::CONFLICT, "Validation")
513///         .detail("title is required")
514///         .extra("field", "data.title")
515///         .extra("rule", "required"),
516/// ));
517/// ```
518///
519/// Wire shape (`application/problem+json`):
520/// ```json
521/// { "status": 409, "title": "Validation", "detail": "title is required",
522///   "field": "data.title", "rule": "required" }
523/// ```
524///
525/// [RFC 7807 Problem Details]: https://datatracker.ietf.org/doc/html/rfc7807
526#[derive(Debug, Clone)]
527pub struct ProblemDetails {
528    /// The HTTP status code, serialized as the `status` member.
529    pub status: http::StatusCode,
530    /// A short, human-readable summary, serialized as the `title` member.
531    pub title: String,
532    /// An optional longer explanation, serialized as the `detail` member.
533    pub detail: Option<String>,
534    /// Extension members. Serialized alongside `status` / `title` / `detail`.
535    /// Keys that collide with the standard members are ignored.
536    ///
537    /// Boxed to keep `WebError` small: this map dominates the struct's size,
538    /// yet extensions are optional and most errors carry none — so the common
539    /// case pays one pointer, not a full inline map, on every `Result` return.
540    pub extra: Box<serde_json::Map<String, serde_json::Value>>,
541}
542
543impl ProblemDetails {
544    /// Construct a new problem with the given status code and title.
545    pub fn new(status: http::StatusCode, title: impl Into<String>) -> Self {
546        Self {
547            status,
548            title: title.into(),
549            detail: None,
550            extra: Box::new(serde_json::Map::new()),
551        }
552    }
553
554    /// Set the human-readable detail message.
555    pub fn detail(mut self, detail: impl Into<String>) -> Self {
556        self.detail = Some(detail.into());
557        self
558    }
559
560    /// Add an extension member. Use any JSON-serializable value via `.into()`,
561    /// `serde_json::json!(…)`, or `serde_json::to_value(…)`.
562    pub fn extra(mut self, key: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
563        self.extra.insert(key.into(), value.into());
564        self
565    }
566}
567
568impl fmt::Display for ProblemDetails {
569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570        match &self.detail {
571            Some(d) => write!(f, "{}: {}", self.title, d),
572            None => write!(f, "{}", self.title),
573        }
574    }
575}
576
577/// An error returned from a handler, a `prepare` hook, or middleware. Each
578/// variant maps to an HTTP status the framework renders as the response
579/// ([`WebError::Problem`] as an `application/problem+json` body).
580#[derive(thiserror::Error, Debug)]
581pub enum WebError {
582    /// `404 Not Found`.
583    #[error("Not found")]
584    NotFound,
585    /// `405`. Carries the methods the matched resource *does* accept, so the
586    /// response can include the `Allow` header RFC 7231 §6.5.5 requires.
587    /// Tokens are the canonical uppercase verb names (`"GET"`, `"POST"`, …);
588    /// `actus-server`'s router builds this list, so handlers rarely construct
589    /// it directly.
590    #[error("Method not allowed; allowed methods: {0:?}")]
591    MethodNotAllowed(Vec<&'static str>),
592    /// `400 Bad Request`. The string is a human-readable reason (e.g. a
593    /// malformed-body or missing-parameter explanation).
594    #[error("Bad request: {0}")]
595    BadRequest(String),
596    /// `413`. The request body exceeded the configured limit (see
597    /// `Server::with_max_body_bytes`).
598    #[error("Payload too large")]
599    PayloadTooLarge,
600    /// `429`. Rate limit exceeded. The optional [`Duration`] is the
601    /// retry-after hint — when present, the framework sets a
602    /// `Retry-After: <seconds>` header on the response, per RFC 7231
603    /// §7.1.3. Used by an application-supplied rate-limit middleware /
604    /// prepare hook; the framework itself doesn't ship a limiter (policy
605    /// belongs in the application; see the rate-limiting pattern in the
606    /// README).
607    #[error("Too many requests")]
608    TooManyRequests(Option<Duration>),
609    /// `504`. The request didn't complete within the configured timeout
610    /// (see `Server::with_request_timeout`). Mapped to `504 Gateway
611    /// Timeout`: the framework is acting as a gateway between the HTTP
612    /// connection and the handler/middleware stack, and the upstream
613    /// (handler) didn't respond in time. Produced by the framework when
614    /// the per-request timer elapses; handlers can also `Err(Timeout)`
615    /// from a downstream call that timed out themselves.
616    #[error("Request timeout")]
617    Timeout,
618    /// `503`. The server is overloaded — typically the framework-level
619    /// concurrency / buffer budget is exhausted (see
620    /// `Server::with_max_inflight_body_bytes`). The optional [`Duration`]
621    /// is a retry-after hint; when present, the framework sets a
622    /// `Retry-After: <seconds>` header on the response. Distinct from
623    /// `TooManyRequests` (429): `Busy` is "the server is overwhelmed
624    /// globally," not "this client has hit its specific limit."
625    #[error("Server busy")]
626    Busy(Option<Duration>),
627    /// `401 Unauthorized` — authentication is missing or invalid.
628    #[error("Unauthorized")]
629    Unauthorized,
630    /// `403 Forbidden` — authenticated but not permitted.
631    #[error("Forbidden")]
632    Forbidden,
633    /// `500 Internal Server Error`. The string is logged/returned as the
634    /// reason; use it for unexpected failures, not for client mistakes.
635    #[error("Internal error: {0}")]
636    Internal(String),
637    /// Structured error with an explicit status code, title, optional detail,
638    /// and arbitrary extension members. Renders as `application/problem+json`.
639    #[error("{0}")]
640    Problem(ProblemDetails),
641}
642
643// ===================== Tests =====================
644#[cfg(test)]
645mod tests {
646    // Note: bring everything from the parent module into scope for tests
647    use super::*;
648    use futures_util::{StreamExt, stream};
649    use http::StatusCode;
650    use serde_json::json;
651
652    // Test that the macro now correctly returns a Result
653    #[test]
654    fn test_macro_returns_result() {
655        let success_reply: Reply = reply!(json!({"status": "ok"}));
656        assert!(success_reply.is_ok());
657
658        let empty_reply: Reply = reply!();
659        assert!(empty_reply.is_ok());
660    }
661
662    #[test]
663    fn test_constructors() {
664        matches!(json(json!({"ok": true})), ReplyData::Json(_));
665        matches!(
666            bytes("application/octet-stream", vec![0, 1]),
667            ReplyData::Bytes { .. }
668        );
669        matches!(empty(), ReplyData::Empty);
670    }
671
672    #[tokio::test]
673    async fn test_stream_constructor() {
674        let test_stream = stream::once(async { Ok::<_, io::Error>(Bytes::from("test")) });
675        let reply_data = stream(test_stream);
676        if let ReplyData::Stream(mut s) = reply_data {
677            assert_eq!(s.next().await.unwrap().unwrap(), "test");
678        } else {
679            panic!("Expected a stream reply");
680        }
681    }
682
683    #[test]
684    fn test_reply_macro_variants() {
685        // JSON literal
686        let data_res: Reply = reply!(json!({"a":1}));
687        matches!(data_res.unwrap(), ReplyData::Json(_));
688
689        // Struct
690        #[derive(Serialize)]
691        struct TestStruct {
692            val: i32,
693        }
694        let data_res: Reply = reply!(TestStruct { val: 10 });
695        matches!(data_res.unwrap(), ReplyData::Json(_));
696
697        // Empty
698        let data_res: Reply = reply!();
699        matches!(data_res.unwrap(), ReplyData::Empty);
700
701        // With status
702        let data_res: Reply = reply!(status = StatusCode::CREATED, json!({"id": 1}));
703        match data_res.unwrap() {
704            ReplyData::Rich(spec) => {
705                assert_eq!(spec.status, Some(StatusCode::CREATED));
706            }
707            _ => panic!("Expected Rich reply"),
708        }
709
710        // With status and headers
711        let data_res: Reply =
712            reply!(status = StatusCode::CREATED, headers = {"X-Test": "value"}, json!({"id": 1}));
713        match data_res.unwrap() {
714            ReplyData::Rich(spec) => {
715                assert_eq!(spec.headers.get("X-Test"), Some(&"value".to_string()));
716            }
717            _ => panic!("Expected Rich reply"),
718        }
719    }
720
721    #[test]
722    fn add_header_lifts_non_rich_into_rich_preserving_payload() {
723        // Json → Rich(Json), header set.
724        let mut r = ReplyData::Json(json!({"x": 1}));
725        r.add_header("X-Trace-Id", "abc");
726        match &r {
727            ReplyData::Rich(spec) => {
728                assert!(matches!(spec.payload, ReplyData::Json(_)));
729                assert_eq!(spec.headers.get("X-Trace-Id"), Some(&"abc".to_string()));
730                assert_eq!(spec.status, None);
731            }
732            _ => panic!("expected Rich, got {r:?}"),
733        }
734
735        // Empty → Rich(Empty), header set.
736        let mut r = ReplyData::Empty;
737        r.add_header("X-K", "v");
738        assert!(matches!(&r, ReplyData::Rich(s) if matches!(s.payload, ReplyData::Empty)));
739
740        // Already Rich → mutate in place, payload untouched.
741        let mut r = ReplyData::Rich(Box::new(ReplySpec {
742            payload: ReplyData::Json(json!({"y": 2})),
743            status: Some(StatusCode::CREATED),
744            headers: HashMap::from([("Existing".into(), "1".into())]),
745        }));
746        r.add_header("New", "2");
747        match &r {
748            ReplyData::Rich(spec) => {
749                assert_eq!(spec.status, Some(StatusCode::CREATED));
750                assert_eq!(spec.headers.get("Existing"), Some(&"1".to_string()));
751                assert_eq!(spec.headers.get("New"), Some(&"2".to_string()));
752            }
753            _ => unreachable!(),
754        }
755    }
756
757    #[test]
758    fn set_status_lifts_or_mutates() {
759        let mut r = ReplyData::Json(json!({"x": 1}));
760        r.set_status(StatusCode::ACCEPTED);
761        match &r {
762            ReplyData::Rich(spec) => {
763                assert_eq!(spec.status, Some(StatusCode::ACCEPTED));
764                assert!(matches!(spec.payload, ReplyData::Json(_)));
765            }
766            _ => panic!("expected Rich"),
767        }
768
769        // Already Rich: mutate.
770        let mut r = ReplyData::Rich(Box::new(ReplySpec {
771            payload: ReplyData::Empty,
772            status: None,
773            headers: HashMap::new(),
774        }));
775        r.set_status(StatusCode::NOT_FOUND);
776        if let ReplyData::Rich(spec) = &r {
777            assert_eq!(spec.status, Some(StatusCode::NOT_FOUND));
778        }
779    }
780
781    #[test]
782    fn add_header_and_set_status_are_a_noop_on_upgrade() {
783        let task: Box<dyn std::any::Any + Send> = Box::new(42i32); // any `Any+Send` is fine for this test
784        let mut r = ReplyData::Upgrade(task);
785        r.add_header("X-K", "v");
786        r.set_status(StatusCode::CREATED);
787        assert!(matches!(r, ReplyData::Upgrade(_))); // unchanged
788    }
789
790    // ===== SSE encoding =====
791    //
792    // Each test pins one bit of the W3C wire spec. Decode-side reassembly
793    // would be the obvious sanity check, but a `tokio-tungstenite`-style
794    // SSE-parser dep isn't worth pulling in for what amounts to "did we
795    // emit the right bytes" — we read the encoded string directly.
796
797    fn s(b: &Bytes) -> &str {
798        std::str::from_utf8(b).expect("SSE encoding is always valid UTF-8")
799    }
800
801    #[test]
802    fn sse_data_only_event_emits_one_data_line_and_blank_separator() {
803        let bytes = SseEvent::data("hello").to_bytes();
804        assert_eq!(s(&bytes), "data: hello\n\n");
805    }
806
807    #[test]
808    fn sse_multi_line_data_emits_one_data_line_per_source_line() {
809        // The browser reassembles by `\n`-joining the values, so a source
810        // string "a\nb\nc" must round-trip as the same string.
811        let bytes = SseEvent::data("a\nb\nc").to_bytes();
812        assert_eq!(s(&bytes), "data: a\ndata: b\ndata: c\n\n");
813    }
814
815    #[test]
816    fn sse_event_id_retry_data_all_in_one_frame() {
817        let bytes = SseEvent::data("payload")
818            .event("update")
819            .id("17")
820            .retry(Duration::from_millis(2500))
821            .to_bytes();
822        assert_eq!(
823            s(&bytes),
824            "event: update\nid: 17\nretry: 2500\ndata: payload\n\n",
825        );
826    }
827
828    #[test]
829    fn sse_event_and_id_strip_embedded_newlines() {
830        // The spec doesn't allow `\n` in single-line fields. We replace
831        // with a space rather than panic / reject — the value still
832        // travels, the field stays well-formed.
833        let bytes = SseEvent::data("ok").event("a\nb").id("x\ry").to_bytes();
834        assert_eq!(s(&bytes), "event: a b\nid: x y\ndata: ok\n\n");
835    }
836
837    #[test]
838    fn sse_comment_only_event_is_a_heartbeat() {
839        // Multi-line comments: each source line becomes its own `:` line.
840        // Empty comment lines emit a bare `:\n` (still a valid comment).
841        let single = SseEvent::comment("keep-alive").to_bytes();
842        assert_eq!(s(&single), ": keep-alive\n\n");
843        let multi = SseEvent::comment("line1\n\nline3").to_bytes();
844        assert_eq!(s(&multi), ": line1\n:\n: line3\n\n");
845    }
846
847    #[test]
848    fn sse_empty_data_string_still_emits_a_data_field() {
849        // `data("")` is explicit "send an empty data field"; absent data
850        // (a comment-only event) skips the field entirely. The previous
851        // test covers absent.
852        let bytes = SseEvent::data("").to_bytes();
853        assert_eq!(s(&bytes), "data: \n\n");
854    }
855
856    #[tokio::test]
857    async fn sse_stream_wraps_in_rich_with_correct_headers_and_streams_each_event() {
858        use futures_util::stream;
859        let events = stream::iter(vec![
860            SseEvent::data("first").id("1"),
861            SseEvent::data("second").event("update"),
862        ]);
863        let reply = sse(events);
864        let ReplyData::Rich(spec) = reply else {
865            panic!("sse() produces a Rich reply for the headers");
866        };
867        assert_eq!(
868            spec.headers.get("content-type"),
869            Some(&"text/event-stream".to_string())
870        );
871        assert_eq!(
872            spec.headers.get("cache-control"),
873            Some(&"no-cache".to_string())
874        );
875
876        let ReplyData::Stream(byte_stream) = spec.payload else {
877            panic!("sse() payload is a Stream of bytes");
878        };
879        // Drain the stream and concat to verify the wire framing of the
880        // whole conversation.
881        let chunks: Vec<Bytes> = byte_stream
882            .map(|r| r.expect("infallible map"))
883            .collect()
884            .await;
885        let joined: String = chunks
886            .iter()
887            .map(|b| std::str::from_utf8(b).unwrap())
888            .collect();
889        assert_eq!(
890            joined,
891            "id: 1\ndata: first\n\nevent: update\ndata: second\n\n",
892        );
893    }
894
895    #[test]
896    fn test_builder_pattern() {
897        let data = build_reply()
898            .status(StatusCode::ACCEPTED)
899            .header("X-Custom", "value123")
900            .body(bytes("application/zip", vec![1, 2, 3]))
901            .done();
902
903        match data {
904            ReplyData::Rich(spec) => {
905                assert_eq!(spec.status, Some(StatusCode::ACCEPTED));
906                assert_eq!(spec.headers.get("X-Custom"), Some(&"value123".to_string()));
907                assert_eq!(
908                    spec.payload,
909                    ReplyData::Bytes {
910                        content_type: Cow::Borrowed("application/zip"),
911                        data: vec![1, 2, 3],
912                    }
913                );
914            }
915            _ => panic!("Expected Rich reply"),
916        }
917    }
918}