Skip to main content

obs_core/sink/
fanout.rs

1//! `FanOutSink` — deliver one envelope to N child sinks.
2//!
3//! `StandardObserverBuilder::sink_for(tier, sink)` replaces the sink
4//! slot for the tier. To fan a tier's output across multiple
5//! destinations (e.g. `NdjsonFileSink` + `OtlpLogSink` during Phase 3,
6//! or `S3Sink` + `OtlpLogSink` + live-tail at the boundary), wrap
7//! them in a `FanOutSink`. Boundary-review § 3.4 (moved upstream from
8//! `tok_obs::FanOutSink`).
9//!
10//! `ScrubbedEnvelope<'_>` is `Copy` — the per-deliver fan-out is
11//! allocation-free. Each child does its own small copy-into-queue
12//! internally.
13
14use std::sync::Arc;
15
16use super::{Sink, SinkFut};
17use crate::registry::ScrubbedEnvelope;
18
19/// Multiplex a single tier's output across N sinks.
20///
21/// Construction enforces `!sinks.is_empty()` — a zero-child fan-out
22/// is almost certainly a config mistake (silent drop of every
23/// envelope); the empty case is better served by an explicit
24/// `NoopSink`.
25pub struct FanOutSink {
26    sinks: Vec<Arc<dyn Sink>>,
27}
28
29impl std::fmt::Debug for FanOutSink {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("FanOutSink")
32            .field("children", &self.sinks.len())
33            .finish()
34    }
35}
36
37impl FanOutSink {
38    /// Wrap `sinks` as a shared `Arc<dyn Sink>`.
39    ///
40    /// # Panics
41    ///
42    /// Panics when `sinks` is empty — caller bug.
43    #[must_use]
44    pub fn new(sinks: Vec<Arc<dyn Sink>>) -> Arc<Self> {
45        assert!(
46            !sinks.is_empty(),
47            "FanOutSink requires at least one child sink",
48        );
49        Arc::new(Self { sinks })
50    }
51
52    /// Number of child sinks.
53    #[must_use]
54    pub fn children(&self) -> usize {
55        self.sinks.len()
56    }
57}
58
59impl Sink for FanOutSink {
60    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
61        for sink in &self.sinks {
62            sink.deliver(env);
63        }
64    }
65
66    fn flush(&self) -> SinkFut<'_> {
67        Box::pin(async move {
68            // Sequential await is fine: `flush` is a shutdown-adjacent
69            // operation (drain batches). The downstream sinks aren't
70            // coupled to each other, so running in order keeps the code
71            // simple without noticeable latency.
72            for sink in &self.sinks {
73                sink.flush().await;
74            }
75        })
76    }
77
78    fn shutdown(&self) -> SinkFut<'_> {
79        Box::pin(async move {
80            for sink in &self.sinks {
81                sink.shutdown().await;
82            }
83        })
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::sync::atomic::{AtomicUsize, Ordering};
90
91    use super::*;
92
93    #[derive(Default)]
94    struct CountingSink {
95        deliveries: AtomicUsize,
96        flushes: AtomicUsize,
97        shutdowns: AtomicUsize,
98    }
99
100    impl std::fmt::Debug for CountingSink {
101        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102            f.debug_struct("CountingSink").finish()
103        }
104    }
105
106    impl Sink for CountingSink {
107        fn deliver(&self, _env: ScrubbedEnvelope<'_>) {
108            self.deliveries.fetch_add(1, Ordering::Relaxed);
109        }
110        fn flush(&self) -> SinkFut<'_> {
111            self.flushes.fetch_add(1, Ordering::Relaxed);
112            Box::pin(async {})
113        }
114        fn shutdown(&self) -> SinkFut<'_> {
115            self.shutdowns.fetch_add(1, Ordering::Relaxed);
116            Box::pin(async {})
117        }
118    }
119
120    #[test]
121    #[should_panic(expected = "FanOutSink requires at least one child sink")]
122    fn test_should_panic_on_empty_sinks() {
123        let _ = FanOutSink::new(Vec::new());
124    }
125
126    #[tokio::test]
127    async fn test_should_flush_and_shutdown_every_child() {
128        let a: Arc<CountingSink> = Arc::new(CountingSink::default());
129        let b: Arc<CountingSink> = Arc::new(CountingSink::default());
130        let fan = FanOutSink::new(vec![a.clone() as Arc<dyn Sink>, b.clone() as Arc<dyn Sink>]);
131
132        assert_eq!(fan.children(), 2);
133        fan.flush().await;
134        fan.shutdown().await;
135
136        assert_eq!(a.flushes.load(Ordering::Relaxed), 1);
137        assert_eq!(a.shutdowns.load(Ordering::Relaxed), 1);
138        assert_eq!(b.flushes.load(Ordering::Relaxed), 1);
139        assert_eq!(b.shutdowns.load(Ordering::Relaxed), 1);
140    }
141}