malstrom 0.1.0

Malstrom is a distributed, stateful stream processing framework written in Rust.
Documentation
use crate::{
    stream::{OperatorBuilder, StreamBuilder},
    types::{DataMessage, Key, MaybeKey},
};

use super::{
    distributed::{
        types::{DistData, DistKey, DistTimestamp, WorkerPartitioner},
        Distributor,
    },
    KeyLocal,
};

/// Key a stream and distribute message to workers according to their key
pub trait KeyDistribute<X, K: Key, V, T> {
    /// Turn a stream into a keyed stream and distribute
    /// messages across workers via the partitioning function.
    /// The keyed stream returned by this method is capable
    /// of redistributing state on cluster size changes
    /// with no downtime.
    fn key_distribute(
        self,
        name: &str,
        key_func: impl Fn(&DataMessage<X, V, T>) -> K + 'static,
        partitioner: WorkerPartitioner<K>,
    ) -> StreamBuilder<K, V, T>;
}

impl<X, K, V, T> KeyDistribute<X, K, V, T> for StreamBuilder<X, V, T>
where
    X: MaybeKey,
    K: DistKey,
    V: DistData,
    T: DistTimestamp,
{
    fn key_distribute(
        self,
        name: &str,
        key_func: impl Fn(&DataMessage<X, V, T>) -> K + 'static,
        partitioner: WorkerPartitioner<K>,
    ) -> StreamBuilder<K, V, T> {
        self.key_local(&format!("{name}-key"), key_func)
            .distribute(&format!("{name}-distribute"), partitioner)
    }
}

pub(crate) trait Distribute<K: Key, V, T> {
    /// Turn a stream into a keyed stream and distribute
    /// messages across workers via the partitioning function.
    /// The keyed stream returned by this method is capable
    /// of redistributing state on cluster size changes
    /// with no downtime.
    fn distribute(self, name: &str, partitioner: WorkerPartitioner<K>) -> StreamBuilder<K, V, T>;
}

impl<K, V, T> Distribute<K, V, T> for StreamBuilder<K, V, T>
where
    K: DistKey,
    V: DistData,
    T: DistTimestamp,
{
    fn distribute(self, name: &str, partitioner: WorkerPartitioner<K>) -> StreamBuilder<K, V, T> {
        self.then(OperatorBuilder::built_by(name, move |ctx| {
            let mut dist = Distributor::new(partitioner, ctx);
            move |input, output, op_ctx| dist.run(input, output, op_ctx)
        }))
    }
}