use std::num::NonZeroUsize;
use std::time::Duration;
use coarsetime::Instant;
use crate::network::{Coord, NetworkMessage, NetworkSender};
use crate::operator::{ExchangeData, StreamElement};
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum BatchMode {
Fixed(NonZeroUsize),
Adaptive(NonZeroUsize, Duration),
Single,
}
impl BatchMode {
pub fn max_size(&self) -> usize {
match self {
BatchMode::Fixed(s) => s.get(),
BatchMode::Adaptive(s, _) => s.get(),
BatchMode::Single => 1,
}
}
pub fn interval(&self) -> Option<Duration> {
match self {
BatchMode::Adaptive(_, ts) => Some(*ts),
_ => None,
}
}
}
pub(crate) struct Batcher<Out: Send + 'static> {
remote_sender: NetworkSender<Out>,
mode: BatchMode,
buffer: Vec<StreamElement<Out>>,
last_send: Instant,
coord: Coord,
}
impl<Out: ExchangeData> Batcher<Out> {
pub(crate) fn new(remote_sender: NetworkSender<Out>, mode: BatchMode, coord: Coord) -> Self {
Self {
remote_sender,
mode,
buffer: Default::default(),
last_send: Instant::now(),
coord,
}
}
pub(crate) fn enqueue(&mut self, message: StreamElement<Out>) {
match self.mode {
BatchMode::Adaptive(n, max_delay) => {
self.buffer.push(message);
let timeout_elapsed = self.last_send.elapsed() > max_delay.into();
if self.buffer.len() >= n.get() || timeout_elapsed {
self.flush()
}
}
BatchMode::Fixed(n) => {
self.buffer.push(message);
if self.buffer.len() >= n.get() {
self.flush()
}
}
BatchMode::Single => {
let message = NetworkMessage::new_single(message, self.coord);
self.remote_sender.send(message).unwrap();
}
}
}
pub(crate) fn flush(&mut self) {
if !self.buffer.is_empty() {
let cap = self.buffer.capacity();
let new_cap = if self.buffer.len() < cap / 4 {
cap / 2
} else {
cap
};
let mut batch = Vec::with_capacity(new_cap);
std::mem::swap(&mut self.buffer, &mut batch);
let message = NetworkMessage::new_batch(batch, self.coord);
self.remote_sender.send(message).unwrap();
self.last_send = Instant::now();
}
}
pub(crate) fn end(self) {
if !self.buffer.is_empty() {
let message = NetworkMessage::new_batch(self.buffer, self.coord);
self.remote_sender.send(message).unwrap();
}
}
}
impl BatchMode {
pub fn fixed(size: usize) -> BatchMode {
BatchMode::Fixed(NonZeroUsize::new(size).expect("The batch size must be positive"))
}
pub fn adaptive(size: usize, max_delay: Duration) -> BatchMode {
BatchMode::Adaptive(
NonZeroUsize::new(size).expect("The batch size must be positive"),
max_delay,
)
}
pub fn single() -> BatchMode {
BatchMode::Single
}
pub fn max_delay(&self) -> Option<Duration> {
match &self {
BatchMode::Adaptive(_, max_delay) => Some(*max_delay),
BatchMode::Fixed(_) | BatchMode::Single => None,
}
}
}
impl Default for BatchMode {
fn default() -> Self {
BatchMode::adaptive(1024, Duration::from_millis(50))
}
}