Expand description
SeaStreamer is a toolkit to help you build real-time stream processors in Rust.
§Features
- Async
SeaStreamer provides an async API, and it supports both tokio
and async-std
. In tandem with other async Rust libraries,
you can build highly concurrent stream processors.
- Generic
We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.
- Testable
SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster, and extremely handy when working locally.
- Micro-service Oriented
Let’s build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!
§Quick Start
Add the following to your Cargo.toml
sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }
Here is a basic stream consumer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(SeaStreamReset::Earliest);
let consumer: SeaConsumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess: SeaMessage = consumer.next().await?;
println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
}
}
Here is a basic stream producer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(stream.stream_key()?, Default::default())
.await?;
for tick in 0..100 {
let message = format!(r#""tick {tick}""#);
eprintln!("{message}");
producer.send(message)?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
producer.end().await?; // flush
Ok(())
}
Here is a basic stream processor. See also other advanced stream processors.
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { input, output } = Args::parse();
let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
let consumer: SeaConsumer = streamer
.create_consumer(input.stream_keys(), options)
.await?;
let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(output.stream_key()?, Default::default())
.await?;
loop {
let message: SeaMessage = consumer.next().await?;
let message = process(message).await?;
eprintln!("{message}");
producer.send(message)?; // send is non-blocking
}
}
Now, let’s put them into action.
With Redis / Kafka:
STREAMER_URI="redis://localhost:6379" # or
STREAMER_URI="kafka://localhost:9092"
# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2
With Stdio:
# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
§Architecture
The architecture of sea-streamer
is constructed by a number of sub-crates:
All crates share the same major version. So 0.1
of sea-streamer
depends on 0.1
of sea-streamer-socket
.
Re-exports§
pub use sea_streamer_kafka as kafka;
sea-streamer-kafka
pub use sea_streamer_stdio as stdio;
sea-streamer-stdio
Modules§
- export
- Re-export types from related libraries
Structs§
- Consumer
Group - Used to identify a group of consumers.
- Consumer
Id - Used to identify a consumer within a group.
- Message
Header - Metadata associated with a message.
- Owned
Message - Payload
- The payload of a message.
- SeaConnect
Options sea-streamer-socket
sea-streamer-socket
concrete type of ConnectOptions.- SeaConsumer
sea-streamer-socket
sea-streamer-socket
concrete type of Consumer.- SeaConsumer
Options sea-streamer-socket
sea-streamer-socket
concrete type of ConsumerOptions.- SeaProducer
sea-streamer-socket
sea-streamer-socket
concrete type of Producer.- SeaProducer
Options sea-streamer-socket
sea-streamer-socket
concrete type of ProducerOptions.- SeaStreamer
sea-streamer-socket
sea-streamer-socket
concrete type of Streamer.- ShardId
- Identifies a shard. Aka. partition.
- Shared
Message - It uses an
Arc
to hold the bytes, so is cheap to clone. - Stream
Key - Identifies a stream. Aka. topic.
- Stream
Url - Streamer URI with stream key(s).
- Streamer
Uri - URI of Streaming Server. If this is a cluster, there can be multiple nodes.
- Timestamp
- A
PrimitiveDateTime
with aUtcOffset
.
Enums§
- Backend
sea-streamer-socket
sea-streamer-socket
Enum for identifying the underlying backend.- Backend
Err sea-streamer-socket
sea-streamer-socket
the concrete backend error.- Bytes
OrStr - Bytes or Str. Being an
str
means the data is UTF-8 valid. - Consumer
Mode - Mode of stream consumption.
- JsonErr
- Errors that may happen when processing JSON
- Next
Future sea-streamer-socket
sea-streamer-socket
concrete type of Future that will yield the next message.- SeaMessage
sea-streamer-socket
sea-streamer-socket
concrete type of Message.- SeaMessage
Stream sea-streamer-socket
sea-streamer-socket
concrete type of Stream that will yield the next messages.- SeaStream
Reset sea-streamer-socket
- Send
Future sea-streamer-socket
sea-streamer-socket
concrete type of a Future that will yield a send receipt.- SeqPos
- Identifies a position in a stream.
- Stream
Err - Common errors that may occur.
- Stream
KeyErr - Errors that may happen when handling StreamKey
- Stream
UrlErr - Errors that may happen when parsing stream URL
Constants§
- MAX_
STREAM_ KEY_ LEN - Maximum string length of a stream key.
- SEA_
STREAMER_ INTERNAL - Reserved by SeaStreamer. Avoid using this as StreamKey.
- TIMESTAMP_
FORMAT - Canonical display format for Timestamp.
Traits§
- Buffer
- Common interface of byte containers.
- Connect
Options - Common options when connecting to a streamer.
- Consumer
- Common interface of consumers, to be implemented by all backends.
- Consumer
Options - Common options of a Consumer.
- Into
Bytes OrStr - Types that be converted into
BytesOrStr
. - Message
- Common interface of messages, to be implemented by all backends.
- Producer
- Common interface of producers, to be implemented by all backends.
- Producer
Options - Common options of a Producer.
- SeaStreamer
Backend sea-streamer-socket
sea-streamer-socket
methods shared bySea*
types.- Streamer
- Common interface of streamer clients.
Functions§
- is_
valid_ stream_ key - is_
valid_ stream_ key_ char - Returns true if this character can be used in a stream key.
- runtime_
error - Function to construct a
StreamErr::Runtime
error variant.
Type Aliases§
- Error
sea-streamer-socket
sea-streamer-socket
the concrete error type.- Receipt
- Delivery receipt.
- SeqNo
- The tuple (StreamKey, ShardId, SeqNo) uniquely identifies a message. Aka. offset.
- Stream
Result - Type alias of the
Result
type specific tosea-streamer
.