use tokio::sync::mpsc;
use tracing::{debug, info};
use crate::types::KeelInput;
#[derive(Clone)]
pub struct EventQueueHandle {
tx: mpsc::UnboundedSender<KeelInput>,
}
impl EventQueueHandle {
pub fn push(&self, input: KeelInput) -> anyhow::Result<()> {
self.tx
.send(input)
.map_err(|e| anyhow::anyhow!("Event queue closed: {}", e))
}
}
pub struct EventQueue {
rx: mpsc::UnboundedReceiver<KeelInput>,
}
impl EventQueue {
pub fn new() -> (Self, EventQueueHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let queue = Self { rx };
let handle = EventQueueHandle { tx };
(queue, handle)
}
pub async fn process<F, Fut>(mut self, handler: F)
where
F: Fn(KeelInput) -> Fut,
Fut: std::future::Future<Output = ()>,
{
info!("Event queue started");
while let Some(input) = self.rx.recv().await {
debug!(
input_id = %input.id,
input_type = ?input.input_type,
"Processing input"
);
handler(input).await;
}
info!("Event queue shut down (all producers dropped)");
}
pub async fn process_until<F, Fut>(
mut self,
handler: F,
shutdown: tokio::sync::watch::Receiver<bool>,
) where
F: Fn(KeelInput) -> Fut,
Fut: std::future::Future<Output = ()>,
{
info!("Event queue started (with shutdown signal)");
let mut shutdown = shutdown;
loop {
tokio::select! {
maybe_input = self.rx.recv() => {
match maybe_input {
Some(input) => {
debug!(
input_id = %input.id,
input_type = ?input.input_type,
"Processing input"
);
handler(input).await;
}
None => {
info!("Event queue shut down (all producers dropped)");
break;
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!("Event queue shut down (shutdown signal received)");
break;
}
}
}
}
}
}