consumer/
consumer.rs

1use futures::StreamExt;
2use s2::{
3    client::{ClientConfig, StreamClient},
4    types::{BasinName, ReadSessionRequest, ReadStart},
5};
6use tokio::select;
7
8#[tokio::main]
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10    let token = std::env::var("S2_ACCESS_TOKEN")?;
11    let config = ClientConfig::new(token);
12    let basin: BasinName = "my-favorite-basin".parse()?;
13    let stream = "my-favorite-stream";
14    let stream_client = StreamClient::new(config, basin, stream);
15
16    let start_seq_num = 0;
17    let read_session_request = ReadSessionRequest::new(ReadStart::SeqNum(start_seq_num));
18    let mut read_stream = stream_client.read_session(read_session_request).await?;
19
20    loop {
21        select! {
22            next_batch = read_stream.next() => {
23                let Some(next_batch) = next_batch else { break };
24                let next_batch = next_batch?;
25                println!("{next_batch:?}");
26            }
27            _ = tokio::signal::ctrl_c() => break,
28        }
29    }
30
31    Ok(())
32}