producer/
producer.rs

1use futures::StreamExt;
2use s2::{
3    batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
4    client::{ClientConfig, StreamClient},
5    types::{AppendInput, AppendRecord, AppendRecordBatch, BasinName, CommandRecord, FencingToken},
6};
7
8#[tokio::main]
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10    let token = std::env::var("S2_ACCESS_TOKEN")?;
11    let config = ClientConfig::new(token);
12    let basin: BasinName = "my-favorite-basin".parse()?;
13    let stream = "my-favorite-stream";
14    let stream_client = StreamClient::new(config, basin, stream);
15
16    let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18    // Set the fencing token.
19    let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20    let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21        .expect("valid batch with 1 append record");
22    let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23    let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25    let match_seq_num = set_fencing_token.tail.seq_num;
26
27    // Stream of records
28    let append_stream = futures::stream::iter([
29        AppendRecord::new("record_1")?,
30        AppendRecord::new("record_2")?,
31    ]);
32
33    let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34        .with_fencing_token(Some(fencing_token))
35        .with_match_seq_num(Some(match_seq_num));
36
37    let append_session_request =
38        AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40    let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42    while let Some(next) = append_session_stream.next().await {
43        let next = next?;
44        println!("{next:#?}");
45    }
46
47    Ok(())
48}