ferrokinesis 0.7.0

A local AWS Kinesis mock server for testing, written in Rust
Documentation
use crate::store::Store;
use crate::types::StreamStatus;

pub async fn run_reaper(store: Store, interval_secs: u64) {
    let interval_ms = interval_secs.saturating_mul(1000);
    sweep_once(&store).await;
    loop {
        crate::runtime::sleep_ms(interval_ms).await;
        sweep_once(&store).await;
    }
}

pub async fn sweep_once(store: &Store) {
    let stream_names = store.list_stream_names().await;
    for name in stream_names {
        let stream = match store.get_stream(&name).await {
            Ok(s) => s,
            Err(_) => continue,
        };
        if stream.stream_status != StreamStatus::Active {
            continue;
        }
        let deleted = store
            .delete_expired_records(&name, stream.retention_period_hours)
            .await;
        if deleted > 0 {
            tracing::debug!(stream = %name, deleted, "retention: trimmed expired records");
        }
    }
}