Crate sea_streamer
source ·Expand description
SeaStreamer is a stream processing toolkit to help you build stream processors in Rust.
Features
- Async
SeaStreamer provides an async API, and it supports both tokio
and async-std
. In tandem with other async Rust libraries,
you can build highly concurrent stream processors.
- Generic
We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.
- Testable
SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster, and extremely handy when working locally.
- Micro-service Oriented
Let’s build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!
Quick Start
Add the following to your Cargo.toml
sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }
Here is a basic stream consumer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::from_args();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(SeaStreamReset::Earliest);
let consumer: SeaConsumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess: SeaMessage = consumer.next().await?;
println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
}
}
Here is a basic stream producer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::from_args();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(stream.stream_key()?, Default::default())
.await?;
for tick in 0..100 {
let message = format!(r#""tick {tick}""#);
eprintln!("{message}");
producer.send(message)?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
producer.end().await?; // flush
Ok(())
}
Here is a basic stream processor. See also other advanced stream processors.
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { input, output } = Args::from_args();
let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
let consumer: SeaConsumer = streamer
.create_consumer(input.stream_keys(), options)
.await?;
let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(output.stream_key()?, Default::default())
.await?;
loop {
let message: SeaMessage = consumer.next().await?;
let message = process(message).await?;
eprintln!("{message}");
producer.send(message)?; // send is non-blocking
}
}
Now, let’s put them into action.
With Redis / Kafka:
STREAMER_URI="redis://localhost:6379" # or
STREAMER_URI="kafka://localhost:9092"
# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2
With Stdio:
# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
Architecture
The architecture of sea-streamer
is constructed by a number of sub-crates:
All crates share the same major version. So 0.1
of sea-streamer
depends on 0.1
of sea-streamer-socket
.
Re-exports
pub use sea_streamer_kafka as kafka;
pub use sea_streamer_stdio as stdio;
Modules
- Re-export types from related libraries
Structs
- Used to identify a group of consumers.
- Used to identify a consumer within a group.
- headerdata associated with a message.
- The payload of a message.
sea-streamer-socket
concrete type of ConnectOptions.sea-streamer-socket
concrete type of Consumer.sea-streamer-socket
concrete type of ConsumerOptions.sea-streamer-socket
concrete type of Producer.sea-streamer-socket
concrete type of ProducerOptions.sea-streamer-socket
concrete type of Streamer.- Identifies a shard. Aka. partition.
- It uses an
Arc
to hold the bytes, so is cheap to clone. - Identifies a stream. Aka. topic.
- Streamer URI with stream key(s).
- URI of Streaming Server. If this is a cluster, there can be multiple nodes.
- A
PrimitiveDateTime
with aUtcOffset
.
Enums
sea-streamer-socket
Enum for identifying the underlying backend.sea-streamer-socket
the concrete backend error.- Bytes or Str. Being an
str
means the data is UTF-8 valid. - Mode of stream consumption.
- Errors that may happen when processing JSON
sea-streamer-socket
concrete type of Future that will yield the next message.sea-streamer-socket
concrete type of Message.sea-streamer-socket
concrete type of Stream that will yield the next messages.sea-streamer-socket
concrete type of a Future that will yield a send receipt.- Identifies a position in a stream.
- Common errors that may occur.
- Errors that may happen when handling StreamKey
- Errors that may happen when parsing stream URL
Constants
- Maximum string length of a stream key.
Traits
- Common interface of byte containers.
- Common options when connecting to a streamer.
- Common interface of consumers, to be implemented by all backends.
- Common options of a Consumer.
- Types that be converted into
BytesOrStr
. - Common interface of messages, to be implemented by all backends.
- Common interface of producers, to be implemented by all backends.
- Common options of a Producer.
sea-streamer-socket
methods shared bySea*
types.- Common interface of streamer clients.
Functions
- Returns true if this character can be used in a stream key.
- Function to construct a
StreamErr::Runtime
error variant.
Type Definitions
sea-streamer-socket
the concrete error type.- Delivery receipt.
- The tuple (StreamKey, ShardId, SeqNo) uniquely identifies a message. Aka. offset.
- Type alias of the
Result
type specific tosea-streamer
.