#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct BufferConfig {
max_records: Option<usize>,
max_bytes: Option<usize>,
emit_early: bool,
}
impl BufferConfig {
#[must_use]
pub fn unbounded() -> Self {
Self {
max_records: None,
max_bytes: None,
emit_early: false,
}
}
#[must_use]
pub fn max_records(n: usize) -> Self {
assert!(n >= 1, "max_records must be >= 1");
Self {
max_records: Some(n),
max_bytes: None,
emit_early: true,
}
}
#[must_use]
pub fn max_bytes(n: usize) -> Self {
assert!(n >= 1, "max_bytes must be >= 1");
Self {
max_records: None,
max_bytes: Some(n),
emit_early: true,
}
}
#[must_use]
pub fn with_max_records(self, n: usize) -> Self {
assert!(n >= 1, "max_records must be >= 1");
Self {
max_records: Some(n),
..self
}
}
#[must_use]
pub fn with_max_bytes(self, n: usize) -> Self {
assert!(n >= 1, "max_bytes must be >= 1");
Self {
max_bytes: Some(n),
..self
}
}
#[must_use]
pub fn emit_early_when_full(self) -> Self {
Self {
emit_early: true,
..self
}
}
#[must_use]
pub fn shut_down_when_full(self) -> Self {
Self {
emit_early: false,
..self
}
}
pub(crate) fn record_cap(&self) -> Option<usize> {
self.max_records
}
pub(crate) fn byte_cap(&self) -> Option<usize> {
self.max_bytes
}
pub(crate) fn is_emit_early(&self) -> bool {
self.emit_early
}
}
#[derive(Debug)]
pub struct Suppressed<K> {
pub(crate) buffer: BufferConfig,
pub(crate) buffer_time: fn(&K, i64) -> i64,
pub(crate) wait: WaitKind,
pub(crate) logging: bool,
}
impl<K> Clone for Suppressed<K> {
fn clone(&self) -> Self {
*self
}
}
impl<K> Copy for Suppressed<K> {}
impl<K> Suppressed<K> {
#[must_use]
pub fn with_logging_disabled(self) -> Self {
Self {
logging: false,
..self
}
}
#[must_use]
pub fn with_logging_enabled(self) -> Self {
Self {
logging: true,
..self
}
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum WaitKind {
UpstreamGrace,
Fixed(i64),
}
impl<KInner> Suppressed<crate::dsl::windows::Windowed<KInner>> {
#[must_use]
pub fn until_window_closes(buffer: BufferConfig) -> Self {
assert!(
!buffer.is_emit_early(),
"untilWindowCloses requires a strict (shutDownWhenFull) buffer config"
);
Self {
buffer,
buffer_time: |k, _ts| k.window.end,
wait: WaitKind::UpstreamGrace,
logging: true,
}
}
}
impl<K> Suppressed<K> {
#[must_use]
pub fn until_time_limit(wait_ms: i64, buffer: BufferConfig) -> Self {
assert!(wait_ms >= 0, "time limit must be >= 0");
Self {
buffer,
buffer_time: |_k, ts| ts,
wait: WaitKind::Fixed(wait_ms),
logging: true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffer_config_caps_and_overflow() {
assert_eq!(BufferConfig::unbounded().record_cap(), None);
assert!(!BufferConfig::unbounded().is_emit_early()); let strict = BufferConfig::unbounded().with_max_records(3);
assert_eq!(strict.record_cap(), Some(3));
assert!(!strict.is_emit_early());
let eager = BufferConfig::max_records(5); assert_eq!(eager.record_cap(), Some(5));
assert!(eager.is_emit_early());
assert!(!eager.shut_down_when_full().is_emit_early());
assert!(
BufferConfig::unbounded()
.emit_early_when_full()
.is_emit_early()
);
}
#[test]
fn buffer_config_byte_caps() {
assert_eq!(BufferConfig::unbounded().byte_cap(), None);
let eager = BufferConfig::max_bytes(1024); assert_eq!(eager.byte_cap(), Some(1024));
assert_eq!(eager.record_cap(), None);
assert!(eager.is_emit_early());
let strict = BufferConfig::unbounded().with_max_bytes(512);
assert_eq!(strict.byte_cap(), Some(512));
assert!(!strict.is_emit_early());
let both = BufferConfig::unbounded()
.with_max_records(3)
.with_max_bytes(99);
assert_eq!(both.record_cap(), Some(3));
assert_eq!(both.byte_cap(), Some(99));
}
#[test]
fn logging_toggles() {
use crate::dsl::windows::{Window, Windowed};
let on = Suppressed::until_window_closes(BufferConfig::unbounded());
assert!(on.logging); let off = on.with_logging_disabled();
assert!(!off.logging);
assert!(off.with_logging_enabled().logging);
let wk = Windowed {
key: "k".to_string(),
window: Window { start: 0, end: 7 },
};
assert_eq!((off.buffer_time)(&wk, 1), 7);
}
#[test]
fn suppressed_constructors() {
use crate::dsl::windows::{Window, Windowed};
let wc = Suppressed::until_window_closes(BufferConfig::unbounded());
let wk = Windowed {
key: "k".to_string(),
window: Window { start: 0, end: 99 },
};
assert_eq!((wc.buffer_time)(&wk, 5), 99); let tl = Suppressed::<String>::until_time_limit(50, BufferConfig::max_records(2));
assert_eq!((tl.buffer_time)(&"k".to_string(), 5), 5); }
}