use std::{sync::Arc, time::Duration};
#[derive(Clone, Debug, Default)]
pub struct MemStats {
pub entries: usize,
pub inserts: u64,
pub replaces: u64,
pub approx_key_bytes: usize,
pub entry_overhead: usize,
pub typed_open_rows: Option<usize>,
pub dyn_batches: Option<usize>,
pub dyn_approx_batch_bytes: Option<usize>,
pub since_last_seal: Option<Duration>,
}
#[derive(Clone, Debug, PartialEq)]
pub enum SealDecision {
NoOp,
Seal(SealReason),
}
#[derive(Clone, Debug, PartialEq)]
pub enum SealReason {
ApproxBytesReached { approx: usize, limit: usize },
OpenRowsReached { count: usize, limit: usize },
BatchesReached { count: usize, limit: usize },
TimeElapsed { elapsed: Duration, limit: Duration },
ReplaceRatio {
replaces: u64,
inserts: u64,
min_ratio: f64,
},
Manual,
}
pub trait SealPolicy {
fn evaluate(&self, stats: &MemStats) -> SealDecision;
}
#[derive(Clone, Debug, Default)]
pub struct NeverSeal;
impl SealPolicy for NeverSeal {
fn evaluate(&self, _stats: &MemStats) -> SealDecision {
SealDecision::NoOp
}
}
pub fn default_policy() -> Arc<dyn SealPolicy + Send + Sync> {
use std::time::Duration;
Arc::new(AnyOf::new(vec![
Arc::new(BytesThreshold {
limit: 64 * 1024 * 1024, }),
Arc::new(TimeElapsedPolicy {
min_interval: Duration::from_secs(30),
}),
Arc::new(OpenRowsThreshold { rows: 16_384 }),
Arc::new(BatchesThreshold { batches: 64 }),
]))
}
#[derive(Clone, Debug)]
pub struct BytesThreshold {
pub limit: usize,
}
impl SealPolicy for BytesThreshold {
fn evaluate(&self, stats: &MemStats) -> SealDecision {
if stats.approx_key_bytes + stats.entries * stats.entry_overhead >= self.limit {
SealDecision::Seal(SealReason::ApproxBytesReached {
approx: stats.approx_key_bytes + stats.entries * stats.entry_overhead,
limit: self.limit,
})
} else {
SealDecision::NoOp
}
}
}
#[derive(Clone, Debug)]
pub struct OpenRowsThreshold {
pub rows: usize,
}
impl SealPolicy for OpenRowsThreshold {
fn evaluate(&self, stats: &MemStats) -> SealDecision {
match stats.typed_open_rows {
Some(cnt) if cnt >= self.rows => SealDecision::Seal(SealReason::OpenRowsReached {
count: cnt,
limit: self.rows,
}),
_ => SealDecision::NoOp,
}
}
}
#[derive(Clone, Debug)]
pub struct BatchesThreshold {
pub batches: usize,
}
impl SealPolicy for BatchesThreshold {
fn evaluate(&self, stats: &MemStats) -> SealDecision {
match stats.dyn_batches {
Some(cnt) if cnt >= self.batches => SealDecision::Seal(SealReason::BatchesReached {
count: cnt,
limit: self.batches,
}),
_ => SealDecision::NoOp,
}
}
}
#[derive(Clone, Debug)]
pub struct TimeElapsedPolicy {
pub min_interval: Duration,
}
impl SealPolicy for TimeElapsedPolicy {
fn evaluate(&self, stats: &MemStats) -> SealDecision {
match stats.since_last_seal {
Some(elapsed) if elapsed >= self.min_interval => {
SealDecision::Seal(SealReason::TimeElapsed {
elapsed,
limit: self.min_interval,
})
}
_ => SealDecision::NoOp,
}
}
}
#[derive(Default)]
pub struct AnyOf {
inner: Vec<Arc<dyn SealPolicy + Send + Sync>>,
}
impl AnyOf {
pub fn new(inner: Vec<Arc<dyn SealPolicy + Send + Sync>>) -> Self {
Self { inner }
}
}
impl SealPolicy for AnyOf {
fn evaluate(&self, stats: &MemStats) -> SealDecision {
for p in self.inner.iter() {
if let SealDecision::Seal(reason) = p.evaluate(stats) {
return SealDecision::Seal(reason);
}
}
SealDecision::NoOp
}
}
#[cfg(test)]
#[derive(Default)]
pub struct AllOf {
inner: Vec<Arc<dyn SealPolicy + Send + Sync>>,
}
#[cfg(test)]
impl AllOf {
pub fn new(inner: Vec<Arc<dyn SealPolicy + Send + Sync>>) -> Self {
Self { inner }
}
}
#[cfg(test)]
impl SealPolicy for AllOf {
fn evaluate(&self, stats: &MemStats) -> SealDecision {
let mut last_reason: Option<SealReason> = None;
for p in self.inner.iter() {
match p.evaluate(stats) {
SealDecision::Seal(r) => last_reason = Some(r),
SealDecision::NoOp => return SealDecision::NoOp,
}
}
match last_reason {
Some(r) => SealDecision::Seal(r),
None => SealDecision::NoOp,
}
}
}
pub(crate) trait StatsProvider {
fn build_stats(&self, since_last_seal: Option<Duration>) -> MemStats;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn time_elapsed_policy_triggers_correctly() {
let p = TimeElapsedPolicy {
min_interval: Duration::from_millis(50),
};
let s_short = MemStats {
since_last_seal: Some(Duration::from_millis(10)),
..Default::default()
};
let s_long = MemStats {
since_last_seal: Some(Duration::from_millis(100)),
..Default::default()
};
assert_eq!(p.evaluate(&s_short), SealDecision::NoOp);
match p.evaluate(&s_long) {
SealDecision::Seal(SealReason::TimeElapsed { elapsed, limit }) => {
assert!(elapsed >= limit);
}
other => panic!("unexpected decision: {other:?}"),
}
}
#[test]
fn anyof_triggers_on_any_inner() {
let any = AnyOf::new(vec![
Arc::new(OpenRowsThreshold { rows: 5 }),
Arc::new(BytesThreshold { limit: 10_000 }),
]);
let s1 = MemStats {
typed_open_rows: Some(5),
..Default::default()
};
assert!(matches!(
any.evaluate(&s1),
SealDecision::Seal(SealReason::OpenRowsReached { .. })
));
}
#[test]
fn allof_requires_all_inners() {
let all = AllOf::new(vec![
Arc::new(OpenRowsThreshold { rows: 2 }),
Arc::new(BytesThreshold { limit: 10 }),
]);
let s_only_rows = MemStats {
typed_open_rows: Some(3),
entries: 0,
approx_key_bytes: 5,
..Default::default()
};
assert_eq!(all.evaluate(&s_only_rows), SealDecision::NoOp);
let s_both = MemStats {
typed_open_rows: Some(3),
entries: 0,
approx_key_bytes: 20,
..Default::default()
};
assert!(matches!(all.evaluate(&s_both), SealDecision::Seal(_)));
}
}