Skip to main content

keel_events/
queue.rs

1use tokio::sync::mpsc;
2use tracing::{debug, info};
3
4use crate::types::KeelInput;
5
6/// Handle used by producers to push events into the queue.
7///
8/// Clone-friendly — distribute to any number of producers.
9#[derive(Clone)]
10pub struct EventQueueHandle {
11    tx: mpsc::UnboundedSender<KeelInput>,
12}
13
14impl EventQueueHandle {
15    /// Push an input into the event queue.
16    ///
17    /// Returns an error if the queue has been shut down.
18    pub fn push(&self, input: KeelInput) -> anyhow::Result<()> {
19        self.tx
20            .send(input)
21            .map_err(|e| anyhow::anyhow!("Event queue closed: {}", e))
22    }
23}
24
25/// Async mpsc-based ordered event queue.
26///
27/// Inputs are processed strictly in the order they arrive.
28pub struct EventQueue {
29    rx: mpsc::UnboundedReceiver<KeelInput>,
30}
31
32impl EventQueue {
33    /// Create a new event queue, returning both the queue (consumer) and a
34    /// cloneable handle (producer).
35    pub fn new() -> (Self, EventQueueHandle) {
36        let (tx, rx) = mpsc::unbounded_channel();
37        let queue = Self { rx };
38        let handle = EventQueueHandle { tx };
39        (queue, handle)
40    }
41
42    /// Process inputs in order, calling `handler` for each one.
43    ///
44    /// Runs until the queue is closed (all handles dropped) or the provided
45    /// `shutdown` future resolves.
46    pub async fn process<F, Fut>(mut self, handler: F)
47    where
48        F: Fn(KeelInput) -> Fut,
49        Fut: std::future::Future<Output = ()>,
50    {
51        info!("Event queue started");
52        while let Some(input) = self.rx.recv().await {
53            debug!(
54                input_id = %input.id,
55                input_type = ?input.input_type,
56                "Processing input"
57            );
58            handler(input).await;
59        }
60        info!("Event queue shut down (all producers dropped)");
61    }
62
63    /// Process inputs with a shutdown signal.
64    ///
65    /// Stops when either all producer handles are dropped or `shutdown`
66    /// resolves — whichever comes first.
67    pub async fn process_until<F, Fut>(
68        mut self,
69        handler: F,
70        shutdown: tokio::sync::watch::Receiver<bool>,
71    ) where
72        F: Fn(KeelInput) -> Fut,
73        Fut: std::future::Future<Output = ()>,
74    {
75        info!("Event queue started (with shutdown signal)");
76        let mut shutdown = shutdown;
77        loop {
78            tokio::select! {
79                maybe_input = self.rx.recv() => {
80                    match maybe_input {
81                        Some(input) => {
82                            debug!(
83                                input_id = %input.id,
84                                input_type = ?input.input_type,
85                                "Processing input"
86                            );
87                            handler(input).await;
88                        }
89                        None => {
90                            info!("Event queue shut down (all producers dropped)");
91                            break;
92                        }
93                    }
94                }
95                _ = shutdown.changed() => {
96                    if *shutdown.borrow() {
97                        info!("Event queue shut down (shutdown signal received)");
98                        break;
99                    }
100                }
101            }
102        }
103    }
104}