sea_streamer/
lib.rs

1//! <div align="center">
2//!
3//!   <img src="https://raw.githubusercontent.com/SeaQL/sea-streamer/master/docs/SeaStreamer Banner.png"/>
4//!
5//!   <h1>SeaStreamer</h1>
6//!
7//!   <p>
8//!     <strong>🌊 A real-time stream processing toolkit for Rust</strong>
9//!   </p>
10//!
11//!   [![crate](https://img.shields.io/crates/v/sea-streamer.svg)](https://crates.io/crates/sea-streamer)
12//!   [![docs](https://docs.rs/sea-streamer/badge.svg)](https://docs.rs/sea-streamer)
13//!   [![build status](https://github.com/SeaQL/sea-streamer/actions/workflows/rust.yml/badge.svg)](https://github.com/SeaQL/sea-streamer/actions/workflows/rust.yml)
14//!
15//! </div>
16//!
17//! SeaStreamer is a toolkit to help you build real-time stream processors in Rust.
18//!
19//! ## Features
20//!
21//! 1. Async
22//!
23//! SeaStreamer provides an async API, and it supports both `tokio` and `async-std`. In tandem with other async Rust libraries,
24//! you can build highly concurrent stream processors.
25//!
26//! 2. Generic
27//!
28//! We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.
29//!
30//! 3. Testable
31//!
32//! SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster,
33//! and extremely handy when working locally.
34//!
35//! 4. Micro-service Oriented
36//!
37//! Let's build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!
38//!
39//! ## Quick Start
40//!
41//! Add the following to your `Cargo.toml`
42//!
43//! ```toml
44//! sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }
45//! ```
46//!
47//! Here is a basic [stream consumer](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/consumer.rs):
48//!
49//! ```ignore
50//! #[tokio::main]
51//! async fn main() -> Result<()> {
52//!     env_logger::init();
53//!
54//!     let Args { stream } = Args::parse();
55//!
56//!     let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
57//!
58//!     let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
59//!     options.set_auto_stream_reset(SeaStreamReset::Earliest);
60//!
61//!     let consumer: SeaConsumer = streamer
62//!         .create_consumer(stream.stream_keys(), options)
63//!         .await?;
64//!
65//!     loop {
66//!         let mess: SeaMessage = consumer.next().await?;
67//!         println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
68//!     }
69//! }
70//! ```
71//!
72//! Here is a basic [stream producer](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/producer.rs):
73//!
74//! ```ignore
75//! #[tokio::main]
76//! async fn main() -> Result<()> {
77//!     env_logger::init();
78//!
79//!     let Args { stream } = Args::parse();
80//!
81//!     let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
82//!
83//!     let producer: SeaProducer = streamer
84//!         .create_producer(stream.stream_key()?, Default::default())
85//!         .await?;
86//!
87//!     for tick in 0..100 {
88//!         let message = format!(r#""tick {tick}""#);
89//!         eprintln!("{message}");
90//!         producer.send(message)?;
91//!         tokio::time::sleep(Duration::from_secs(1)).await;
92//!     }
93//!
94//!     producer.end().await?; // flush
95//!
96//!     Ok(())
97//! }
98//! ```
99//!
100//! Here is a [basic stream processor](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/processor.rs).
101//! See also other [advanced stream processors](https://github.com/SeaQL/sea-streamer/tree/main/examples/).
102//!
103//! ```ignore
104//! #[tokio::main]
105//! async fn main() -> Result<()> {
106//!     env_logger::init();
107//!
108//!     let Args { input, output } = Args::parse();
109//!
110//!     let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
111//!     let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
112//!     let consumer: SeaConsumer = streamer
113//!         .create_consumer(input.stream_keys(), options)
114//!         .await?;
115//!
116//!     let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
117//!     let producer: SeaProducer = streamer
118//!         .create_producer(output.stream_key()?, Default::default())
119//!         .await?;
120//!
121//!     loop {
122//!         let message: SeaMessage = consumer.next().await?;
123//!         let message = process(message).await?;
124//!         eprintln!("{message}");
125//!         producer.send(message)?; // send is non-blocking
126//!     }
127//! }
128//! ```
129//!
130//! Now, let's put them into action.
131//!
132//! With Redis / Kafka:
133//!
134//! ```shell
135//! STREAMER_URI="redis://localhost:6379" # or
136//! STREAMER_URI="kafka://localhost:9092"
137//!
138//! # Produce some input
139//! cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
140//! # Start the processor, producing some output
141//! cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
142//! # Replay the output
143//! cargo run --bin consumer -- --stream $STREAMER_URI/hello2
144//! # Remember to stop the processes
145//! kill %1 %2
146//! ```
147//!
148//! With Stdio:
149//!
150//! ```shell
151//! # Pipe the producer to the processor
152//! cargo run --bin producer -- --stream stdio:///hello1 | \
153//! cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
154//! ```
155//!
156//! ## Architecture
157//!
158//! The architecture of [`sea-streamer`](https://docs.rs/sea-streamer) is constructed by a number of sub-crates:
159//!
160//! + [`sea-streamer-types`](https://docs.rs/sea-streamer-types)
161//! + [`sea-streamer-socket`](https://docs.rs/sea-streamer-socket)
162//!     + [`sea-streamer-kafka`](https://docs.rs/sea-streamer-kafka)
163//!     + [`sea-streamer-redis`](https://docs.rs/sea-streamer-redis)
164//!     + [`sea-streamer-stdio`](https://docs.rs/sea-streamer-stdio)
165//!     + [`sea-streamer-file`](https://docs.rs/sea-streamer-file)
166//! + [`sea-streamer-runtime`](https://docs.rs/sea-streamer-runtime)
167//!
168//! All crates share the same major version. So `0.1` of `sea-streamer` depends on `0.1` of `sea-streamer-socket`.
169
170#![cfg_attr(docsrs, feature(doc_cfg))]
171#![doc(
172    html_logo_url = "https://raw.githubusercontent.com/SeaQL/sea-streamer/main/docs/SeaQL icon.png"
173)]
174
175pub use sea_streamer_types::*;
176
177#[cfg(feature = "sea-streamer-kafka")]
178#[cfg_attr(docsrs, doc(cfg(feature = "sea-streamer-kafka")))]
179pub use sea_streamer_kafka as kafka;
180
181#[cfg(feature = "sea-streamer-redis")]
182#[cfg_attr(docsrs, doc(cfg(feature = "sea-streamer-redis")))]
183pub use sea_streamer_redis as redis;
184
185#[cfg(feature = "sea-streamer-stdio")]
186#[cfg_attr(docsrs, doc(cfg(feature = "sea-streamer-stdio")))]
187pub use sea_streamer_stdio as stdio;
188
189#[cfg(feature = "sea-streamer-file")]
190#[cfg_attr(docsrs, doc(cfg(feature = "sea-streamer-file")))]
191pub use sea_streamer_file as file;
192
193#[cfg(feature = "sea-streamer-socket")]
194#[cfg_attr(docsrs, doc(cfg(feature = "sea-streamer-socket")))]
195pub use sea_streamer_socket::*;
196
197#[cfg(any(
198    feature = "runtime",
199    feature = "runtime-tokio",
200    feature = "runtime-async-std"
201))]
202pub use sea_streamer_runtime as runtime;