connectrpc/response.rs
1//! Handler request/response types.
2//!
3//! This module splits the old `Context` struct into a read-only
4//! [`RequestContext`] (passed *into* handlers) and a [`Response<B>`]
5//! wrapper (returned *from* handlers). The body type `B` is bounded by
6//! [`Encodable<M>`] in the generated trait so handlers can return either
7//! the owned message `M`, a borrowing `MView<'_>` /
8//! [`OwnedView<MView<'static>>`](buffa::view::OwnedView), or
9//! [`MaybeBorrowed`] for the conditional case.
10
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::time::{Duration, Instant};
14
15use buffa::Message;
16use buffa::view::{MessageView, ViewEncode};
17use bytes::Bytes;
18use futures::Stream;
19use http::HeaderMap;
20use http::header::{HeaderName, HeaderValue};
21use serde::Serialize;
22
23use crate::codec::CodecFormat;
24use crate::error::ConnectError;
25
26// ---------------------------------------------------------------------------
27// RequestContext
28// ---------------------------------------------------------------------------
29
30/// Read-only request context passed to RPC handlers.
31///
32/// Carries the request headers, parsed deadline, and any
33/// connection-scoped extensions (peer address, TLS certs, auth context)
34/// inserted by a tower layer in front of the service. Handlers do *not*
35/// return this; response-side metadata lives on [`Response`].
36///
37/// `RequestContext` is `#[non_exhaustive]`: construct it with
38/// [`RequestContext::new`] and the `with_*` builders, and read fields
39/// through the accessor methods (`headers()`, `deadline()`,
40/// `extensions()`, …). New request-scoped metadata can be added in minor
41/// releases without breaking downstream code.
42#[derive(Debug, Clone, Default)]
43#[non_exhaustive]
44pub struct RequestContext {
45 /// Request headers (after protocol-prefix stripping).
46 pub(crate) headers: HeaderMap,
47 /// Absolute request deadline parsed from the protocol's timeout header,
48 /// if any. Propagate to downstream calls.
49 ///
50 /// If a [`DeadlinePolicy`](crate::DeadlinePolicy) is configured on the
51 /// service, this is the *moderated* value — clamped to the policy's
52 /// `[min, max]` range, or the policy default when the client asserted
53 /// nothing — not the raw client header.
54 pub(crate) deadline: Option<Instant>,
55 /// Request extensions carried from the underlying `http::Request`.
56 pub(crate) extensions: http::Extensions,
57 /// Static metadata for the dispatched RPC method, when known.
58 pub(crate) spec: Option<crate::spec::Spec>,
59 /// The wire protocol negotiated for this request, when known.
60 pub(crate) protocol: Option<crate::Protocol>,
61 /// The procedure path the client requested, with a leading slash.
62 pub(crate) path: Option<String>,
63}
64
65impl RequestContext {
66 /// Create a new context with the given request headers.
67 pub fn new(headers: HeaderMap) -> Self {
68 Self {
69 headers,
70 deadline: None,
71 extensions: http::Extensions::new(),
72 spec: None,
73 protocol: None,
74 path: None,
75 }
76 }
77
78 /// Set the request deadline (absolute `Instant`).
79 #[must_use]
80 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
81 self.deadline = deadline;
82 self
83 }
84
85 /// Attach request extensions captured from the underlying `http::Request`.
86 #[must_use]
87 pub fn with_extensions(mut self, extensions: http::Extensions) -> Self {
88 self.extensions = extensions;
89 self
90 }
91
92 /// Attach the static method metadata for the dispatched RPC.
93 #[must_use]
94 pub fn with_spec(mut self, spec: Option<crate::spec::Spec>) -> Self {
95 self.spec = spec;
96 self
97 }
98
99 /// Attach the negotiated wire protocol.
100 #[must_use]
101 pub fn with_protocol(mut self, protocol: Option<crate::Protocol>) -> Self {
102 self.protocol = protocol;
103 self
104 }
105
106 /// Attach the procedure path the client requested. The dispatch path
107 /// always supplies the leading-slash form (`"/package.Service/Method"`),
108 /// matching [`Spec::procedure`](crate::Spec::procedure); custom
109 /// dispatch shims and test fixtures should do the same so consumers
110 /// of [`path()`](Self::path) see a consistent shape.
111 #[must_use]
112 pub fn with_path(mut self, path: impl Into<String>) -> Self {
113 self.path = Some(path.into());
114 self
115 }
116
117 /// Request headers (after protocol-prefix stripping).
118 ///
119 /// For a single header lookup, [`header`](Self::header) is simpler.
120 pub fn headers(&self) -> &HeaderMap {
121 &self.headers
122 }
123
124 /// Get a request header value.
125 pub fn header(&self, key: impl http::header::AsHeaderName) -> Option<&HeaderValue> {
126 self.headers.get(key)
127 }
128
129 /// Absolute request deadline parsed from the protocol's timeout header
130 /// (`Connect-Timeout-Ms` or `grpc-timeout`), if the client asserted one.
131 ///
132 /// Propagate this to downstream calls so the whole call chain shares a
133 /// single budget. For the remaining budget as a `Duration`, see
134 /// [`time_remaining`](Self::time_remaining).
135 ///
136 /// If a [`DeadlinePolicy`](crate::DeadlinePolicy) is configured on the
137 /// service, this is the *moderated* value — clamped to the policy's
138 /// `[min, max]` range, or the policy default when the client asserted
139 /// nothing — not the raw client header.
140 pub fn deadline(&self) -> Option<Instant> {
141 self.deadline
142 }
143
144 /// Time remaining until the request deadline, saturating at zero.
145 ///
146 /// `None` if the client did not assert a timeout. Use this to budget
147 /// downstream calls — for example, subtract a margin before passing the
148 /// remainder as a downstream RPC's per-call timeout. See also issue
149 /// [#92](https://github.com/anthropics/connect-rust/issues/92) for
150 /// server-side deadline enforcement.
151 pub fn time_remaining(&self) -> Option<Duration> {
152 self.deadline
153 .map(|d| d.saturating_duration_since(Instant::now()))
154 }
155
156 /// Request extensions carried from the underlying `http::Request`.
157 ///
158 /// This is the passthrough for connection-scoped metadata that a tower
159 /// layer in front of the service can attach — TLS peer certificates,
160 /// remote socket address, auth context, etc. The dispatch path moves
161 /// `parts.extensions` here verbatim; handlers read it with
162 /// `ctx.extensions().get::<T>()`. For the well-known peer types, prefer
163 /// the typed accessors `peer_addr()` and `peer_certs()` (gated on the
164 /// `server` and `server-tls` features respectively) — they return
165 /// `None` instead of panicking when the transport didn't insert the
166 /// extension.
167 pub fn extensions(&self) -> &http::Extensions {
168 &self.extensions
169 }
170
171 /// Mutable access to the request extensions.
172 ///
173 /// Useful for code that constructs a `RequestContext` directly — e.g.
174 /// a custom dispatch shim or test fixture — and needs to insert
175 /// connection-scoped values before calling a handler.
176 ///
177 /// # Note
178 ///
179 /// Handlers receive `RequestContext` **by value**, so calling
180 /// `ctx.extensions_mut().insert(...)` inside a handler mutates a local
181 /// copy that the framework never sees again — it has no effect on the
182 /// dispatch path or on downstream layers. To pass values *into* a
183 /// handler from middleware, mutate `http::Request::extensions_mut()`
184 /// in the layer instead; the dispatcher moves request extensions into
185 /// `RequestContext` automatically before dispatch.
186 pub fn extensions_mut(&mut self) -> &mut http::Extensions {
187 &mut self.extensions
188 }
189
190 /// Static metadata for the dispatched RPC method, when known.
191 ///
192 /// Populated by code-generated `FooServiceServer<T>` dispatchers and
193 /// by the dynamic [`Router`](crate::Router) when registered through
194 /// the generated `register()` (which chains
195 /// [`Router::with_spec`](crate::Router::with_spec) per route).
196 /// `None` only for routes registered through the manual `route_*`
197 /// builders without a `with_spec` call. See [`path`](Self::path) for
198 /// the always-present procedure path.
199 pub fn spec(&self) -> Option<crate::spec::Spec> {
200 self.spec
201 }
202
203 /// The wire protocol negotiated for this request, when known.
204 ///
205 /// `None` if the runtime constructed the context outside the dispatch
206 /// path (e.g. unit tests calling handlers directly).
207 pub fn protocol(&self) -> Option<crate::Protocol> {
208 self.protocol
209 }
210
211 /// The procedure path the client requested, `"/package.Service/Method"`.
212 ///
213 /// Always present when constructed by the dispatch path: it is taken
214 /// from the request URI, so it is populated whenever a handler is
215 /// dispatched — including dispatch through the dynamic
216 /// [`Router`](crate::Router), which does not supply a
217 /// [`Spec`](crate::Spec). `None` only for hand-built contexts (unit
218 /// tests calling handlers directly, custom dispatch shims). Code that
219 /// must label or gate every request — auth interceptors, span
220 /// builders, rate limiters — should read `path()`, not `spec()`, and
221 /// treat `None` as a misconfigured or synthetic context rather than a
222 /// real RPC.
223 ///
224 /// Compare [`spec()`](Self::spec): that is the registered method's
225 /// *static* metadata, populated only when a generated
226 /// `FooServiceServer<T>` dispatcher resolved the route, and
227 /// [`Spec::procedure`](crate::Spec::procedure) is its `&'static str`
228 /// procedure name. When both are present they are identical strings;
229 /// `path()` exists for the cases where `spec()` cannot be.
230 ///
231 /// The leading slash is included to match `Spec::procedure`, the
232 /// `connect-go` `Spec.Procedure` convention, and `http::Uri::path()`
233 /// for any HTTP request that reached the dispatch layer. To compare
234 /// against [`Dispatcher::lookup`](crate::Dispatcher::lookup) keys
235 /// (which omit it), use `path.strip_prefix('/').unwrap_or(path)`.
236 pub fn path(&self) -> Option<&str> {
237 self.path.as_deref()
238 }
239
240 /// Remote peer socket address, if the transport recorded one.
241 ///
242 /// Present when the request arrived through
243 /// [`Server::serve`](crate::server::Server::serve) (plain) or
244 /// `Server::with_tls(...)` (TLS), or any integration that inserts
245 /// [`PeerAddr`](crate::server::PeerAddr) into the request extensions
246 /// (`connectrpc::axum::serve_tls` does).
247 /// Returns `None` otherwise (e.g. an axum app without a layer that
248 /// captures the connect info), so prefer this over
249 /// `ctx.extensions().get::<PeerAddr>().unwrap()` — the latter compiles,
250 /// passes in unit tests, and panics in production behind a transport
251 /// that didn't insert it.
252 #[cfg(feature = "server")]
253 #[cfg_attr(docsrs, doc(cfg(feature = "server")))]
254 pub fn peer_addr(&self) -> Option<std::net::SocketAddr> {
255 self.extensions
256 .get::<crate::server::PeerAddr>()
257 .map(|p| p.0)
258 }
259
260 /// TLS client certificate chain presented by the peer (leaf first), if any.
261 ///
262 /// Present only when the request arrived over a TLS listener that
263 /// requested a client certificate and the client presented one — see
264 /// [`Server::with_tls`](crate::server::Server::with_tls) and
265 /// `connectrpc::axum::serve_tls`. Returns `None` for plaintext
266 /// transports, for TLS without mutual auth, and for integrations that
267 /// don't insert [`PeerCerts`](crate::server::PeerCerts) into the
268 /// request extensions. Like [`peer_addr`](Self::peer_addr), prefer
269 /// this over a raw `extensions().get()` + `unwrap()`.
270 #[cfg(feature = "server-tls")]
271 #[cfg_attr(docsrs, doc(cfg(feature = "server-tls")))]
272 pub fn peer_certs(&self) -> Option<&[rustls::pki_types::CertificateDer<'static>]> {
273 self.extensions
274 .get::<crate::server::PeerCerts>()
275 .map(|p| &p.0[..])
276 }
277}
278
279// ---------------------------------------------------------------------------
280// Response<B>
281// ---------------------------------------------------------------------------
282
283/// Handler response wrapper: a body plus optional response headers,
284/// trailers, and compression hint.
285///
286/// `B` is bounded by [`Encodable<M>`] in the generated service trait so
287/// handlers can return the owned message `M` (the common case), or any
288/// type that encodes to the same wire bytes.
289///
290/// # Happy path
291///
292/// [`Response::ok`] is the bare-body shorthand:
293///
294/// ```rust,ignore
295/// async fn say(&self, _ctx: RequestContext, req: OwnedSayRequestView)
296/// -> ServiceResult<SayResponse>
297/// {
298/// Response::ok(SayResponse { sentence: reply, ..Default::default() })
299/// }
300/// ```
301///
302/// # With metadata
303///
304/// ```rust,ignore
305/// Ok(Response::new(reply)
306/// .with_header("x-request-id", id)
307/// .with_trailer("x-timing", elapsed))
308/// ```
309#[derive(Debug, Clone)]
310pub struct Response<B> {
311 /// The response body.
312 pub body: B,
313 /// Response headers to send before the body.
314 pub headers: HeaderMap,
315 /// Trailers to send after the body. Sent as HTTP/2 trailing
316 /// HEADERS for gRPC, or as `trailer-`-prefixed headers / the
317 /// EndStreamResponse JSON for Connect.
318 pub trailers: HeaderMap,
319 /// Whether to compress the response. `None` uses the server's
320 /// compression policy; `Some(false)` disables compression for this
321 /// response, `Some(true)` forces it.
322 pub compress: Option<bool>,
323}
324
325impl<B> Response<B> {
326 /// Shorthand for `Ok(Response::from(body))` — the bare-body happy
327 /// path.
328 ///
329 /// Use `Ok(Response::new(body).with_header(...))` when setting
330 /// response metadata; this constructor is for the common case of
331 /// "just the body".
332 pub fn ok(body: B) -> ServiceResult<B> {
333 Ok(Self::from(body))
334 }
335
336 /// Wrap a body with empty response metadata.
337 pub fn new(body: B) -> Self {
338 Self {
339 body,
340 headers: HeaderMap::new(),
341 trailers: HeaderMap::new(),
342 compress: None,
343 }
344 }
345
346 /// Append a response header.
347 ///
348 /// Uses [`HeaderMap::append`], so calling twice with the same name
349 /// accumulates values rather than replacing.
350 ///
351 /// # Panics
352 ///
353 /// Panics if `name` or `value` cannot be converted into the
354 /// corresponding header type (invalid characters, non-ASCII name,
355 /// etc.). Use [`try_with_header`](Self::try_with_header) for
356 /// dynamic values, or the `headers` field directly for full
357 /// control.
358 #[must_use]
359 pub fn with_header<K, V>(mut self, name: K, value: V) -> Self
360 where
361 K: TryInto<HeaderName>,
362 K::Error: std::fmt::Debug,
363 V: TryInto<HeaderValue>,
364 V::Error: std::fmt::Debug,
365 {
366 self.headers
367 .append(name.try_into().unwrap(), value.try_into().unwrap());
368 self
369 }
370
371 /// Append a response header, returning an error if `name` or
372 /// `value` is invalid.
373 ///
374 /// Non-panicking sibling of [`with_header`](Self::with_header) for
375 /// dynamic values. Uses [`HeaderMap::append`], so repeated calls
376 /// accumulate.
377 pub fn try_with_header<K, V>(mut self, name: K, value: V) -> Result<Self, http::Error>
378 where
379 K: TryInto<HeaderName>,
380 K::Error: Into<http::Error>,
381 V: TryInto<HeaderValue>,
382 V::Error: Into<http::Error>,
383 {
384 self.headers.append(
385 name.try_into().map_err(Into::into)?,
386 value.try_into().map_err(Into::into)?,
387 );
388 Ok(self)
389 }
390
391 /// Append a response trailer.
392 ///
393 /// Uses [`HeaderMap::append`], so calling twice with the same name
394 /// accumulates values rather than replacing.
395 ///
396 /// # Panics
397 ///
398 /// Panics if `name` or `value` cannot be converted into the
399 /// corresponding header type. Use
400 /// [`try_with_trailer`](Self::try_with_trailer) for dynamic
401 /// values, or the `trailers` field directly for full control.
402 #[must_use]
403 pub fn with_trailer<K, V>(mut self, name: K, value: V) -> Self
404 where
405 K: TryInto<HeaderName>,
406 K::Error: std::fmt::Debug,
407 V: TryInto<HeaderValue>,
408 V::Error: std::fmt::Debug,
409 {
410 self.trailers
411 .append(name.try_into().unwrap(), value.try_into().unwrap());
412 self
413 }
414
415 /// Append a response trailer, returning an error if `name` or
416 /// `value` is invalid.
417 ///
418 /// Non-panicking sibling of [`with_trailer`](Self::with_trailer)
419 /// for dynamic values. Uses [`HeaderMap::append`], so repeated
420 /// calls accumulate.
421 pub fn try_with_trailer<K, V>(mut self, name: K, value: V) -> Result<Self, http::Error>
422 where
423 K: TryInto<HeaderName>,
424 K::Error: Into<http::Error>,
425 V: TryInto<HeaderValue>,
426 V::Error: Into<http::Error>,
427 {
428 self.trailers.append(
429 name.try_into().map_err(Into::into)?,
430 value.try_into().map_err(Into::into)?,
431 );
432 Ok(self)
433 }
434
435 /// Override the server's compression policy for this response.
436 ///
437 /// `true` forces compression, `false` disables it, `None` (or
438 /// never calling this) defers to the server's policy.
439 #[must_use]
440 pub fn compress(mut self, enabled: impl Into<Option<bool>>) -> Self {
441 self.compress = enabled.into();
442 self
443 }
444
445 /// Replace the body, preserving headers/trailers/compression.
446 pub fn map_body<C>(self, f: impl FnOnce(B) -> C) -> Response<C> {
447 Response {
448 body: f(self.body),
449 headers: self.headers,
450 trailers: self.trailers,
451 compress: self.compress,
452 }
453 }
454}
455
456impl<B> From<B> for Response<B> {
457 fn from(body: B) -> Self {
458 Self::new(body)
459 }
460}
461
462impl<T> Response<ServiceStream<T>> {
463 /// Wrap a streaming body, boxing and unsize-coercing it to
464 /// [`ServiceStream<T>`]. Handles the explicit coercion that
465 /// `Ok(Box::pin(s).into())` would otherwise need.
466 pub fn stream(s: impl Stream<Item = Result<T, ConnectError>> + Send + 'static) -> Self {
467 Self::new(Box::pin(s))
468 }
469
470 /// Shorthand for `Ok(Response::stream(s))` — the bare-stream
471 /// happy path.
472 pub fn stream_ok(
473 s: impl Stream<Item = Result<T, ConnectError>> + Send + 'static,
474 ) -> ServiceResult<ServiceStream<T>> {
475 Ok(Self::stream(s))
476 }
477}
478
479/// Result type returned by handler trait methods.
480///
481/// `B` is the body type — typically the owned response message, or any
482/// `impl Encodable<M>`.
483pub type ServiceResult<B> = Result<Response<B>, ConnectError>;
484
485/// Boxed `Send` stream of `Result<T, ConnectError>`.
486///
487/// Used as the request type for client/bidi-streaming handlers and the
488/// body type for server/bidi-streaming responses.
489///
490/// For an inbound request stream, `None` means the client finished the
491/// stream cleanly; `Some(Err(..))` means the stream ended abnormally — a
492/// decode failure or a request body that failed mid-upload (truncated or
493/// broken transport). Treat only `None` as a complete stream; propagating
494/// the error with `?` fails the RPC, which is the right default for
495/// handlers that aggregate inbound messages.
496pub type ServiceStream<T> = Pin<Box<dyn Stream<Item = Result<T, ConnectError>> + Send>>;
497
498// ---------------------------------------------------------------------------
499// Encodable<M>
500// ---------------------------------------------------------------------------
501
502/// Encodes to the same wire bytes as proto message `M`.
503///
504/// This is the bound on the response body in generated trait methods.
505/// Provided implementations:
506/// - the owned `M` itself (blanket `M: Message + Serialize` below);
507/// - `MView<'_>` and [`OwnedView<MView<'static>>`](buffa::view::OwnedView),
508/// emitted by codegen per RPC output type;
509/// - [`MaybeBorrowed<M, V>`] for handlers that conditionally return
510/// either;
511/// - [`StreamMessage<M>`](crate::StreamMessage) for echoing inbound
512/// stream items back out (re-encodes from the retained wire bytes);
513/// - [`PreEncoded`] for handlers that encode a non-`'static` view
514/// internally and pass the bytes across the handler boundary.
515///
516/// # Contract
517///
518/// Implementations must produce bytes that decode as a valid `M` in
519/// the given format.
520///
521/// `encode` is fallible: the owned-message impl never errors. The
522/// view-body impls are proto-only (view types lack `Serialize`) and return
523/// [`ErrorCode::Unimplemented`](crate::ErrorCode::Unimplemented) for
524/// `CodecFormat::Json`. [`PreEncoded`] supports both codecs but the JSON
525/// path is a slow fallback (decode + re-serialize) — see its
526/// `# Codec behaviour` doc.
527pub trait Encodable<M> {
528 /// Encode `self` as wire bytes for `M` in the requested format.
529 fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError>;
530}
531
532impl<M: Message + Serialize> Encodable<M> for M {
533 fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
534 match codec {
535 CodecFormat::Proto => Ok(self.encode_to_bytes()),
536 CodecFormat::Json => serde_json::to_vec(self).map(Bytes::from).map_err(|e| {
537 ConnectError::internal(format!("failed to encode JSON response: {e}"))
538 }),
539 }
540 }
541}
542
543/// Encode a view body via [`ViewEncode`] for [`CodecFormat::Proto`], or
544/// return [`ErrorCode::Unimplemented`](crate::ErrorCode::Unimplemented)
545/// for [`CodecFormat::Json`] (view types don't implement `Serialize`).
546///
547/// Used by codegen-emitted `impl Encodable<Foo> for FooView<'_>` /
548/// `impl Encodable<Foo> for OwnedView<FooView<'static>>` blocks. A
549/// runtime blanket on [`OwnedView`](buffa::view::OwnedView) would
550/// conflict with the `M: Message + Serialize` blanket above (coherence
551/// can't rule out upstream adding `Message`/`Serialize` for
552/// `OwnedView`), so the impls are emitted per output type instead.
553#[doc(hidden)]
554pub fn encode_view_body<'a, V: ViewEncode<'a>>(
555 view: &V,
556 codec: CodecFormat,
557) -> Result<Bytes, ConnectError> {
558 match codec {
559 CodecFormat::Proto => Ok(view.encode_to_bytes()),
560 CodecFormat::Json => Err(ConnectError::unimplemented(
561 "view-body responses do not support the JSON codec; return the owned message type for JSON-serving handlers",
562 )),
563 }
564}
565
566// ---------------------------------------------------------------------------
567// MaybeBorrowed
568// ---------------------------------------------------------------------------
569
570/// Either an owned message `M` or a borrowing view `V`, both
571/// [`Encodable<M>`].
572///
573/// Use this when a handler conditionally passes the request through
574/// unchanged (return the view, zero allocations) versus modifying it
575/// (clone to owned, mutate, return owned). The single concrete return
576/// type satisfies the `impl Encodable<M>` bound on the generated trait.
577///
578/// This is not [`std::borrow::Cow`]: `V` is a separate
579/// [`Encodable<M>`] type (e.g. `MView<'a>` or `OwnedView<MView>`),
580/// not a `&M`, and there is no `ToOwned` relationship between the
581/// arms — each encodes independently.
582///
583/// ```rust,ignore
584/// async fn redact(&self, _ctx: RequestContext, req: OwnedRecordView)
585/// -> ServiceResult<MaybeBorrowed<Record, OwnedRecordView>>
586/// {
587/// if req.email.is_empty() && req.ssn.is_empty() {
588/// // pass-through: re-encode straight from the request bytes
589/// return Response::ok(MaybeBorrowed::Borrowed(req));
590/// }
591/// let mut owned = req.to_owned_message();
592/// owned.email.clear();
593/// owned.ssn.clear();
594/// Response::ok(MaybeBorrowed::Owned(owned))
595/// }
596/// ```
597///
598/// # Codec compatibility
599///
600/// The `Borrowed` arm only encodes for [`CodecFormat::Proto`]. JSON
601/// clients receive an `unimplemented` error; if your service must
602/// support JSON, return `Owned` (or just the owned message) on every
603/// path.
604#[derive(Debug, Clone)]
605pub enum MaybeBorrowed<M, V> {
606 /// An owned message body.
607 Owned(M),
608 /// A borrowing body that encodes to the same wire bytes as `M`.
609 Borrowed(V),
610}
611
612impl<M, V> Encodable<M> for MaybeBorrowed<M, V>
613where
614 // satisfied via the blanket impl for M: Message + Serialize
615 M: Encodable<M>,
616 V: Encodable<M>,
617{
618 fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
619 match self {
620 Self::Owned(m) => m.encode(codec),
621 Self::Borrowed(v) => v.encode(codec),
622 }
623 }
624}
625
626// ---------------------------------------------------------------------------
627// PreEncoded
628// ---------------------------------------------------------------------------
629
630/// Pre-encoded protobuf response body for message type `M`.
631///
632/// Use when the handler builds and encodes a borrowing view internally —
633/// e.g. a `FooView<'a>` borrowing from a local snapshot — rather than
634/// returning the view itself. The `'static` bound on `Handler::Body` (and
635/// on streaming items, see the `use<Self>` note in the
636/// [`StreamingHandler`](crate::StreamingHandler) docs) means a view with a
637/// non-`'static` lifetime can't cross the handler
638/// boundary; `PreEncoded` carries the bytes across instead.
639///
640/// The `M` type parameter is a compile-time witness for which RPC output
641/// type the bytes encode. Three construction paths, in decreasing order
642/// of compile-time guarantee:
643///
644/// - [`from_message(&m)`](PreEncoded::from_message) — encodes an owned
645/// `M`; the receiver type *is* the witness.
646/// - [`from_view(&view)`](PreEncoded::from_view) — encodes a borrowing
647/// view; `MessageView::Owned = M` is the witness.
648/// - [`from_bytes_unchecked(bytes)`](PreEncoded::from_bytes_unchecked) —
649/// wraps already-encoded bytes from elsewhere (a cache, storage,
650/// another service). No witness; you're asserting the bytes decode as
651/// `M`.
652///
653/// `from_message` and `from_view` produce the same `PreEncoded<M>` type,
654/// so a stream can mix items built either way (e.g. a cache-hit path
655/// returning the cached owned `M`, a cache-miss path building a view from
656/// a snapshot) — the same role [`MaybeBorrowed`] fills for unary
657/// handlers, but with the encode happening eagerly inside the stream
658/// body.
659///
660/// # Streaming example
661///
662/// The motivating shape — a server-streaming handler that builds and
663/// encodes per-item views borrowing from a local store snapshot, then
664/// yields the bytes:
665///
666/// ```rust,ignore
667/// use connectrpc::{PreEncoded, Response, RequestContext, ServiceResult, ServiceStream};
668///
669/// async fn watch(
670/// &self,
671/// _ctx: RequestContext,
672/// req: OwnedWatchRequestView,
673/// ) -> ServiceResult<ServiceStream<PreEncoded<WatchResponse>>> {
674/// let store = self.store.clone();
675/// let stream = futures::stream::unfold(store, |store| async move {
676/// let snapshot = store.load();
677/// // `view` borrows from `snapshot`; encode while the borrow is live.
678/// let view = build_view_from_snapshot(&snapshot);
679/// let item = PreEncoded::from_view(&view);
680/// Some((Ok(item), store))
681/// });
682/// Response::stream_ok(stream)
683/// }
684/// ```
685///
686/// For a unary handler, the same pattern applies — return
687/// `ServiceResult<PreEncoded<MyResponse>>`.
688///
689/// # Codec behaviour
690///
691/// `PreEncoded` is optimized for the `proto` codec: the wrapped bytes are
692/// passed through verbatim with no re-encoding. The motivating use case
693/// (high-throughput fanout) is proto-only.
694///
695/// For the `json` codec, `PreEncoded` falls back to decoding the bytes as
696/// `M` and re-serializing as JSON. **This is correct but not fast** — a
697/// full proto decode plus a JSON serialize per response (or per stream
698/// item). The fallback exists so that registering a `PreEncoded` handler
699/// on a JSON-capable router degrades gracefully instead of returning a
700/// runtime error. If your service serves a meaningful JSON traffic share,
701/// build and return the owned message (or [`MaybeBorrowed::Owned`])
702/// instead — that lets the codec layer pick the right encoding without
703/// the proto round-trip.
704///
705/// If the wrapped bytes don't decode as `M` (e.g. you passed mismatched
706/// bytes to [`from_bytes_unchecked`](PreEncoded::from_bytes_unchecked)),
707/// the JSON path returns an [`internal`](crate::ErrorCode::Internal)
708/// error at the server; the proto path passes the bytes through and the
709/// client sees a decode error.
710///
711/// ## Codec-dependent fidelity
712///
713/// The proto path is byte-exact; the JSON path is **only as faithful as
714/// decoding the bytes to an owned `M` and re-serializing**. The two
715/// diverge when the wrapped bytes carry information not representable in
716/// `M` itself:
717///
718/// - **Unknown fields** (proto bytes encoded against a *newer* schema
719/// than the server's `M`) are preserved on the proto path and dropped
720/// on the JSON path. This matters only for
721/// [`from_bytes_unchecked`](PreEncoded::from_bytes_unchecked) bytes
722/// sourced externally; bytes produced by
723/// [`from_message`](PreEncoded::from_message) /
724/// [`from_view`](PreEncoded::from_view) cannot carry unknown fields.
725/// - **Non-canonical proto encodings** (out-of-order fields, redundant
726/// length prefixes, repeated non-`repeated` fields) are passed through
727/// verbatim on the proto path and normalized by the decode on the JSON
728/// path.
729///
730/// If byte-exact fidelity across codecs matters (e.g. signature
731/// verification, content-addressed storage), do not use `PreEncoded` with
732/// JSON-capable routes.
733///
734/// ## Cost is selected by the client
735///
736/// The codec is chosen per-request by the client's `Content-Type` header.
737/// For a service that adopted `PreEncoded` for proto throughput, a client
738/// sending JSON requests (intentionally, by misconfiguration, or
739/// adversarially) shifts those requests onto the slow decode-reserialize
740/// path. The marginal cost is bounded by the response size and is usually
741/// small relative to the handler's own work, but a streaming RPC pays it
742/// per item. A service that wants to *enforce* proto-only should reject
743/// non-proto `Content-Type` at the middleware layer (e.g. an axum
744/// middleware that returns `415 Unsupported Media Type`) rather than rely
745/// on the body type — that keeps the policy outside the handler and
746/// applies before the request body is read.
747///
748/// # Contract
749///
750/// `PreEncoded` is a transparent byte container — it does not validate
751/// the wrapped bytes on the proto path. [`PreEncoded::from_view`] gives a
752/// compile-time witness via `MessageView::Owned = M`;
753/// [`PreEncoded::from_bytes_unchecked`] trusts the caller. Returning bytes
754/// that don't decode as `M` will produce decode errors on the client (or,
755/// for JSON clients, an `internal` error from the server-side fallback
756/// decode).
757#[must_use = "PreEncoded must be returned from a handler to take effect"]
758pub struct PreEncoded<M> {
759 bytes: Bytes,
760 // `fn() -> M` keeps `PreEncoded<M>` `Send + Sync` regardless of `M`'s
761 // auto-trait surface (the bytes are owned; `M` is only a type witness).
762 _marker: PhantomData<fn() -> M>,
763}
764
765// Manual derives: `#[derive(Debug, Clone)]` would add a spurious `M: Debug` /
766// `M: Clone` bound (PhantomData carries it through to the where-clause).
767impl<M> std::fmt::Debug for PreEncoded<M> {
768 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
769 f.debug_tuple("PreEncoded").field(&self.bytes).finish()
770 }
771}
772
773impl<M> Clone for PreEncoded<M> {
774 fn clone(&self) -> Self {
775 Self {
776 bytes: self.bytes.clone(),
777 _marker: PhantomData,
778 }
779 }
780}
781
782impl<M: Message> PreEncoded<M> {
783 /// Encode an owned `M` to protobuf bytes.
784 ///
785 /// The receiver type is the compile-time witness — there's no way to
786 /// produce a `PreEncoded<M>` from a `&Other`. This is the right
787 /// constructor when the handler builds an owned `M` and wants to
788 /// share the encoding (e.g. encode once, clone the
789 /// [`Bytes`]-backed `PreEncoded` for N readers in a fanout) or when a
790 /// stream needs to mix owned-message and view-built items under a
791 /// single `type Item = PreEncoded<M>`.
792 ///
793 /// Equivalent to `PreEncoded::from_bytes_unchecked(m.encode_to_bytes())`,
794 /// but with `M` enforced by the type system rather than asserted by
795 /// the caller.
796 pub fn from_message(msg: &M) -> Self {
797 Self {
798 bytes: msg.encode_to_bytes(),
799 _marker: PhantomData,
800 }
801 }
802
803 /// Encode a [`ViewEncode`] view to protobuf bytes.
804 ///
805 /// The `MessageView<'a, Owned = M>` bound is the compile-time witness
806 /// that the bytes decode as `M` — passing `OtherView<'a>` won't
807 /// type-check unless `OtherView::Owned == M`.
808 pub fn from_view<'a, V>(view: &V) -> Self
809 where
810 V: ViewEncode<'a> + MessageView<'a, Owned = M>,
811 {
812 Self {
813 bytes: view.encode_to_bytes(),
814 _marker: PhantomData,
815 }
816 }
817
818 /// Wrap already-encoded protobuf bytes without validating them.
819 ///
820 /// Use when the bytes come from somewhere with no structural type
821 /// guarantee — a byte cache, a blob store, a sidecar service. You are
822 /// asserting the bytes decode as `M`; the proto path does not
823 /// validate this. In debug builds, the bytes are decoded once as a
824 /// `debug_assert!` to surface mismatches early.
825 ///
826 /// Prefer [`from_message`](PreEncoded::from_message) when you have an
827 /// owned `M` in hand and [`from_view`](PreEncoded::from_view) when
828 /// you have a view — both enforce `M` at compile time.
829 ///
830 /// Zero-copy for `Bytes` and `Vec<u8>`; passing `&[u8]` allocates and
831 /// copies.
832 pub fn from_bytes_unchecked(bytes: impl Into<Bytes>) -> Self {
833 let bytes = bytes.into();
834 debug_assert!(
835 M::decode_from_slice(&bytes).is_ok(),
836 "PreEncoded::from_bytes_unchecked: bytes do not decode as {}",
837 std::any::type_name::<M>(),
838 );
839 Self {
840 bytes,
841 _marker: PhantomData,
842 }
843 }
844}
845
846/// Encode an owned `M` to a [`PreEncoded<M>`].
847///
848/// Equivalent to [`PreEncoded::from_message`]; provided for `.into()`
849/// ergonomics.
850impl<M: Message> From<&M> for PreEncoded<M> {
851 fn from(msg: &M) -> Self {
852 Self::from_message(msg)
853 }
854}
855
856// Coherence: this impl is non-overlapping with the
857// `impl<M: Message + Serialize> Encodable<M> for M` blanket above for
858// structural reasons. For the two to overlap, some `T` would have to satisfy
859// both `T: Encodable<T>` (blanket, with `T: Message + Serialize`) and
860// `T = PreEncoded<U>` with `T: Encodable<U>` (this impl) for the *same* trait
861// parameter — i.e. `T = U`, i.e. `PreEncoded<U> = U`, which is infinite. So
862// the impls cannot overlap even if a future change made `PreEncoded` a
863// `Message` (which would only add `PreEncoded<M>: Encodable<PreEncoded<M>>` —
864// a different trait instantiation). No invariant to maintain here.
865//
866// The `M: Message + Serialize` bound matches the blanket so a `PreEncoded<M>`
867// is `Encodable<M>` exactly when an owned `M` would be — and is what makes the
868// JSON fallback path possible (decode as `M`, re-serialize).
869impl<M: Message + Serialize> Encodable<M> for PreEncoded<M> {
870 fn encode(&self, codec: CodecFormat) -> Result<Bytes, ConnectError> {
871 match codec {
872 CodecFormat::Proto => Ok(self.bytes.clone()),
873 // Slow path: decode the proto bytes back to `M`, then serialize
874 // as JSON. This exists for correctness (JSON clients should get
875 // a response, not `unimplemented`), not throughput; the owned
876 // message path skips the proto round-trip and is preferable for
877 // JSON-heavy services. See the type-level docs.
878 CodecFormat::Json => {
879 let msg = M::decode_from_slice(&self.bytes).map_err(|e| {
880 ConnectError::internal(format!(
881 "pre-encoded bytes did not decode as {}: {e}",
882 std::any::type_name::<M>(),
883 ))
884 })?;
885 serde_json::to_vec(&msg).map(Bytes::from).map_err(|e| {
886 ConnectError::internal(format!("failed to encode JSON response: {e}"))
887 })
888 }
889 }
890 }
891}
892
893// ---------------------------------------------------------------------------
894// EncodedResponse (dispatcher boundary)
895// ---------------------------------------------------------------------------
896
897/// A [`Response`] with the body already encoded to bytes.
898///
899/// This is what the [`Dispatcher`](crate::Dispatcher) returns to the
900/// protocol layer — encoding happens inside the dispatcher so the body
901/// type stays generic across the trait boundary.
902pub type EncodedResponse = Response<Bytes>;
903
904impl<B> Response<B> {
905 /// Encode the body to bytes via [`Encodable<M>`], preserving
906 /// response metadata.
907 #[doc(hidden)] // exposed for dispatcher::codegen (generated code)
908 pub fn encode<M>(self, codec: CodecFormat) -> Result<EncodedResponse, ConnectError>
909 where
910 B: Encodable<M>,
911 {
912 let bytes = self.body.encode(codec)?;
913 Ok(Response {
914 body: bytes,
915 headers: self.headers,
916 trailers: self.trailers,
917 compress: self.compress,
918 })
919 }
920}
921
922#[cfg(test)]
923mod tests {
924 use super::*;
925 use buffa_types::google::protobuf::StringValue;
926
927 #[tokio::test]
928 async fn response_stream_ok_shorthand() {
929 use futures::StreamExt;
930 let r: ServiceResult<ServiceStream<i32>> =
931 Response::stream_ok(futures::stream::iter([Ok(7)]));
932 let collected: Vec<_> = r.unwrap().body.map(|x| x.unwrap()).collect().await;
933 assert_eq!(collected, vec![7]);
934 }
935
936 #[test]
937 fn compress_tristate() {
938 assert_eq!(Response::new(()).compress(true).compress, Some(true));
939 assert_eq!(Response::new(()).compress(false).compress, Some(false));
940 assert_eq!(Response::new(()).compress(None).compress, None);
941 }
942
943 #[test]
944 fn header_accepts_str() {
945 let mut h = HeaderMap::new();
946 h.insert("x-custom", HeaderValue::from_static("v"));
947 let ctx = RequestContext::new(h);
948 assert_eq!(ctx.header("x-custom").unwrap(), "v");
949 }
950
951 #[test]
952 fn response_ok_shorthand() {
953 let r: ServiceResult<u32> = Response::ok(42);
954 let r = r.unwrap();
955 assert_eq!(r.body, 42);
956 assert!(r.headers.is_empty());
957 }
958
959 #[test]
960 fn response_from_body() {
961 let r: Response<StringValue> = StringValue::from("hi").into();
962 assert_eq!(r.body.value, "hi");
963 assert!(r.headers.is_empty());
964 assert!(r.trailers.is_empty());
965 assert_eq!(r.compress, None);
966 }
967
968 #[test]
969 fn response_builder() {
970 let r = Response::new(StringValue::from("hi"))
971 .with_header("x-a", "1")
972 .with_trailer("x-b", "2")
973 .compress(true);
974 assert_eq!(r.headers.get("x-a").unwrap(), "1");
975 assert_eq!(r.trailers.get("x-b").unwrap(), "2");
976 assert_eq!(r.compress, Some(true));
977 }
978
979 #[test]
980 fn encodable_owned_proto() {
981 let m = StringValue::from("hello");
982 let bytes = Encodable::<StringValue>::encode(&m, CodecFormat::Proto).unwrap();
983 assert_eq!(
984 StringValue::decode_from_slice(&bytes).unwrap().value,
985 "hello"
986 );
987 }
988
989 #[test]
990 fn encodable_owned_json() {
991 let m = StringValue::from("hello");
992 let bytes = Encodable::<StringValue>::encode(&m, CodecFormat::Json).unwrap();
993 assert_eq!(&bytes[..], b"\"hello\"");
994 }
995
996 #[test]
997 fn response_encode() {
998 let r = Response::new(StringValue::from("hi")).with_header("x-a", "1");
999 let enc = r.encode::<StringValue>(CodecFormat::Proto).unwrap();
1000 assert_eq!(enc.headers.get("x-a").unwrap(), "1");
1001 assert_eq!(
1002 StringValue::decode_from_slice(&enc.body).unwrap().value,
1003 "hi"
1004 );
1005 }
1006
1007 #[test]
1008 fn request_context_new() {
1009 let mut h = HeaderMap::new();
1010 h.insert("x-custom", HeaderValue::from_static("v"));
1011 let ctx = RequestContext::new(h);
1012 assert_eq!(
1013 ctx.header(HeaderName::from_static("x-custom")).unwrap(),
1014 "v"
1015 );
1016 assert_eq!(ctx.headers().get("x-custom").unwrap(), "v");
1017 assert!(ctx.deadline().is_none());
1018 assert!(ctx.time_remaining().is_none());
1019 assert!(ctx.extensions().is_empty());
1020 }
1021
1022 #[test]
1023 fn request_context_with_deadline() {
1024 let d = Instant::now();
1025 let ctx = RequestContext::new(HeaderMap::new()).with_deadline(Some(d));
1026 assert_eq!(ctx.deadline(), Some(d));
1027 }
1028
1029 #[test]
1030 fn request_context_time_remaining_saturates_at_zero() {
1031 // Deadline in the past — `time_remaining()` should clamp to zero,
1032 // not underflow.
1033 let past = Instant::now() - Duration::from_secs(60);
1034 let ctx = RequestContext::new(HeaderMap::new()).with_deadline(Some(past));
1035 assert_eq!(ctx.time_remaining(), Some(Duration::ZERO));
1036 }
1037
1038 #[test]
1039 fn request_context_time_remaining_future() {
1040 let future = Instant::now() + Duration::from_secs(60);
1041 let ctx = RequestContext::new(HeaderMap::new()).with_deadline(Some(future));
1042 let remaining = ctx.time_remaining().unwrap();
1043 // Some elapsed time between `with_deadline` and the assertion is
1044 // expected; just bound it.
1045 assert!(remaining > Duration::from_secs(55));
1046 assert!(remaining <= Duration::from_secs(60));
1047 }
1048
1049 #[test]
1050 fn request_context_extensions_mut() {
1051 #[derive(Clone, Debug, PartialEq)]
1052 struct Tag(u8);
1053 let mut ctx = RequestContext::new(HeaderMap::new());
1054 ctx.extensions_mut().insert(Tag(1));
1055 assert_eq!(ctx.extensions().get::<Tag>(), Some(&Tag(1)));
1056 }
1057
1058 #[cfg(feature = "server")]
1059 #[test]
1060 fn request_context_peer_addr_absent() {
1061 // No transport inserted `PeerAddr`; the typed accessor returns
1062 // `None` rather than panicking.
1063 let ctx = RequestContext::new(HeaderMap::new());
1064 assert_eq!(ctx.peer_addr(), None);
1065 }
1066
1067 #[cfg(feature = "server")]
1068 #[test]
1069 fn request_context_peer_addr_present() {
1070 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1071 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
1072 let mut ext = http::Extensions::new();
1073 ext.insert(crate::server::PeerAddr(addr));
1074 let ctx = RequestContext::new(HeaderMap::new()).with_extensions(ext);
1075 assert_eq!(ctx.peer_addr(), Some(addr));
1076 }
1077
1078 #[cfg(feature = "server-tls")]
1079 #[test]
1080 fn request_context_peer_certs_absent() {
1081 let ctx = RequestContext::new(HeaderMap::new());
1082 assert!(ctx.peer_certs().is_none());
1083 }
1084
1085 #[test]
1086 fn response_map_body_preserves_metadata() {
1087 let r = Response::new(2u32)
1088 .with_header("x-h", "1")
1089 .with_trailer("x-t", "2")
1090 .compress(true);
1091 let r = r.map_body(|n| n.to_string());
1092 assert_eq!(r.body, "2");
1093 assert_eq!(r.headers.get("x-h").unwrap(), "1");
1094 assert_eq!(r.trailers.get("x-t").unwrap(), "2");
1095 assert_eq!(r.compress, Some(true));
1096 }
1097
1098 #[tokio::test]
1099 async fn response_stream_yields_items() {
1100 use futures::StreamExt;
1101 let r: Response<ServiceStream<i32>> =
1102 Response::stream(futures::stream::iter([Ok(1), Ok(2), Ok(3)]));
1103 let collected: Vec<_> = r.body.map(|x| x.unwrap()).collect().await;
1104 assert_eq!(collected, vec![1, 2, 3]);
1105 }
1106
1107 #[test]
1108 #[should_panic]
1109 fn with_header_panics_on_invalid_name() {
1110 let _ = Response::new(()).with_header("invalid header name", "v");
1111 }
1112
1113 #[test]
1114 fn try_with_header_errors_on_invalid_name() {
1115 let err = Response::new(())
1116 .try_with_header("invalid header name", "v")
1117 .unwrap_err();
1118 assert!(err.is::<http::header::InvalidHeaderName>());
1119 }
1120
1121 #[test]
1122 fn try_with_header_ok_appends() {
1123 let r = Response::new(())
1124 .try_with_header("x-a", "1")
1125 .unwrap()
1126 .try_with_header("x-a", "2")
1127 .unwrap();
1128 let vals: Vec<_> = r.headers.get_all("x-a").iter().collect();
1129 assert_eq!(vals.len(), 2);
1130 }
1131
1132 #[test]
1133 fn try_with_trailer_errors_on_invalid_value() {
1134 // Newlines are not permitted in header values.
1135 let err = Response::new(())
1136 .try_with_trailer("x-t", "bad\nvalue")
1137 .unwrap_err();
1138 assert!(err.is::<http::header::InvalidHeaderValue>());
1139 }
1140
1141 #[test]
1142 fn encode_view_body_proto() {
1143 use buffa_types::google::protobuf::__buffa::view::StringValueView;
1144 let v = StringValueView {
1145 value: "hi",
1146 ..Default::default()
1147 };
1148 let bytes = encode_view_body(&v, CodecFormat::Proto).unwrap();
1149 assert_eq!(StringValue::decode_from_slice(&bytes).unwrap().value, "hi");
1150 }
1151
1152 #[test]
1153 fn encode_view_body_json_errors() {
1154 use buffa_types::google::protobuf::__buffa::view::StringValueView;
1155 let v = StringValueView::default();
1156 let err = encode_view_body(&v, CodecFormat::Json).unwrap_err();
1157 assert_eq!(err.code, crate::ErrorCode::Unimplemented);
1158 assert!(err.message.as_deref().unwrap().contains("JSON codec"));
1159 }
1160
1161 // Manual Encodable<StringValue> impl modelling what codegen emits
1162 // for FooView<'_>. Shared by the MaybeBorrowed tests below.
1163 struct V<'a>(buffa_types::google::protobuf::__buffa::view::StringValueView<'a>);
1164 impl Encodable<StringValue> for V<'_> {
1165 fn encode(&self, c: CodecFormat) -> Result<Bytes, ConnectError> {
1166 encode_view_body(&self.0, c)
1167 }
1168 }
1169
1170 #[test]
1171 fn maybe_borrowed_dispatch() {
1172 use buffa_types::google::protobuf::__buffa::view::StringValueView;
1173 let owned: MaybeBorrowed<StringValue, V<'_>> =
1174 MaybeBorrowed::Owned(StringValue::from("owned"));
1175 let borrowed = MaybeBorrowed::Borrowed(V(StringValueView {
1176 value: "view",
1177 ..Default::default()
1178 }));
1179 assert_eq!(
1180 StringValue::decode_from_slice(&owned.encode(CodecFormat::Proto).unwrap())
1181 .unwrap()
1182 .value,
1183 "owned"
1184 );
1185 assert_eq!(
1186 StringValue::decode_from_slice(&borrowed.encode(CodecFormat::Proto).unwrap())
1187 .unwrap()
1188 .value,
1189 "view"
1190 );
1191 }
1192
1193 #[test]
1194 fn maybe_borrowed_borrowed_json_unimplemented() {
1195 use buffa_types::google::protobuf::__buffa::view::StringValueView;
1196 let borrowed: MaybeBorrowed<StringValue, V<'_>> =
1197 MaybeBorrowed::Borrowed(V(StringValueView::default()));
1198 let err = borrowed.encode(CodecFormat::Json).unwrap_err();
1199 assert_eq!(err.code, crate::ErrorCode::Unimplemented);
1200 }
1201
1202 #[test]
1203 fn pre_encoded_proto_round_trip() {
1204 let m = StringValue::from("pre-encoded");
1205 let bytes = m.encode_to_bytes();
1206 let body = PreEncoded::<StringValue>::from_bytes_unchecked(bytes.clone());
1207 let out = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1208 assert_eq!(out, bytes);
1209 assert_eq!(
1210 StringValue::decode_from_slice(&out).unwrap().value,
1211 "pre-encoded"
1212 );
1213 }
1214
1215 #[test]
1216 fn pre_encoded_json_decodes_then_serializes() {
1217 // The JSON path round-trips: proto bytes → owned `M` → JSON. Slow,
1218 // but correct — see the `# Codec behaviour` doc on `PreEncoded`.
1219 let m = StringValue::from("hi");
1220 let body = PreEncoded::<StringValue>::from_bytes_unchecked(m.encode_to_bytes());
1221 let out = Encodable::<StringValue>::encode(&body, CodecFormat::Json).unwrap();
1222 // Output should match what serializing the owned message directly
1223 // would produce.
1224 assert_eq!(out, Bytes::from(serde_json::to_vec(&m).unwrap()));
1225 }
1226
1227 #[test]
1228 fn pre_encoded_json_decode_failure_is_internal_error() {
1229 // `from_bytes_unchecked` is unvalidated on the proto path. The JSON
1230 // fallback necessarily decodes; if that fails (the wrapped bytes
1231 // were never a valid `M`), the server-side `internal` error surfaces
1232 // closer to the construction bug than the proto path would.
1233 //
1234 // Field 1 (LEN) declares 99 bytes but only 2 follow — guaranteed
1235 // truncated for `StringValue`.
1236 let body = PreEncoded::<StringValue> {
1237 bytes: Bytes::from_static(&[0x0a, 0x63, b'h', b'i']),
1238 _marker: std::marker::PhantomData,
1239 };
1240 let err = Encodable::<StringValue>::encode(&body, CodecFormat::Json).unwrap_err();
1241 assert_eq!(err.code, crate::ErrorCode::Internal);
1242 assert!(err.message.as_deref().unwrap().contains("did not decode"));
1243 }
1244
1245 #[test]
1246 fn pre_encoded_from_view() {
1247 use buffa::view::ViewEncode;
1248 use buffa_types::google::protobuf::__buffa::view::StringValueView;
1249 let v = StringValueView {
1250 value: "from-view",
1251 ..Default::default()
1252 };
1253 // `from_view` infers `M = StringValue` from `StringValueView::Owned`.
1254 let body = PreEncoded::from_view(&v);
1255 let out = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1256 assert_eq!(out, v.encode_to_bytes());
1257 assert_eq!(
1258 StringValue::decode_from_slice(&out).unwrap().value,
1259 "from-view"
1260 );
1261 }
1262
1263 #[test]
1264 fn pre_encoded_from_message() {
1265 let m = StringValue::from("from-message");
1266 // `from_message` infers `M` from the receiver — no annotation.
1267 let body = PreEncoded::from_message(&m);
1268 let out = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1269 assert_eq!(out, m.encode_to_bytes());
1270
1271 // `From<&M>` is the same conversion via `.into()`.
1272 let body2: PreEncoded<StringValue> = (&m).into();
1273 let out2 = Encodable::<StringValue>::encode(&body2, CodecFormat::Proto).unwrap();
1274 assert_eq!(out2, out);
1275 }
1276
1277 #[test]
1278 fn pre_encoded_codec_fidelity_diverges_on_unknown_fields() {
1279 // Documents the codec-dependent fidelity caveat: the proto path
1280 // is byte-exact (unknown fields preserved); the JSON path
1281 // round-trips through `M` (unknown fields dropped). Only relevant
1282 // for `from_bytes_unchecked` bytes sourced externally.
1283 //
1284 // Wire bytes: field 1 = "hi" (the known `StringValue.value`),
1285 // plus field 2 = varint 42 (unknown to `StringValue`).
1286 let bytes_with_unknown =
1287 Bytes::from_static(&[0x0a, 0x02, b'h', b'i', /* tag 2 varint */ 0x10, 42]);
1288 let body = PreEncoded::<StringValue> {
1289 bytes: bytes_with_unknown.clone(),
1290 _marker: std::marker::PhantomData,
1291 };
1292
1293 // Proto: byte-exact passthrough, unknown field preserved.
1294 let proto = Encodable::<StringValue>::encode(&body, CodecFormat::Proto).unwrap();
1295 assert_eq!(proto, bytes_with_unknown);
1296
1297 // JSON: round-trips through `StringValue`, which drops the
1298 // unknown field. Output equals serializing the bare known
1299 // message.
1300 let json = Encodable::<StringValue>::encode(&body, CodecFormat::Json).unwrap();
1301 assert_eq!(
1302 json,
1303 Bytes::from(serde_json::to_vec(&StringValue::from("hi")).unwrap())
1304 );
1305 }
1306
1307 #[test]
1308 fn pre_encoded_is_typed() {
1309 // `PreEncoded<M>` only implements `Encodable<M>` — the type witness
1310 // means `PreEncoded<StringValue>` cannot be used where
1311 // `Encodable<Int32Value>` is required. Verified at compile time;
1312 // this test just exercises the happy path for both types.
1313 use buffa_types::google::protobuf::Int32Value;
1314 let s = PreEncoded::<StringValue>::from_bytes_unchecked(
1315 StringValue::from("a").encode_to_bytes(),
1316 );
1317 let i =
1318 PreEncoded::<Int32Value>::from_bytes_unchecked(Int32Value::from(1).encode_to_bytes());
1319 Encodable::<StringValue>::encode(&s, CodecFormat::Proto).unwrap();
1320 Encodable::<Int32Value>::encode(&i, CodecFormat::Proto).unwrap();
1321 // The following would not compile:
1322 // Encodable::<Int32Value>::encode(&s, CodecFormat::Proto)
1323 }
1324
1325 #[test]
1326 #[cfg(debug_assertions)]
1327 #[should_panic(expected = "do not decode as")]
1328 fn pre_encoded_from_bytes_unchecked_debug_asserts() {
1329 // In debug builds, `from_bytes_unchecked` decodes once to surface
1330 // mismatched bytes early. Field 1 (LEN) declares 99 bytes; only 2
1331 // follow.
1332 let _ = PreEncoded::<StringValue>::from_bytes_unchecked(Bytes::from_static(&[
1333 0x0a, 0x63, b'h', b'i',
1334 ]));
1335 }
1336
1337 #[test]
1338 fn request_context_with_extensions() {
1339 #[derive(Clone, Debug, PartialEq)]
1340 struct Peer(u32);
1341 let mut ext = http::Extensions::new();
1342 ext.insert(Peer(7));
1343 let ctx = RequestContext::new(HeaderMap::new()).with_extensions(ext);
1344 assert_eq!(ctx.extensions().get::<Peer>(), Some(&Peer(7)));
1345 }
1346
1347 #[test]
1348 fn request_context_with_spec_and_protocol() {
1349 use crate::spec::{Spec, StreamType};
1350
1351 // Default-constructed context has neither.
1352 let ctx = RequestContext::new(HeaderMap::new());
1353 assert_eq!(ctx.spec(), None);
1354 assert_eq!(ctx.protocol(), None);
1355
1356 // Both round-trip through the builders.
1357 const SPEC: Spec = Spec::server("/pkg.Svc/M", StreamType::Unary);
1358 let ctx = RequestContext::new(HeaderMap::new())
1359 .with_spec(Some(SPEC))
1360 .with_protocol(Some(crate::Protocol::Grpc));
1361 assert_eq!(ctx.spec(), Some(SPEC));
1362 assert_eq!(ctx.protocol(), Some(crate::Protocol::Grpc));
1363
1364 // Builders accept `None` to clear (matches `with_deadline`).
1365 let ctx = ctx.with_spec(None).with_protocol(None);
1366 assert_eq!(ctx.spec(), None);
1367 assert_eq!(ctx.protocol(), None);
1368 }
1369
1370 #[test]
1371 fn request_context_with_path() {
1372 // Hand-built contexts (tests, custom dispatchers) have no path.
1373 let ctx = RequestContext::new(HeaderMap::new());
1374 assert_eq!(ctx.path(), None);
1375
1376 // Round-trips through the builder.
1377 let ctx = RequestContext::new(HeaderMap::new()).with_path("/pkg.Svc/M");
1378 assert_eq!(ctx.path(), Some("/pkg.Svc/M"));
1379
1380 // The builder takes ownership (Into<String>) so callers can pass
1381 // borrowed or owned without an extra clone.
1382 let owned = String::from("/pkg.Svc/Other");
1383 let ctx = RequestContext::new(HeaderMap::new()).with_path(owned);
1384 assert_eq!(ctx.path(), Some("/pkg.Svc/Other"));
1385
1386 // The builder does not normalize or validate — `Some("")` is
1387 // preserved verbatim. The dispatch path always supplies a non-empty
1388 // leading-slash form; `Some("")` only reaches consumers from a
1389 // misconfigured custom dispatch shim, which is a wiring bug they
1390 // should surface rather than silently coerce to `None`.
1391 let ctx = RequestContext::new(HeaderMap::new()).with_path("");
1392 assert_eq!(ctx.path(), Some(""));
1393 }
1394}