sea-streamer-socket 0.5.2

🌊 SeaStreamer backend-agnostic Socket API
Documentation
use anyhow::{bail, Result};
use clap::Parser;
use sea_streamer_socket::{SeaConsumerOptions, SeaStreamer};
use sea_streamer_types::{
    Consumer, ConsumerMode, ConsumerOptions, Message, Producer, StreamUrl, Streamer,
};
use std::str::FromStr;

#[cfg(feature = "backend-kafka")]
use sea_streamer_kafka::AutoOffsetReset;
#[cfg(feature = "backend-redis")]
use sea_streamer_redis::AutoStreamReset;

#[derive(Debug, Parser)]
struct Args {
    #[clap(
        long,
        help = "Streamer Source Uri, i.e. try `kafka://localhost:9092/stream_key`"
    )]
    input: StreamUrl,
    #[clap(long, help = "Streamer Sink Uri, i.e. try `stdio:///stream_key`")]
    output: StreamUrl,
    #[clap(long, help = "Stream from `start` or `end`", default_value = "end")]
    offset: Offset,
}

#[derive(Debug)]
enum Offset {
    Start,
    End,
}

impl FromStr for Offset {
    type Err = &'static str;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        match s {
            "start" => Ok(Self::Start),
            "end" => Ok(Self::End),
            _ => Err("unknown Offset"),
        }
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let Args {
        input,
        output,
        offset,
    } = Args::parse();

    if input == output && input.streamer().protocol() != Some("stdio") {
        bail!("input == output !!!");
    }

    let source = SeaStreamer::connect(input.streamer(), Default::default()).await?;
    let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
    #[cfg(feature = "backend-kafka")]
    options.set_kafka_consumer_options(|options| {
        options.set_auto_offset_reset(match offset {
            Offset::Start => AutoOffsetReset::Earliest,
            Offset::End => AutoOffsetReset::Latest,
        });
    });
    #[cfg(feature = "backend-redis")]
    options.set_redis_consumer_options(|options| {
        options.set_auto_stream_reset(match offset {
            Offset::Start => AutoStreamReset::Earliest,
            Offset::End => AutoStreamReset::Latest,
        });
    });
    let consumer = source.create_consumer(input.stream_keys(), options).await?;

    let sink = SeaStreamer::connect(output.streamer(), Default::default()).await?;
    let producer = sink
        .create_producer(output.stream_key()?, Default::default())
        .await?;

    loop {
        let mess = consumer.next().await?;
        producer.send(mess.message())?;
    }
}