scepter 0.1.1

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use crate::key::LexicographicKey;
use crate::shard::RangeAssigner;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriteEnvelope<T> {
    pub key: Vec<u8>,
    pub timestamp: u64,
    pub payload: T,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
    AcceptAll,
    DropOlderThan { watermark: u64 },
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestDecision<WorkerId> {
    Route(WorkerId),
    DropStale,
    NoRoute,
}

#[derive(Debug, Clone)]
pub struct IngestRouter<WorkerId> {
    assigner: RangeAssigner<WorkerId>,
    drop_policy: DropPolicy,
}

impl<WorkerId: Clone> IngestRouter<WorkerId> {
    pub fn new(assigner: RangeAssigner<WorkerId>) -> Self {
        Self {
            assigner,
            drop_policy: DropPolicy::AcceptAll,
        }
    }

    pub fn with_drop_policy(mut self, drop_policy: DropPolicy) -> Self {
        self.drop_policy = drop_policy;
        self
    }

    pub fn route<K: LexicographicKey + ?Sized>(
        &self,
        key: &K,
        timestamp: u64,
    ) -> IngestDecision<WorkerId> {
        if matches!(self.drop_policy, DropPolicy::DropOlderThan { watermark } if timestamp < watermark)
        {
            return IngestDecision::DropStale;
        }

        self.assigner
            .worker_for(key)
            .cloned()
            .map(IngestDecision::Route)
            .unwrap_or(IngestDecision::NoRoute)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::shard::RangeAssigner;

    #[test]
    fn ingest_router_drops_stale_writes() {
        let mut assigner = RangeAssigner::new();
        assigner
            .assign(b"a".to_vec()..b"z".to_vec(), "leaf")
            .unwrap();
        let router = IngestRouter::new(assigner)
            .with_drop_policy(DropPolicy::DropOlderThan { watermark: 10 });

        assert_eq!(router.route("abc", 9), IngestDecision::DropStale);
        assert_eq!(router.route("abc", 10), IngestDecision::Route("leaf"));
    }
}