use crate::key::LexicographicKey;
use crate::shard::RangeAssigner;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriteEnvelope<T> {
pub key: Vec<u8>,
pub timestamp: u64,
pub payload: T,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
AcceptAll,
DropOlderThan { watermark: u64 },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestDecision<WorkerId> {
Route(WorkerId),
DropStale,
NoRoute,
}
#[derive(Debug, Clone)]
pub struct IngestRouter<WorkerId> {
assigner: RangeAssigner<WorkerId>,
drop_policy: DropPolicy,
}
impl<WorkerId: Clone> IngestRouter<WorkerId> {
pub fn new(assigner: RangeAssigner<WorkerId>) -> Self {
Self {
assigner,
drop_policy: DropPolicy::AcceptAll,
}
}
pub fn with_drop_policy(mut self, drop_policy: DropPolicy) -> Self {
self.drop_policy = drop_policy;
self
}
pub fn route<K: LexicographicKey + ?Sized>(
&self,
key: &K,
timestamp: u64,
) -> IngestDecision<WorkerId> {
if matches!(self.drop_policy, DropPolicy::DropOlderThan { watermark } if timestamp < watermark)
{
return IngestDecision::DropStale;
}
self.assigner
.worker_for(key)
.cloned()
.map(IngestDecision::Route)
.unwrap_or(IngestDecision::NoRoute)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::shard::RangeAssigner;
#[test]
fn ingest_router_drops_stale_writes() {
let mut assigner = RangeAssigner::new();
assigner
.assign(b"a".to_vec()..b"z".to_vec(), "leaf")
.unwrap();
let router = IngestRouter::new(assigner)
.with_drop_policy(DropPolicy::DropOlderThan { watermark: 10 });
assert_eq!(router.route("abc", 9), IngestDecision::DropStale);
assert_eq!(router.route("abc", 10), IngestDecision::Route("leaf"));
}
}