1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Fluxus - A powerful stream processing framework in Rust
//!
//! Fluxus is a high-performance stream processing framework inspired by Flink,
//! designed to provide a seamless experience for building and running data processing pipelines in Rust.
//! It offers a rich set of APIs and components to handle various data sources, sinks, and transformations.
//!
//! ## Add Dependencies
//! To use Fluxus, you need to add the following dependencies to your `Cargo.toml`:
//! ```toml
//! [dependencies]
//! fluxus = { version = "1", features = ["full"] }
//! ```
//!
//! ## Word Count Example
//! Here is a word count example using Fluxus:
//! ```rust
//! use anyhow::Result;
//! use fluxus::api::{
//! DataStream,
//! io::{CollectionSink, CollectionSource},
//! };
//! use fluxus::utils::window::WindowConfig;
//! use std::collections::HashMap;
//! use std::time::Duration;
//!
//! pub type WordCount = HashMap<String, usize>;
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! // Sample input text
//! let text = vec![
//! "hello world",
//! "hello stream processing",
//! "world of streaming",
//! "hello streaming world",
//! ];
//!
//! // Create a source from the text collection
//! let source = CollectionSource::new(text);
//! let sink: CollectionSink<WordCount> = CollectionSink::new();
//!
//! // Build and execute the streaming pipeline
//! DataStream::new(source)
//! // Split text into words
//! .map(|line| {
//! line.split_whitespace()
//! .map(|s| s.to_lowercase())
//! .collect::<Vec<_>>()
//! })
//! // Parallelize the processing
//! .parallel(2)
//! // Create tumbling windows of 1 second
//! .window(WindowConfig::tumbling(Duration::from_millis(1000)))
//! // Count words in each window
//! .aggregate(HashMap::new(), |mut counts, words| {
//! for word in words {
//! *counts.entry(word).or_insert(0) += 1;
//! }
//! counts
//! })
//! // Write results to sink
//! .sink(sink.clone())
//! .await?;
//!
//! // Print the results
//! println!("\nWord count results:");
//! for result in sink.get_data() {
//! println!("\nWindow results:");
//! let mut words: Vec<_> = result.iter().collect();
//! words.sort_by(|a, b| b.1.cmp(a.1).then(a.0.cmp(b.0)));
//! for (word, count) in words {
//! println!(" {}: {}", word, count);
//! }
//! }
//!
//! Ok(())
//! }
//!