scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use crate::key::LexicographicKey;
use crate::shard::RangeAssigner;

/// Timestamped write payload routed by an encoded time-series key.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriteEnvelope<T> {
    /// Encoded routing key.
    pub key: Vec<u8>,
    /// Write timestamp.
    pub timestamp: u64,
    /// Application payload.
    pub payload: T,
}

/// Policy for accepting or dropping incoming writes.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
    /// Accept every write regardless of timestamp.
    AcceptAll,
    /// Drop writes older than `watermark`.
    DropOlderThan {
        /// Oldest accepted timestamp.
        watermark: u64,
    },
}

/// Routing decision made by an ingest router.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestDecision<WorkerId> {
    /// Send the write to this worker.
    Route(WorkerId),
    /// Drop the write because it is older than the configured watermark.
    DropStale,
    /// No assigned range matched the write key.
    NoRoute,
}

/// Routes writes to workers using a lexicographic range assignment.
#[derive(Debug, Clone)]
pub struct IngestRouter<WorkerId> {
    assigner: RangeAssigner<WorkerId>,
    drop_policy: DropPolicy,
}

impl<WorkerId: Clone> IngestRouter<WorkerId> {
    /// Creates a router with `DropPolicy::AcceptAll`.
    pub fn new(assigner: RangeAssigner<WorkerId>) -> Self {
        Self {
            assigner,
            drop_policy: DropPolicy::AcceptAll,
        }
    }

    /// Sets the router's drop policy.
    pub fn with_drop_policy(mut self, drop_policy: DropPolicy) -> Self {
        self.drop_policy = drop_policy;
        self
    }

    /// Returns the routing decision for `key` at `timestamp`.
    pub fn route<K: LexicographicKey + ?Sized>(
        &self,
        key: &K,
        timestamp: u64,
    ) -> IngestDecision<WorkerId> {
        if matches!(self.drop_policy, DropPolicy::DropOlderThan { watermark } if timestamp < watermark)
        {
            return IngestDecision::DropStale;
        }

        self.assigner
            .worker_for(key)
            .cloned()
            .map(IngestDecision::Route)
            .unwrap_or(IngestDecision::NoRoute)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::shard::RangeAssigner;

    #[test]
    fn ingest_router_drops_stale_writes() {
        let mut assigner = RangeAssigner::new();
        assigner
            .assign(b"a".to_vec()..b"z".to_vec(), "leaf")
            .unwrap();
        let router = IngestRouter::new(assigner)
            .with_drop_policy(DropPolicy::DropOlderThan { watermark: 10 });

        assert_eq!(router.route("abc", 9), IngestDecision::DropStale);
        assert_eq!(router.route("abc", 10), IngestDecision::Route("leaf"));
    }
}