use crate::traits::{EndpointStatus, MessagePublisher, PublisherError, Sent, SentBatch};
use crate::CanonicalMessage;
use async_trait::async_trait;
use std::any::Any;
use std::sync::Arc;
pub struct FanoutPublisher {
publishers: Vec<Arc<dyn MessagePublisher>>,
}
impl FanoutPublisher {
pub fn new(publishers: Vec<Arc<dyn MessagePublisher>>) -> Self {
Self { publishers }
}
}
#[async_trait]
impl MessagePublisher for FanoutPublisher {
async fn send(&self, message: CanonicalMessage) -> Result<Sent, PublisherError> {
for publisher in &self.publishers {
publisher.send(message.clone()).await?;
}
Ok(Sent::Ack)
}
async fn send_batch(
&self,
messages: Vec<CanonicalMessage>,
) -> Result<SentBatch, PublisherError> {
use futures::future::join_all;
if messages.is_empty() {
return Ok(SentBatch::Ack);
}
let batch_sends = self.publishers.iter().map(|p| {
p.send_batch(messages.clone())
});
let results = join_all(batch_sends).await;
for result in results {
result?;
}
Ok(SentBatch::Ack)
}
async fn status(&self) -> EndpointStatus {
use futures::future::join_all;
let status_futs = self.publishers.iter().map(|p| p.status());
let results = join_all(status_futs).await;
let mut healthy = true;
let mut pending = 0;
let mut capacity = 0;
let mut error: Option<String> = None;
let mut details = Vec::new();
for status in results {
if !status.healthy {
healthy = false;
if error.is_none() {
error = status.error.clone();
}
}
pending += status.pending.unwrap_or(0);
capacity += status.capacity.unwrap_or(0);
details.push(status);
}
EndpointStatus {
healthy,
pending: Some(pending),
capacity: Some(capacity),
error,
details: serde_json::json!({ "destinations": details }),
..Default::default()
}
}
fn as_any(&self) -> &dyn Any {
self
}
}