Skip to main content

sinks_core/
fan_out.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use schema_core::{GenericValue, IndexMapping, IndexName};
5
6use crate::{FlushReport, Result, Sink};
7
8/// Dispatches every sink operation to a set of inner sinks in declaration order.
9///
10/// `ensure_index`, `upsert`, `delete`, and `flush` are sent to each sink
11/// sequentially; the first error short-circuits. `is_seeded` returns `true` only when **all**
12/// inner sinks report the index as seeded (AND semantics: every destination
13/// must hold the data before a backfill is considered complete). `mark_seeded`
14/// is called on all inner sinks.
15#[derive(Debug, Clone)]
16pub struct FanOutSink {
17    sinks: Vec<Arc<dyn Sink>>,
18}
19
20impl FanOutSink {
21    pub fn new(sinks: Vec<Arc<dyn Sink>>) -> Self {
22        Self { sinks }
23    }
24}
25
26#[async_trait]
27impl Sink for FanOutSink {
28    async fn ensure_index(&self, mapping: &IndexMapping) -> Result<()> {
29        for sink in &self.sinks {
30            sink.ensure_index(mapping).await?;
31        }
32        Ok(())
33    }
34
35    async fn upsert(&self, index: &IndexName, id: &str, document: &GenericValue) -> Result<()> {
36        for sink in &self.sinks {
37            sink.upsert(index, id, document).await?;
38        }
39        Ok(())
40    }
41
42    async fn delete(&self, index: &IndexName, id: &str) -> Result<()> {
43        for sink in &self.sinks {
44            sink.delete(index, id).await?;
45        }
46        Ok(())
47    }
48
49    async fn flush(&self, caught_up: bool) -> Result<FlushReport> {
50        let mut report = FlushReport::default();
51        for sink in &self.sinks {
52            report
53                .rejected
54                .extend(sink.flush(caught_up).await?.rejected);
55        }
56        Ok(report)
57    }
58
59    async fn is_seeded(&self, index: &IndexName) -> Result<bool> {
60        for sink in &self.sinks {
61            if !sink.is_seeded(index).await? {
62                return Ok(false);
63            }
64        }
65        Ok(true)
66    }
67
68    async fn mark_seeded(&self, index: &IndexName) -> Result<()> {
69        for sink in &self.sinks {
70            sink.mark_seeded(index).await?;
71        }
72        Ok(())
73    }
74}