use std::time::Duration;
use tokio::{sync::mpsc, time::Instant};
use super::BatchedMessage;
pub(super) async fn auto_flusher<T>(
mut uplink: mpsc::Receiver<BatchedMessage<T>>,
downlink: mpsc::Sender<BatchedMessage<T>>,
period: Duration,
) -> Option<()> {
'flushed: loop {
let msg = uplink.recv().await?;
let flushed = msg.flush.is_some();
downlink.send(msg).await.ok()?;
if flushed {
continue;
}
let flush_at = Instant::now() + period;
loop {
tokio::select! {
_ = tokio::time::sleep_until(flush_at) => {
downlink.send(BatchedMessage { batch: Vec::new(), flush: Some(()) }).await.ok()?;
continue 'flushed;
},
msg = uplink.recv() => {
let msg = msg?;
let flushed = msg.flush.is_some();
downlink.send(msg).await.ok()?;
if flushed {
continue 'flushed;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::event_ingestion::auto_flusher;
use crate::event_ingestion::batched_message::BatchedMessage;
use tokio::sync::mpsc;
use tokio::time::Duration;
#[tokio::test(start_paused = true)]
async fn test_auto_flusher() {
let (uplink_tx, uplink_rx) = mpsc::channel(10);
let (downlink_tx, mut downlink_rx) = mpsc::channel(10);
let flush_period = Duration::from_millis(100);
tokio::spawn(auto_flusher::auto_flusher(
uplink_rx,
downlink_tx,
flush_period,
));
uplink_tx
.send(BatchedMessage {
batch: vec![1, 2, 3],
flush: None,
})
.await
.unwrap();
uplink_tx
.send(BatchedMessage {
batch: vec![4, 5, 6],
flush: None,
})
.await
.unwrap();
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![1, 2, 3],
flush: None
})
);
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![4, 5, 6],
flush: None
})
);
tokio::time::advance(flush_period * 2).await;
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: Vec::new(),
flush: Some(())
})
);
uplink_tx
.send(BatchedMessage {
batch: vec![],
flush: Some(()),
})
.await
.unwrap();
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![],
flush: Some(())
})
);
uplink_tx
.send(BatchedMessage {
batch: vec![7, 8, 9],
flush: None,
})
.await
.unwrap();
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![7, 8, 9],
flush: None
})
);
}
}