event_notification/
lib.rs

1mod adapter;
2mod bus;
3mod config;
4mod error;
5mod event;
6mod global;
7mod producer;
8mod store;
9
10pub use adapter::create_adapters;
11#[cfg(feature = "kafka")]
12pub use adapter::kafka::KafkaAdapter;
13#[cfg(feature = "mqtt")]
14pub use adapter::mqtt::MqttAdapter;
15#[cfg(feature = "webhook")]
16pub use adapter::webhook::WebhookAdapter;
17pub use adapter::ChannelAdapter;
18pub use bus::event_bus;
19#[cfg(feature = "kafka")]
20pub use config::KafkaConfig;
21#[cfg(feature = "mqtt")]
22pub use config::MqttConfig;
23#[cfg(feature = "webhook")]
24pub use config::WebhookConfig;
25pub use config::{AdapterConfig, NotificationConfig};
26pub use error::Error;
27
28pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
29pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
30pub use producer::{handle_event, start_producer};
31pub use store::EventStore;
32
33use std::sync::Arc;
34use tokio::sync::mpsc;
35use tokio_util::sync::CancellationToken;
36
37/// The `NotificationSystem` struct represents the notification system.
38/// It manages the event bus and the adapters.
39/// It is responsible for sending and receiving events.
40/// It also handles the shutdown process.
41pub struct NotificationSystem {
42    tx: mpsc::Sender<Event>,
43    rx: Option<mpsc::Receiver<Event>>,
44    store: Arc<EventStore>,
45    shutdown: CancellationToken,
46}
47
48impl NotificationSystem {
49    /// Creates a new `NotificationSystem` instance.
50    pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
51        let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
52        let store = Arc::new(EventStore::new(&config.store_path).await?);
53        let shutdown = CancellationToken::new();
54
55        let restored_logs = store.load_logs().await?;
56        for log in restored_logs {
57            for event in log.records {
58                // For example, where the send method may return a SendError when calling it
59                tx.send(event)
60                    .await
61                    .map_err(|e| Error::ChannelSend(Box::new(e)))?;
62            }
63        }
64
65        Ok(Self {
66            tx,
67            rx: Some(rx),
68            store,
69            shutdown,
70        })
71    }
72
73    /// Starts the notification system.
74    /// It initializes the event bus and the producer.
75    pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
76        let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
77
78        let shutdown_clone = self.shutdown.clone();
79        let store_clone = self.store.clone();
80        let bus_handle = tokio::spawn(async move {
81            if let Err(e) = bus::event_bus(rx, adapters, store_clone, shutdown_clone).await {
82                tracing::error!("Event bus failed: {}", e);
83            }
84        });
85
86        let tx_clone = self.tx.clone();
87        let producer_handle = tokio::spawn(async move {
88            if let Err(e) = producer::start_producer(tx_clone).await {
89                tracing::error!("Producer failed: {}", e);
90            }
91        });
92        tokio::select! {
93            result = bus_handle => {
94                result.map_err(Error::JoinError)?;
95                Ok(())
96            },
97            result = producer_handle => {
98                result.map_err(Error::JoinError)?;
99                Ok(())
100            },
101            _ = self.shutdown.cancelled() => {
102                tracing::info!("System shutdown triggered");
103                Ok(())
104            }
105        }
106    }
107
108    /// Sends an event to the notification system.
109    /// This method is used to send events to the event bus.
110    pub async fn send_event(&self, event: Event) -> Result<(), Error> {
111        self.tx
112            .send(event)
113            .await
114            .map_err(|e| Error::ChannelSend(Box::new(e)))?;
115        Ok(())
116    }
117
118    /// Shuts down the notification system.
119    /// This method is used to cancel the event bus and producer tasks.
120    pub fn shutdown(&self) {
121        self.shutdown.cancel();
122    }
123}