1use tokio::sync::mpsc;
2use tracing::{debug, info};
3
4use crate::types::KeelInput;
5
6#[derive(Clone)]
10pub struct EventQueueHandle {
11 tx: mpsc::UnboundedSender<KeelInput>,
12}
13
14impl EventQueueHandle {
15 pub fn push(&self, input: KeelInput) -> anyhow::Result<()> {
19 self.tx
20 .send(input)
21 .map_err(|e| anyhow::anyhow!("Event queue closed: {}", e))
22 }
23}
24
25pub struct EventQueue {
29 rx: mpsc::UnboundedReceiver<KeelInput>,
30}
31
32impl EventQueue {
33 pub fn new() -> (Self, EventQueueHandle) {
36 let (tx, rx) = mpsc::unbounded_channel();
37 let queue = Self { rx };
38 let handle = EventQueueHandle { tx };
39 (queue, handle)
40 }
41
42 pub async fn process<F, Fut>(mut self, handler: F)
47 where
48 F: Fn(KeelInput) -> Fut,
49 Fut: std::future::Future<Output = ()>,
50 {
51 info!("Event queue started");
52 while let Some(input) = self.rx.recv().await {
53 debug!(
54 input_id = %input.id,
55 input_type = ?input.input_type,
56 "Processing input"
57 );
58 handler(input).await;
59 }
60 info!("Event queue shut down (all producers dropped)");
61 }
62
63 pub async fn process_until<F, Fut>(
68 mut self,
69 handler: F,
70 shutdown: tokio::sync::watch::Receiver<bool>,
71 ) where
72 F: Fn(KeelInput) -> Fut,
73 Fut: std::future::Future<Output = ()>,
74 {
75 info!("Event queue started (with shutdown signal)");
76 let mut shutdown = shutdown;
77 loop {
78 tokio::select! {
79 maybe_input = self.rx.recv() => {
80 match maybe_input {
81 Some(input) => {
82 debug!(
83 input_id = %input.id,
84 input_type = ?input.input_type,
85 "Processing input"
86 );
87 handler(input).await;
88 }
89 None => {
90 info!("Event queue shut down (all producers dropped)");
91 break;
92 }
93 }
94 }
95 _ = shutdown.changed() => {
96 if *shutdown.borrow() {
97 info!("Event queue shut down (shutdown signal received)");
98 break;
99 }
100 }
101 }
102 }
103 }
104}