ruststream 0.5.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
//! Outgoing message and the publish middleware pipeline.
//!
//! When a handler's reply is published (via `#[subscriber(.., publish(..))]`), it flows through a
//! chain of [`PublishLayer`] before reaching the broker publisher. Middleware transform the
//! payload (for example, wrap it in a Confluent / Avro envelope) and enrich the headers
//! (content-type, schema id), or observe it (publish metrics). The chain is symmetric to the
//! consume-side static [`Stack`](super::Stack).

use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};

use bytes::BytesMut;
use serde::Serialize;
use tracing::warn;

use super::lifecycle::BoxError;
use super::publisher_registry::ErasedPublisher;
use crate::codec::Codec;
// `DefaultCodec` only exists when a codec feature is on; the impl that names it is gated the same
// way, so an ungated import would break `--no-default-features`.
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
use crate::codec::DefaultCodec;
use crate::runtime::publish::sealed::Sealed;
use crate::{Headers, Publisher, TransactionalPublisher};

type PublishFut<'a> = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'a>>;

/// A mutable outgoing message flowing through the publish pipeline.
///
/// The [`name`](Self::name) is a [`Cow`]: the macro reply path borrows a string literal
/// (`reply_name(&self) -> &str`), so the common case carries the destination without an
/// allocation; a computed name moves in owned. The [`payload`](Self::payload_mut) is a
/// [`BytesMut`]: codec output moves in directly (no copy), and middleware can still mutate it in
/// place (for example wrapping it in an envelope). Middleware may change the name, transform the
/// payload, and enrich the [`headers`](Self::headers_mut) before the message is sent.
#[derive(Debug, Clone)]
pub struct Outgoing<'a> {
    name: Cow<'a, str>,
    payload: BytesMut,
    headers: Headers,
}

impl<'a> Outgoing<'a> {
    /// Creates an outgoing message with no headers.
    ///
    /// Pass a `&str` (a borrowed destination, the no-allocation case) or a `String` (a computed
    /// owned one) for `name`; pass a [`BytesMut`] (codec output moves in) or a `&[u8]` for the
    /// payload.
    #[must_use]
    pub fn new(name: impl Into<Cow<'a, str>>, payload: impl Into<BytesMut>) -> Self {
        Self {
            name: name.into(),
            payload: payload.into(),
            headers: Headers::new(),
        }
    }

    /// The destination name.
    #[must_use]
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Sets the destination name.
    pub fn set_name(&mut self, name: impl Into<Cow<'a, str>>) {
        self.name = name.into();
    }

    /// The payload bytes.
    #[must_use]
    pub fn payload(&self) -> &[u8] {
        &self.payload
    }

    /// The payload bytes, mutably (for envelope wrapping).
    pub fn payload_mut(&mut self) -> &mut BytesMut {
        &mut self.payload
    }

    /// Replaces the payload.
    pub fn set_payload(&mut self, payload: impl Into<BytesMut>) {
        self.payload = payload.into();
    }

    /// The outgoing headers.
    #[must_use]
    pub fn headers(&self) -> &Headers {
        &self.headers
    }

    /// The outgoing headers, mutably.
    pub fn headers_mut(&mut self) -> &mut Headers {
        &mut self.headers
    }
}

/// A static, app-wide publish pipeline: an around-style chain of [`PublishLayer`] ending in
/// the broker send.
///
/// The publish-side analog of the consume-side static [`Stack`](super::Stack) /
/// [`Identity`](super::Identity): the
/// app's publish middleware (added with
/// [`RustStream::publish_layer`](super::RustStream::publish_layer)) compose into a concrete type, so
/// the default path ([`PublishIdentity`], no middleware) is a zero-cost direct send with no `dyn`
/// dispatch. You rarely name this trait; it is built for you. (A runtime-composed escape hatch, the
/// publish counterpart of [`DynStack`](super::DynStack), can be layered in later without changing
/// this contract.)
pub trait PublishPipeline: Send + Sync {
    /// Runs `out` through the remaining middleware, then sends it via `send`.
    fn run<'a>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        send: &'a dyn ErasedPublisher,
    ) -> PublishFut<'a>;
}

/// The terminal [`PublishPipeline`]: no middleware, just the broker send. The default for an app
/// with no [`publish_layer`](super::RustStream::publish_layer).
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishIdentity;

impl PublishPipeline for PublishIdentity {
    fn run<'a>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        send: &'a dyn ErasedPublisher,
    ) -> PublishFut<'a> {
        send.publish_message(out.name(), out.payload(), out.headers())
    }
}

/// Prepends a [`PublishLayer`] `Head` to a [`PublishPipeline`] `Tail`. Built by
/// [`RustStream::publish_layer`](super::RustStream::publish_layer); you rarely name it directly.
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishStack<Head, Tail> {
    head: Head,
    tail: Tail,
}

impl<Head, Tail> PublishStack<Head, Tail> {
    /// Composes `head` in front of `tail`.
    pub(crate) const fn new(head: Head, tail: Tail) -> Self {
        Self { head, tail }
    }
}

impl<Head: PublishLayer, Tail: PublishPipeline> PublishPipeline for PublishStack<Head, Tail> {
    fn run<'a>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        send: &'a dyn ErasedPublisher,
    ) -> PublishFut<'a> {
        self.head.on_publish(
            out,
            PublishNext {
                tail: &self.tail,
                send,
            },
        )
    }
}

/// Middleware that transforms (or observes) an [`Outgoing`] message before it is published.
///
/// Each middleware inspects / mutates `out`, then calls [`PublishNext::run`] to continue; the chain
/// ends in the actual broker publish. Static (no `dyn` dispatch): a middleware is generic over the
/// rest of the pipeline `N`, so the whole chain monomorphizes. Added app-wide with
/// [`RustStream::publish_layer`](super::RustStream::publish_layer).
pub trait PublishLayer: Send + Sync {
    /// Handle the outgoing message, calling `next` to continue the pipeline.
    fn on_publish<'a, N: PublishPipeline>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        next: PublishNext<'a, N>,
    ) -> PublishFut<'a>;
}

/// A cursor over the rest of the publish pipeline, ending in the broker send. Handed to a
/// [`PublishLayer`]; call [`run`](Self::run) to continue.
pub struct PublishNext<'a, N> {
    tail: &'a N,
    send: &'a dyn ErasedPublisher,
}

impl<'a, N: PublishPipeline> PublishNext<'a, N> {
    /// Runs the rest of the pipeline (the remaining middleware, then the send).
    #[must_use]
    pub fn run(self, out: &'a mut Outgoing<'a>) -> PublishFut<'a> {
        self.tail.run(out, self.send)
    }
}

impl<N> std::fmt::Debug for PublishNext<'_, N> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishNext").finish_non_exhaustive()
    }
}

/// An object-safe publish middleware, for a [`PublishDynStack`].
///
/// The dynamic counterpart of [`PublishLayer`]: it cannot name the rest of the pipeline as a
/// type parameter (that is what keeps it object-safe and lets a heterogeneous, runtime-built list
/// live in one [`PublishDynStack`]), so it continues through the type-erased [`PublishDynNext`]
/// instead. Use it only when the middleware set is decided at runtime; otherwise a static
/// [`PublishLayer`] is zero-cost.
pub trait PublishDynLayer: Send + Sync {
    /// Handle the outgoing message, calling `next` to continue.
    fn on_publish<'a>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        next: PublishDynNext<'a>,
    ) -> PublishFut<'a>;
}

/// A cursor over the rest of a [`PublishDynStack`], ending in the surrounding static pipeline.
///
/// Mirrors [`PublishNext`] for the dynamic list: [`run`](Self::run) advances to the next
/// [`PublishDynLayer`], or hands control back to the static chain once the list is exhausted.
pub struct PublishDynNext<'a> {
    rest: &'a [Arc<dyn PublishDynLayer>],
    // The surrounding static `PublishNext::run`, erased so this cursor need not carry its type. A
    // one-shot continuation: `run` is called exactly once per published message.
    tail: Box<dyn FnOnce(&'a mut Outgoing<'a>) -> PublishFut<'a> + Send + 'a>,
}

impl<'a> PublishDynNext<'a> {
    /// Runs the next dynamic middleware, or the surrounding static pipeline if the list is done.
    #[must_use]
    pub fn run(self, out: &'a mut Outgoing<'a>) -> PublishFut<'a> {
        match self.rest.split_first() {
            Some((middleware, rest)) => middleware.on_publish(
                out,
                PublishDynNext {
                    rest,
                    tail: self.tail,
                },
            ),
            None => (self.tail)(out),
        }
    }
}

impl std::fmt::Debug for PublishDynNext<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishDynNext")
            .field("remaining", &self.rest.len())
            .finish_non_exhaustive()
    }
}

/// A single static [`PublishLayer`] wrapping a runtime-built, frozen list of
/// [`PublishDynLayer`].
///
/// The publish-side counterpart of the consume-side [`DynStack`](super::DynStack): the opt-in
/// escape hatch for a middleware set decided at runtime (from config, a loop, feature flags) that
/// therefore cannot be a compile-time [`publish_layer`](super::RustStream::publish_layer) chain.
/// Add it like any other middleware; only the middleware inside it pay one boxed future per layer.
///
/// ```
/// # #[cfg(all(feature = "memory", feature = "json"))]
/// # {
/// use std::sync::Arc;
/// use ruststream::runtime::{PublishDynLayer, PublishDynStack};
///
/// fn stack(
///     middleware: Vec<Arc<dyn PublishDynLayer>>,
/// ) -> PublishDynStack {
///     PublishDynStack::new(middleware)
/// }
/// # }
/// ```
pub struct PublishDynStack(Arc<[Arc<dyn PublishDynLayer>]>);

// Manual `Clone`: the field is an `Arc`, so a clone is a refcount bump regardless of the contents.
impl Clone for PublishDynStack {
    fn clone(&self) -> Self {
        Self(Arc::clone(&self.0))
    }
}

impl PublishDynStack {
    /// Builds a stack from a list of middleware, applied in iteration order (first runs outermost).
    #[must_use]
    pub fn new(middleware: impl IntoIterator<Item = Arc<dyn PublishDynLayer>>) -> Self {
        Self(middleware.into_iter().collect())
    }
}

impl std::fmt::Debug for PublishDynStack {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishDynStack")
            .field("middleware", &self.0.len())
            .finish_non_exhaustive()
    }
}

impl PublishLayer for PublishDynStack {
    fn on_publish<'a, N: PublishPipeline>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        next: PublishNext<'a, N>,
    ) -> PublishFut<'a> {
        // Erase the static continuation into a one-shot closure so the object-safe walker can end
        // by handing control back to the surrounding static pipeline.
        PublishDynNext {
            rest: &self.0,
            tail: Box::new(move |out| next.run(out)),
        }
        .run(out)
    }
}

/// A read-only view of the originating delivery, handed to a [`PublishTransform`].
///
/// A reply is published from inside a handler, so the static publish transform can read the
/// delivery that produced it: its channel [`name`](Self::name), the incoming
/// [`headers`](Self::headers) (a W3C `traceparent`, a correlation id), and the broker's typed
/// per-delivery [`context`](Self::context) by [`Field`](crate::Field) key. This is how a trace / correlation id
/// propagates from the incoming message onto the reply (the static, zero-cost path; the app-wide
/// [`PublishLayer`] stays context-agnostic). `C` is the
/// handler's context type (`()` when it names none).
pub struct PublishContext<'a, C = ()> {
    name: &'a str,
    headers: &'a Headers,
    cx: &'a C,
}

impl<'a, C> PublishContext<'a, C> {
    /// Builds the view from the parts the runtime already holds at publish time.
    pub(crate) fn new(name: &'a str, headers: &'a Headers, cx: &'a C) -> Self {
        Self { name, headers, cx }
    }

    /// The channel the originating message was delivered on.
    #[must_use]
    pub fn name(&self) -> &str {
        self.name
    }

    /// The originating message's headers (the working copy the handler saw).
    #[must_use]
    pub fn headers(&self) -> &Headers {
        self.headers
    }

    /// Reads a broker-supplied per-delivery field off the typed context by compile-time `key`,
    /// mirroring [`Context::context`](super::Context::context).
    pub fn context<K: crate::Field<C>>(&self, key: K) -> K::Value<'_> {
        key.get(self.cx)
    }
}

impl<C> std::fmt::Debug for PublishContext<'_, C> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishContext")
            .field("name", &self.name)
            .finish_non_exhaustive()
    }
}

/// A static, compile-time publish transform: mutates an [`Outgoing`] before it is sent, with
/// read access to the originating delivery through [`PublishContext`].
///
/// The publish-side counterpart to the consume-side [`Layer`](super::Layer): zero-cost composition,
/// no `dyn` dispatch. Baked onto a [`TypedPublisher`] with [`TypedPublisher::transform`]. Use for
/// per-destination transforms that belong to the publisher itself - a Confluent / Avro envelope, a
/// fixed content-type header, or stamping the delivery's trace / correlation id onto the reply
/// (read it from `cx`). The `C` parameter is the originating handler's context type; a transform
/// that ignores the context is generic over it (mounts on any handler). For cross-cutting
/// *observation* across every publish (metrics), use the app-wide [`PublishLayer`] via
/// [`RustStream::publish_layer`](super::RustStream::publish_layer) instead; the per-publisher
/// transforms run first (closest to the value), then the app-wide publish pipeline, then the send.
pub trait PublishTransform<C = ()>: Send + Sync {
    /// Transforms `out` in place before it is sent, reading the delivery through `cx`.
    fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>);
}

/// The no-op [`PublishTransform`]: the default for a [`TypedPublisher`] with no static transforms.
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishTransformIdentity;

impl<C> PublishTransform<C> for PublishTransformIdentity {
    fn apply(&self, _out: &mut Outgoing<'_>, _cx: &PublishContext<'_, C>) {}
}

/// Composes two [`PublishTransform`]s: `inner` runs first, then `outer`. Built by
/// [`TypedPublisher::transform`]; you rarely name it directly.
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishTransformStack<Inner, Outer> {
    inner: Inner,
    outer: Outer,
}

impl<C, Inner: PublishTransform<C>, Outer: PublishTransform<C>> PublishTransform<C>
    for PublishTransformStack<Inner, Outer>
{
    fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>) {
        self.inner.apply(out, cx);
        self.outer.apply(out, cx);
    }
}

/// A static publish transform that runs only on a `#[subscriber(batch(..), publish(..))]` handler's
/// replies, not on single-message replies.
///
/// The batch counterpart of [`PublishTransform`], kept a distinct trait so a transform that belongs to
/// the batch path only (a header marking a reply as batched, a per-batch sampling decision) cannot
/// be added with [`TypedPublisher::transform`] by mistake; it is added with
/// [`TypedPublisher::batch_transform`], which the single-message mounts reject at compile time. The
/// per-message [`PublishTransform`] stack does not run for batched replies and this one does not run for
/// single-message replies - the two paths are independent. To use the same transform on both, add
/// it to each, reusing it on the batch side with [`for_batch`] (no second implementation). Each
/// reply in the batch is passed through it individually, reading the delivery through
/// [`PublishContext`].
pub trait BatchPublishTransform<C = ()>: Send + Sync {
    /// Transforms one of the batch's outgoing replies before it is sent.
    fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>);
}

/// The no-op [`BatchPublishTransform`]: the default for a [`TypedPublisher`] with no batch transforms.
#[derive(Debug, Clone, Copy, Default)]
pub struct BatchTransformIdentity;

impl<C> BatchPublishTransform<C> for BatchTransformIdentity {
    fn apply(&self, _out: &mut Outgoing<'_>, _cx: &PublishContext<'_, C>) {}
}

/// Composes two [`BatchPublishTransform`]s: `inner` runs first, then `outer`. Built by
/// [`TypedPublisher::batch_transform`]; you rarely name it directly.
#[derive(Debug, Clone, Copy, Default)]
pub struct BatchPublishTransformStack<Inner, Outer> {
    inner: Inner,
    outer: Outer,
}

impl<C, Inner: BatchPublishTransform<C>, Outer: BatchPublishTransform<C>> BatchPublishTransform<C>
    for BatchPublishTransformStack<Inner, Outer>
{
    fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>) {
        self.inner.apply(out, cx);
        self.outer.apply(out, cx);
    }
}

/// Adapts a per-message [`PublishTransform`] into a [`BatchPublishTransform`], applying it to each reply of
/// a batch. Built by [`for_batch`].
#[derive(Debug, Clone, Copy, Default)]
pub struct ForBatch<L>(L);

impl<C, L: PublishTransform<C>> BatchPublishTransform<C> for ForBatch<L> {
    fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>) {
        self.0.apply(out, cx);
    }
}

/// Lifts a per-message [`PublishTransform`] onto the batch path so the same transform can be added with
/// [`TypedPublisher::batch_transform`] without a second implementation.
///
/// ```
/// # #[cfg(all(feature = "memory", feature = "json"))]
/// # {
/// use ruststream::memory::MemoryBroker;
/// use ruststream::runtime::{for_batch, Outgoing, PublishContext, PublishTransform, TypedPublisher};
///
/// struct Stamp;
/// impl<C> PublishTransform<C> for Stamp {
///     fn apply(&self, out: &mut Outgoing<'_>, _cx: &PublishContext<'_, C>) {
///         out.headers_mut().insert("x-stamp", b"1".to_vec());
///     }
/// }
///
/// let broker = MemoryBroker::new();
/// // The same `Stamp` on both paths: per message, and batched.
/// let publisher = TypedPublisher::new(broker.publisher())
///     .transform(Stamp)
///     .batch_transform(for_batch(Stamp));
/// # let _ = publisher;
/// # }
/// ```
#[must_use]
pub fn for_batch<L>(transform: L) -> ForBatch<L> {
    ForBatch(transform)
}

/// A byte [`Publisher`] paired with a [`Codec`] and a static [`PublishTransform`] stack, ready to send
/// typed values.
///
/// The publish-side counterpart to a typed subscriber: it carries *how* a value is encoded and the
/// per-publisher transforms ([`transform`](Self::transform)), while *where* it goes (the destination name)
/// is supplied per send - so one `TypedPublisher` is reused across handlers replying to different
/// names. The `#[subscriber(.., publish("name"))]` reply form supplies the name; the
/// `TypedPublisher` is passed at wiring.
///
/// ```
/// # #[cfg(all(feature = "memory", feature = "json"))]
/// # {
/// use ruststream::codec::JsonCodec;
/// use ruststream::memory::MemoryBroker;
/// use ruststream::runtime::TypedPublisher;
///
/// let broker = MemoryBroker::new();
/// let with_default = TypedPublisher::new(broker.publisher()); // DefaultCodec
/// let explicit = TypedPublisher::with_codec(broker.publisher(), JsonCodec);
/// # let _ = (with_default, explicit);
/// # }
/// ```
///
/// [macro]: crate::subscriber
pub struct TypedPublisher<P, C, PL = PublishTransformIdentity, BL = BatchTransformIdentity> {
    publisher: P,
    codec: C,
    layers: PL,
    batch_layers: BL,
}

impl<P, C> TypedPublisher<P, C, PublishTransformIdentity, BatchTransformIdentity> {
    /// Pairs `publisher` with an explicit `codec` and no static transforms.
    #[must_use]
    pub fn with_codec(publisher: P, codec: C) -> Self {
        Self {
            publisher,
            codec,
            layers: PublishTransformIdentity,
            batch_layers: BatchTransformIdentity,
        }
    }
}

#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
impl<P> TypedPublisher<P, DefaultCodec, PublishTransformIdentity, BatchTransformIdentity> {
    /// Pairs `publisher` with the [`DefaultCodec`](DefaultCodec) and no static
    /// transforms. Use [`with_codec`](Self::with_codec) to name a codec explicitly.
    #[must_use]
    pub fn new(publisher: P) -> Self {
        Self::with_codec(publisher, DefaultCodec::default())
    }
}

impl<P, C, PL, BL> TypedPublisher<P, C, PL, BL> {
    /// The codec this publisher encodes replies with. Lets the runtime reuse it as the decode
    /// codec when a publishing handler is mounted without an explicit one.
    pub(crate) const fn codec(&self) -> &C {
        &self.codec
    }

    /// Adds a static [`PublishTransform`], applied to every single-message reply from this publisher
    /// (a `#[subscriber(.., publish(..))]` handler). It does not run on the batch path; use
    /// [`batch_transform`](Self::batch_transform) for that. The first one added runs first (closest
    /// to the encoded value).
    #[must_use]
    pub fn transform<N>(
        self,
        transform: N,
    ) -> TypedPublisher<P, C, PublishTransformStack<PL, N>, BL> {
        TypedPublisher {
            publisher: self.publisher,
            codec: self.codec,
            layers: PublishTransformStack {
                inner: self.layers,
                outer: transform,
            },
            batch_layers: self.batch_layers,
        }
    }

    /// Adds a static [`BatchPublishTransform`], applied to every reply of a
    /// `#[subscriber(batch(..), publish(..))]` handler only (after the per-message
    /// [`PublishTransform`] stack), never to a single-message reply. Wrap a per-message
    /// [`PublishTransform`] with [`for_batch`] to reuse it here. The single-message mounts reject a
    /// publisher carrying a non-trivial batch stack, so a batch-only transform cannot leak onto the
    /// single path.
    #[must_use]
    pub fn batch_transform<N>(
        self,
        transform: N,
    ) -> TypedPublisher<P, C, PL, BatchPublishTransformStack<BL, N>> {
        TypedPublisher {
            publisher: self.publisher,
            codec: self.codec,
            layers: self.layers,
            batch_layers: BatchPublishTransformStack {
                inner: self.batch_layers,
                outer: transform,
            },
        }
    }

    /// Switches batch reply publishing to one broker transaction per batch: the replies of a
    /// `#[subscriber(batch(..), publish(..))]` handler all become visible atomically on commit,
    /// or none of them do.
    ///
    /// Exists only when the underlying publisher implements
    /// [`TransactionalPublisher`](crate::TransactionalPublisher); for brokers without
    /// transactions, the method does not exist and the compiler enforces it. The returned wiring
    /// is accepted by the batch publishing mounts only: a one-message transaction adds broker
    /// round-trips for no atomicity gain, so the single-message `include_publishing` forms keep
    /// taking a plain [`TypedPublisher`].
    #[must_use]
    pub fn transactional(self) -> Transactional<P, C, PL, BL>
    where
        P: TransactionalPublisher,
    {
        Transactional { inner: self }
    }
}

impl<P: Publisher, C: Codec, PL, BL> TypedPublisher<P, C, PL, BL> {
    /// Encodes `value`, applies the static transforms (reading the originating delivery through
    /// `cx`), then publishes to `name` through `pipeline`.
    pub(crate) async fn publish<T: Serialize + Sync, Cx, PP>(
        &self,
        name: &str,
        value: &T,
        pipeline: &PP,
        cx: &PublishContext<'_, Cx>,
    ) -> Result<(), BoxError>
    where
        PL: PublishTransform<Cx>,
        BL: Sync,
        Cx: Sync,
        PP: PublishPipeline,
    {
        let payload = self
            .codec
            .encode(value)
            .map_err(|e| Box::new(e) as BoxError)?;
        let mut out = Outgoing::new(name, payload);
        self.layers.apply(&mut out, cx);
        pipeline.run(&mut out, &self.publisher).await
    }

    /// Like [`publish`](Self::publish), but applies the batch-only [`BatchPublishTransform`] stack
    /// instead of the per-message [`PublishTransform`] one. Used per reply on the batch path: the
    /// per-message transforms do not run for batched replies (a transform wanted on both paths is
    /// added to each, reusing it on the batch side with [`for_batch`]).
    pub(crate) async fn publish_batched<T: Serialize + Sync, Cx, PP>(
        &self,
        name: &str,
        value: &T,
        pipeline: &PP,
        cx: &PublishContext<'_, Cx>,
    ) -> Result<(), BoxError>
    where
        PL: Sync,
        BL: BatchPublishTransform<Cx>,
        Cx: Sync,
        PP: PublishPipeline,
    {
        let payload = self
            .codec
            .encode(value)
            .map_err(|e| Box::new(e) as BoxError)?;
        let mut out = Outgoing::new(name, payload);
        self.batch_layers.apply(&mut out, cx);
        pipeline.run(&mut out, &self.publisher).await
    }
}

impl<P, C, PL, BL> std::fmt::Debug for TypedPublisher<P, C, PL, BL> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("TypedPublisher").finish_non_exhaustive()
    }
}

/// A [`TypedPublisher`] whose batch replies are published inside one broker transaction.
///
/// Built with [`TypedPublisher::transactional`]; accepted by the
/// `include_batch_publishing` mounts. Per batch, the runtime begins a transaction, publishes
/// every reply, then commits before the incoming batch is acked; any failure aborts the
/// transaction and the batch is retried, so replies are never half-visible.
pub struct Transactional<P, C, PL = PublishTransformIdentity, BL = BatchTransformIdentity> {
    inner: TypedPublisher<P, C, PL, BL>,
}

impl<P, C, PL, BL> std::fmt::Debug for Transactional<P, C, PL, BL> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Transactional").finish_non_exhaustive()
    }
}

mod sealed {
    /// Seals [`ReplyPublisher`](super::ReplyPublisher): the reply-publishing strategies are the
    /// two wirings above, not an extension point.
    pub trait Sealed {}

    impl<P, C, PL, BL> Sealed for super::TypedPublisher<P, C, PL, BL> {}
    impl<P, C, PL, BL> Sealed for super::Transactional<P, C, PL, BL> {}
}

/// The reply wiring accepted by the `include_batch_publishing` mounts.
///
/// Implemented by a plain [`TypedPublisher`] (each reply published independently) and by a
/// [`Transactional`] one (all replies of a batch inside one transaction). Sealed: implemented by
/// exactly those two types. `Cx` is the originating batch handler's context type, threaded so the
/// static [`PublishTransform`] reads the delivery while publishing each reply.
pub trait ReplyPublisher<Cx = ()>: Sealed + Send + Sync {
    /// The codec replies are encoded with (also reused as the decode codec when a batch
    /// publishing handler is mounted without an explicit one).
    type Codec: Codec;

    /// Returns the reply codec.
    #[doc(hidden)]
    fn reply_codec(&self) -> &Self::Codec;

    /// Publishes one batch's replies to `name` through `pipeline`, reading the originating
    /// delivery through `cx`.
    #[doc(hidden)]
    fn publish_batch<'a, T, PP>(
        &'a self,
        name: &'a str,
        replies: &'a [T],
        pipeline: &'a PP,
        cx: &'a PublishContext<'a, Cx>,
    ) -> impl Future<Output = Result<(), BoxError>> + Send
    where
        T: Serialize + Sync,
        PP: PublishPipeline;
}

impl<P, C, PL, BL, Cx> ReplyPublisher<Cx> for TypedPublisher<P, C, PL, BL>
where
    P: Publisher,
    C: Codec,
    PL: Send + Sync,
    BL: BatchPublishTransform<Cx>,
    Cx: Sync,
{
    type Codec = C;

    fn reply_codec(&self) -> &C {
        self.codec()
    }

    /// Each reply is published independently: a mid-batch failure leaves the earlier replies
    /// visible, and the retried batch may publish them again (at-least-once).
    async fn publish_batch<'a, T, PP>(
        &'a self,
        name: &'a str,
        replies: &'a [T],
        pipeline: &'a PP,
        cx: &'a PublishContext<'a, Cx>,
    ) -> Result<(), BoxError>
    where
        T: Serialize + Sync,
        PP: PublishPipeline,
    {
        for reply in replies {
            self.publish_batched(name, reply, pipeline, cx).await?;
        }
        Ok(())
    }
}

impl<P, C, PL, BL, Cx> ReplyPublisher<Cx> for Transactional<P, C, PL, BL>
where
    P: TransactionalPublisher,
    C: Codec,
    PL: Send + Sync,
    BL: BatchPublishTransform<Cx>,
    Cx: Sync,
{
    type Codec = C;

    fn reply_codec(&self) -> &C {
        self.inner.codec()
    }

    /// All replies publish inside one transaction: begin, publish each, commit. Any failure
    /// aborts the transaction, so none of the batch's replies become visible.
    async fn publish_batch<'a, T, PP>(
        &'a self,
        name: &'a str,
        replies: &'a [T],
        pipeline: &'a PP,
        cx: &'a PublishContext<'a, Cx>,
    ) -> Result<(), BoxError>
    where
        T: Serialize + Sync,
        PP: PublishPipeline,
    {
        let publisher = &self.inner.publisher;
        publisher
            .begin_transaction()
            .await
            .map_err(|e| Box::new(e) as BoxError)?;
        for reply in replies {
            if let Err(err) = self.inner.publish_batched(name, reply, pipeline, cx).await {
                abort_quietly(publisher).await;
                return Err(err);
            }
        }
        if let Err(err) = publisher.commit().await {
            abort_quietly(publisher).await;
            return Err(Box::new(err) as BoxError);
        }
        Ok(())
    }
}

/// Aborts a failed transaction; an abort failure is logged, not propagated, because the
/// original publish / commit error is the one the caller acts on.
async fn abort_quietly<P: TransactionalPublisher>(publisher: &P) {
    if let Err(err) = publisher.abort().await {
        warn!(target: "ruststream::dispatch", error = %err, "transaction abort failed");
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn borrowed_name_is_not_owned() {
        // The macro-reply hot path passes a string literal: it must stay borrowed (no alloc),
        // which is the whole point of the Cow.
        let out = Outgoing::new("orders.created", b"payload".as_slice());
        assert!(matches!(out.name, Cow::Borrowed(_)));
        assert_eq!(out.name(), "orders.created");
        assert_eq!(out.payload(), b"payload");
    }

    #[test]
    fn owned_name_moves_in() {
        let computed = format!("orders.{}", 42);
        let out = Outgoing::new(computed, BytesMut::from(&b"x"[..]));
        assert!(matches!(out.name, Cow::Owned(_)));
        assert_eq!(out.name(), "orders.42");
    }

    #[test]
    fn payload_mutates_in_place() {
        let mut out = Outgoing::new("t", BytesMut::from(&b"body"[..]));
        out.payload_mut().extend_from_slice(b"!");
        assert_eq!(out.payload(), b"body!");

        out.set_payload(b"fresh".as_slice());
        assert_eq!(out.payload(), b"fresh");
    }

    #[test]
    fn set_name_and_headers() {
        let mut out = Outgoing::new("a", b"".as_slice());
        out.set_name("b");
        out.headers_mut().insert("k", "v");
        assert_eq!(out.name(), "b");
        assert_eq!(out.headers().get_str("k"), Some("v"));
    }
}