Crate sea_streamer
source ·Expand description
SeaStreamer is a toolkit to help you build real-time stream processors in Rust.
§Features
- Async
SeaStreamer provides an async API, and it supports both tokio
and async-std
. In tandem with other async Rust libraries,
you can build highly concurrent stream processors.
- Generic
We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.
- Testable
SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster, and extremely handy when working locally.
- Micro-service Oriented
Let’s build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!
§Quick Start
Add the following to your Cargo.toml
sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }
Here is a basic stream consumer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(SeaStreamReset::Earliest);
let consumer: SeaConsumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess: SeaMessage = consumer.next().await?;
println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
}
}
Here is a basic stream producer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(stream.stream_key()?, Default::default())
.await?;
for tick in 0..100 {
let message = format!(r#""tick {tick}""#);
eprintln!("{message}");
producer.send(message)?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
producer.end().await?; // flush
Ok(())
}
Here is a basic stream processor. See also other advanced stream processors.
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { input, output } = Args::parse();
let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
let consumer: SeaConsumer = streamer
.create_consumer(input.stream_keys(), options)
.await?;
let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(output.stream_key()?, Default::default())
.await?;
loop {
let message: SeaMessage = consumer.next().await?;
let message = process(message).await?;
eprintln!("{message}");
producer.send(message)?; // send is non-blocking
}
}
Now, let’s put them into action.
With Redis / Kafka:
STREAMER_URI="redis://localhost:6379" # or
STREAMER_URI="kafka://localhost:9092"
# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2
With Stdio:
# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
§Architecture
The architecture of sea-streamer
is constructed by a number of sub-crates:
All crates share the same major version. So 0.1
of sea-streamer
depends on 0.1
of sea-streamer-socket
.
Re-exports§
pub use sea_streamer_kafka as kafka;
sea-streamer-kafka
pub use sea_streamer_stdio as stdio;
sea-streamer-stdio
Modules§
- Re-export types from related libraries
Structs§
- Used to identify a group of consumers.
- Used to identify a consumer within a group.
- Metadata associated with a message.
- The payload of a message.
- SeaConnectOptions
sea-streamer-socket
sea-streamer-socket
concrete type of ConnectOptions. - SeaConsumer
sea-streamer-socket
sea-streamer-socket
concrete type of Consumer. - SeaConsumerOptions
sea-streamer-socket
sea-streamer-socket
concrete type of ConsumerOptions. - SeaProducer
sea-streamer-socket
sea-streamer-socket
concrete type of Producer. - SeaProducerOptions
sea-streamer-socket
sea-streamer-socket
concrete type of ProducerOptions. - SeaStreamer
sea-streamer-socket
sea-streamer-socket
concrete type of Streamer. - Identifies a shard. Aka. partition.
- It uses an
Arc
to hold the bytes, so is cheap to clone. - Identifies a stream. Aka. topic.
- Streamer URI with stream key(s).
- URI of Streaming Server. If this is a cluster, there can be multiple nodes.
- A
PrimitiveDateTime
with aUtcOffset
.
Enums§
- Backend
sea-streamer-socket
sea-streamer-socket
Enum for identifying the underlying backend. - BackendErr
sea-streamer-socket
sea-streamer-socket
the concrete backend error. - Bytes or Str. Being an
str
means the data is UTF-8 valid. - Mode of stream consumption.
- Errors that may happen when processing JSON
- NextFuture
sea-streamer-socket
sea-streamer-socket
concrete type of Future that will yield the next message. - SeaMessage
sea-streamer-socket
sea-streamer-socket
concrete type of Message. - SeaMessageStream
sea-streamer-socket
sea-streamer-socket
concrete type of Stream that will yield the next messages. - SeaStreamReset
sea-streamer-socket
- SendFuture
sea-streamer-socket
sea-streamer-socket
concrete type of a Future that will yield a send receipt. - Identifies a position in a stream.
- Common errors that may occur.
- Errors that may happen when handling StreamKey
- Errors that may happen when parsing stream URL
Constants§
- Maximum string length of a stream key.
- Reserved by SeaStreamer. Avoid using this as StreamKey.
- Canonical display format for Timestamp.
Traits§
- Common interface of byte containers.
- Common options when connecting to a streamer.
- Common interface of consumers, to be implemented by all backends.
- Common options of a Consumer.
- Types that be converted into
BytesOrStr
. - Common interface of messages, to be implemented by all backends.
- Common interface of producers, to be implemented by all backends.
- Common options of a Producer.
- SeaStreamerBackend
sea-streamer-socket
sea-streamer-socket
methods shared bySea*
types. - Common interface of streamer clients.
Functions§
- Returns true if this character can be used in a stream key.
- Function to construct a
StreamErr::Runtime
error variant.