Crate sea_streamer

Source
Expand description

SeaStreamer

🌊 A real-time stream processing toolkit for Rust

crate docs build status

SeaStreamer is a toolkit to help you build real-time stream processors in Rust.

§Features

  1. Async

SeaStreamer provides an async API, and it supports both tokio and async-std. In tandem with other async Rust libraries, you can build highly concurrent stream processors.

  1. Generic

We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.

  1. Testable

SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster, and extremely handy when working locally.

  1. Micro-service Oriented

Let’s build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!

§Quick Start

Add the following to your Cargo.toml

sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }

Here is a basic stream consumer:

ⓘ
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args { stream } = Args::parse();

    let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

    let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
    options.set_auto_stream_reset(SeaStreamReset::Earliest);

    let consumer: SeaConsumer = streamer
        .create_consumer(stream.stream_keys(), options)
        .await?;

    loop {
        let mess: SeaMessage = consumer.next().await?;
        println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
    }
}

Here is a basic stream producer:

ⓘ
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args { stream } = Args::parse();

    let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

    let producer: SeaProducer = streamer
        .create_producer(stream.stream_key()?, Default::default())
        .await?;

    for tick in 0..100 {
        let message = format!(r#""tick {tick}""#);
        eprintln!("{message}");
        producer.send(message)?;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }

    producer.end().await?; // flush

    Ok(())
}

Here is a basic stream processor. See also other advanced stream processors.

ⓘ
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args { input, output } = Args::parse();

    let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
    let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
    let consumer: SeaConsumer = streamer
        .create_consumer(input.stream_keys(), options)
        .await?;

    let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
    let producer: SeaProducer = streamer
        .create_producer(output.stream_key()?, Default::default())
        .await?;

    loop {
        let message: SeaMessage = consumer.next().await?;
        let message = process(message).await?;
        eprintln!("{message}");
        producer.send(message)?; // send is non-blocking
    }
}

Now, let’s put them into action.

With Redis / Kafka:

STREAMER_URI="redis://localhost:6379" # or
STREAMER_URI="kafka://localhost:9092"

# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2

With Stdio:

# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2

§Architecture

The architecture of sea-streamer is constructed by a number of sub-crates:

All crates share the same major version. So 0.1 of sea-streamer depends on 0.1 of sea-streamer-socket.

Re-exports§

pub use sea_streamer_kafka as kafka;sea-streamer-kafka
pub use sea_streamer_stdio as stdio;sea-streamer-stdio

Modules§

export
Re-export types from related libraries

Structs§

ConsumerGroup
Used to identify a group of consumers.
ConsumerId
Used to identify a consumer within a group.
MessageHeader
Metadata associated with a message.
OwnedMessage
Payload
The payload of a message.
SeaConnectOptionssea-streamer-socket
sea-streamer-socket concrete type of ConnectOptions.
SeaConsumersea-streamer-socket
sea-streamer-socket concrete type of Consumer.
SeaConsumerOptionssea-streamer-socket
sea-streamer-socket concrete type of ConsumerOptions.
SeaProducersea-streamer-socket
sea-streamer-socket concrete type of Producer.
SeaProducerOptionssea-streamer-socket
sea-streamer-socket concrete type of ProducerOptions.
SeaStreamersea-streamer-socket
sea-streamer-socket concrete type of Streamer.
ShardId
Identifies a shard. Aka. partition.
SharedMessage
It uses an Arc to hold the bytes, so is cheap to clone.
StreamKey
Identifies a stream. Aka. topic.
StreamUrl
Streamer URI with stream key(s).
StreamerUri
URI of Streaming Server. If this is a cluster, there can be multiple nodes.
Timestamp
A PrimitiveDateTime with a UtcOffset.

Enums§

Backendsea-streamer-socket
sea-streamer-socket Enum for identifying the underlying backend.
BackendErrsea-streamer-socket
sea-streamer-socket the concrete backend error.
BytesOrStr
Bytes or Str. Being an str means the data is UTF-8 valid.
ConsumerMode
Mode of stream consumption.
JsonErr
Errors that may happen when processing JSON
NextFuturesea-streamer-socket
sea-streamer-socket concrete type of Future that will yield the next message.
SeaMessagesea-streamer-socket
sea-streamer-socket concrete type of Message.
SeaMessageStreamsea-streamer-socket
sea-streamer-socket concrete type of Stream that will yield the next messages.
SeaStreamResetsea-streamer-socket
SendFuturesea-streamer-socket
sea-streamer-socket concrete type of a Future that will yield a send receipt.
SeqPos
Identifies a position in a stream.
StreamErr
Common errors that may occur.
StreamKeyErr
Errors that may happen when handling StreamKey
StreamUrlErr
Errors that may happen when parsing stream URL

Constants§

MAX_STREAM_KEY_LEN
Maximum string length of a stream key.
SEA_STREAMER_INTERNAL
Reserved by SeaStreamer. Avoid using this as StreamKey.
TIMESTAMP_FORMAT
Canonical display format for Timestamp.

Traits§

Buffer
Common interface of byte containers.
ConnectOptions
Common options when connecting to a streamer.
Consumer
Common interface of consumers, to be implemented by all backends.
ConsumerOptions
Common options of a Consumer.
IntoBytesOrStr
Types that be converted into BytesOrStr.
Message
Common interface of messages, to be implemented by all backends.
Producer
Common interface of producers, to be implemented by all backends.
ProducerOptions
Common options of a Producer.
SeaStreamerBackendsea-streamer-socket
sea-streamer-socket methods shared by Sea* types.
Streamer
Common interface of streamer clients.

Functions§

is_valid_stream_key
is_valid_stream_key_char
Returns true if this character can be used in a stream key.
runtime_error
Function to construct a StreamErr::Runtime error variant.

Type Aliases§

Errorsea-streamer-socket
sea-streamer-socket the concrete error type.
Receipt
Delivery receipt.
SeqNo
The tuple (StreamKey, ShardId, SeqNo) uniquely identifies a message. Aka. offset.
StreamResult
Type alias of the Result type specific to sea-streamer.