event_notification/
bus.rs1use crate::ChannelAdapter;
2use crate::Error;
3use crate::EventStore;
4use crate::{Event, Log};
5use chrono::Utc;
6use std::sync::Arc;
7use tokio::sync::mpsc;
8use tokio_util::sync::CancellationToken;
9
10pub async fn event_bus(
15 mut rx: mpsc::Receiver<Event>,
16 adapters: Vec<Arc<dyn ChannelAdapter>>,
17 store: Arc<EventStore>,
18 shutdown: CancellationToken,
19) -> Result<(), Error> {
20 let mut pending_logs = Vec::new();
21 let mut current_log = Log {
22 event_name: crate::event::Name::Everything,
23 key: Utc::now().timestamp().to_string(),
24 records: Vec::new(),
25 };
26
27 loop {
28 tokio::select! {
29 Some(event) = rx.recv() => {
30 current_log.records.push(event.clone());
31 let mut send_tasks = Vec::new();
32 for adapter in &adapters {
33 if event.channels.contains(&adapter.name()) {
34 let adapter = adapter.clone();
35 let event = event.clone();
36 send_tasks.push(tokio::spawn(async move {
37 if let Err(e) = adapter.send(&event).await {
38 tracing::error!("Failed to send event to {}: {}", adapter.name(), e);
39 Err(e)
40 } else {
41 Ok(())
42 }
43 }));
44 }
45 }
46 for task in send_tasks {
47 if task.await?.is_err() {
48 current_log.records.retain(|e| e.id != event.id);
49 }
50 }
51 if !current_log.records.is_empty() {
52 pending_logs.push(current_log.clone());
53 }
54 current_log.records.clear();
55 }
56 _ = shutdown.cancelled() => {
57 tracing::info!("Shutting down event bus, saving pending logs...");
58 if !current_log.records.is_empty() {
59 pending_logs.push(current_log);
60 }
61 store.save_logs(&pending_logs).await?;
62 break;
63 }
64 else => break,
65 }
66 }
67 Ok(())
68}