use futures_util::future::join_all;
use crate::core::{Event, Result};
use crate::sink::{BoxedSink, SinkAdapter, SinkDeliveryGuarantee};
async fn join_first_err<I, F>(futures: I) -> Result<()>
where
I: IntoIterator<Item = F>,
F: std::future::Future<Output = Result<()>>,
{
join_all(futures)
.await
.into_iter()
.find(|r| r.is_err())
.unwrap_or(Ok(()))
}
pub struct FanOutSinkAdapter {
sinks: Vec<BoxedSink>,
}
impl std::fmt::Debug for FanOutSinkAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let names: Vec<&str> = self.sinks.iter().map(|s| s.name()).collect();
f.debug_struct("FanOutSinkAdapter")
.field("sinks", &names)
.finish()
}
}
impl FanOutSinkAdapter {
pub fn new(sinks: Vec<BoxedSink>) -> Self {
assert!(
!sinks.is_empty(),
"FanOutSinkAdapter requires at least one child sink"
);
Self { sinks }
}
pub fn len(&self) -> usize {
self.sinks.len()
}
pub fn is_empty(&self) -> bool {
self.sinks.is_empty()
}
}
impl SinkAdapter for FanOutSinkAdapter {
fn name(&self) -> &str {
"fan-out"
}
fn is_closed(&self) -> bool {
self.sinks.iter().all(|s| s.is_closed())
}
fn delivery_metrics(&self) -> Option<crate::sink::SinkDeliveryMetrics> {
let mut any = false;
let mut agg = crate::sink::SinkDeliveryMetrics::default();
for s in &self.sinks {
if let Some(m) = s.delivery_metrics() {
agg.merge(&m);
any = true;
}
}
if any {
Some(agg)
} else {
None
}
}
async fn send(&mut self, event: &Event) -> Result<()> {
join_first_err(self.sinks.iter_mut().map(|s| s.send(event))).await
}
async fn flush(&mut self) -> Result<()> {
join_first_err(self.sinks.iter_mut().map(|s| s.flush())).await
}
async fn close(&mut self) -> Result<()> {
join_first_err(self.sinks.iter_mut().map(|s| s.close())).await
}
fn delivery_guarantee(&self) -> SinkDeliveryGuarantee {
self.sinks.iter().map(|s| s.delivery_guarantee()).fold(
SinkDeliveryGuarantee::EffectivelyOnce,
SinkDeliveryGuarantee::weakest,
)
}
fn idempotent_delivery_capable(&self) -> bool {
self.sinks.iter().all(|s| s.idempotent_delivery_capable())
}
fn transactional_checkpoint_barrier_capable(&self) -> bool {
self.sinks
.iter()
.all(|s| s.transactional_checkpoint_barrier_capable())
}
fn queue_depth(&self) -> Option<usize> {
let mut any = false;
let total: usize = self
.sinks
.iter()
.filter_map(|s| {
let d = s.queue_depth();
if d.is_some() {
any = true;
}
d
})
.sum();
if any {
Some(total)
} else {
None
}
}
fn flush_tick_interval(&self) -> Option<std::time::Duration> {
self.sinks
.iter()
.filter_map(|s| s.flush_tick_interval())
.min()
}
async fn begin_checkpoint_barrier(&mut self) -> Result<()> {
let n = self.sinks.len();
for i in 0..n {
if let Err(begin_err) = self.sinks[i].begin_checkpoint_barrier().await {
for j in (0..i).rev() {
let _ = self.sinks[j].abort_checkpoint_barrier().await;
}
return Err(begin_err);
}
}
Ok(())
}
async fn commit_checkpoint_barrier(&mut self) -> Result<()> {
join_first_err(self.sinks.iter_mut().map(|s| s.commit_checkpoint_barrier())).await
}
async fn abort_checkpoint_barrier(&mut self) -> Result<()> {
join_first_err(self.sinks.iter_mut().map(|s| s.abort_checkpoint_barrier())).await
}
async fn preflight_check(&mut self) -> Result<()> {
join_first_err(self.sinks.iter_mut().map(|s| s.preflight_check())).await
}
}
#[cfg(test)]
mod tests {
use crate::core::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
use crate::sink::{BoxedSink, FanOutSinkAdapter, MemorySinkAdapter, SinkAdapter};
fn make_event(table: &str) -> Event {
Event {
before: None,
after: Some(serde_json::json!({"id": 1})),
op: Operation::Insert,
source: SourceMetadata {
source_name: "test".into(),
offset: "0".into(),
timestamp: 1,
},
ts: 1,
schema: None,
table: table.into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
fn make_fan(n: usize) -> (FanOutSinkAdapter, Vec<String>) {
let names: Vec<String> = (0..n).map(|i| format!("mem{i}")).collect();
let sinks = names
.iter()
.map(|name| BoxedSink::new(MemorySinkAdapter::new(name.as_str())))
.collect();
(FanOutSinkAdapter::new(sinks), names)
}
#[tokio::test]
async fn delivers_to_all_children_concurrently() {
let (mut fan, _) = make_fan(3);
let event = make_event("orders");
fan.send(&event).await.expect("send should succeed");
fan.flush().await.expect("flush should succeed");
fan.close().await.expect("close should succeed");
}
#[tokio::test]
async fn per_sink_ordering_is_preserved() {
let n_sinks = 3;
let n_events = 5;
let (mut fan, _) = make_fan(n_sinks);
for i in 0..n_events {
let event = make_event(&format!("tbl_{i}"));
fan.send(&event).await.expect("send should succeed");
}
fan.flush().await.expect("flush should succeed");
}
#[tokio::test]
async fn name_is_fan_out() {
let (fan, _) = make_fan(2);
assert_eq!(fan.name(), "fan-out");
}
#[tokio::test]
async fn len_reflects_child_count() {
let (fan, _) = make_fan(4);
assert_eq!(fan.len(), 4);
assert!(!fan.is_empty());
}
#[tokio::test]
async fn begin_checkpoint_barrier_is_noop_for_default_sinks() {
let (mut fan, _) = make_fan(2);
fan.begin_checkpoint_barrier()
.await
.expect("begin should succeed");
fan.commit_checkpoint_barrier()
.await
.expect("commit should succeed");
}
#[tokio::test]
async fn abort_checkpoint_barrier_is_noop_for_default_sinks() {
let (mut fan, _) = make_fan(2);
fan.begin_checkpoint_barrier()
.await
.expect("begin should succeed");
fan.abort_checkpoint_barrier()
.await
.expect("abort should succeed");
}
#[tokio::test]
async fn preflight_check_runs_all_children_concurrently() {
let (mut fan, _) = make_fan(2);
fan.preflight_check()
.await
.expect("preflight should succeed");
}
#[test]
fn debug_impl_shows_child_names() {
let (fan, names) = make_fan(2);
let dbg = format!("{fan:?}");
for name in &names {
assert!(
dbg.contains(name.as_str()),
"debug should contain child name {name}"
);
}
}
#[test]
#[should_panic(expected = "at least one child sink")]
fn panics_on_empty_sinks() {
FanOutSinkAdapter::new(vec![]);
}
#[test]
fn delivery_guarantee_is_weakest_of_children() {
use crate::sink::SinkDeliveryGuarantee;
let (fan, _) = make_fan(3);
assert_eq!(fan.delivery_guarantee(), SinkDeliveryGuarantee::AtLeastOnce);
}
#[test]
fn idempotent_delivery_capable_requires_all_children() {
let (fan, _) = make_fan(2);
assert!(!fan.idempotent_delivery_capable());
}
#[test]
fn transactional_barrier_capable_requires_all_children() {
let (fan, _) = make_fan(2);
assert!(!fan.transactional_checkpoint_barrier_capable());
}
}