entelix-agents 0.5.5

entelix production agent runtime — ReAct / Supervisor / Hierarchical / Chat recipes, tool-side layer ecosystem (approval / event / hook), sink adapters (broadcast / capture / channel / dropping / fail-open / fan-out / state-erasure), chat-shape state helpers
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
//! `AgentEventSink<S>` — consumer trait for [`AgentEvent<S>`] emissions.
//!
//! All sinks share one `async fn send(&self, event)` signature so the
//! agent runtime can drive any of them uniformly:
//!
//! - [`DroppingSink`] — silently discards. Default for tests and
//!   CLI flows that only consume the awaited result of
//!   `Agent::execute`.
//! - [`ChannelSink`] — `tokio::sync::mpsc` backed; the consumer
//!   owns the receiver. Bounded capacity gives backpressure; if
//!   the consumer falls behind, `send` returns an error.
//! - [`BroadcastSink`] — `tokio::sync::broadcast` backed for
//!   multi-consumer subscribe-shaped fan-out (`SSE` clients +
//!   `OTel` exporter via `subscribe()`). Slow consumers receive
//!   `Lagged` errors but the agent never blocks.
//! - [`CaptureSink`] — captures every event into an
//!   `Arc<Mutex<Vec<_>>>` for assertions in integration tests.
//! - [`FanOutSink`] — callback-shaped composition primitive that
//!   dispatches every event to a fixed set of inner sinks
//!   sequentially. Stops at the first failing sink — wrap
//!   best-effort sinks with [`FailOpenSink`] to keep
//!   higher-priority sinks downstream.
//! - [`FailOpenSink`] — composition adapter that swallows the
//!   inner sink's `Err` (logs once via `tracing::warn!`) and
//!   always returns `Ok(())`. Lifts an observe-only sink
//!   (embedding indexer, billing, anomaly detector) into the
//!   `must-succeed` semantics that bare [`AgentEventSink::send`]
//!   contract demands.
//!
//! Composition pattern for production deployments — audit must
//! succeed, telemetry is best-effort:
//!
//! ```ignore
//! use std::sync::Arc;
//! use entelix_agents::{FanOutSink, FailOpenSink};
//!
//! let sink = FanOutSink::<MyState>::new()
//!     .push(audit_sink)                          // must succeed → propagates Err
//!     .push(Arc::new(FailOpenSink::new(otel)))   // observe-only → swallowed
//!     .push(Arc::new(FailOpenSink::new(billing))); // observe-only → swallowed
//! ```
//!
//! [`AgentEvent<S>`]: crate::agent::event::AgentEvent

use std::sync::Arc;

use async_trait::async_trait;
use entelix_core::error::{Error, Result};
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc};

use crate::agent::event::AgentEvent;

/// Consumer trait the agent calls for every emitted event.
///
/// Implementations should be cheap to clone — the agent runtime
/// holds the sink as `Arc<dyn AgentEventSink<S>>` and may share it
/// across nested sub-agents and observers.
#[async_trait]
pub trait AgentEventSink<S>: Send + Sync
where
    S: Clone + Send + Sync + 'static,
{
    /// Consume a single event. Returning `Err` halts the agent —
    /// sinks that want best-effort semantics should swallow their
    /// own errors and return `Ok(())`.
    async fn send(&self, event: AgentEvent<S>) -> Result<()>;
}

/// No-op sink — the agent runs to completion without surfacing
/// per-event telemetry. Default when no sink is configured.
///
/// To surface the misconfiguration in real deployments without
/// adding overhead to test-only paths, the first event a process
/// drops emits a single `tracing::debug!` naming the alternative
/// sinks. One line is enough for an operator grepping the output
/// to discover that telemetry isn't wired.
///
/// Operators wiring OpenTelemetry / SSE / log forwarding without
/// also calling `Agent::builder().add_sink(...)` end up here by
/// accident: the agent runs, telemetry is silently discarded,
/// alerts never fire because no data arrives.
#[derive(Clone, Copy, Debug, Default)]
pub struct DroppingSink;

#[async_trait]
impl<S> AgentEventSink<S> for DroppingSink
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, _event: AgentEvent<S>) -> Result<()> {
        warn_dropped_first_event();
        Ok(())
    }
}

/// Process-wide one-shot guard for the dropped-event diagnostic.
/// Using a `static AtomicBool` (rather than a field on
/// [`DroppingSink`]) keeps the type `Copy` so existing call sites
/// that pass it by value continue to compile, and means a process
/// running 50 agents with `DroppingSink` defaults sees one log
/// line, not fifty.
fn warn_dropped_first_event() {
    use std::sync::atomic::{AtomicBool, Ordering};
    static WARNED: AtomicBool = AtomicBool::new(false);
    if WARNED
        .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
        .is_ok()
    {
        tracing::debug!(
            target: "entelix_agents",
            "DroppingSink dropped first agent event — telemetry is not wired. \
             Pass an explicit sink via Agent::builder().add_sink(...) — see \
             ChannelSink, BroadcastSink, CaptureSink, FanOutSink, or wire OtelLayer."
        );
    }
}

/// Single-consumer mpsc sink. Construct via [`Self::new`] — the
/// caller keeps the [`tokio::sync::mpsc::Receiver`] for downstream
/// consumption (HTTP SSE driver, file logger, etc.).
///
/// Bounded capacity provides backpressure: when the buffer fills,
/// `send` waits for the consumer to drain. If the consumer is
/// dropped, `send` returns [`Error::Cancelled`] so the agent can
/// shut down cleanly.
pub struct ChannelSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    tx: mpsc::Sender<AgentEvent<S>>,
}

impl<S> ChannelSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    /// Build a sink with an mpsc channel of the given capacity.
    /// Returns the sink and the matching receiver.
    #[must_use]
    pub fn new(capacity: usize) -> (Self, mpsc::Receiver<AgentEvent<S>>) {
        let (tx, rx) = mpsc::channel(capacity);
        (Self { tx }, rx)
    }
}

impl<S> Clone for ChannelSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            tx: self.tx.clone(),
        }
    }
}

impl<S> std::fmt::Debug for ChannelSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ChannelSink")
            .field("capacity", &self.tx.max_capacity())
            .field("closed", &self.tx.is_closed())
            .finish()
    }
}

#[async_trait]
impl<S> AgentEventSink<S> for ChannelSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, event: AgentEvent<S>) -> Result<()> {
        self.tx.send(event).await.map_err(|_| Error::Cancelled)
    }
}

/// Multi-consumer broadcast sink. Each subscriber gets its own
/// `Receiver`; slow consumers receive `Lagged` errors but the
/// agent never blocks. Suitable for `SSE` fan-out + `OTel` exporter
/// + recording sink simultaneously.
pub struct BroadcastSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    tx: broadcast::Sender<AgentEvent<S>>,
}

impl<S> BroadcastSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    /// Build a broadcast sink with the given per-subscriber buffer
    /// depth. Subscribers register via [`Self::subscribe`].
    #[must_use]
    pub fn new(capacity: usize) -> Self {
        let (tx, _rx_drop) = broadcast::channel(capacity);
        Self { tx }
    }

    /// Register a new subscriber. Each receiver sees every event
    /// emitted *after* its registration.
    #[must_use]
    pub fn subscribe(&self) -> broadcast::Receiver<AgentEvent<S>> {
        self.tx.subscribe()
    }

    /// Number of currently registered subscribers.
    #[must_use]
    pub fn receiver_count(&self) -> usize {
        self.tx.receiver_count()
    }
}

impl<S> Clone for BroadcastSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            tx: self.tx.clone(),
        }
    }
}

impl<S> std::fmt::Debug for BroadcastSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("BroadcastSink")
            .field("subscribers", &self.tx.receiver_count())
            .finish()
    }
}

#[async_trait]
impl<S> AgentEventSink<S> for BroadcastSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, event: AgentEvent<S>) -> Result<()> {
        // `send` returns Err when *all* receivers have been dropped.
        // That is not a hard failure — the agent continues. A
        // crashed or detached consumer would otherwise spam one
        // debug line per event for the rest of the run, so a
        // process-wide one-shot guard collapses the storm to a
        // single observable signal.
        if self.tx.send(event).is_err() {
            warn_no_subscribers_once();
        }
        Ok(())
    }
}

/// Process-wide one-shot guard for the no-subscribers diagnostic.
/// A `static AtomicBool` keeps `BroadcastSink<S>` zero-cost (no
/// extra field per instance, no `S`-monomorphisation tax) and
/// means N agents observing N consumer drops produce one log
/// line, not N × N. Operators see one alert and investigate.
fn warn_no_subscribers_once() {
    use std::sync::atomic::{AtomicBool, Ordering};
    static WARNED: AtomicBool = AtomicBool::new(false);
    if WARNED
        .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
        .is_ok()
    {
        tracing::debug!(
            target: "entelix_agents::sink",
            "BroadcastSink: no active subscribers; event dropped (further drops will be silent — \
             investigate whether the consumer crashed or was detached)"
        );
    }
}

/// In-memory capture sink for integration tests. Every emitted
/// event is appended to an `Arc<Mutex<Vec<_>>>`; tests inspect the
/// vector to assert ordering and content.
pub struct CaptureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    events: Arc<Mutex<Vec<AgentEvent<S>>>>,
}

impl<S> CaptureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    /// Empty capture sink.
    #[must_use]
    pub fn new() -> Self {
        Self {
            events: Arc::new(Mutex::new(Vec::new())),
        }
    }

    /// Snapshot of the events captured so far. Cheap clone of the
    /// internal vector; subsequent emissions do not affect the
    /// returned snapshot.
    #[must_use]
    pub fn events(&self) -> Vec<AgentEvent<S>> {
        self.events.lock().clone()
    }

    /// Number of events captured.
    #[must_use]
    pub fn len(&self) -> usize {
        self.events.lock().len()
    }

    /// Whether the capture is empty.
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.events.lock().is_empty()
    }
}

impl<S> Default for CaptureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn default() -> Self {
        Self::new()
    }
}

impl<S> Clone for CaptureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            events: Arc::clone(&self.events),
        }
    }
}

impl<S> std::fmt::Debug for CaptureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("CaptureSink")
            .field("captured", &self.len())
            .finish()
    }
}

#[async_trait]
impl<S> AgentEventSink<S> for CaptureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, event: AgentEvent<S>) -> Result<()> {
        self.events.lock().push(event);
        Ok(())
    }
}

/// Callback-shaped fan-out — every emitted event reaches every
/// inner sink in registration order.
///
/// Stops at the first failing sink: subsequent sinks do **not** see
/// the event. Operators add sinks in priority order — highest-priority
/// (must-succeed: audit, compliance) first, lower-priority (telemetry,
/// embedding indexer) wrapped in [`FailOpenSink`] later. A failing
/// must-succeed sink halts the run before the lower-priority work
/// runs at all.
///
/// Sequential, not parallel — events arrive at sinks in stable
/// registration order so debugging traces stay reproducible across
/// runs. Fan-out across many sinks is bounded by the slowest sink;
/// observe-only sinks that may stall (HTTP exporter, remote DB)
/// belong behind a [`ChannelSink`] in front of [`FanOutSink`] so the
/// agent loop never blocks on their I/O.
///
/// Distinct from [`BroadcastSink`]: `FanOutSink` is callback-shaped
/// (every sink runs synchronously inside `send`), `BroadcastSink` is
/// subscribe-shaped (consumers pull from their own `Receiver`).
/// Compose freely — a `FanOutSink` whose pushed sinks include a
/// `BroadcastSink` gives both shapes simultaneously.
pub struct FanOutSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    sinks: Vec<Arc<dyn AgentEventSink<S>>>,
}

impl<S> FanOutSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    /// Empty fan-out — `send` is a no-op until at least one inner
    /// sink is pushed.
    #[must_use]
    pub fn new() -> Self {
        Self { sinks: Vec::new() }
    }

    /// Append an inner sink. Builder-style — chains naturally with
    /// `Arc::new(MySink::new()).into()`.
    #[must_use]
    pub fn push(mut self, sink: Arc<dyn AgentEventSink<S>>) -> Self {
        self.sinks.push(sink);
        self
    }

    /// Number of registered inner sinks.
    #[must_use]
    pub fn len(&self) -> usize {
        self.sinks.len()
    }

    /// Whether no inner sinks are registered (`send` is a no-op).
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.sinks.is_empty()
    }
}

impl<S> Default for FanOutSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn default() -> Self {
        Self::new()
    }
}

impl<S> Clone for FanOutSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            sinks: self.sinks.clone(),
        }
    }
}

impl<S> std::fmt::Debug for FanOutSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("FanOutSink")
            .field("sinks", &self.sinks.len())
            .finish()
    }
}

#[async_trait]
impl<S> AgentEventSink<S> for FanOutSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, event: AgentEvent<S>) -> Result<()> {
        for sink in &self.sinks {
            sink.send(event.clone()).await?;
        }
        Ok(())
    }
}

/// Composition adapter that swallows the inner sink's errors and
/// always returns `Ok(())`.
///
/// `AgentEventSink::send` returning `Err` halts the agent (the
/// trait's contract). For sinks whose failure should NOT propagate
/// — embedding indexer, billing meter, anomaly detector,
/// best-effort telemetry — wrap in `FailOpenSink` so a transient
/// downstream failure logs once and the run continues.
///
/// The lift is one-way and explicit: there is no `is_observe_only`
/// flag on the sink itself. Operators express intent at the
/// composition site (`FailOpenSink::new(my_sink)`) so the dispatch
/// loop stays simple — every sink is treated identically.
pub struct FailOpenSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    inner: Arc<dyn AgentEventSink<S>>,
}

impl<S> FailOpenSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    /// Wrap an inner sink — its errors will be logged via
    /// `tracing::warn!` and never propagated to the agent runtime.
    #[must_use]
    pub fn new(inner: Arc<dyn AgentEventSink<S>>) -> Self {
        Self { inner }
    }
}

impl<S> Clone for FailOpenSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl<S> std::fmt::Debug for FailOpenSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("FailOpenSink").finish_non_exhaustive()
    }
}

#[async_trait]
impl<S> AgentEventSink<S> for FailOpenSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, event: AgentEvent<S>) -> Result<()> {
        if let Err(err) = self.inner.send(event).await {
            tracing::warn!(
                target: "entelix_agents::sink",
                error = %err,
                "FailOpenSink: inner sink errored — discarding event and continuing"
            );
        }
        Ok(())
    }
}

/// Adapter that erases an agent's state type so a single
/// [`AgentEventSink<()>`] can fan in from heterogeneous agents
/// (`Agent<ReActState>`, `Agent<SupervisorState>`, operator-defined
/// state types) in a multi-agent system.
///
/// The wrapper takes any `Arc<dyn AgentEventSink<()>>` (the canonical
/// state-agnostic sink shape) and produces an [`AgentEventSink<S>`]
/// for any `S: Send + Sync + 'static`. The adapter calls
/// [`AgentEvent::erase_state`] on every dispatch, replacing the
/// agent's terminal state with `()` while every other field
/// (`run_id`, `tenant_id`, `parent_run_id`, tool I/O, error envelope,
/// usage snapshot, approval decisions) reaches the sink unchanged.
///
/// Wire pattern:
///
/// ```ignore
/// let audit: Arc<dyn AgentEventSink<()>> = Arc::new(MyAuditSink);
///
/// // ReAct agent — own state-typed sink built from the same audit.
/// let react = ReActAgentBuilder::new(model)
///     .add_sink(Arc::new(StateErasureSink::new(Arc::clone(&audit))))
///     .build()?;
///
/// // Supervisor agent — sharing the same audit pipeline.
/// let supervisor = create_supervisor_agent(...)
///     .add_sink(Arc::new(StateErasureSink::new(audit)))
///     .build()?;
/// ```
pub struct StateErasureSink<S> {
    inner: Arc<dyn AgentEventSink<()>>,
    _phantom: std::marker::PhantomData<S>,
}

impl<S> StateErasureSink<S> {
    /// Wrap a state-agnostic sink for attachment to an
    /// `Agent<S>`-typed event stream.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// use std::sync::Arc;
    /// use entelix::{AgentEventSink, ReActState, StateErasureSink, SupervisorState};
    ///
    /// // One audit pipeline consumed by every agent in a multi-agent
    /// // topology — the adapter projects each typed event to
    /// // `AgentEvent<()>` via `erase_state` before the audit sink sees it.
    /// let audit: Arc<dyn AgentEventSink<()>> = Arc::new(MyAuditSink);
    /// let react_adapter: Arc<dyn AgentEventSink<ReActState>> =
    ///     Arc::new(StateErasureSink::new(Arc::clone(&audit)));
    /// let supervisor_adapter: Arc<dyn AgentEventSink<SupervisorState>> =
    ///     Arc::new(StateErasureSink::new(audit));
    /// # struct MyAuditSink;
    /// # #[async_trait::async_trait]
    /// # impl AgentEventSink<()> for MyAuditSink {
    /// #     async fn send(&self, _: entelix::AgentEvent<()>) -> entelix::Result<()> { Ok(()) }
    /// # }
    /// ```
    #[must_use]
    pub fn new(inner: Arc<dyn AgentEventSink<()>>) -> Self {
        Self {
            inner,
            _phantom: std::marker::PhantomData,
        }
    }
}

impl<S> Clone for StateErasureSink<S> {
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
            _phantom: std::marker::PhantomData,
        }
    }
}

impl<S> std::fmt::Debug for StateErasureSink<S> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("StateErasureSink").finish_non_exhaustive()
    }
}

#[async_trait]
impl<S> AgentEventSink<S> for StateErasureSink<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn send(&self, event: AgentEvent<S>) -> Result<()> {
        self.inner.send(event.erase_state()).await
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
    use super::*;

    type TestEvent = AgentEvent<i32>;

    fn started(agent: impl Into<String>) -> TestEvent {
        TestEvent::Started {
            run_id: "test-run".into(),
            tenant_id: entelix_core::TenantId::new("t-test"),
            parent_run_id: None,
            agent: agent.into(),
        }
    }

    fn complete(state: i32) -> TestEvent {
        TestEvent::Complete {
            run_id: "test-run".into(),
            tenant_id: entelix_core::TenantId::new("t-test"),
            state,
            usage: None,
        }
    }

    #[tokio::test]
    async fn dropping_sink_silently_discards_events() {
        let sink = DroppingSink;
        for i in 0..10 {
            sink.send(started(format!("a{i}"))).await.unwrap();
        }
    }

    #[tokio::test]
    async fn channel_sink_round_trips_in_order() {
        let (sink, mut rx) = ChannelSink::<i32>::new(4);
        for i in 0..3 {
            sink.send(started(format!("a{i}"))).await.unwrap();
        }
        drop(sink);
        let mut received = Vec::new();
        while let Some(event) = rx.recv().await {
            received.push(event);
        }
        assert_eq!(received.len(), 3);
        assert!(matches!(&received[0], AgentEvent::Started { agent, .. } if agent == "a0"));
    }

    #[tokio::test]
    async fn channel_sink_returns_cancelled_when_receiver_dropped() {
        let (sink, rx) = ChannelSink::<i32>::new(1);
        sink.send(started("a")).await.unwrap();
        drop(rx);
        let err = sink.send(complete(0)).await.unwrap_err();
        assert!(matches!(err, Error::Cancelled));
    }

    #[tokio::test]
    async fn broadcast_sink_fans_out_to_multiple_subscribers() {
        let sink = BroadcastSink::<i32>::new(8);
        let mut a = sink.subscribe();
        let mut b = sink.subscribe();
        assert_eq!(sink.receiver_count(), 2);
        sink.send(complete(7)).await.unwrap();
        let ea = a.recv().await.unwrap();
        let eb = b.recv().await.unwrap();
        assert!(matches!(ea, AgentEvent::Complete { state: 7, .. }));
        assert!(matches!(eb, AgentEvent::Complete { state: 7, .. }));
    }

    #[tokio::test]
    async fn broadcast_sink_no_subscribers_does_not_error() {
        let sink = BroadcastSink::<i32>::new(8);
        sink.send(complete(0)).await.unwrap();
    }

    #[tokio::test]
    async fn capture_sink_preserves_order_and_content() {
        let sink = CaptureSink::<i32>::new();
        sink.send(started("test")).await.unwrap();
        sink.send(complete(42)).await.unwrap();

        assert_eq!(sink.len(), 2);
        let events = sink.events();
        assert!(matches!(events[0], AgentEvent::Started { .. }));
        assert!(matches!(events[1], AgentEvent::Complete { state: 42, .. }));
    }

    #[tokio::test]
    async fn capture_sink_clones_share_underlying_buffer() {
        let sink_a = CaptureSink::<i32>::new();
        let sink_b = sink_a.clone();
        sink_a.send(complete(1)).await.unwrap();
        sink_b.send(complete(2)).await.unwrap();
        assert_eq!(sink_a.len(), 2);
        assert_eq!(sink_b.len(), 2);
    }

    /// Sink that always returns `Err` for `FailOpenSink` and
    /// `FanOutSink` regression coverage.
    #[derive(Default)]
    struct FailingSink {
        calls: parking_lot::Mutex<u32>,
    }

    #[async_trait]
    impl AgentEventSink<i32> for FailingSink {
        async fn send(&self, _event: AgentEvent<i32>) -> Result<()> {
            *self.calls.lock() += 1;
            Err(Error::Cancelled)
        }
    }

    #[tokio::test]
    async fn fan_out_sink_dispatches_in_registration_order() {
        let a = CaptureSink::<i32>::new();
        let b = CaptureSink::<i32>::new();
        let fan = FanOutSink::<i32>::new()
            .push(Arc::new(a.clone()))
            .push(Arc::new(b.clone()));
        fan.send(complete(1)).await.unwrap();
        fan.send(complete(2)).await.unwrap();
        assert_eq!(a.len(), 2);
        assert_eq!(b.len(), 2);
        assert!(matches!(
            a.events()[0],
            AgentEvent::Complete { state: 1, .. }
        ));
    }

    #[tokio::test]
    async fn fan_out_sink_propagates_first_error_and_stops() {
        let recorded = CaptureSink::<i32>::new();
        let failing = Arc::new(FailingSink::default());
        let fan = FanOutSink::<i32>::new()
            .push(Arc::clone(&failing) as Arc<dyn AgentEventSink<i32>>)
            .push(Arc::new(recorded.clone()));
        let err = fan.send(complete(1)).await.unwrap_err();
        assert!(matches!(err, Error::Cancelled));
        assert_eq!(*failing.calls.lock(), 1, "failing sink saw the event");
        assert_eq!(
            recorded.len(),
            0,
            "downstream sinks must not see the event after an upstream failure"
        );
    }

    #[tokio::test]
    async fn fail_open_sink_swallows_inner_error() {
        let failing: Arc<dyn AgentEventSink<i32>> = Arc::new(FailingSink::default());
        let lifted = FailOpenSink::new(failing);
        // Three failing sends — every one returns Ok.
        for _ in 0..3 {
            lifted.send(complete(0)).await.unwrap();
        }
    }

    #[tokio::test]
    async fn fail_open_sink_lifts_into_fan_out_to_isolate_observe_only() {
        let recorded = CaptureSink::<i32>::new();
        let failing: Arc<dyn AgentEventSink<i32>> = Arc::new(FailingSink::default());
        let fan = FanOutSink::<i32>::new()
            .push(Arc::new(FailOpenSink::new(failing))) // observe-only — lifted
            .push(Arc::new(recorded.clone())); // must-succeed — sees the event
        fan.send(complete(7)).await.unwrap();
        assert_eq!(
            recorded.len(),
            1,
            "lifting the failing sink with FailOpenSink must keep downstream sinks reachable"
        );
    }

    #[tokio::test]
    async fn state_erasure_sink_fans_in_heterogeneous_agents_to_one_unit_sink() {
        // Two agents with distinct state types (i32, String) share a
        // single `AgentEventSink<()>` audit pipeline through
        // StateErasureSink wrappers. The audit sees every event from
        // both agents with `Complete::state == ()` and every header
        // field (run_id / tenant_id / parent_run_id) preserved.
        let audit: Arc<CaptureSink<()>> = Arc::new(CaptureSink::<()>::new());
        let audit_dyn: Arc<dyn AgentEventSink<()>> = audit.clone();

        // Agent A: state-typed `i32`.
        let a_adapter: StateErasureSink<i32> = StateErasureSink::new(Arc::clone(&audit_dyn));
        a_adapter
            .send(AgentEvent::Started {
                run_id: "a-run".into(),
                tenant_id: entelix_core::TenantId::new("t-shared"),
                parent_run_id: None,
                agent: "agent-a".into(),
            })
            .await
            .unwrap();
        a_adapter
            .send(AgentEvent::Complete {
                run_id: "a-run".into(),
                tenant_id: entelix_core::TenantId::new("t-shared"),
                state: 42_i32,
                usage: None,
            })
            .await
            .unwrap();

        // Agent B: state-typed `String`.
        let b_adapter: StateErasureSink<String> = StateErasureSink::new(audit_dyn);
        b_adapter
            .send(AgentEvent::Started {
                run_id: "b-run".into(),
                tenant_id: entelix_core::TenantId::new("t-shared"),
                parent_run_id: Some("a-run".into()),
                agent: "agent-b".into(),
            })
            .await
            .unwrap();
        b_adapter
            .send(AgentEvent::Complete {
                run_id: "b-run".into(),
                tenant_id: entelix_core::TenantId::new("t-shared"),
                state: "done".to_owned(),
                usage: None,
            })
            .await
            .unwrap();

        // Audit captured all four events with Complete::state == ().
        let events = audit.events();
        assert_eq!(events.len(), 4);

        // Headers survived erasure.
        match &events[0] {
            AgentEvent::Started {
                run_id,
                tenant_id,
                parent_run_id,
                agent,
            } => {
                assert_eq!(run_id, "a-run");
                assert_eq!(tenant_id.as_str(), "t-shared");
                assert_eq!(parent_run_id.as_deref(), None);
                assert_eq!(agent, "agent-a");
            }
            other => panic!("unexpected event: {other:?}"),
        }
        match &events[2] {
            AgentEvent::Started {
                parent_run_id,
                agent,
                ..
            } => {
                assert_eq!(parent_run_id.as_deref(), Some("a-run"));
                assert_eq!(agent, "agent-b");
            }
            other => panic!("unexpected event: {other:?}"),
        }

        // Complete events lost their typed state via erasure — the
        // single-unit audit sink observes `state: ()` from both.
        match &events[1] {
            AgentEvent::Complete { state, .. } => assert_eq!(*state, ()),
            other => panic!("unexpected event: {other:?}"),
        }
        match &events[3] {
            AgentEvent::Complete { state, .. } => assert_eq!(*state, ()),
            other => panic!("unexpected event: {other:?}"),
        }
    }

    #[tokio::test]
    async fn state_erasure_sink_pins_first_party_recipe_state_types() {
        // Companion to the toy-type fan-in test: this one pins the
        // adapter against the *actual* state shapes the SDK ships
        // — `ReActState` / `ChatState` / `SupervisorState`. A future
        // change to the impl bounds (e.g. tightening from
        // `Clone + Send + Sync + 'static` to something narrower)
        // breaks here at compile time rather than reaching operators.
        use crate::state::{ChatState, ReActState, SupervisorState};

        let audit: Arc<CaptureSink<()>> = Arc::new(CaptureSink::<()>::new());
        let audit_dyn: Arc<dyn AgentEventSink<()>> = audit.clone();

        let react_adapter: StateErasureSink<ReActState> =
            StateErasureSink::new(Arc::clone(&audit_dyn));
        let chat_adapter: StateErasureSink<ChatState> =
            StateErasureSink::new(Arc::clone(&audit_dyn));
        let supervisor_adapter: StateErasureSink<SupervisorState> =
            StateErasureSink::new(audit_dyn);

        let tenant = entelix_core::TenantId::new("t-multi");

        // ReAct agent — typed Complete state with `steps` + messages.
        react_adapter
            .send(AgentEvent::Complete {
                run_id: "react-run".into(),
                tenant_id: tenant.clone(),
                state: ReActState::from_user("hello react"),
                usage: None,
            })
            .await
            .unwrap();

        // Chat agent — typed Complete state with messages only.
        chat_adapter
            .send(AgentEvent::Complete {
                run_id: "chat-run".into(),
                tenant_id: tenant.clone(),
                state: ChatState::from_user("hello chat"),
                usage: None,
            })
            .await
            .unwrap();

        // Supervisor agent — typed Complete state with routing fields.
        supervisor_adapter
            .send(AgentEvent::Complete {
                run_id: "super-run".into(),
                tenant_id: tenant,
                state: SupervisorState::from_user("hello supervisor"),
                usage: None,
            })
            .await
            .unwrap();

        let events = audit.events();
        assert_eq!(events.len(), 3);
        for event in &events {
            match event {
                AgentEvent::Complete { state, .. } => assert_eq!(*state, ()),
                other => panic!("unexpected event: {other:?}"),
            }
        }
    }
}