connectrpc 0.6.0

A Tower-based Rust implementation of the ConnectRPC protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
//! Type-erased, lazily-decoded RPC message bodies.
//!
//! Interceptors run on every RPC and most of them never look inside the
//! request or response message — they read the [`Spec`](crate::Spec),
//! the headers, or the deadline and pass the call through. Decoding the
//! message eagerly would tax every call to pay for the rare interceptor
//! that inspects fields.
//!
//! [`Payload`] solves this by holding the wire bytes (always available,
//! reference-counted) and decoding to a typed message on first access.
//! Interceptors that want a typed message call [`Payload::message`]
//! (owned, works for both proto and JSON wires) or [`Payload::view`]
//! (zero-copy, proto only). Interceptors that want to *replace* the
//! message call [`Payload::set_message`]; the dispatch path re-encodes
//! the replacement on the way out.
//!
//! [`AnyMessage`] is the object-safe surface that lets a `Payload` cache
//! a decoded message without knowing its concrete type. It has a blanket
//! impl over every protobuf message, so user code never implements it
//! directly.

use std::any::Any;
use std::fmt;
use std::sync::OnceLock;

use buffa::Message;
use buffa::view::{MessageView, OwnedView};
use bytes::Bytes;
use serde::Serialize;
use serde::de::DeserializeOwned;

use crate::codec::{CodecFormat, decode_json, decode_proto, encode_json, encode_proto};
use crate::error::ConnectError;

/// Object-safe, type-erased RPC message.
///
/// `AnyMessage` is what a [`Payload`] caches once it has decoded its wire
/// bytes — a `Box<dyn AnyMessage>` that can be downcast back to the
/// concrete request/response type and re-encoded for the wire if an
/// interceptor swaps it.
///
/// You will almost never implement this trait directly. A blanket
/// implementation covers every type that is `Message + Serialize`, which
/// includes every owned message the code generator emits. A manual impl
/// must uphold a round-trip invariant: bytes returned by
/// [`encode`](AnyMessage::encode)`(format)` must decode back to an
/// equivalent value in that same format — the dispatch path relies on it
/// when re-encoding a replacement set via [`Payload::set_message`].
/// Violating it does not panic or error — [`Payload::encoded`] silently
/// produces wrong-shape bytes — so a manual impl must be tested for the
/// round-trip explicitly.
pub trait AnyMessage: Send + Sync + 'static {
    /// Borrow the message as `dyn Any` for downcasting.
    fn as_any(&self) -> &dyn Any;
    /// Mutably borrow the message as `dyn Any` for downcasting.
    fn as_any_mut(&mut self) -> &mut dyn Any;
    /// Convert the boxed message into a `Box<dyn Any>` for owned downcasting.
    ///
    /// [`Payload::take_message`] uses this to move the cached decode out
    /// of the `Payload` and into the handler without a clone.
    fn into_any(self: Box<Self>) -> Box<dyn Any>;
    /// Serialize the message to wire bytes in the given format.
    ///
    /// # Errors
    ///
    /// Returns an error if encoding fails. Proto encoding is infallible
    /// for valid messages; JSON encoding can fail on non-UTF-8 `bytes`
    /// fields and similar serde edge cases.
    fn encode(&self, format: CodecFormat) -> Result<Bytes, ConnectError>;
    /// The concrete type's name, for diagnostics. The default uses
    /// [`std::any::type_name`].
    fn type_name(&self) -> &'static str {
        std::any::type_name::<Self>()
    }
}

impl<T> AnyMessage for T
where
    // Message already requires Send + Sync as supertraits.
    T: Message + Serialize + 'static,
{
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
    fn into_any(self: Box<Self>) -> Box<dyn Any> {
        self
    }
    fn encode(&self, format: CodecFormat) -> Result<Bytes, ConnectError> {
        match format {
            CodecFormat::Proto => encode_proto(self),
            CodecFormat::Json => encode_json(self),
        }
    }
}

/// A lazily-decoded, replaceable RPC message body.
///
/// A `Payload` always holds the wire-encoded body bytes ([`Bytes`], so
/// clones are reference-counted) and the [`CodecFormat`] they came in.
/// Typed access happens on demand:
///
/// - [`message`](Payload::message) — decode once into an owned message,
///   cache it, return a borrow. Works for both `Proto` and `Json` wires.
/// - [`view`](Payload::view) — zero-copy view borrowing the wire bytes
///   directly. `Proto` wires only; returns an error for `Json`.
/// - [`set_message`](Payload::set_message) — replace the body. The
///   replacement takes priority for all subsequent reads, and the
///   dispatch path re-encodes it on the way out.
/// - [`encoded`](Payload::encoded) — the wire bytes that will actually be
///   sent: either the original `bytes` or the re-encoded replacement.
///
/// `Payload` is normally constructed by the dispatch path and received
/// by user code through [`UnaryRequest`](crate::UnaryRequest) and
/// [`UnaryResponse`](crate::UnaryResponse). [`Payload::new`] is `pub`
/// so test fixtures and custom transports can build one directly.
///
/// `message` borrows the cached owned decode; `view` returns a fresh
/// self-contained [`OwnedView`] (a [`Bytes`] refcount bump, not a copy)
/// because a zero-copy view cannot be stored in the type-erased cache.
///
/// `Payload` is intentionally not `Clone`: a clone would either drop the
/// decode cache (surprising) or duplicate it (defeating the laziness).
/// Pass it by reference, or move it through the call chain.
pub struct Payload {
    bytes: Bytes,
    format: CodecFormat,
    decoded: OnceLock<Box<dyn AnyMessage>>,
    replaced: Option<Box<dyn AnyMessage>>,
}

impl Payload {
    /// Wrap wire bytes in a `Payload`. No decoding happens until a typed
    /// accessor is called.
    pub fn new(bytes: Bytes, format: CodecFormat) -> Self {
        Self {
            bytes,
            format,
            decoded: OnceLock::new(),
            replaced: None,
        }
    }

    /// The original wire bytes the peer sent, **ignoring** any
    /// replacement set with [`set_message`](Payload::set_message). For
    /// the bytes the dispatch path will actually send downstream, use
    /// [`encoded()`](Payload::encoded).
    pub fn bytes(&self) -> &Bytes {
        &self.bytes
    }

    /// The codec format the wire bytes are encoded in.
    pub fn format(&self) -> CodecFormat {
        self.format
    }

    /// Decode and cache the body as an owned `M`, returning a borrow.
    ///
    /// The decode runs at most once per `Payload`; subsequent calls
    /// return the cached value. If a replacement has been set with
    /// [`set_message`](Payload::set_message), that is returned instead.
    ///
    /// # Errors
    ///
    /// - [`invalid_argument`](ConnectError::invalid_argument) if the wire
    ///   bytes fail to decode as `M` — peer-supplied data, not a server
    ///   bug.
    /// - [`internal`](ConnectError::internal) if a replacement set with
    ///   [`set_message`](Payload::set_message) is not an `M`, or if a
    ///   prior `message::<N>()` cached a different type than `M` — both
    ///   are server-side programming errors (interceptors and handlers
    ///   for the same RPC must agree on the message types), and the cache
    ///   holds whichever type decoded first.
    pub fn message<M>(&self) -> Result<&M, ConnectError>
    where
        M: Message + Serialize + DeserializeOwned + 'static,
    {
        if let Some(replaced) = &self.replaced {
            return replaced.as_any().downcast_ref::<M>().ok_or_else(|| {
                ConnectError::internal(format!(
                    "payload replacement is a {}, not a {}",
                    replaced.type_name(),
                    std::any::type_name::<M>()
                ))
            });
        }
        // `get_or_try_init` is unstable, so probe-then-set. Two threads
        // racing decode the same bytes; only one `set` wins and the loser
        // discards its copy. With the same `M` (the normal case), the loser
        // still returns the winner's cached value. With *different* `M`
        // (a caller-side type bug), only the winner's type is cached and
        // the other caller gets the wrong-type error below.
        if self.decoded.get().is_none() {
            let m: M = match self.format {
                CodecFormat::Proto => decode_proto(&self.bytes)?,
                CodecFormat::Json => decode_json(&self.bytes)?,
            };
            let _ = self.decoded.set(Box::new(m));
        }
        // The `set` above (or a concurrent winner's) guarantees the cell is
        // now populated. The downcast can still miss if a prior call cached
        // a different `M`; that's a caller-side type mismatch, not a panic.
        let cached = self.decoded.get().expect("decoded cell populated above");
        cached.as_any().downcast_ref::<M>().ok_or_else(|| {
            ConnectError::internal(format!(
                "payload was previously decoded as a {}, not a {}",
                cached.type_name(),
                std::any::type_name::<M>()
            ))
        })
    }

    /// Decode the body into an owned `M`, consuming `self`.
    ///
    /// If the body was already decoded — an interceptor called
    /// [`message`](Payload::message) — and the cached value is an `M`,
    /// it is moved out: no second decode, no clone. If a replacement was
    /// set with [`set_message`](Payload::set_message), it is moved out
    /// instead. Otherwise the wire bytes are decoded fresh.
    ///
    /// The dispatch path uses this to hand the request to the handler
    /// without re-decoding bytes an interceptor already decoded. Because
    /// it consumes the `Payload`, it must be the last access — call
    /// [`message`](Payload::message) (which caches a borrow) for repeated
    /// reads.
    ///
    /// # Errors
    ///
    /// The same error contract as [`message`](Payload::message), with one
    /// behavioral difference worth noting: a wrong-typed cache that an
    /// interceptor created is now an error *for the handler*. Before
    /// `take_message`, the handler decoded the wire bytes independently
    /// and never saw the interceptor's cache, so an interceptor that
    /// decoded as the wrong `M` failed silently. Surfacing it loudly is
    /// the intent — interceptors and handlers for the same RPC must agree
    /// on the message types.
    ///
    /// - [`invalid_argument`](ConnectError::invalid_argument) if there is
    ///   no cache and the wire bytes fail to decode as `M` — peer-supplied
    ///   data, not a server bug.
    /// - [`internal`](ConnectError::internal) if a cached decode or a
    ///   replacement set with [`set_message`](Payload::set_message) is not
    ///   an `M` — a server-side type bug.
    pub fn take_message<M>(self) -> Result<M, ConnectError>
    where
        // Unlike `message()`, this never *populates* the cache, so it
        // does not need `M: Serialize` (the bound `message()` carries to
        // box `M` as `dyn AnyMessage`). It only reads the cache or
        // decodes fresh.
        M: Message + DeserializeOwned + 'static,
    {
        if let Some(replaced) = self.replaced {
            let type_name = replaced.type_name();
            return replaced
                .into_any()
                .downcast::<M>()
                .map(|b| *b)
                .map_err(|_| {
                    ConnectError::internal(format!(
                        "payload replacement is a {}, not a {}",
                        type_name,
                        std::any::type_name::<M>()
                    ))
                });
        }
        if let Some(cached) = self.decoded.into_inner() {
            let type_name = cached.type_name();
            return cached.into_any().downcast::<M>().map(|b| *b).map_err(|_| {
                ConnectError::internal(format!(
                    "payload was previously decoded as a {}, not a {}",
                    type_name,
                    std::any::type_name::<M>()
                ))
            });
        }
        match self.format {
            CodecFormat::Proto => decode_proto(&self.bytes),
            CodecFormat::Json => decode_json(&self.bytes),
        }
    }

    /// Decode the body as a zero-copy [`OwnedView`].
    ///
    /// Borrows directly from the wire bytes — no copy, no allocation
    /// beyond the [`Bytes`] refcount bump. If a replacement has been set
    /// with [`set_message`](Payload::set_message), it is encoded to proto
    /// (regardless of [`format`](Payload::format)) and decoded as a view
    /// — note this re-encodes on **every** call (unlike
    /// [`message`](Payload::message), there is no cache for views). Hold
    /// onto the returned `OwnedView` rather than re-fetching in a loop.
    ///
    /// # Errors
    ///
    /// - [`internal`](ConnectError::internal) for JSON-encoded wires:
    ///   JSON cannot back a zero-copy proto view. This is a server-side
    ///   programming error and escapes to the peer as a 500 if uncaught —
    ///   branch on [`format()`](Payload::format) and call
    ///   [`message()`](Payload::message) for JSON wires instead.
    /// - [`invalid_argument`](ConnectError::invalid_argument) if the wire
    ///   bytes fail to decode as `V` — peer-supplied data, not a server
    ///   bug.
    /// - [`internal`](ConnectError::internal) if a replacement set with
    ///   [`set_message`](Payload::set_message) fails to re-encode or
    ///   decode as `V` — server-supplied data, so the asymmetry with the
    ///   wire-bytes case is intentional.
    pub fn view<V>(&self) -> Result<OwnedView<V>, ConnectError>
    where
        V: MessageView<'static>,
    {
        if let Some(replaced) = &self.replaced {
            let bytes = replaced.encode(CodecFormat::Proto)?;
            return OwnedView::decode(bytes).map_err(|e| {
                ConnectError::internal(format!("failed to decode replacement as view: {e}"))
            });
        }
        if self.format != CodecFormat::Proto {
            return Err(ConnectError::internal(
                "Payload::view requires a proto-encoded wire; use Payload::message for JSON",
            ));
        }
        OwnedView::decode(self.bytes.clone()).map_err(|e| {
            ConnectError::invalid_argument(format!("failed to decode payload as view: {e}"))
        })
    }

    /// Replace the body with a new message.
    ///
    /// Subsequent [`message`](Payload::message) and
    /// [`view`](Payload::view) calls return the replacement, and
    /// [`encoded`](Payload::encoded) re-encodes it for the wire.
    pub fn set_message<M>(&mut self, message: M)
    where
        M: AnyMessage,
    {
        self.replaced = Some(Box::new(message));
        // Drop the prior decode cache so the original message doesn't pin
        // memory for the Payload's lifetime. `replaced` is checked first,
        // so a stale cache would never be visible — this is purely a
        // memory concern.
        if self.decoded.get().is_some() {
            self.decoded = OnceLock::new();
        }
    }

    /// The wire bytes the dispatch path should actually send.
    ///
    /// Returns the original `bytes` (a cheap [`Bytes`] clone) unless a
    /// replacement was set with [`set_message`](Payload::set_message),
    /// in which case the replacement is re-encoded in the original
    /// [`format`](Payload::format).
    ///
    /// # Errors
    ///
    /// Returns an error if re-encoding a replacement fails.
    pub fn encoded(&self) -> Result<Bytes, ConnectError> {
        match &self.replaced {
            Some(r) => r.encode(self.format),
            None => Ok(self.bytes.clone()),
        }
    }
}

impl fmt::Debug for Payload {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Payload")
            .field("len", &self.bytes.len())
            .field("format", &self.format)
            .field("decoded", &self.decoded.get().is_some())
            .field("replaced", &self.replaced.is_some())
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use buffa_types::google::protobuf::__buffa::view::StringValueView;
    use buffa_types::google::protobuf::StringValue;

    fn proto_payload(value: &str) -> Payload {
        let msg = StringValue {
            value: value.into(),
            ..Default::default()
        };
        Payload::new(encode_proto(&msg).unwrap(), CodecFormat::Proto)
    }

    #[test]
    fn message_decodes_and_caches() {
        let p = proto_payload("hello");
        let m1: &StringValue = p.message().unwrap();
        assert_eq!(m1.value, "hello");
        // Second call returns the same cached value (same address).
        let m2: &StringValue = p.message().unwrap();
        assert!(std::ptr::eq(m1, m2), "second call should hit the cache");
    }

    #[test]
    fn message_decodes_json() {
        let bytes = encode_json(&StringValue {
            value: "json".into(),
            ..Default::default()
        })
        .unwrap();
        let p = Payload::new(bytes, CodecFormat::Json);
        let m: &StringValue = p.message().unwrap();
        assert_eq!(m.value, "json");
    }

    #[test]
    fn view_zero_copy_proto() {
        let p = proto_payload("zero copy");
        let v = p.view::<StringValueView>().unwrap();
        assert_eq!(v.value, "zero copy");
        // Borrows the payload's bytes — same backing storage.
        let value_ptr = v.value.as_ptr() as usize;
        let bytes_range =
            p.bytes().as_ptr() as usize..p.bytes().as_ptr() as usize + p.bytes().len();
        assert!(
            bytes_range.contains(&value_ptr),
            "view should borrow from the payload's wire bytes"
        );
    }

    #[test]
    fn view_errors_on_json() {
        let bytes = encode_json(&StringValue {
            value: "x".into(),
            ..Default::default()
        })
        .unwrap();
        let p = Payload::new(bytes, CodecFormat::Json);
        let err = p.view::<StringValueView>().unwrap_err();
        assert!(
            err.message
                .as_deref()
                .unwrap_or_default()
                .contains("requires a proto-encoded wire"),
            "{err:?}"
        );
    }

    #[test]
    fn set_message_round_trips() {
        let mut p = proto_payload("before");
        p.set_message(StringValue {
            value: "after".into(),
            ..Default::default()
        });
        // message() returns the replacement.
        let m: &StringValue = p.message().unwrap();
        assert_eq!(m.value, "after");
        // view() re-encodes the replacement and views it.
        let v = p.view::<StringValueView>().unwrap();
        assert_eq!(v.value, "after");
        // encoded() re-encodes for the original format.
        let encoded = p.encoded().unwrap();
        let rt: StringValue = decode_proto(&encoded).unwrap();
        assert_eq!(rt.value, "after");
        // bytes() is unchanged.
        let orig: StringValue = decode_proto(p.bytes()).unwrap();
        assert_eq!(orig.value, "before");
    }

    #[test]
    fn set_message_round_trips_json_format() {
        let bytes = encode_json(&StringValue {
            value: "before".into(),
            ..Default::default()
        })
        .unwrap();
        let mut p = Payload::new(bytes, CodecFormat::Json);
        p.set_message(StringValue {
            value: "after".into(),
            ..Default::default()
        });
        // encoded() re-encodes in the original (JSON) format.
        let encoded = p.encoded().unwrap();
        let rt: StringValue = decode_json(&encoded).unwrap();
        assert_eq!(rt.value, "after");
    }

    #[test]
    fn encoded_without_replacement_returns_original() {
        let p = proto_payload("x");
        // Same backing storage — refcounted Bytes clone, not a copy.
        assert!(std::ptr::eq(
            p.encoded().unwrap().as_ptr(),
            p.bytes().as_ptr()
        ));
    }

    #[test]
    fn message_wrong_type_errors() {
        use buffa_types::google::protobuf::Int32Value;
        let p = proto_payload("x");
        // Cache as StringValue.
        let _: &StringValue = p.message().unwrap();
        // Now ask for a different type — downcast fails. The error names
        // both types so the bug is locatable.
        let err = p.message::<Int32Value>().unwrap_err();
        let msg = err.message.as_deref().unwrap_or_default();
        assert!(msg.contains("previously decoded as a"), "{err:?}");
        assert!(msg.contains("StringValue"), "{err:?}");
        assert!(msg.contains("Int32Value"), "{err:?}");
    }

    #[test]
    fn message_decode_error_is_invalid_argument() {
        use crate::ErrorCode;
        // Bytes that cannot decode as a StringValue — peer-supplied data,
        // so the error code blames the peer, not the server.
        let p = Payload::new(Bytes::from_static(&[0xff, 0xff, 0xff]), CodecFormat::Proto);
        let err = p.message::<StringValue>().unwrap_err();
        assert_eq!(err.code, ErrorCode::InvalidArgument, "{err:?}");
    }

    #[test]
    fn message_replacement_wrong_type_errors() {
        use buffa_types::google::protobuf::Int32Value;
        let mut p = proto_payload("x");
        p.set_message(Int32Value {
            value: 7,
            ..Default::default()
        });
        // The replacement path has a distinct error message from the
        // post-decode wrong-type path covered by message_wrong_type_errors.
        let err = p.message::<StringValue>().unwrap_err();
        let msg = err.message.as_deref().unwrap_or_default();
        assert!(msg.contains("replacement is a"), "{err:?}");
        assert!(msg.contains("Int32Value"), "{err:?}");
        assert!(msg.contains("StringValue"), "{err:?}");
    }

    #[test]
    fn view_replaced_json_format_payload() {
        // A replacement is always re-encoded to proto for view(), so a
        // JSON-format payload with a replacement still views successfully.
        let bytes = encode_json(&StringValue {
            value: "before".into(),
            ..Default::default()
        })
        .unwrap();
        let mut p = Payload::new(bytes, CodecFormat::Json);
        p.set_message(StringValue {
            value: "after".into(),
            ..Default::default()
        });
        let v = p.view::<StringValueView>().unwrap();
        assert_eq!(v.value, "after");
    }

    #[test]
    fn set_message_twice_supersedes() {
        let mut p = proto_payload("original");
        p.set_message(StringValue {
            value: "first".into(),
            ..Default::default()
        });
        p.set_message(StringValue {
            value: "second".into(),
            ..Default::default()
        });
        let m: &StringValue = p.message().unwrap();
        assert_eq!(m.value, "second");
    }

    #[test]
    fn take_message_decodes_fresh_when_no_cache() {
        let p = proto_payload("fresh");
        let m: StringValue = p.take_message().unwrap();
        assert_eq!(m.value, "fresh");
    }

    #[test]
    fn take_message_reuses_cache() {
        let p = proto_payload("cached");
        // Populate the cache (an interceptor would do this).
        let _ = p.message::<StringValue>().unwrap();
        // `take_message` reads the cache, not the wire bytes. The
        // no-second-decode property has no observable proof in safe Rust
        // (the cached value and a fresh decode are bitwise identical), so
        // it's pinned indirectly by `take_message_returns_replacement` —
        // if `take_message` decoded the bytes there, it would never see
        // the replacement. The wrong-type test below pins the other
        // direction (the cache, not the bytes, is the source of truth).
        let m: StringValue = p.take_message().unwrap();
        assert_eq!(m.value, "cached");
    }

    #[test]
    fn take_message_returns_replacement() {
        // Build a payload whose wire bytes are *garbage* — not a valid
        // proto. If `take_message` decoded the bytes instead of moving
        // the replacement out, this would error.
        let mut p = Payload::new(Bytes::from_static(&[0xff, 0xff, 0xff]), CodecFormat::Proto);
        p.set_message(StringValue {
            value: "replaced".into(),
            ..Default::default()
        });
        let m: StringValue = p.take_message().unwrap();
        assert_eq!(m.value, "replaced");
    }

    #[test]
    fn take_message_wrong_cached_type_errors() {
        use buffa_types::google::protobuf::Int32Value;
        let p = proto_payload("x");
        // An interceptor cached the wrong type for this route. Before
        // `take_message`, the handler would silently re-decode and never
        // notice. Now the bug is loud.
        let _: &StringValue = p.message().unwrap();
        let err = p.take_message::<Int32Value>().unwrap_err();
        let msg = err.message.as_deref().unwrap_or_default();
        assert!(msg.contains("previously decoded as a"), "{err:?}");
        assert!(msg.contains("StringValue"), "{err:?}");
        assert!(msg.contains("Int32Value"), "{err:?}");
    }

    #[test]
    fn take_message_wrong_replacement_type_errors() {
        use buffa_types::google::protobuf::Int32Value;
        let mut p = proto_payload("x");
        p.set_message(Int32Value {
            value: 7,
            ..Default::default()
        });
        let err = p.take_message::<StringValue>().unwrap_err();
        let msg = err.message.as_deref().unwrap_or_default();
        assert!(msg.contains("replacement is a"), "{err:?}");
    }

    #[test]
    fn take_message_decode_error_is_invalid_argument() {
        use crate::ErrorCode;
        let p = Payload::new(Bytes::from_static(&[0xff, 0xff, 0xff]), CodecFormat::Proto);
        let err = p.take_message::<StringValue>().unwrap_err();
        assert_eq!(err.code, ErrorCode::InvalidArgument, "{err:?}");
    }

    #[test]
    fn payload_debug_redacts_body() {
        let p = proto_payload("secret");
        let dbg = format!("{p:?}");
        assert!(!dbg.contains("secret"), "Debug must not leak body: {dbg}");
        assert!(dbg.contains("Proto"), "{dbg}");
    }

    /// `Payload` and `Box<dyn AnyMessage>` cross task boundaries inside
    /// the dispatch path; assert the auto-trait bounds hold.
    #[test]
    fn payload_is_send_sync() {
        fn assert_send_sync<T: Send + Sync>() {}
        assert_send_sync::<Payload>();
        assert_send_sync::<Box<dyn AnyMessage>>();
    }

    /// Hammer the `OnceLock` probe-then-set race from multiple threads.
    /// Same `M` everywhere: every thread must succeed and all returned
    /// borrows must alias the same cache slot.
    #[test]
    fn message_concurrent_same_type() {
        let p = proto_payload("race");
        std::thread::scope(|s| {
            let handles: Vec<_> = (0..16)
                .map(|_| {
                    let p = &p;
                    // Return the cache slot's address as an integer so the
                    // closure stays `Send` (raw pointers are not).
                    s.spawn(move || p.message::<StringValue>().unwrap() as *const _ as usize)
                })
                .collect();
            let addrs: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
            assert!(
                addrs.iter().all(|&a| a == addrs[0]),
                "all callers should observe the same cached value"
            );
        });
    }
}