use tokio::sync::mpsc::UnboundedReceiver;
use tokio::time::{Interval, interval};
use crate::config::BetterStackConfig;
use crate::log_event::LogEvent;
use crate::sender::send_batch;
pub(crate) struct BatchProcessor {
receiver: UnboundedReceiver<LogEvent>,
config: BetterStackConfig,
interval: Interval,
batch: Vec<LogEvent>,
}
impl BatchProcessor {
pub(crate) fn new(receiver: UnboundedReceiver<LogEvent>, config: BetterStackConfig) -> Self {
let interval = interval(config.batch_timeout);
let batch_capacity = config.batch_size;
Self {
receiver,
config,
interval,
batch: Vec::with_capacity(batch_capacity),
}
}
pub(crate) async fn run(mut self) {
loop {
tokio::select! {
Some(event) = self.receiver.recv() => {
self.batch.push(event);
if self.batch.len() >= self.config.batch_size {
self.flush().await;
}
}
_ = self.interval.tick() => {
if !self.batch.is_empty() {
self.flush().await;
}
}
else => {
if !self.batch.is_empty() {
self.flush().await;
}
break;
}
}
}
}
async fn flush(&mut self) {
if self.batch.is_empty() {
return;
}
let batch = std::mem::replace(&mut self.batch, Vec::with_capacity(self.config.batch_size));
if let Err(e) = send_batch(batch, &self.config).await {
eprintln!("Failed to send batch to Better Stack: {}", e);
}
}
}