use std::sync::Arc;
use super::{Sink, SinkFut};
use crate::registry::ScrubbedEnvelope;
pub struct FanOutSink {
sinks: Vec<Arc<dyn Sink>>,
}
impl std::fmt::Debug for FanOutSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FanOutSink")
.field("children", &self.sinks.len())
.finish()
}
}
impl FanOutSink {
#[must_use]
pub fn new(sinks: Vec<Arc<dyn Sink>>) -> Arc<Self> {
assert!(
!sinks.is_empty(),
"FanOutSink requires at least one child sink",
);
Arc::new(Self { sinks })
}
#[must_use]
pub fn children(&self) -> usize {
self.sinks.len()
}
}
impl Sink for FanOutSink {
fn deliver(&self, env: ScrubbedEnvelope<'_>) {
for sink in &self.sinks {
sink.deliver(env);
}
}
fn flush(&self) -> SinkFut<'_> {
Box::pin(async move {
for sink in &self.sinks {
sink.flush().await;
}
})
}
fn shutdown(&self) -> SinkFut<'_> {
Box::pin(async move {
for sink in &self.sinks {
sink.shutdown().await;
}
})
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
#[derive(Default)]
struct CountingSink {
deliveries: AtomicUsize,
flushes: AtomicUsize,
shutdowns: AtomicUsize,
}
impl std::fmt::Debug for CountingSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CountingSink").finish()
}
}
impl Sink for CountingSink {
fn deliver(&self, _env: ScrubbedEnvelope<'_>) {
self.deliveries.fetch_add(1, Ordering::Relaxed);
}
fn flush(&self) -> SinkFut<'_> {
self.flushes.fetch_add(1, Ordering::Relaxed);
Box::pin(async {})
}
fn shutdown(&self) -> SinkFut<'_> {
self.shutdowns.fetch_add(1, Ordering::Relaxed);
Box::pin(async {})
}
}
#[test]
#[should_panic(expected = "FanOutSink requires at least one child sink")]
fn test_should_panic_on_empty_sinks() {
let _ = FanOutSink::new(Vec::new());
}
#[tokio::test]
async fn test_should_flush_and_shutdown_every_child() {
let a: Arc<CountingSink> = Arc::new(CountingSink::default());
let b: Arc<CountingSink> = Arc::new(CountingSink::default());
let fan = FanOutSink::new(vec![a.clone() as Arc<dyn Sink>, b.clone() as Arc<dyn Sink>]);
assert_eq!(fan.children(), 2);
fan.flush().await;
fan.shutdown().await;
assert_eq!(a.flushes.load(Ordering::Relaxed), 1);
assert_eq!(a.shutdowns.load(Ordering::Relaxed), 1);
assert_eq!(b.flushes.load(Ordering::Relaxed), 1);
assert_eq!(b.shutdowns.load(Ordering::Relaxed), 1);
}
}