1use futures::StreamExt;
2use s2::{
3 batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
4 client::{ClientConfig, StreamClient},
5 types::{AppendInput, AppendRecord, AppendRecordBatch, BasinName, CommandRecord, FencingToken},
6};
7
8#[tokio::main]
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}