event_notification/
lib.rs1mod adapter;
2mod bus;
3mod config;
4mod error;
5mod event;
6mod global;
7mod producer;
8mod store;
9
10pub use adapter::create_adapters;
11#[cfg(feature = "kafka")]
12pub use adapter::kafka::KafkaAdapter;
13#[cfg(feature = "mqtt")]
14pub use adapter::mqtt::MqttAdapter;
15#[cfg(feature = "webhook")]
16pub use adapter::webhook::WebhookAdapter;
17pub use adapter::ChannelAdapter;
18pub use bus::event_bus;
19#[cfg(feature = "kafka")]
20pub use config::KafkaConfig;
21#[cfg(feature = "mqtt")]
22pub use config::MqttConfig;
23#[cfg(feature = "webhook")]
24pub use config::WebhookConfig;
25pub use config::{AdapterConfig, NotificationConfig};
26pub use error::Error;
27
28pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
29pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
30pub use producer::{handle_event, start_producer};
31pub use store::EventStore;
32
33use std::sync::Arc;
34use tokio::sync::mpsc;
35use tokio_util::sync::CancellationToken;
36
37pub struct NotificationSystem {
42 tx: mpsc::Sender<Event>,
43 rx: Option<mpsc::Receiver<Event>>,
44 store: Arc<EventStore>,
45 shutdown: CancellationToken,
46}
47
48impl NotificationSystem {
49 pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
51 let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
52 let store = Arc::new(EventStore::new(&config.store_path).await?);
53 let shutdown = CancellationToken::new();
54
55 let restored_logs = store.load_logs().await?;
56 for log in restored_logs {
57 for event in log.records {
58 tx.send(event)
60 .await
61 .map_err(|e| Error::ChannelSend(Box::new(e)))?;
62 }
63 }
64
65 Ok(Self {
66 tx,
67 rx: Some(rx),
68 store,
69 shutdown,
70 })
71 }
72
73 pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
76 let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
77
78 let shutdown_clone = self.shutdown.clone();
79 let store_clone = self.store.clone();
80 let bus_handle = tokio::spawn(async move {
81 if let Err(e) = bus::event_bus(rx, adapters, store_clone, shutdown_clone).await {
82 tracing::error!("Event bus failed: {}", e);
83 }
84 });
85
86 let tx_clone = self.tx.clone();
87 let producer_handle = tokio::spawn(async move {
88 if let Err(e) = producer::start_producer(tx_clone).await {
89 tracing::error!("Producer failed: {}", e);
90 }
91 });
92 tokio::select! {
93 result = bus_handle => {
94 result.map_err(Error::JoinError)?;
95 Ok(())
96 },
97 result = producer_handle => {
98 result.map_err(Error::JoinError)?;
99 Ok(())
100 },
101 _ = self.shutdown.cancelled() => {
102 tracing::info!("System shutdown triggered");
103 Ok(())
104 }
105 }
106 }
107
108 pub async fn send_event(&self, event: Event) -> Result<(), Error> {
111 self.tx
112 .send(event)
113 .await
114 .map_err(|e| Error::ChannelSend(Box::new(e)))?;
115 Ok(())
116 }
117
118 pub fn shutdown(&self) {
121 self.shutdown.cancel();
122 }
123}