event_notification/adapter/
mod.rs1use crate::AdapterConfig;
2use crate::Error;
3use crate::Event;
4use async_trait::async_trait;
5use std::sync::Arc;
6
7#[cfg(feature = "kafka")]
8pub(crate) mod kafka;
9#[cfg(feature = "mqtt")]
10pub(crate) mod mqtt;
11#[cfg(feature = "webhook")]
12pub(crate) mod webhook;
13
14#[async_trait]
16pub trait ChannelAdapter: Send + Sync + 'static {
17 fn name(&self) -> String;
19 async fn send(&self, event: &Event) -> Result<(), Error>;
21}
22
23pub fn create_adapters(
25 configs: &[AdapterConfig],
26) -> Result<Vec<Arc<dyn ChannelAdapter>>, Box<Error>> {
27 let mut adapters: Vec<Arc<dyn ChannelAdapter>> = Vec::new();
28
29 for config in configs {
30 match config {
31 #[cfg(feature = "webhook")]
32 AdapterConfig::Webhook(webhook_config) => {
33 webhook_config
34 .validate()
35 .map_err(|e| Box::new(Error::ConfigError(e)))?;
36 adapters.push(Arc::new(webhook::WebhookAdapter::new(
37 webhook_config.clone(),
38 )));
39 }
40 #[cfg(feature = "kafka")]
41 AdapterConfig::Kafka(kafka_config) => {
42 adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?));
43 }
44 #[cfg(feature = "mqtt")]
45 AdapterConfig::Mqtt(mqtt_config) => {
46 let (mqtt, mut event_loop) = mqtt::MqttAdapter::new(mqtt_config);
47 tokio::spawn(async move { while event_loop.poll().await.is_ok() {} });
48 adapters.push(Arc::new(mqtt));
49 }
50 #[cfg(not(feature = "webhook"))]
51 AdapterConfig::Webhook(_) => return Err(Box::new(Error::FeatureDisabled("webhook"))),
52 #[cfg(not(feature = "kafka"))]
53 AdapterConfig::Kafka(_) => return Err(Box::new(Error::FeatureDisabled("kafka"))),
54 #[cfg(not(feature = "mqtt"))]
55 AdapterConfig::Mqtt(_) => return Err(Box::new(Error::FeatureDisabled("mqtt"))),
56 }
57 }
58
59 Ok(adapters)
60}