event_notification/adapter/
mod.rs

1use crate::AdapterConfig;
2use crate::Error;
3use crate::Event;
4use async_trait::async_trait;
5use std::sync::Arc;
6
7#[cfg(feature = "kafka")]
8pub(crate) mod kafka;
9#[cfg(feature = "mqtt")]
10pub(crate) mod mqtt;
11#[cfg(feature = "webhook")]
12pub(crate) mod webhook;
13
14/// The `ChannelAdapter` trait defines the interface for all channel adapters.
15#[async_trait]
16pub trait ChannelAdapter: Send + Sync + 'static {
17    /// Sends an event to the channel.
18    fn name(&self) -> String;
19    /// Sends an event to the channel.
20    async fn send(&self, event: &Event) -> Result<(), Error>;
21}
22
23/// Creates channel adapters based on the provided configuration.
24pub fn create_adapters(
25    configs: &[AdapterConfig],
26) -> Result<Vec<Arc<dyn ChannelAdapter>>, Box<Error>> {
27    let mut adapters: Vec<Arc<dyn ChannelAdapter>> = Vec::new();
28
29    for config in configs {
30        match config {
31            #[cfg(feature = "webhook")]
32            AdapterConfig::Webhook(webhook_config) => {
33                webhook_config
34                    .validate()
35                    .map_err(|e| Box::new(Error::ConfigError(e)))?;
36                adapters.push(Arc::new(webhook::WebhookAdapter::new(
37                    webhook_config.clone(),
38                )));
39            }
40            #[cfg(feature = "kafka")]
41            AdapterConfig::Kafka(kafka_config) => {
42                adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?));
43            }
44            #[cfg(feature = "mqtt")]
45            AdapterConfig::Mqtt(mqtt_config) => {
46                let (mqtt, mut event_loop) = mqtt::MqttAdapter::new(mqtt_config);
47                tokio::spawn(async move { while event_loop.poll().await.is_ok() {} });
48                adapters.push(Arc::new(mqtt));
49            }
50            #[cfg(not(feature = "webhook"))]
51            AdapterConfig::Webhook(_) => return Err(Box::new(Error::FeatureDisabled("webhook"))),
52            #[cfg(not(feature = "kafka"))]
53            AdapterConfig::Kafka(_) => return Err(Box::new(Error::FeatureDisabled("kafka"))),
54            #[cfg(not(feature = "mqtt"))]
55            AdapterConfig::Mqtt(_) => return Err(Box::new(Error::FeatureDisabled("mqtt"))),
56        }
57    }
58
59    Ok(adapters)
60}