docs_streams/
docs_streams.rs1use std::time::Duration;
6
7use futures::StreamExt;
8use s2_sdk::{
9 S2,
10 append_session::AppendSessionConfig,
11 batching::BatchingConfig,
12 producer::ProducerConfig,
13 types::{
14 AppendInput, AppendRecord, AppendRecordBatch, BasinName, ReadFrom, ReadInput, ReadLimits,
15 ReadStart, ReadStop, S2Config, StreamName,
16 },
17};
18
19#[tokio::main]
20async fn main() -> Result<(), Box<dyn std::error::Error>> {
21 let token = std::env::var("S2_ACCESS_TOKEN")?;
22 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
23
24 let client = S2::new(S2Config::new(token))?;
25 let basin = client.basin(basin_name);
26
27 let stream_name: StreamName = format!(
29 "docs-streams-{}",
30 std::time::SystemTime::now()
31 .duration_since(std::time::UNIX_EPOCH)?
32 .as_millis()
33 )
34 .parse()?;
35 basin
36 .create_stream(s2_sdk::types::CreateStreamInput::new(stream_name.clone()))
37 .await?;
38
39 let stream = basin.stream(stream_name.clone());
41
42 let records = AppendRecordBatch::try_from_iter([
43 AppendRecord::new("first event")?,
44 AppendRecord::new("second event")?,
45 ])?;
46
47 let ack = stream.append(AppendInput::new(records)).await?;
48
49 println!(
51 "Wrote records {} through {}",
52 ack.start.seq_num,
53 ack.end.seq_num - 1
54 );
55 let batch = stream
59 .read(
60 ReadInput::new()
61 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
62 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
63 )
64 .await?;
65
66 for record in batch.records {
67 println!("[{}] {:?}", record.seq_num, record.body);
68 }
69 let session = stream.append_session(AppendSessionConfig::new());
73
74 let records = AppendRecordBatch::try_from_iter([
76 AppendRecord::new("event-1")?,
77 AppendRecord::new("event-2")?,
78 ])?;
79 let ticket = session.submit(AppendInput::new(records)).await?;
80
81 let ack = ticket.await?;
83 println!("Durable at seqNum {}", ack.start.seq_num);
84
85 session.close().await?;
86 let producer = stream.producer(
90 ProducerConfig::new()
91 .with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
92 );
93
94 let ticket = producer.submit(AppendRecord::new("my event")?).await?;
96
97 let ack = ticket.await?;
99 println!("Record durable at seqNum {}", ack.seq_num);
100
101 producer.close().await?;
102 let tail = stream.check_tail().await?;
106 println!("Stream has {} records", tail.seq_num);
107 basin
111 .delete_stream(s2_sdk::types::DeleteStreamInput::new(stream_name))
112 .await?;
113
114 println!("Streams examples completed");
115
116 if std::env::var("RUN_READ_SESSIONS").is_err() {
119 return Ok(());
120 }
121
122 let mut session = stream
124 .read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
125 .await?;
126
127 while let Some(batch) = session.next().await {
128 let batch = batch?;
129 for record in batch.records {
130 println!("[{}] {:?}", record.seq_num, record.body);
131 }
132 }
133 let mut session = stream
138 .read_session(
139 ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
140 )
141 .await?;
142
143 while let Some(batch) = session.next().await {
144 let batch = batch?;
145 for record in batch.records {
146 println!("[{}] {:?}", record.seq_num, record.body);
147 }
148 }
149 let one_hour_ago = std::time::SystemTime::now()
154 .duration_since(std::time::UNIX_EPOCH)?
155 .as_millis() as u64
156 - 3600 * 1000;
157 let mut session = stream
158 .read_session(
159 ReadInput::new()
160 .with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
161 )
162 .await?;
163
164 while let Some(batch) = session.next().await {
165 let batch = batch?;
166 for record in batch.records {
167 println!("[{}] {:?}", record.seq_num, record.body);
168 }
169 }
170 let one_hour_ago = std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)?
176 .as_millis() as u64
177 - 3600 * 1000;
178 let mut session = stream
179 .read_session(
180 ReadInput::new()
181 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
182 .with_stop(ReadStop::new().with_until(..one_hour_ago)),
183 )
184 .await?;
185
186 while let Some(batch) = session.next().await {
187 let batch = batch?;
188 for record in batch.records {
189 println!("[{}] {:?}", record.seq_num, record.body);
190 }
191 }
192 let mut session = stream
198 .read_session(
199 ReadInput::new()
200 .with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
201 .with_stop(ReadStop::new().with_wait(30)),
202 )
203 .await?;
204
205 while let Some(batch) = session.next().await {
206 let batch = batch?;
207 for record in batch.records {
208 println!("[{}] {:?}", record.seq_num, record.body);
209 }
210 }
211 Ok(())
214}