use std::time::Duration;
use futures::StreamExt;
use s2_sdk::{
S2,
append_session::AppendSessionConfig,
batching::BatchingConfig,
producer::ProducerConfig,
types::{
AppendInput, AppendRecord, AppendRecordBatch, BasinName, ReadFrom, ReadInput, ReadLimits,
ReadStart, ReadStop, S2Config, StreamName,
},
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_ACCESS_TOKEN")?;
let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
let client = S2::new(S2Config::new(token))?;
let basin = client.basin(basin_name);
let stream_name: StreamName = format!(
"docs-streams-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis()
)
.parse()?;
basin
.create_stream(s2_sdk::types::CreateStreamInput::new(stream_name.clone()))
.await?;
let stream = basin.stream(stream_name.clone());
let records = AppendRecordBatch::try_from_iter([
AppendRecord::new("first event")?,
AppendRecord::new("second event")?,
])?;
let ack = stream.append(AppendInput::new(records)).await?;
println!(
"Wrote records {} through {}",
ack.start.seq_num,
ack.end.seq_num - 1
);
let batch = stream
.read(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
)
.await?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
let session = stream.append_session(AppendSessionConfig::new());
let records = AppendRecordBatch::try_from_iter([
AppendRecord::new("event-1")?,
AppendRecord::new("event-2")?,
])?;
let ticket = session.submit(AppendInput::new(records)).await?;
let ack = ticket.await?;
println!("Durable at seqNum {}", ack.start.seq_num);
session.close().await?;
let producer = stream.producer(
ProducerConfig::new()
.with_batching(BatchingConfig::new().with_linger(Duration::from_millis(5))),
);
let ticket = producer.submit(AppendRecord::new("my event")?).await?;
let ack = ticket.await?;
println!("Record durable at seqNum {}", ack.seq_num);
producer.close().await?;
let tail = stream.check_tail().await?;
println!("Stream has {} records", tail.seq_num);
basin
.delete_stream(s2_sdk::types::DeleteStreamInput::new(stream_name))
.await?;
println!("Streams examples completed");
if std::env::var("RUN_READ_SESSIONS").is_err() {
return Ok(());
}
let mut session = stream
.read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
let mut session = stream
.read_session(
ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_until(..one_hour_ago)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_wait(30)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Ok(())
}