Crate fluxus

Crate fluxus 

Source
Expand description

Fluxus - A powerful stream processing framework in Rust

Fluxus is a high-performance stream processing framework inspired by Flink, designed to provide a seamless experience for building and running data processing pipelines in Rust. It offers a rich set of APIs and components to handle various data sources, sinks, and transformations.

§Add Dependencies

To use Fluxus, you need to add the following dependencies to your Cargo.toml:

[dependencies]
fluxus = { version = "1", features = ["full"] }

§Word Count Example

Here is a word count example using Fluxus:

use anyhow::Result;
use fluxus::api::{
    DataStream,
    io::{CollectionSink, CollectionSource},
};
use fluxus::utils::window::WindowConfig;
use std::collections::HashMap;
use std::time::Duration;

pub type WordCount = HashMap<String, usize>;

#[tokio::main]
async fn main() -> Result<()> {
    // Sample input text
    let text = vec![
        "hello world",
        "hello stream processing",
        "world of streaming",
        "hello streaming world",
    ];

    // Create a source from the text collection
    let source = CollectionSource::new(text);
    let sink: CollectionSink<WordCount> = CollectionSink::new();

    // Build and execute the streaming pipeline
    DataStream::new(source)
        // Split text into words
        .map(|line| {
            line.split_whitespace()
                .map(|s| s.to_lowercase())
                .collect::<Vec<_>>()
        })
        // Parallelize the processing
        .parallel(2)
        // Create tumbling windows of 1 second
        .window(WindowConfig::tumbling(Duration::from_millis(1000)))
        // Count words in each window
        .aggregate(HashMap::new(), |mut counts, words| {
            for word in words {
                *counts.entry(word).or_insert(0) += 1;
            }
            counts
        })
        // Write results to sink
        .sink(sink.clone())
        .await?;

    // Print the results
    println!("\nWord count results:");
    for result in sink.get_data() {
        println!("\nWindow results:");
        let mut words: Vec<_> = result.iter().collect();
        words.sort_by(|a, b| b.1.cmp(a.1).then(a.0.cmp(b.0)));
        for (word, count) in words {
            println!("  {}: {}", word, count);
        }
    }

    Ok(())
}