1use std::sync::Arc;
2
3use async_trait::async_trait;
4use schema_core::{GenericValue, IndexMapping, IndexName};
5
6use crate::{FlushReport, Result, Sink};
7
8#[derive(Debug, Clone)]
16pub struct FanOutSink {
17 sinks: Vec<Arc<dyn Sink>>,
18}
19
20impl FanOutSink {
21 pub fn new(sinks: Vec<Arc<dyn Sink>>) -> Self {
22 Self { sinks }
23 }
24}
25
26#[async_trait]
27impl Sink for FanOutSink {
28 async fn ensure_index(&self, mapping: &IndexMapping) -> Result<()> {
29 for sink in &self.sinks {
30 sink.ensure_index(mapping).await?;
31 }
32 Ok(())
33 }
34
35 async fn upsert(&self, index: &IndexName, id: &str, document: &GenericValue) -> Result<()> {
36 for sink in &self.sinks {
37 sink.upsert(index, id, document).await?;
38 }
39 Ok(())
40 }
41
42 async fn delete(&self, index: &IndexName, id: &str) -> Result<()> {
43 for sink in &self.sinks {
44 sink.delete(index, id).await?;
45 }
46 Ok(())
47 }
48
49 async fn flush(&self, caught_up: bool) -> Result<FlushReport> {
50 let mut report = FlushReport::default();
51 for sink in &self.sinks {
52 report
53 .rejected
54 .extend(sink.flush(caught_up).await?.rejected);
55 }
56 Ok(report)
57 }
58
59 async fn is_seeded(&self, index: &IndexName) -> Result<bool> {
60 for sink in &self.sinks {
61 if !sink.is_seeded(index).await? {
62 return Ok(false);
63 }
64 }
65 Ok(true)
66 }
67
68 async fn mark_seeded(&self, index: &IndexName) -> Result<()> {
69 for sink in &self.sinks {
70 sink.mark_seeded(index).await?;
71 }
72 Ok(())
73 }
74}