1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use std::{fmt::Debug, time::Duration};

use crate::batch::Batch;

/// When to process a batch.
#[derive(Debug)]
#[non_exhaustive]
pub enum BatchingStrategy {
    /// Process the batch when it reaches a given size.
    Size(usize),
    /// Process the batch a given duration after it was created.
    Duration(Duration),
    /// Process the batch a given duration after the most recent item was added.
    Debounce(Duration),
    /// Process the batch after the previous batch for the same key has finished.
    Sequential,
    // TODO: Duration/Debounce+Size
}

pub enum BatchingResult {
    Process,
    ProcessAfter(Duration),
    DoNothing,
}

impl BatchingStrategy {
    pub(crate) fn is_sequential(&self) -> bool {
        matches!(self, Self::Sequential)
    }

    pub(crate) fn apply<K, I, O, E>(&self, batch: &Batch<K, I, O, E>) -> BatchingResult
    where
        K: 'static + Send + Clone,
    {
        match self {
            Self::Size(size) if batch.len() >= *size => BatchingResult::Process,

            Self::Duration(dur) if batch.is_new_batch() => BatchingResult::ProcessAfter(*dur),

            Self::Debounce(dur) => BatchingResult::ProcessAfter(*dur),

            Self::Sequential if !batch.is_running() => BatchingResult::Process,
            _ => BatchingResult::DoNothing,
        }
    }
}