1use std::sync::Arc;
15
16use super::{Sink, SinkFut};
17use crate::registry::ScrubbedEnvelope;
18
19pub struct FanOutSink {
26 sinks: Vec<Arc<dyn Sink>>,
27}
28
29impl std::fmt::Debug for FanOutSink {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("FanOutSink")
32 .field("children", &self.sinks.len())
33 .finish()
34 }
35}
36
37impl FanOutSink {
38 #[must_use]
44 pub fn new(sinks: Vec<Arc<dyn Sink>>) -> Arc<Self> {
45 assert!(
46 !sinks.is_empty(),
47 "FanOutSink requires at least one child sink",
48 );
49 Arc::new(Self { sinks })
50 }
51
52 #[must_use]
54 pub fn children(&self) -> usize {
55 self.sinks.len()
56 }
57}
58
59impl Sink for FanOutSink {
60 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
61 for sink in &self.sinks {
62 sink.deliver(env);
63 }
64 }
65
66 fn flush(&self) -> SinkFut<'_> {
67 Box::pin(async move {
68 for sink in &self.sinks {
73 sink.flush().await;
74 }
75 })
76 }
77
78 fn shutdown(&self) -> SinkFut<'_> {
79 Box::pin(async move {
80 for sink in &self.sinks {
81 sink.shutdown().await;
82 }
83 })
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::sync::atomic::{AtomicUsize, Ordering};
90
91 use super::*;
92
93 #[derive(Default)]
94 struct CountingSink {
95 deliveries: AtomicUsize,
96 flushes: AtomicUsize,
97 shutdowns: AtomicUsize,
98 }
99
100 impl std::fmt::Debug for CountingSink {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("CountingSink").finish()
103 }
104 }
105
106 impl Sink for CountingSink {
107 fn deliver(&self, _env: ScrubbedEnvelope<'_>) {
108 self.deliveries.fetch_add(1, Ordering::Relaxed);
109 }
110 fn flush(&self) -> SinkFut<'_> {
111 self.flushes.fetch_add(1, Ordering::Relaxed);
112 Box::pin(async {})
113 }
114 fn shutdown(&self) -> SinkFut<'_> {
115 self.shutdowns.fetch_add(1, Ordering::Relaxed);
116 Box::pin(async {})
117 }
118 }
119
120 #[test]
121 #[should_panic(expected = "FanOutSink requires at least one child sink")]
122 fn test_should_panic_on_empty_sinks() {
123 let _ = FanOutSink::new(Vec::new());
124 }
125
126 #[tokio::test]
127 async fn test_should_flush_and_shutdown_every_child() {
128 let a: Arc<CountingSink> = Arc::new(CountingSink::default());
129 let b: Arc<CountingSink> = Arc::new(CountingSink::default());
130 let fan = FanOutSink::new(vec![a.clone() as Arc<dyn Sink>, b.clone() as Arc<dyn Sink>]);
131
132 assert_eq!(fan.children(), 2);
133 fan.flush().await;
134 fan.shutdown().await;
135
136 assert_eq!(a.flushes.load(Ordering::Relaxed), 1);
137 assert_eq!(a.shutdowns.load(Ordering::Relaxed), 1);
138 assert_eq!(b.flushes.load(Ordering::Relaxed), 1);
139 assert_eq!(b.shutdowns.load(Ordering::Relaxed), 1);
140 }
141}