Crate sea_streamer

source ·
Expand description

SeaStreamer

🌊 A real-time stream processing toolkit for Rust

crate docs build status

SeaStreamer is a toolkit to help you build real-time stream processors in Rust.

Features

  1. Async

SeaStreamer provides an async API, and it supports both tokio and async-std. In tandem with other async Rust libraries, you can build highly concurrent stream processors.

  1. Generic

We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.

  1. Testable

SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster, and extremely handy when working locally.

  1. Micro-service Oriented

Let’s build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!

Quick Start

Add the following to your Cargo.toml

sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }

Here is a basic stream consumer:

ⓘ
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args { stream } = Args::from_args();

    let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

    let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
    options.set_auto_stream_reset(SeaStreamReset::Earliest);

    let consumer: SeaConsumer = streamer
        .create_consumer(stream.stream_keys(), options)
        .await?;

    loop {
        let mess: SeaMessage = consumer.next().await?;
        println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
    }
}

Here is a basic stream producer:

ⓘ
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args { stream } = Args::from_args();

    let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

    let producer: SeaProducer = streamer
        .create_producer(stream.stream_key()?, Default::default())
        .await?;

    for tick in 0..100 {
        let message = format!(r#""tick {tick}""#);
        eprintln!("{message}");
        producer.send(message)?;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }

    producer.end().await?; // flush

    Ok(())
}

Here is a basic stream processor. See also other advanced stream processors.

ⓘ
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args { input, output } = Args::from_args();

    let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
    let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
    let consumer: SeaConsumer = streamer
        .create_consumer(input.stream_keys(), options)
        .await?;

    let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
    let producer: SeaProducer = streamer
        .create_producer(output.stream_key()?, Default::default())
        .await?;

    loop {
        let message: SeaMessage = consumer.next().await?;
        let message = process(message).await?;
        eprintln!("{message}");
        producer.send(message)?; // send is non-blocking
    }
}

Now, let’s put them into action.

With Redis / Kafka:

STREAMER_URI="redis://localhost:6379" # or
STREAMER_URI="kafka://localhost:9092"

# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2

With Stdio:

# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2

Architecture

The architecture of sea-streamer is constructed by a number of sub-crates:

All crates share the same major version. So 0.1 of sea-streamer depends on 0.1 of sea-streamer-socket.

Re-exports

Modules

  • Re-export types from related libraries

Structs

Enums

  • Backendsea-streamer-socket
    sea-streamer-socket Enum for identifying the underlying backend.
  • BackendErrsea-streamer-socket
    sea-streamer-socket the concrete backend error.
  • Bytes or Str. Being an str means the data is UTF-8 valid.
  • Mode of stream consumption.
  • Errors that may happen when processing JSON
  • NextFuturesea-streamer-socket
    sea-streamer-socket concrete type of Future that will yield the next message.
  • SeaMessagesea-streamer-socket
    sea-streamer-socket concrete type of Message.
  • SeaMessageStreamsea-streamer-socket
    sea-streamer-socket concrete type of Stream that will yield the next messages.
  • SeaStreamResetsea-streamer-socket
  • SendFuturesea-streamer-socket
    sea-streamer-socket concrete type of a Future that will yield a send receipt.
  • Identifies a position in a stream.
  • Common errors that may occur.
  • Errors that may happen when handling StreamKey
  • Errors that may happen when parsing stream URL

Constants

Traits

Functions

Type Aliases

  • Errorsea-streamer-socket
    sea-streamer-socket the concrete error type.
  • Delivery receipt.
  • The tuple (StreamKey, ShardId, SeqNo) uniquely identifies a message. Aka. offset.
  • Type alias of the Result type specific to sea-streamer.