malstrom 0.1.0

Malstrom is a distributed, stateful stream processing framework written in Rust.
Documentation
//! Example job showcasing the role of epochs in tracking event time
use std::u8;

use chrono::{Datelike, NaiveDate, TimeDelta};
use indexmap::IndexMap;
use malstrom::{
    channels::operator_io::Output,
    keyed::partitioners::rendezvous_select,
    operators::*,
    runtime::SingleThreadRuntime,
    sinks::{StatelessSink, StdOutSink},
    snapshot::NoPersistence,
    sources::{SingleIteratorSource, StatelessSource},
    types::{DataMessage, Message, Timestamp},
    worker::StreamProvider,
};
use serde::{Deserialize, Serialize};

/// Fake transactions to track
static TRANSACTIONS: [Transaction; 10] = [
    Transaction {
        amount: 1000.0,
        transaction_time: TransactionTime::new(2025, 1, 1),
    },
    Transaction {
        amount: -20.0,
        transaction_time: TransactionTime::new(2025, 1, 5),
    },
    Transaction {
        amount: -150.0,
        transaction_time: TransactionTime::new(2025, 1, 17),
    },
    Transaction {
        amount: -300.0,
        transaction_time: TransactionTime::new(2025, 2, 4),
    },
    Transaction {
        amount: 60.0,
        transaction_time: TransactionTime::new(2025, 2, 16),
    },
    Transaction {
        amount: 75.0,
        transaction_time: TransactionTime::new(2025, 2, 25),
    },
    Transaction {
        amount: -55.0,
        transaction_time: TransactionTime::new(2025, 3, 16),
    },
    Transaction {
        amount: 200.0,
        transaction_time: TransactionTime::new(2025, 3, 31),
    },
    Transaction {
        amount: -10.0,
        transaction_time: TransactionTime::new(2025, 4, 5),
    },
    Transaction {
        amount: -5.0,
        transaction_time: TransactionTime::new(2025, 4, 19),
    },
];

fn main() {
    SingleThreadRuntime::builder()
        .persistence(NoPersistence)
        .build(build_dataflow)
        .execute()
        .unwrap();
}

fn build_dataflow(provider: &mut dyn StreamProvider) {
    let (stream, _late) = provider
        .new_stream()
        .source(
            "iter-source",
            StatelessSource::new(SingleIteratorSource::new(TRANSACTIONS.clone())),
        )
        // key transactions by (year, month) to create monthly balances
        .key_distribute(
            "key-year-month",
            |msg| {
                let ts = msg.value.transaction_time.0;
                (ts.year(), ts.month())
            },
            rendezvous_select,
        )
        .assign_timestamps("assign-time", |msg| msg.value.transaction_time.clone())
        .generate_epochs("end-of-month", move |msg, prev_epoch| {
            // issue an epoch everytime we advance a month or if we have not yet issued an epoch
            let ts = msg.timestamp.0;
            let prev_month = ts.with_day(1).unwrap() - TimeDelta::days(1);

            match prev_epoch {
                Some(epoch) => {
                    if (epoch.0.year(), epoch.0.month()) != (ts.year(), ts.month()) {
                        // close the previous month
                        Some(TransactionTime(prev_month))
                    } else {
                        None
                    }
                }
                None => Some(TransactionTime(prev_month)),
            }
        });

    stream
        .stateful_op("transaction-counter", TransactionCounter)
        .sink("stdout", StatelessSink::new(StdOutSink));
}

// Counter keeping monthly balances
struct TransactionCounter;

impl StatefulLogic<(i32, u32), Transaction, TransactionTime, f32, f32> for TransactionCounter {
    // executed on every data message
    fn on_data(
        &mut self,
        msg: DataMessage<(i32, u32), Transaction, TransactionTime>,
        key_state: f32,
        _output: &mut Output<(i32, u32), f32, TransactionTime>,
    ) -> Option<f32> {
        // update the balance
        Some(key_state + msg.value.amount)
    }

    /// At the end of every month emit and reset the balance
    fn on_epoch(
        &mut self,
        epoch: &TransactionTime,
        state: &mut IndexMap<(i32, u32), f32>,
        output: &mut Output<(i32, u32), f32, TransactionTime>,
    ) {
        // remove all closed months from state
        state.retain(|(year, month), balance| {
            if (year, month) <= (&epoch.0.year(), &epoch.0.month()) {
                output.send(Message::Data(DataMessage::new(
                    (*year, *month),
                    *balance,
                    epoch.clone(),
                )));
                false
            } else {
                // retain state
                true
            }
        });
    }
}

/// Fake transaction to track
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Transaction {
    amount: f32,
    transaction_time: TransactionTime,
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
struct TransactionTime(NaiveDate);

impl TransactionTime {
    const fn new(year: i32, month: u32, day: u32) -> Self {
        Self(NaiveDate::from_ymd_opt(year, month, day).unwrap())
    }
}

impl Timestamp for TransactionTime {
    const MAX: Self = Self(NaiveDate::MAX);

    const MIN: Self = Self(NaiveDate::MIN);

    fn merge(&self, other: &Self) -> Self {
        Self(self.0.min(other.0))
    }
}