event_notification/
lib.rs1mod adapter;
2mod bus;
3mod config;
4mod error;
5mod event;
6mod global;
7mod producer;
8mod store;
9
10pub use adapter::ChannelAdapter;
11pub use adapter::create_adapters;
12#[cfg(feature = "kafka")]
13pub use adapter::kafka::KafkaAdapter;
14#[cfg(feature = "mqtt")]
15pub use adapter::mqtt::MqttAdapter;
16#[cfg(feature = "webhook")]
17pub use adapter::webhook::WebhookAdapter;
18pub use bus::event_bus;
19#[cfg(feature = "http-producer")]
20pub use config::HttpProducerConfig;
21#[cfg(feature = "kafka")]
22pub use config::KafkaConfig;
23#[cfg(feature = "mqtt")]
24pub use config::MqttConfig;
25#[cfg(feature = "webhook")]
26pub use config::WebhookConfig;
27pub use config::{AdapterConfig, NotificationConfig};
28pub use error::Error;
29
30pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
31pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
32pub use store::EventStore;
33
34#[cfg(feature = "http-producer")]
35pub use producer::EventProducer;
36#[cfg(feature = "http-producer")]
37pub use producer::http::HttpProducer;
38
39use std::sync::Arc;
40use tokio::sync::mpsc;
41use tokio_util::sync::CancellationToken;
42
43pub struct NotificationSystem {
48 tx: mpsc::Sender<Event>,
49 rx: Option<mpsc::Receiver<Event>>,
50 store: Arc<EventStore>,
51 shutdown: CancellationToken,
52 #[cfg(feature = "http-producer")]
53 http_config: HttpProducerConfig,
54}
55
56impl NotificationSystem {
57 pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
59 let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
60 let store = Arc::new(EventStore::new(&config.store_path).await?);
61 let shutdown = CancellationToken::new();
62
63 let restored_logs = store.load_logs().await?;
64 for log in restored_logs {
65 for event in log.records {
66 tx.send(event)
68 .await
69 .map_err(|e| Error::ChannelSend(Box::new(e)))?;
70 }
71 }
72
73 Ok(Self {
74 tx,
75 rx: Some(rx),
76 store,
77 shutdown,
78 #[cfg(feature = "http-producer")]
79 http_config: config.http,
80 })
81 }
82
83 pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
86 let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
87
88 let shutdown_clone = self.shutdown.clone();
89 let store_clone = self.store.clone();
90 let bus_handle = tokio::spawn(async move {
91 if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
92 tracing::error!("Event bus failed: {}", e);
93 }
94 });
95
96 #[cfg(feature = "http-producer")]
97 {
98 let producer = HttpProducer::new(self.tx.clone(), self.http_config.port);
99 producer.start().await?;
100 }
101
102 tokio::select! {
103 result = bus_handle => {
104 result.map_err(Error::JoinError)?;
105 Ok(())
106 },
107 _ = self.shutdown.cancelled() => {
108 tracing::info!("System shutdown triggered");
109 Ok(())
110 }
111 }
112 }
113
114 pub async fn send_event(&self, event: Event) -> Result<(), Error> {
117 self.tx
118 .send(event)
119 .await
120 .map_err(|e| Error::ChannelSend(Box::new(e)))?;
121 Ok(())
122 }
123
124 pub fn shutdown(&self) {
127 self.shutdown.cancel();
128 }
129
130 #[cfg(feature = "http-producer")]
133 pub fn set_http_port(&mut self, port: u16) {
134 self.http_config.port = port;
135 }
136}