event_notification/
bus.rs

1use crate::ChannelAdapter;
2use crate::Error;
3use crate::EventStore;
4use crate::{Event, Log};
5use chrono::Utc;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio_util::sync::CancellationToken;
9
10/// Handles incoming events from the producer.
11///
12/// This function is responsible for receiving events from the producer and sending them to the appropriate adapters.
13/// It also handles the shutdown process and saves any pending logs to the event store.
14pub async fn event_bus(
15    mut rx: mpsc::Receiver<Event>,
16    adapters: Vec<Arc<dyn ChannelAdapter>>,
17    store: Arc<EventStore>,
18    shutdown: CancellationToken,
19) -> Result<(), Error> {
20    let mut pending_logs = Vec::new();
21    let mut current_log = Log {
22        event_name: crate::event::Name::Everything,
23        key: Utc::now().timestamp().to_string(),
24        records: Vec::new(),
25    };
26
27    loop {
28        tokio::select! {
29            Some(event) = rx.recv() => {
30                current_log.records.push(event.clone());
31                let mut send_tasks = Vec::new();
32                for adapter in &adapters {
33                    if event.channels.contains(&adapter.name()) {
34                        let adapter = adapter.clone();
35                        let event = event.clone();
36                        send_tasks.push(tokio::spawn(async move {
37                            if let Err(e) = adapter.send(&event).await {
38                                tracing::error!("Failed to send event to {}: {}", adapter.name(), e);
39                                Err(e)
40                            } else {
41                                Ok(())
42                            }
43                        }));
44                    }
45                }
46                for task in send_tasks {
47                    if task.await?.is_err() {
48                        current_log.records.retain(|e| e.id != event.id);
49                    }
50                }
51                if !current_log.records.is_empty() {
52                    pending_logs.push(current_log.clone());
53                }
54                current_log.records.clear();
55            }
56            _ = shutdown.cancelled() => {
57                tracing::info!("Shutting down event bus, saving pending logs...");
58                if !current_log.records.is_empty() {
59                    pending_logs.push(current_log);
60                }
61                store.save_logs(&pending_logs).await?;
62                break;
63            }
64            else => break,
65        }
66    }
67    Ok(())
68}