Skip to main content

connectrpc/
response.rs

1//! Handler request/response types.
2//!
3//! This module splits the old `Context` struct into a read-only
4//! [`RequestContext`] (passed *into* handlers) and a [`Response<B>`]
5//! wrapper (returned *from* handlers). The body type `B` is bounded by
6//! [`Encodable<M>`] in the generated trait so handlers can return either
7//! the owned message `M`, a borrowing `MView<'_>` /
8//! [`OwnedView<MView<'static>>`](buffa::view::OwnedView), or
9//! [`MaybeBorrowed`] for the conditional case.
10
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::time::{Duration, Instant};
14
15use buffa::Message;
16use buffa::view::{MessageView, ViewEncode};
17use bytes::Bytes;
18use futures::Stream;
19use http::HeaderMap;
20use http::header::{HeaderName, HeaderValue};
21use serde::Serialize;
22
23use crate::codec::CodecFormat;
24use crate::error::ConnectError;
25
26// ---------------------------------------------------------------------------
27// RequestContext
28// ---------------------------------------------------------------------------
29
30/// Read-only request context passed to RPC handlers.
31///
32/// Carries the request headers, parsed deadline, and any
33/// connection-scoped extensions (peer address, TLS certs, auth context)
34/// inserted by a tower layer in front of the service. Handlers do *not*
35/// return this; response-side metadata lives on [`Response`].
36///
37/// `RequestContext` is `#[non_exhaustive]`: construct it with
38/// [`RequestContext::new`] and the `with_*` builders, and read fields
39/// through the accessor methods (`headers()`, `deadline()`,
40/// `extensions()`, …). New request-scoped metadata can be added in minor
41/// releases without breaking downstream code.
42#[derive(Debug, Clone, Default)]
43#[non_exhaustive]
44pub struct RequestContext {
45    /// Request headers (after protocol-prefix stripping).
46    pub(crate) headers: HeaderMap,
47    /// Absolute request deadline parsed from the protocol's timeout header,
48    /// if any. Propagate to downstream calls.
49    ///
50    /// If a [`DeadlinePolicy`](crate::DeadlinePolicy) is configured on the
51    /// service, this is the *moderated* value — clamped to the policy's
52    /// `[min, max]` range, or the policy default when the client asserted
53    /// nothing — not the raw client header.
54    pub(crate) deadline: Option<Instant>,
55    /// Request extensions carried from the underlying `http::Request`.
56    pub(crate) extensions: http::Extensions,
57    /// Static metadata for the dispatched RPC method, when known.
58    pub(crate) spec: Option<crate::spec::Spec>,
59    /// The wire protocol negotiated for this request, when known.
60    pub(crate) protocol: Option<crate::Protocol>,
61    /// The procedure path the client requested, with a leading slash.
62    pub(crate) path: Option<String>,
63}
64
65impl RequestContext {
66    /// Create a new context with the given request headers.
67    pub fn new(headers: HeaderMap) -> Self {
68        Self {
69            headers,
70            deadline: None,
71            extensions: http::Extensions::new(),
72            spec: None,
73            protocol: None,
74            path: None,
75        }
76    }
77
78    /// Set the request deadline (absolute `Instant`).
79    #[must_use]
80    pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
81        self.deadline = deadline;
82        self
83    }
84
85    /// Attach request extensions captured from the underlying `http::Request`.
86    #[must_use]
87    pub fn with_extensions(mut self, extensions: http::Extensions) -> Self {
88        self.extensions = extensions;
89        self
90    }
91
92    /// Attach the static method metadata for the dispatched RPC.
93    #[must_use]
94    pub fn with_spec(mut self, spec: Option<crate::spec::Spec>) -> Self {
95        self.spec = spec;
96        self
97    }
98
99    /// Attach the negotiated wire protocol.
100    #[must_use]
101    pub fn with_protocol(mut self, protocol: Option<crate::Protocol>) -> Self {
102        self.protocol = protocol;
103        self
104    }
105
106    /// Attach the procedure path the client requested. The dispatch path
107    /// always supplies the leading-slash form (`"/package.Service/Method"`),
108    /// matching [`Spec::procedure`](crate::Spec::procedure); custom
109    /// dispatch shims and test fixtures should do the same so consumers
110    /// of [`path()`](Self::path) see a consistent shape.
111    #[must_use]
112    pub fn with_path(mut self, path: impl Into<String>) -> Self {
113        self.path = Some(path.into());
114        self
115    }
116
117    /// Request headers (after protocol-prefix stripping).
118    ///
119    /// For a single header lookup, [`header`](Self::header) is simpler.
120    pub fn headers(&self) -> &HeaderMap {
121        &self.headers
122    }
123
124    /// Get a request header value.
125    pub fn header(&self, key: impl http::header::AsHeaderName) -> Option<&HeaderValue> {
126        self.headers.get(key)
127    }
128
129    /// Absolute request deadline parsed from the protocol's timeout header
130    /// (`Connect-Timeout-Ms` or `grpc-timeout`), if the client asserted one.
131    ///
132    /// Propagate this to downstream calls so the whole call chain shares a
133    /// single budget. For the remaining budget as a `Duration`, see
134    /// [`time_remaining`](Self::time_remaining).
135    ///
136    /// If a [`DeadlinePolicy`](crate::DeadlinePolicy) is configured on the
137    /// service, this is the *moderated* value — clamped to the policy's
138    /// `[min, max]` range, or the policy default when the client asserted
139    /// nothing — not the raw client header.
140    pub fn deadline(&self) -> Option<Instant> {
141        self.deadline
142    }
143
144    /// Time remaining until the request deadline, saturating at zero.
145    ///
146    /// `None` if the client did not assert a timeout. Use this to budget
147    /// downstream calls — for example, subtract a margin before passing the
148    /// remainder as a downstream RPC's per-call timeout. See also issue
149    /// [#92](https://github.com/anthropics/connect-rust/issues/92) for
150    /// server-side deadline enforcement.
151    pub fn time_remaining(&self) -> Option<Duration> {
152        self.deadline
153            .map(|d| d.saturating_duration_since(Instant::now()))
154    }
155
156    /// Request extensions carried from the underlying `http::Request`.
157    ///
158    /// This is the passthrough for connection-scoped metadata that a tower
159    /// layer in front of the service can attach — TLS peer certificates,
160    /// remote socket address, auth context, etc. The dispatch path moves
161    /// `parts.extensions` here verbatim; handlers read it with
162    /// `ctx.extensions().get::<T>()`. For the well-known peer types, prefer
163    /// the typed accessors `peer_addr()` and `peer_certs()` (gated on the
164    /// `server` and `server-tls` features respectively) — they return
165    /// `None` instead of panicking when the transport didn't insert the
166    /// extension.
167    pub fn extensions(&self) -> &http::Extensions {
168        &self.extensions
169    }
170
171    /// Mutable access to the request extensions.
172    ///
173    /// Useful for code that constructs a `RequestContext` directly — e.g.
174    /// a custom dispatch shim or test fixture — and needs to insert
175    /// connection-scoped values before calling a handler.
176    ///
177    /// # Note
178    ///
179    /// Handlers receive `RequestContext` **by value**, so calling
180    /// `ctx.extensions_mut().insert(...)` inside a handler mutates a local
181    /// copy that the framework never sees again — it has no effect on the
182    /// dispatch path or on downstream layers. To pass values *into* a
183    /// handler from middleware, mutate `http::Request::extensions_mut()`
184    /// in the layer instead; the dispatcher moves request extensions into
185    /// `RequestContext` automatically before dispatch.
186    pub fn extensions_mut(&mut self) -> &mut http::Extensions {
187        &mut self.extensions
188    }
189
190    /// Static metadata for the dispatched RPC method, when known.
191    ///
192    /// Populated by code-generated `FooServiceServer<T>` dispatchers and
193    /// by the dynamic [`Router`](crate::Router) when registered through
194    /// the generated `register()` (which chains
195    /// [`Router::with_spec`](crate::Router::with_spec) per route).
196    /// `None` only for routes registered through the manual `route_*`
197    /// builders without a `with_spec` call. See [`path`](Self::path) for
198    /// the always-present procedure path.
199    pub fn spec(&self) -> Option<crate::spec::Spec> {
200        self.spec
201    }
202
203    /// The wire protocol negotiated for this request, when known.
204    ///
205    /// `None` if the runtime constructed the context outside the dispatch
206    /// path (e.g. unit tests calling handlers directly).
207    pub fn protocol(&self) -> Option<crate::Protocol> {
208        self.protocol
209    }
210
211    /// The procedure path the client requested, `"/package.Service/Method"`.
212    ///
213    /// Always present when constructed by the dispatch path: it is taken
214    /// from the request URI, so it is populated whenever a handler is
215    /// dispatched — including dispatch through the dynamic
216    /// [`Router`](crate::Router), which does not supply a
217    /// [`Spec`](crate::Spec). `None` only for hand-built contexts (unit
218    /// tests calling handlers directly, custom dispatch shims). Code that
219    /// must label or gate every request — auth interceptors, span
220    /// builders, rate limiters — should read `path()`, not `spec()`, and
221    /// treat `None` as a misconfigured or synthetic context rather than a
222    /// real RPC.
223    ///
224    /// Compare [`spec()`](Self::spec): that is the registered method's
225    /// *static* metadata, populated only when a generated
226    /// `FooServiceServer<T>` dispatcher resolved the route, and
227    /// [`Spec::procedure`](crate::Spec::procedure) is its `&'static str`
228    /// procedure name. When both are present they are identical strings;
229    /// `path()` exists for the cases where `spec()` cannot be.
230    ///
231    /// The leading slash is included to match `Spec::procedure`, the
232    /// `connect-go` `Spec.Procedure` convention, and `http::Uri::path()`
233    /// for any HTTP request that reached the dispatch layer. To compare
234    /// against [`Dispatcher::lookup`](crate::Dispatcher::lookup) keys
235    /// (which omit it), use `path.strip_prefix('/').unwrap_or(path)`.
236    pub fn path(&self) -> Option<&str> {
237        self.path.as_deref()
238    }
239
240    /// Remote peer socket address, if the transport recorded one.
241    ///
242    /// Present when the request arrived through
243    /// [`Server::serve`](crate::server::Server::serve) (plain) or
244    /// `Server::with_tls(...)` (TLS), or any integration that inserts
245    /// [`PeerAddr`](crate::server::PeerAddr) into the request extensions
246    /// (`connectrpc::axum::serve_tls` does).
247    /// Returns `None` otherwise (e.g. an axum app without a layer that
248    /// captures the connect info), so prefer this over
249    /// `ctx.extensions().get::<PeerAddr>().unwrap()` — the latter compiles,
250    /// passes in unit tests, and panics in production behind a transport
251    /// that didn't insert it.
252    #[cfg(feature = "server")]
253    #[cfg_attr(docsrs, doc(cfg(feature = "server")))]
254    pub fn peer_addr(&self) -> Option<std::net::SocketAddr> {
255        self.extensions
256            .get::<crate::server::PeerAddr>()
257            .map(|p| p.0)
258    }
259
260    /// TLS client certificate chain presented by the peer (leaf first), if any.
261    ///
262    /// Present only when the request arrived over a TLS listener that
263    /// requested a client certificate and the client presented one — see
264    /// [`Server::with_tls`](crate::server::Server::with_tls) and
265    /// `connectrpc::axum::serve_tls`. Returns `None` for plaintext
266    /// transports, for TLS without mutual auth, and for integrations that
267    /// don't insert [`PeerCerts`](crate::server::PeerCerts) into the
268    /// request extensions. Like [`peer_addr`](Self::peer_addr), prefer
269    /// this over a raw `extensions().get()` + `unwrap()`.
270    #[cfg(feature = "server-tls")]
271    #[cfg_attr(docsrs, doc(cfg(feature = "server-tls")))]
272    pub fn peer_certs(&self) -> Option<&[rustls::pki_types::CertificateDer<'static>]> {
273        self.extensions
274            .get::<crate::server::PeerCerts>()
275            .map(|p| &p.0[..])
276    }
277}
278
279// ---------------------------------------------------------------------------
280// Response<B>
281// ---------------------------------------------------------------------------
282
283/// Handler response wrapper: a body plus optional response headers,
284/// trailers, and compression hint.
285///
286/// `B` is bounded by [`Encodable<M>`] in the generated service trait so
287/// handlers can return the owned message `M` (the common case), or any
288/// type that encodes to the same wire bytes.
289///
290/// # Happy path
291///
292/// [`Response::ok`] is the bare-body shorthand:
293///
294/// ```rust,ignore
295/// async fn say(&self, _ctx: RequestContext, req: OwnedSayRequestView)
296///     -> ServiceResult<SayResponse>
297/// {
298///     Response::ok(SayResponse { sentence: reply, ..Default::default() })
299/// }
300/// ```
301///
302/// # With metadata
303///
304/// ```rust,ignore
305/// Ok(Response::new(reply)
306///     .with_header("x-request-id", id)
307///     .with_trailer("x-timing", elapsed))
308/// ```
309#[derive(Debug, Clone)]
310pub struct Response<B> {
311    /// The response body.
312    pub body: B,
313    /// Response headers to send before the body.
314    pub headers: HeaderMap,
315    /// Trailers to send after the body. Sent as HTTP/2 trailing
316    /// HEADERS for gRPC, or as `trailer-`-prefixed headers / the
317    /// EndStreamResponse JSON for Connect.
318    pub trailers: HeaderMap,
319    /// Whether to compress the response. `None` uses the server's
320    /// compression policy; `Some(false)` disables compression for this
321    /// response, `Some(true)` forces it.
322    pub compress: Option<bool>,
323}
324
325impl<B> Response<B> {
326    /// Shorthand for `Ok(Response::from(body))` — the bare-body happy
327    /// path.
328    ///
329    /// Use `Ok(Response::new(body).with_header(...))` when setting
330    /// response metadata; this constructor is for the common case of
331    /// "just the body".
332    pub fn ok(body: B) -> ServiceResult<B> {
333        Ok(Self::from(body))
334    }
335
336    /// Wrap a body with empty response metadata.
337    pub fn new(body: B) -> Self {
338        Self {
339            body,
340            headers: HeaderMap::new(),
341            trailers: HeaderMap::new(),
342            compress: None,
343        }
344    }
345
346    /// Append a response header.
347    ///
348    /// Uses [`HeaderMap::append`], so calling twice with the same name
349    /// accumulates values rather than replacing.
350    ///
351    /// # Panics
352    ///
353    /// Panics if `name` or `value` cannot be converted into the
354    /// corresponding header type (invalid characters, non-ASCII name,
355    /// etc.). Use [`try_with_header`](Self::try_with_header) for
356    /// dynamic values, or the `headers` field directly for full
357    /// control.
358    #[must_use]
359    pub fn with_header<K, V>(mut self, name: K, value: V) -> Self
360    where
361        K: TryInto<HeaderName>,
362        K::Error: std::fmt::Debug,
363        V: TryInto<HeaderValue>,
364        V::Error: std::fmt::Debug,
365    {
366        self.headers
367            .append(name.try_into().unwrap(), value.try_into().unwrap());
368        self
369    }
370
371    /// Append a response header, returning an error if `name` or
372    /// `value` is invalid.
373    ///
374    /// Non-panicking sibling of [`with_header`](Self::with_header) for
375    /// dynamic values. Uses [`HeaderMap::append`], so repeated calls
376    /// accumulate.
377    pub fn try_with_header<K, V>(mut self, name: K, value: V) -> Result<Self, http::Error>
378    where
379        K: TryInto<HeaderName>,
380        K::Error: Into<http::Error>,
381        V: TryInto<HeaderValue>,
382        V::Error: Into<http::Error>,
383    {
384        self.headers.append(
385            name.try_into().map_err(Into::into)?,
386            value.try_into().map_err(Into::into)?,
387        );
388        Ok(self)
389    }
390
391    /// Append a response trailer.
392    ///
393    /// Uses [`HeaderMap::append`], so calling twice with the same name
394    /// accumulates values rather than replacing.
395    ///
396    /// # Panics
397    ///
398    /// Panics if `name` or `value` cannot be converted into the
399    /// corresponding header type. Use
400    /// [`try_with_trailer`](Self::try_with_trailer) for dynamic
401    /// values, or the `trailers` field directly for full control.
402    #[must_use]
403    pub fn with_trailer<K, V>(mut self, name: K, value: V) -> Self
404    where
405        K: TryInto<HeaderName>,
406        K::Error: std::fmt::Debug,
407        V: TryInto<HeaderValue>,
408        V::Error: std::fmt::Debug,
409    {
410        self.trailers
411            .append(name.try_into().unwrap(), value.try_into().unwrap());
412        self
413    }
414
415    /// Append a response trailer, returning an error if `name` or
416    /// `value` is invalid.
417    ///
418    /// Non-panicking sibling of [`with_trailer`](Self::with_trailer)
419    /// for dynamic values. Uses [`HeaderMap::append`], so repeated
420    /// calls accumulate.
421    pub fn try_with_trailer<K, V>(mut self, name: K, value: V) -> Result<Self, http::Error>
422    where
423        K: TryInto<HeaderName>,
424        K::Error: Into<http::Error>,
425        V: TryInto<HeaderValue>,
426        V::Error: Into<http::Error>,
427    {
428        self.trailers.append(
429            name.try_into().map_err(Into::into)?,
430            value.try_into().map_err(Into::into)?,
431        );
432        Ok(self)
433    }
434
435    /// Override the server's compression policy for this response.
436    ///
437    /// `true` forces compression, `false` disables it, `None` (or
438    /// never calling this) defers to the server's policy.
439    #[must_use]
440    pub fn compress(mut self, enabled: impl Into<Option<bool>>) -> Self {
441        self.compress = enabled.into();
442        self
443    }
444
445    /// Replace the body, preserving headers/trailers/compression.
446    pub fn map_body<C>(self, f: impl FnOnce(B) -> C) -> Response<C> {
447        Response {
448            body: f(self.body),
449            headers: self.headers,
450            trailers: self.trailers,
451            compress: self.compress,
452        }
453    }
454}
455
456impl<B> From<B> for Response<B> {
457    fn from(body: B) -> Self {
458        Self::new(body)
459    }
460}
461
462impl<T> Response<ServiceStream<T>> {
463    /// Wrap a streaming body, boxing and unsize-coercing it to
464    /// [`ServiceStream<T>`]. Handles the explicit coercion that
465    /// `Ok(Box::pin(s).into())` would otherwise need.
466    pub fn stream(s: impl Stream<Item = Result<T, ConnectError>> + Send + 'static) -> Self {
467        Self::new(Box::pin(s))
468    }
469
470    /// Shorthand for `Ok(Response::stream(s))` — the bare-stream
471    /// happy path.
472    pub fn stream_ok(
473        s: impl Stream<Item = Result<T, ConnectError>> + Send + 'static,
474    ) -> ServiceResult<ServiceStream<T>> {
475        Ok(Self::stream(s))
476    }
477}
478
479/// Result type returned by handler trait methods.
480///
481/// `B` is the body type — typically the owned response message, or any
482/// `impl Encodable<M>`.
483pub type ServiceResult<B> = Result<Response<B>, ConnectError>;
484
485/// Boxed `Send` stream of `Result<T, ConnectError>`.
486///
487/// Used as the request type for client/bidi-streaming handlers and the
488/// body type for server/bidi-streaming responses.
489///
490/// For an inbound request stream, `None` means the client finished the
491/// stream cleanly; `Some(Err(..))` means the stream ended abnormally — a
492/// decode failure or a request body that failed mid-upload (truncated or
493/// broken transport). Treat only `None` as a complete stream; propagating
494/// the error with `?` fails the RPC, which is the right default for
495/// handlers that aggregate inbound messages.
496pub type ServiceStream<T> = Pin<Box<dyn Stream<Item = Result<T, ConnectError>> + Send>>;
497
498// ---------------------------------------------------------------------------
499// Encodable<M>
500// ---------------------------------------------------------------------------
501
502/// Encodes to the same wire bytes as proto message `M`.
503///
504/// This is the bound on the response body in generated trait methods.
505/// Provided implementations:
506/// - the owned `M` itself (blanket `M: Message + Serialize` below);
507/// - `MView<'_>` and [`OwnedView<MView<'static>>`](buffa::view::OwnedView),
508///   emitted by codegen per RPC output type;
509/// - [`MaybeBorrowed<M, V>`] for handlers that conditionally return
510///   either;
511/// - [`StreamMessage<M>`](crate::StreamMessage) for echoing inbound
512///   stream items back out (re-encodes from the retained wire bytes);
513/// - [`PreEncoded`] for handlers that encode a non-`'static` view
514///   internally and pass the bytes across the handler boundary.
515///
516/// # Contract
517///
518/// Implementations must produce bytes that decode as a valid `M` in
519/// the given format.
520///
521/// `encode` is fallible: the owned-message impl never errors. The
522/// view-body impls are proto-only (view types lack `Serialize`) and return
523/// [`ErrorCode::Unimplemented`](crate::ErrorCode::Unimplemented) for
524/// `CodecFormat::Json`. [`PreEncoded`] supports both codecs but the JSON
525/// path is a slow fallback (decode + re-serialize) — see its
526/// `# Codec behaviour` doc.
527pub trait Encodable<M> {
528    /// Encode `self` as wire bytes for `M` in the requested format.
529    fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError>;
530}
531
532impl<M: Message + Serialize> Encodable<M> for M {
533    fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
534        match codec {
535            CodecFormat::Proto => Ok(self.encode_to_bytes()),
536            CodecFormat::Json => serde_json::to_vec(self).map(Bytes::from).map_err(|e| {
537                ConnectError::internal(format!("failed to encode JSON response: {e}"))
538            }),
539        }
540    }
541}
542
543/// Encode a view body via [`ViewEncode`] for [`CodecFormat::Proto`], or
544/// return [`ErrorCode::Unimplemented`](crate::ErrorCode::Unimplemented)
545/// for [`CodecFormat::Json`] (view types don't implement `Serialize`).
546///
547/// Used by codegen-emitted `impl Encodable<Foo> for FooView<'_>` /
548/// `impl Encodable<Foo> for OwnedView<FooView<'static>>` blocks. A
549/// runtime blanket on [`OwnedView`](buffa::view::OwnedView) would
550/// conflict with the `M: Message + Serialize` blanket above (coherence
551/// can't rule out upstream adding `Message`/`Serialize` for
552/// `OwnedView`), so the impls are emitted per output type instead.
553#[doc(hidden)]
554pub fn encode_view_body<'a, V: ViewEncode<'a>>(
555    view: &V,
556    codec: CodecFormat,
557) -> Result<Bytes, ConnectError> {
558    match codec {
559        CodecFormat::Proto => Ok(view.encode_to_bytes()),
560        CodecFormat::Json => Err(ConnectError::unimplemented(
561            "view-body responses do not support the JSON codec; return the owned message type for JSON-serving handlers",
562        )),
563    }
564}
565
566// ---------------------------------------------------------------------------
567// MaybeBorrowed
568// ---------------------------------------------------------------------------
569
570/// Either an owned message `M` or a borrowing view `V`, both
571/// [`Encodable<M>`].
572///
573/// Use this when a handler conditionally passes the request through
574/// unchanged (return the view, zero allocations) versus modifying it
575/// (clone to owned, mutate, return owned). The single concrete return
576/// type satisfies the `impl Encodable<M>` bound on the generated trait.
577///
578/// This is not [`std::borrow::Cow`]: `V` is a separate
579/// [`Encodable<M>`] type (e.g. `MView<'a>` or `OwnedView<MView>`),
580/// not a `&M`, and there is no `ToOwned` relationship between the
581/// arms — each encodes independently.
582///
583/// ```rust,ignore
584/// async fn redact(&self, _ctx: RequestContext, req: OwnedRecordView)
585///     -> ServiceResult<MaybeBorrowed<Record, OwnedRecordView>>
586/// {
587///     if req.email.is_empty() && req.ssn.is_empty() {
588///         // pass-through: re-encode straight from the request bytes
589///         return Response::ok(MaybeBorrowed::Borrowed(req));
590///     }
591///     let mut owned = req.to_owned_message();
592///     owned.email.clear();
593///     owned.ssn.clear();
594///     Response::ok(MaybeBorrowed::Owned(owned))
595/// }
596/// ```
597///
598/// # Codec compatibility
599///
600/// The `Borrowed` arm only encodes for [`CodecFormat::Proto`]. JSON
601/// clients receive an `unimplemented` error; if your service must
602/// support JSON, return `Owned` (or just the owned message) on every
603/// path.
604#[derive(Debug, Clone)]
605pub enum MaybeBorrowed<M, V> {
606    /// An owned message body.
607    Owned(M),
608    /// A borrowing body that encodes to the same wire bytes as `M`.
609    Borrowed(V),
610}
611
612impl<M, V> Encodable<M> for MaybeBorrowed<M, V>
613where
614    // satisfied via the blanket impl for M: Message + Serialize
615    M: Encodable<M>,
616    V: Encodable<M>,
617{
618    fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
619        match self {
620            Self::Owned(m) => m.encode(codec),
621            Self::Borrowed(v) => v.encode(codec),
622        }
623    }
624}
625
626// ---------------------------------------------------------------------------
627// PreEncoded
628// ---------------------------------------------------------------------------
629
630/// Pre-encoded protobuf response body for message type `M`.
631///
632/// Use when the handler builds and encodes a borrowing view internally —
633/// e.g. a `FooView<'a>` borrowing from a local snapshot — rather than
634/// returning the view itself. The `'static` bound on `Handler::Body` (and
635/// on streaming items, see the `use<Self>` note in the
636/// [`StreamingHandler`](crate::StreamingHandler) docs) means a view with a
637/// non-`'static` lifetime can't cross the handler
638/// boundary; `PreEncoded` carries the bytes across instead.
639///
640/// The `M` type parameter is a compile-time witness for which RPC output
641/// type the bytes encode. Three construction paths, in decreasing order
642/// of compile-time guarantee:
643///
644/// - [`from_message(&m)`](PreEncoded::from_message) — encodes an owned
645///   `M`; the receiver type *is* the witness.
646/// - [`from_view(&view)`](PreEncoded::from_view) — encodes a borrowing
647///   view; `MessageView::Owned = M` is the witness.
648/// - [`from_bytes_unchecked(bytes)`](PreEncoded::from_bytes_unchecked) —
649///   wraps already-encoded bytes from elsewhere (a cache, storage,
650///   another service). No witness; you're asserting the bytes decode as
651///   `M`.
652///
653/// `from_message` and `from_view` produce the same `PreEncoded<M>` type,
654/// so a stream can mix items built either way (e.g. a cache-hit path
655/// returning the cached owned `M`, a cache-miss path building a view from
656/// a snapshot) — the same role [`MaybeBorrowed`] fills for unary
657/// handlers, but with the encode happening eagerly inside the stream
658/// body.
659///
660/// # Streaming example
661///
662/// The motivating shape — a server-streaming handler that builds and
663/// encodes per-item views borrowing from a local store snapshot, then
664/// yields the bytes:
665///
666/// ```rust,ignore
667/// use connectrpc::{PreEncoded, Response, RequestContext, ServiceResult, ServiceStream};
668///
669/// async fn watch(
670///     &self,
671///     _ctx: RequestContext,
672///     req: OwnedWatchRequestView,
673/// ) -> ServiceResult<ServiceStream<PreEncoded<WatchResponse>>> {
674///     let store = self.store.clone();
675///     let stream = futures::stream::unfold(store, |store| async move {
676///         let snapshot = store.load();
677///         // `view` borrows from `snapshot`; encode while the borrow is live.
678///         let view = build_view_from_snapshot(&snapshot);
679///         let item = PreEncoded::from_view(&view);
680///         Some((Ok(item), store))
681///     });
682///     Response::stream_ok(stream)
683/// }
684/// ```
685///
686/// For a unary handler, the same pattern applies — return
687/// `ServiceResult<PreEncoded<MyResponse>>`.
688///
689/// # Codec behaviour
690///
691/// `PreEncoded` is optimized for the `proto` codec: the wrapped bytes are
692/// passed through verbatim with no re-encoding. The motivating use case
693/// (high-throughput fanout) is proto-only.
694///
695/// For the `json` codec, `PreEncoded` falls back to decoding the bytes as
696/// `M` and re-serializing as JSON. **This is correct but not fast** — a
697/// full proto decode plus a JSON serialize per response (or per stream
698/// item). The fallback exists so that registering a `PreEncoded` handler
699/// on a JSON-capable router degrades gracefully instead of returning a
700/// runtime error. If your service serves a meaningful JSON traffic share,
701/// build and return the owned message (or [`MaybeBorrowed::Owned`])
702/// instead — that lets the codec layer pick the right encoding without
703/// the proto round-trip.
704///
705/// If the wrapped bytes don't decode as `M` (e.g. you passed mismatched
706/// bytes to [`from_bytes_unchecked`](PreEncoded::from_bytes_unchecked)),
707/// the JSON path returns an [`internal`](crate::ErrorCode::Internal)
708/// error at the server; the proto path passes the bytes through and the
709/// client sees a decode error.
710///
711/// ## Codec-dependent fidelity
712///
713/// The proto path is byte-exact; the JSON path is **only as faithful as
714/// decoding the bytes to an owned `M` and re-serializing**. The two
715/// diverge when the wrapped bytes carry information not representable in
716/// `M` itself:
717///
718/// - **Unknown fields** (proto bytes encoded against a *newer* schema
719///   than the server's `M`) are preserved on the proto path and dropped
720///   on the JSON path. This matters only for
721///   [`from_bytes_unchecked`](PreEncoded::from_bytes_unchecked) bytes
722///   sourced externally; bytes produced by
723///   [`from_message`](PreEncoded::from_message) /
724///   [`from_view`](PreEncoded::from_view) cannot carry unknown fields.
725/// - **Non-canonical proto encodings** (out-of-order fields, redundant
726///   length prefixes, repeated non-`repeated` fields) are passed through
727///   verbatim on the proto path and normalized by the decode on the JSON
728///   path.
729///
730/// If byte-exact fidelity across codecs matters (e.g. signature
731/// verification, content-addressed storage), do not use `PreEncoded` with
732/// JSON-capable routes.
733///
734/// ## Cost is selected by the client
735///
736/// The codec is chosen per-request by the client's `Content-Type` header.
737/// For a service that adopted `PreEncoded` for proto throughput, a client
738/// sending JSON requests (intentionally, by misconfiguration, or
739/// adversarially) shifts those requests onto the slow decode-reserialize
740/// path. The marginal cost is bounded by the response size and is usually
741/// small relative to the handler's own work, but a streaming RPC pays it
742/// per item. A service that wants to *enforce* proto-only should reject
743/// non-proto `Content-Type` at the middleware layer (e.g. an axum
744/// middleware that returns `415 Unsupported Media Type`) rather than rely
745/// on the body type — that keeps the policy outside the handler and
746/// applies before the request body is read.
747///
748/// # Contract
749///
750/// `PreEncoded` is a transparent byte container — it does not validate
751/// the wrapped bytes on the proto path. [`PreEncoded::from_view`] gives a
752/// compile-time witness via `MessageView::Owned = M`;
753/// [`PreEncoded::from_bytes_unchecked`] trusts the caller. Returning bytes
754/// that don't decode as `M` will produce decode errors on the client (or,
755/// for JSON clients, an `internal` error from the server-side fallback
756/// decode).
757#[must_use = "PreEncoded must be returned from a handler to take effect"]
758pub struct PreEncoded<M> {
759    bytes: Bytes,
760    // `fn() -> M` keeps `PreEncoded<M>` `Send + Sync` regardless of `M`'s
761    // auto-trait surface (the bytes are owned; `M` is only a type witness).
762    _marker: PhantomData<fn() -> M>,
763}
764
765// Manual derives: `#[derive(Debug, Clone)]` would add a spurious `M: Debug` /
766// `M: Clone` bound (PhantomData carries it through to the where-clause).
767impl<M> std::fmt::Debug for PreEncoded<M> {
768    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
769        f.debug_tuple("PreEncoded").field(&self.bytes).finish()
770    }
771}
772
773impl<M> Clone for PreEncoded<M> {
774    fn clone(&self) -> Self {
775        Self {
776            bytes: self.bytes.clone(),
777            _marker: PhantomData,
778        }
779    }
780}
781
782impl<M: Message> PreEncoded<M> {
783    /// Encode an owned `M` to protobuf bytes.
784    ///
785    /// The receiver type is the compile-time witness — there's no way to
786    /// produce a `PreEncoded<M>` from a `&Other`. This is the right
787    /// constructor when the handler builds an owned `M` and wants to
788    /// share the encoding (e.g. encode once, clone the
789    /// [`Bytes`]-backed `PreEncoded` for N readers in a fanout) or when a
790    /// stream needs to mix owned-message and view-built items under a
791    /// single `type Item = PreEncoded<M>`.
792    ///
793    /// Equivalent to `PreEncoded::from_bytes_unchecked(m.encode_to_bytes())`,
794    /// but with `M` enforced by the type system rather than asserted by
795    /// the caller.
796    pub fn from_message(msg: &M) -> Self {
797        Self {
798            bytes: msg.encode_to_bytes(),
799            _marker: PhantomData,
800        }
801    }
802
803    /// Encode a [`ViewEncode`] view to protobuf bytes.
804    ///
805    /// The `MessageView<'a, Owned = M>` bound is the compile-time witness
806    /// that the bytes decode as `M` — passing `OtherView<'a>` won't
807    /// type-check unless `OtherView::Owned == M`.
808    pub fn from_view<'a, V>(view: &V) -> Self
809    where
810        V: ViewEncode<'a> + MessageView<'a, Owned = M>,
811    {
812        Self {
813            bytes: view.encode_to_bytes(),
814            _marker: PhantomData,
815        }
816    }
817
818    /// Wrap already-encoded protobuf bytes without validating them.
819    ///
820    /// Use when the bytes come from somewhere with no structural type
821    /// guarantee — a byte cache, a blob store, a sidecar service. You are
822    /// asserting the bytes decode as `M`; the proto path does not
823    /// validate this. In debug builds, the bytes are decoded once as a
824    /// `debug_assert!` to surface mismatches early.
825    ///
826    /// Prefer [`from_message`](PreEncoded::from_message) when you have an
827    /// owned `M` in hand and [`from_view`](PreEncoded::from_view) when
828    /// you have a view — both enforce `M` at compile time.
829    ///
830    /// Zero-copy for `Bytes` and `Vec<u8>`; passing `&[u8]` allocates and
831    /// copies.
832    pub fn from_bytes_unchecked(bytes: impl Into<Bytes>) -> Self {
833        let bytes = bytes.into();
834        debug_assert!(
835            M::decode_from_slice(&bytes).is_ok(),
836            "PreEncoded::from_bytes_unchecked: bytes do not decode as {}",
837            std::any::type_name::<M>(),
838        );
839        Self {
840            bytes,
841            _marker: PhantomData,
842        }
843    }
844}
845
846/// Encode an owned `M` to a [`PreEncoded<M>`].
847///
848/// Equivalent to [`PreEncoded::from_message`]; provided for `.into()`
849/// ergonomics.
850impl<M: Message> From<&M> for PreEncoded<M> {
851    fn from(msg: &M) -> Self {
852        Self::from_message(msg)
853    }
854}
855
856// Coherence: this impl is non-overlapping with the
857// `impl<M: Message + Serialize> Encodable<M> for M` blanket above for
858// structural reasons. For the two to overlap, some `T` would have to satisfy
859// both `T: Encodable<T>` (blanket, with `T: Message + Serialize`) and
860// `T = PreEncoded<U>` with `T: Encodable<U>` (this impl) for the *same* trait
861// parameter — i.e. `T = U`, i.e. `PreEncoded<U> = U`, which is infinite. So
862// the impls cannot overlap even if a future change made `PreEncoded` a
863// `Message` (which would only add `PreEncoded<M>: Encodable<PreEncoded<M>>` —
864// a different trait instantiation). No invariant to maintain here.
865//
866// The `M: Message + Serialize` bound matches the blanket so a `PreEncoded<M>`
867// is `Encodable<M>` exactly when an owned `M` would be — and is what makes the
868// JSON fallback path possible (decode as `M`, re-serialize).
869impl<M: Message + Serialize> Encodable<M> for PreEncoded<M> {
870    fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
871        match codec {
872            CodecFormat::Proto => Ok(self.bytes.clone()),
873            // Slow path: decode the proto bytes back to `M`, then serialize
874            // as JSON. This exists for correctness (JSON clients should get
875            // a response, not `unimplemented`), not throughput; the owned
876            // message path skips the proto round-trip and is preferable for
877            // JSON-heavy services. See the type-level docs.
878            CodecFormat::Json => {
879                let msg = M::decode_from_slice(&self.bytes).map_err(|e| {
880                    ConnectError::internal(format!(
881                        "pre-encoded bytes did not decode as {}: {e}",
882                        std::any::type_name::<M>(),
883                    ))
884                })?;
885                serde_json::to_vec(&msg).map(Bytes::from).map_err(|e| {
886                    ConnectError::internal(format!("failed to encode JSON response: {e}"))
887                })
888            }
889        }
890    }
891}
892
893// ---------------------------------------------------------------------------
894// EncodedResponse (dispatcher boundary)
895// ---------------------------------------------------------------------------
896
897/// A [`Response`] with the body already encoded to bytes.
898///
899/// This is what the [`Dispatcher`](crate::Dispatcher) returns to the
900/// protocol layer — encoding happens inside the dispatcher so the body
901/// type stays generic across the trait boundary.
902pub type EncodedResponse = Response<Bytes>;
903
904impl<B> Response<B> {
905    /// Encode the body to bytes via [`Encodable<M>`], preserving
906    /// response metadata.
907    #[doc(hidden)] // exposed for dispatcher::codegen (generated code)
908    pub fn encode<M>(self, codec: CodecFormat) -> Result<EncodedResponse, ConnectError>
909    where
910        B: Encodable<M>,
911    {
912        let bytes = self.body.encode(codec)?;
913        Ok(Response {
914            body: bytes,
915            headers: self.headers,
916            trailers: self.trailers,
917            compress: self.compress,
918        })
919    }
920}
921
922#[cfg(test)]
923mod tests {
924    use super::*;
925    use buffa_types::google::protobuf::StringValue;
926
927    #[tokio::test]
928    async fn response_stream_ok_shorthand() {
929        use futures::StreamExt;
930        let r: ServiceResult<ServiceStream<i32>> =
931            Response::stream_ok(futures::stream::iter([Ok(7)]));
932        let collected: Vec<_> = r.unwrap().body.map(|x| x.unwrap()).collect().await;
933        assert_eq!(collected, vec![7]);
934    }
935
936    #[test]
937    fn compress_tristate() {
938        assert_eq!(Response::new(()).compress(true).compress, Some(true));
939        assert_eq!(Response::new(()).compress(false).compress, Some(false));
940        assert_eq!(Response::new(()).compress(None).compress, None);
941    }
942
943    #[test]
944    fn header_accepts_str() {
945        let mut h = HeaderMap::new();
946        h.insert("x-custom", HeaderValue::from_static("v"));
947        let ctx = RequestContext::new(h);
948        assert_eq!(ctx.header("x-custom").unwrap(), "v");
949    }
950
951    #[test]
952    fn response_ok_shorthand() {
953        let r: ServiceResult<u32> = Response::ok(42);
954        let r = r.unwrap();
955        assert_eq!(r.body, 42);
956        assert!(r.headers.is_empty());
957    }
958
959    #[test]
960    fn response_from_body() {
961        let r: Response<StringValue> = StringValue::from("hi").into();
962        assert_eq!(r.body.value, "hi");
963        assert!(r.headers.is_empty());
964        assert!(r.trailers.is_empty());
965        assert_eq!(r.compress, None);
966    }
967
968    #[test]
969    fn response_builder() {
970        let r = Response::new(StringValue::from("hi"))
971            .with_header("x-a", "1")
972            .with_trailer("x-b", "2")
973            .compress(true);
974        assert_eq!(r.headers.get("x-a").unwrap(), "1");
975        assert_eq!(r.trailers.get("x-b").unwrap(), "2");
976        assert_eq!(r.compress, Some(true));
977    }
978
979    #[test]
980    fn encodable_owned_proto() {
981        let m = StringValue::from("hello");
982        let bytes = Encodable::<StringValue>::encode(&m, CodecFormat::Proto).unwrap();
983        assert_eq!(
984            StringValue::decode_from_slice(&bytes).unwrap().value,
985            "hello"
986        );
987    }
988
989    #[test]
990    fn encodable_owned_json() {
991        let m = StringValue::from("hello");
992        let bytes = Encodable::<StringValue>::encode(&m, CodecFormat::Json).unwrap();
993        assert_eq!(&bytes[..], b"\"hello\"");
994    }
995
996    #[test]
997    fn response_encode() {
998        let r = Response::new(StringValue::from("hi")).with_header("x-a", "1");
999        let enc = r.encode::<StringValue>(CodecFormat::Proto).unwrap();
1000        assert_eq!(enc.headers.get("x-a").unwrap(), "1");
1001        assert_eq!(
1002            StringValue::decode_from_slice(&enc.body).unwrap().value,
1003            "hi"
1004        );
1005    }
1006
1007    #[test]
1008    fn request_context_new() {
1009        let mut h = HeaderMap::new();
1010        h.insert("x-custom", HeaderValue::from_static("v"));
1011        let ctx = RequestContext::new(h);
1012        assert_eq!(
1013            ctx.header(HeaderName::from_static("x-custom")).unwrap(),
1014            "v"
1015        );
1016        assert_eq!(ctx.headers().get("x-custom").unwrap(), "v");
1017        assert!(ctx.deadline().is_none());
1018        assert!(ctx.time_remaining().is_none());
1019        assert!(ctx.extensions().is_empty());
1020    }
1021
1022    #[test]
1023    fn request_context_with_deadline() {
1024        let d = Instant::now();
1025        let ctx = RequestContext::new(HeaderMap::new()).with_deadline(Some(d));
1026        assert_eq!(ctx.deadline(), Some(d));
1027    }
1028
1029    #[test]
1030    fn request_context_time_remaining_saturates_at_zero() {
1031        // Deadline in the past — `time_remaining()` should clamp to zero,
1032        // not underflow.
1033        let past = Instant::now() - Duration::from_secs(60);
1034        let ctx = RequestContext::new(HeaderMap::new()).with_deadline(Some(past));
1035        assert_eq!(ctx.time_remaining(), Some(Duration::ZERO));
1036    }
1037
1038    #[test]
1039    fn request_context_time_remaining_future() {
1040        let future = Instant::now() + Duration::from_secs(60);
1041        let ctx = RequestContext::new(HeaderMap::new()).with_deadline(Some(future));
1042        let remaining = ctx.time_remaining().unwrap();
1043        // Some elapsed time between `with_deadline` and the assertion is
1044        // expected; just bound it.
1045        assert!(remaining > Duration::from_secs(55));
1046        assert!(remaining <= Duration::from_secs(60));
1047    }
1048
1049    #[test]
1050    fn request_context_extensions_mut() {
1051        #[derive(Clone, Debug, PartialEq)]
1052        struct Tag(u8);
1053        let mut ctx = RequestContext::new(HeaderMap::new());
1054        ctx.extensions_mut().insert(Tag(1));
1055        assert_eq!(ctx.extensions().get::<Tag>(), Some(&Tag(1)));
1056    }
1057
1058    #[cfg(feature = "server")]
1059    #[test]
1060    fn request_context_peer_addr_absent() {
1061        // No transport inserted `PeerAddr`; the typed accessor returns
1062        // `None` rather than panicking.
1063        let ctx = RequestContext::new(HeaderMap::new());
1064        assert_eq!(ctx.peer_addr(), None);
1065    }
1066
1067    #[cfg(feature = "server")]
1068    #[test]
1069    fn request_context_peer_addr_present() {
1070        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1071        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
1072        let mut ext = http::Extensions::new();
1073        ext.insert(crate::server::PeerAddr(addr));
1074        let ctx = RequestContext::new(HeaderMap::new()).with_extensions(ext);
1075        assert_eq!(ctx.peer_addr(), Some(addr));
1076    }
1077
1078    #[cfg(feature = "server-tls")]
1079    #[test]
1080    fn request_context_peer_certs_absent() {
1081        let ctx = RequestContext::new(HeaderMap::new());
1082        assert!(ctx.peer_certs().is_none());
1083    }
1084
1085    #[test]
1086    fn response_map_body_preserves_metadata() {
1087        let r = Response::new(2u32)
1088            .with_header("x-h", "1")
1089            .with_trailer("x-t", "2")
1090            .compress(true);
1091        let r = r.map_body(|n| n.to_string());
1092        assert_eq!(r.body, "2");
1093        assert_eq!(r.headers.get("x-h").unwrap(), "1");
1094        assert_eq!(r.trailers.get("x-t").unwrap(), "2");
1095        assert_eq!(r.compress, Some(true));
1096    }
1097
1098    #[tokio::test]
1099    async fn response_stream_yields_items() {
1100        use futures::StreamExt;
1101        let r: Response<ServiceStream<i32>> =
1102            Response::stream(futures::stream::iter([Ok(1), Ok(2), Ok(3)]));
1103        let collected: Vec<_> = r.body.map(|x| x.unwrap()).collect().await;
1104        assert_eq!(collected, vec![1, 2, 3]);
1105    }
1106
1107    #[test]
1108    #[should_panic]
1109    fn with_header_panics_on_invalid_name() {
1110        let _ = Response::new(()).with_header("invalid header name", "v");
1111    }
1112
1113    #[test]
1114    fn try_with_header_errors_on_invalid_name() {
1115        let err = Response::new(())
1116            .try_with_header("invalid header name", "v")
1117            .unwrap_err();
1118        assert!(err.is::<http::header::InvalidHeaderName>());
1119    }
1120
1121    #[test]
1122    fn try_with_header_ok_appends() {
1123        let r = Response::new(())
1124            .try_with_header("x-a", "1")
1125            .unwrap()
1126            .try_with_header("x-a", "2")
1127            .unwrap();
1128        let vals: Vec<_> = r.headers.get_all("x-a").iter().collect();
1129        assert_eq!(vals.len(), 2);
1130    }
1131
1132    #[test]
1133    fn try_with_trailer_errors_on_invalid_value() {
1134        // Newlines are not permitted in header values.
1135        let err = Response::new(())
1136            .try_with_trailer("x-t", "bad\nvalue")
1137            .unwrap_err();
1138        assert!(err.is::<http::header::InvalidHeaderValue>());
1139    }
1140
1141    #[test]
1142    fn encode_view_body_proto() {
1143        use buffa_types::google::protobuf::__buffa::view::StringValueView;
1144        let v = StringValueView {
1145            value: "hi",
1146            ..Default::default()
1147        };
1148        let bytes = encode_view_body(&v, CodecFormat::Proto).unwrap();
1149        assert_eq!(StringValue::decode_from_slice(&bytes).unwrap().value, "hi");
1150    }
1151
1152    #[test]
1153    fn encode_view_body_json_errors() {
1154        use buffa_types::google::protobuf::__buffa::view::StringValueView;
1155        let v = StringValueView::default();
1156        let err = encode_view_body(&v, CodecFormat::Json).unwrap_err();
1157        assert_eq!(err.code, crate::ErrorCode::Unimplemented);
1158        assert!(err.message.as_deref().unwrap().contains("JSON codec"));
1159    }
1160
1161    // Manual Encodable<StringValue> impl modelling what codegen emits
1162    // for FooView<'_>. Shared by the MaybeBorrowed tests below.
1163    struct V<'a>(buffa_types::google::protobuf::__buffa::view::StringValueView<'a>);
1164    impl Encodable<StringValue> for V<'_> {
1165        fn encode(&self, c: CodecFormat) -> Result<Bytes, ConnectError> {
1166            encode_view_body(&self.0, c)
1167        }
1168    }
1169
1170    #[test]
1171    fn maybe_borrowed_dispatch() {
1172        use buffa_types::google::protobuf::__buffa::view::StringValueView;
1173        let owned: MaybeBorrowed<StringValue, V<'_>> =
1174            MaybeBorrowed::Owned(StringValue::from("owned"));
1175        let borrowed = MaybeBorrowed::Borrowed(V(StringValueView {
1176            value: "view",
1177            ..Default::default()
1178        }));
1179        assert_eq!(
1180            StringValue::decode_from_slice(&owned.encode(CodecFormat::Proto).unwrap())
1181                .unwrap()
1182                .value,
1183            "owned"
1184        );
1185        assert_eq!(
1186            StringValue::decode_from_slice(&borrowed.encode(CodecFormat::Proto).unwrap())
1187                .unwrap()
1188                .value,
1189            "view"
1190        );
1191    }
1192
1193    #[test]
1194    fn maybe_borrowed_borrowed_json_unimplemented() {
1195        use buffa_types::google::protobuf::__buffa::view::StringValueView;
1196        let borrowed: MaybeBorrowed<StringValue, V<'_>> =
1197            MaybeBorrowed::Borrowed(V(StringValueView::default()));
1198        let err = borrowed.encode(CodecFormat::Json).unwrap_err();
1199        assert_eq!(err.code, crate::ErrorCode::Unimplemented);
1200    }
1201
1202    #[test]
1203    fn pre_encoded_proto_round_trip() {
1204        let m = StringValue::from("pre-encoded");
1205        let bytes = m.encode_to_bytes();
1206        let body = PreEncoded::<StringValue>::from_bytes_unchecked(bytes.clone());
1207        let out = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1208        assert_eq!(out, bytes);
1209        assert_eq!(
1210            StringValue::decode_from_slice(&out).unwrap().value,
1211            "pre-encoded"
1212        );
1213    }
1214
1215    #[test]
1216    fn pre_encoded_json_decodes_then_serializes() {
1217        // The JSON path round-trips: proto bytes → owned `M` → JSON. Slow,
1218        // but correct — see the `# Codec behaviour` doc on `PreEncoded`.
1219        let m = StringValue::from("hi");
1220        let body = PreEncoded::<StringValue>::from_bytes_unchecked(m.encode_to_bytes());
1221        let out = Encodable::<StringValue>::encode(&body, CodecFormat::Json).unwrap();
1222        // Output should match what serializing the owned message directly
1223        // would produce.
1224        assert_eq!(out, Bytes::from(serde_json::to_vec(&m).unwrap()));
1225    }
1226
1227    #[test]
1228    fn pre_encoded_json_decode_failure_is_internal_error() {
1229        // `from_bytes_unchecked` is unvalidated on the proto path. The JSON
1230        // fallback necessarily decodes; if that fails (the wrapped bytes
1231        // were never a valid `M`), the server-side `internal` error surfaces
1232        // closer to the construction bug than the proto path would.
1233        //
1234        // Field 1 (LEN) declares 99 bytes but only 2 follow — guaranteed
1235        // truncated for `StringValue`.
1236        let body = PreEncoded::<StringValue> {
1237            bytes: Bytes::from_static(&[0x0a, 0x63, b'h', b'i']),
1238            _marker: std::marker::PhantomData,
1239        };
1240        let err = Encodable::<StringValue>::encode(&body, CodecFormat::Json).unwrap_err();
1241        assert_eq!(err.code, crate::ErrorCode::Internal);
1242        assert!(err.message.as_deref().unwrap().contains("did not decode"));
1243    }
1244
1245    #[test]
1246    fn pre_encoded_from_view() {
1247        use buffa::view::ViewEncode;
1248        use buffa_types::google::protobuf::__buffa::view::StringValueView;
1249        let v = StringValueView {
1250            value: "from-view",
1251            ..Default::default()
1252        };
1253        // `from_view` infers `M = StringValue` from `StringValueView::Owned`.
1254        let body = PreEncoded::from_view(&v);
1255        let out = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1256        assert_eq!(out, v.encode_to_bytes());
1257        assert_eq!(
1258            StringValue::decode_from_slice(&out).unwrap().value,
1259            "from-view"
1260        );
1261    }
1262
1263    #[test]
1264    fn pre_encoded_from_message() {
1265        let m = StringValue::from("from-message");
1266        // `from_message` infers `M` from the receiver — no annotation.
1267        let body = PreEncoded::from_message(&m);
1268        let out = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1269        assert_eq!(out, m.encode_to_bytes());
1270
1271        // `From<&M>` is the same conversion via `.into()`.
1272        let body2: PreEncoded<StringValue> = (&m).into();
1273        let out2 = Encodable::<StringValue>::encode(&body2, CodecFormat::Proto).unwrap();
1274        assert_eq!(out2, out);
1275    }
1276
1277    #[test]
1278    fn pre_encoded_codec_fidelity_diverges_on_unknown_fields() {
1279        // Documents the codec-dependent fidelity caveat: the proto path
1280        // is byte-exact (unknown fields preserved); the JSON path
1281        // round-trips through `M` (unknown fields dropped). Only relevant
1282        // for `from_bytes_unchecked` bytes sourced externally.
1283        //
1284        // Wire bytes: field 1 = "hi" (the known `StringValue.value`),
1285        // plus field 2 = varint 42 (unknown to `StringValue`).
1286        let bytes_with_unknown =
1287            Bytes::from_static(&[0x0a, 0x02, b'h', b'i', /* tag 2 varint */ 0x10, 42]);
1288        let body = PreEncoded::<StringValue> {
1289            bytes: bytes_with_unknown.clone(),
1290            _marker: std::marker::PhantomData,
1291        };
1292
1293        // Proto: byte-exact passthrough, unknown field preserved.
1294        let proto = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1295        assert_eq!(proto, bytes_with_unknown);
1296
1297        // JSON: round-trips through `StringValue`, which drops the
1298        // unknown field. Output equals serializing the bare known
1299        // message.
1300        let json = Encodable::<StringValue>::encode(&body, CodecFormat::Json).unwrap();
1301        assert_eq!(
1302            json,
1303            Bytes::from(serde_json::to_vec(&StringValue::from("hi")).unwrap())
1304        );
1305    }
1306
1307    #[test]
1308    fn pre_encoded_is_typed() {
1309        // `PreEncoded<M>` only implements `Encodable<M>` — the type witness
1310        // means `PreEncoded<StringValue>` cannot be used where
1311        // `Encodable<Int32Value>` is required. Verified at compile time;
1312        // this test just exercises the happy path for both types.
1313        use buffa_types::google::protobuf::Int32Value;
1314        let s = PreEncoded::<StringValue>::from_bytes_unchecked(
1315            StringValue::from("a").encode_to_bytes(),
1316        );
1317        let i =
1318            PreEncoded::<Int32Value>::from_bytes_unchecked(Int32Value::from(1).encode_to_bytes());
1319        Encodable::<StringValue>::encode(&s, CodecFormat::Proto).unwrap();
1320        Encodable::<Int32Value>::encode(&i, CodecFormat::Proto).unwrap();
1321        // The following would not compile:
1322        //   Encodable::<Int32Value>::encode(&s, CodecFormat::Proto)
1323    }
1324
1325    #[test]
1326    #[cfg(debug_assertions)]
1327    #[should_panic(expected = "do not decode as")]
1328    fn pre_encoded_from_bytes_unchecked_debug_asserts() {
1329        // In debug builds, `from_bytes_unchecked` decodes once to surface
1330        // mismatched bytes early. Field 1 (LEN) declares 99 bytes; only 2
1331        // follow.
1332        let _ = PreEncoded::<StringValue>::from_bytes_unchecked(Bytes::from_static(&[
1333            0x0a, 0x63, b'h', b'i',
1334        ]));
1335    }
1336
1337    #[test]
1338    fn request_context_with_extensions() {
1339        #[derive(Clone, Debug, PartialEq)]
1340        struct Peer(u32);
1341        let mut ext = http::Extensions::new();
1342        ext.insert(Peer(7));
1343        let ctx = RequestContext::new(HeaderMap::new()).with_extensions(ext);
1344        assert_eq!(ctx.extensions().get::<Peer>(), Some(&Peer(7)));
1345    }
1346
1347    #[test]
1348    fn request_context_with_spec_and_protocol() {
1349        use crate::spec::{Spec, StreamType};
1350
1351        // Default-constructed context has neither.
1352        let ctx = RequestContext::new(HeaderMap::new());
1353        assert_eq!(ctx.spec(), None);
1354        assert_eq!(ctx.protocol(), None);
1355
1356        // Both round-trip through the builders.
1357        const SPEC: Spec = Spec::server("/pkg.Svc/M", StreamType::Unary);
1358        let ctx = RequestContext::new(HeaderMap::new())
1359            .with_spec(Some(SPEC))
1360            .with_protocol(Some(crate::Protocol::Grpc));
1361        assert_eq!(ctx.spec(), Some(SPEC));
1362        assert_eq!(ctx.protocol(), Some(crate::Protocol::Grpc));
1363
1364        // Builders accept `None` to clear (matches `with_deadline`).
1365        let ctx = ctx.with_spec(None).with_protocol(None);
1366        assert_eq!(ctx.spec(), None);
1367        assert_eq!(ctx.protocol(), None);
1368    }
1369
1370    #[test]
1371    fn request_context_with_path() {
1372        // Hand-built contexts (tests, custom dispatchers) have no path.
1373        let ctx = RequestContext::new(HeaderMap::new());
1374        assert_eq!(ctx.path(), None);
1375
1376        // Round-trips through the builder.
1377        let ctx = RequestContext::new(HeaderMap::new()).with_path("/pkg.Svc/M");
1378        assert_eq!(ctx.path(), Some("/pkg.Svc/M"));
1379
1380        // The builder takes ownership (Into<String>) so callers can pass
1381        // borrowed or owned without an extra clone.
1382        let owned = String::from("/pkg.Svc/Other");
1383        let ctx = RequestContext::new(HeaderMap::new()).with_path(owned);
1384        assert_eq!(ctx.path(), Some("/pkg.Svc/Other"));
1385
1386        // The builder does not normalize or validate — `Some("")` is
1387        // preserved verbatim. The dispatch path always supplies a non-empty
1388        // leading-slash form; `Some("")` only reaches consumers from a
1389        // misconfigured custom dispatch shim, which is a wiring bug they
1390        // should surface rather than silently coerce to `None`.
1391        let ctx = RequestContext::new(HeaderMap::new()).with_path("");
1392        assert_eq!(ctx.path(), Some(""));
1393    }
1394}