event_notification/
lib.rs

1mod adapter;
2mod bus;
3mod config;
4mod error;
5mod event;
6mod global;
7mod producer;
8mod store;
9
10pub use adapter::ChannelAdapter;
11pub use adapter::create_adapters;
12#[cfg(feature = "kafka")]
13pub use adapter::kafka::KafkaAdapter;
14#[cfg(feature = "mqtt")]
15pub use adapter::mqtt::MqttAdapter;
16#[cfg(feature = "webhook")]
17pub use adapter::webhook::WebhookAdapter;
18pub use bus::event_bus;
19#[cfg(feature = "kafka")]
20pub use config::KafkaConfig;
21#[cfg(feature = "mqtt")]
22pub use config::MqttConfig;
23#[cfg(feature = "webhook")]
24pub use config::WebhookConfig;
25pub use config::{AdapterConfig, NotificationConfig};
26pub use error::Error;
27
28pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
29pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
30pub use store::EventStore;
31
32#[cfg(feature = "http-producer")]
33pub use producer::EventProducer;
34#[cfg(feature = "http-producer")]
35pub use producer::http::HttpProducer;
36#[cfg(feature = "http-producer")]
37pub use producer::http::HttpProducerConfig;
38
39use std::sync::Arc;
40use tokio::sync::mpsc;
41use tokio_util::sync::CancellationToken;
42
43/// The `NotificationSystem` struct represents the notification system.
44/// It manages the event bus and the adapters.
45/// It is responsible for sending and receiving events.
46/// It also handles the shutdown process.
47pub struct NotificationSystem {
48    tx: mpsc::Sender<Event>,
49    rx: Option<mpsc::Receiver<Event>>,
50    store: Arc<EventStore>,
51    shutdown: CancellationToken,
52    #[cfg(feature = "http-producer")]
53    http_config: HttpProducerConfig,
54}
55
56impl NotificationSystem {
57    /// Creates a new `NotificationSystem` instance.
58    pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
59        let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
60        let store = Arc::new(EventStore::new(&config.store_path).await?);
61        let shutdown = CancellationToken::new();
62
63        let restored_logs = store.load_logs().await?;
64        for log in restored_logs {
65            for event in log.records {
66                // For example, where the send method may return a SendError when calling it
67                tx.send(event)
68                    .await
69                    .map_err(|e| Error::ChannelSend(Box::new(e)))?;
70            }
71        }
72
73        Ok(Self {
74            tx,
75            rx: Some(rx),
76            store,
77            shutdown,
78            #[cfg(feature = "http-producer")]
79            http_config: config.http,
80        })
81    }
82
83    /// Starts the notification system.
84    /// It initializes the event bus and the producer.
85    pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
86        let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
87
88        let shutdown_clone = self.shutdown.clone();
89        let store_clone = self.store.clone();
90        let bus_handle = tokio::spawn(async move {
91            if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
92                tracing::error!("Event bus failed: {}", e);
93            }
94        });
95
96        #[cfg(feature = "http-producer")]
97        {
98            let producer = HttpProducer::new(tx, self.http_config.port);
99            producer.start().await?;
100        }
101
102        tokio::select! {
103            result = bus_handle => {
104                result.map_err(Error::JoinError)?;
105                Ok(())
106            },
107            _ = self.shutdown.cancelled() => {
108                tracing::info!("System shutdown triggered");
109                Ok(())
110            }
111        }
112    }
113
114    /// Sends an event to the notification system.
115    /// This method is used to send events to the event bus.
116    pub async fn send_event(&self, event: Event) -> Result<(), Error> {
117        self.tx
118            .send(event)
119            .await
120            .map_err(|e| Error::ChannelSend(Box::new(e)))?;
121        Ok(())
122    }
123
124    /// Shuts down the notification system.
125    /// This method is used to cancel the event bus and producer tasks.
126    pub fn shutdown(&self) {
127        self.shutdown.cancel();
128    }
129
130    /// Sets the HTTP port for the notification system.
131    /// This method is used to change the port for the HTTP producer.
132    #[cfg(feature = "http-producer")]
133    pub fn set_http_port(&mut self, port: u16) {
134        self.http_config.port = port;
135    }
136}