pub struct S2Stream { /* private fields */ }Expand description
A stream in an S2 basin.
See S2Basin::stream.
Implementations§
Source§impl S2Stream
impl S2Stream
Sourcepub async fn check_tail(&self) -> Result<StreamPosition, S2Error>
pub async fn check_tail(&self) -> Result<StreamPosition, S2Error>
Check tail position.
Examples found in repository?
examples/explicit_trim.rs (line 20)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13 let stream_name: StreamName = std::env::var("S2_STREAM")
14 .map_err(|_| "S2_STREAM env var not set")?
15 .parse()?;
16
17 let s2 = S2::new(S2Config::new(access_token))?;
18 let stream = s2.basin(basin_name).stream(stream_name);
19
20 let tail = stream.check_tail().await?;
21 if tail.seq_num == 0 {
22 println!("Empty stream");
23 return Ok(());
24 }
25
26 let input = AppendInput::new(AppendRecordBatch::try_from_iter([CommandRecord::trim(
27 tail.seq_num - 1,
28 )
29 .into()])?);
30 stream.append(input).await?;
31 println!("Trim requested");
32
33 Ok(())
34}Sourcepub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error>
pub async fn append(&self, input: AppendInput) -> Result<AppendAck, S2Error>
Append records.
Examples found in repository?
examples/explicit_trim.rs (line 30)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13 let stream_name: StreamName = std::env::var("S2_STREAM")
14 .map_err(|_| "S2_STREAM env var not set")?
15 .parse()?;
16
17 let s2 = S2::new(S2Config::new(access_token))?;
18 let stream = s2.basin(basin_name).stream(stream_name);
19
20 let tail = stream.check_tail().await?;
21 if tail.seq_num == 0 {
22 println!("Empty stream");
23 return Ok(());
24 }
25
26 let input = AppendInput::new(AppendRecordBatch::try_from_iter([CommandRecord::trim(
27 tail.seq_num - 1,
28 )
29 .into()])?);
30 stream.append(input).await?;
31 println!("Trim requested");
32
33 Ok(())
34}Sourcepub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error>
pub async fn read(&self, input: ReadInput) -> Result<ReadBatch, S2Error>
Read records.
Examples found in repository?
examples/get_latest_record.rs (line 25)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token =
11 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
12 let basin_name: BasinName = std::env::var("S2_BASIN")
13 .map_err(|_| "S2_BASIN env var not set")?
14 .parse()?;
15 let stream_name: StreamName = std::env::var("S2_STREAM")
16 .map_err(|_| "S2_STREAM env var not set")?
17 .parse()?;
18
19 let s2 = S2::new(S2Config::new(access_token))?;
20 let stream = s2.basin(basin_name).stream(stream_name);
21
22 let input = ReadInput::new()
23 .with_start(ReadStart::new().with_from(ReadFrom::TailOffset(1)))
24 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(1)));
25 let batch = stream.read(input).await?;
26 println!("{batch:#?}");
27
28 Ok(())
29}Sourcepub fn append_session(&self, config: AppendSessionConfig) -> AppendSession
pub fn append_session(&self, config: AppendSessionConfig) -> AppendSession
Create an append session for submitting AppendInputs.
Sourcepub fn producer(&self, config: ProducerConfig) -> Producer
pub fn producer(&self, config: ProducerConfig) -> Producer
Create a producer for submitting individual AppendRecords.
Examples found in repository?
examples/producer.rs (line 21)
8async fn main() -> Result<(), Box<dyn std::error::Error>> {
9 let access_token =
10 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
11 let basin_name: BasinName = std::env::var("S2_BASIN")
12 .map_err(|_| "S2_BASIN env var not set")?
13 .parse()?;
14 let stream_name: StreamName = std::env::var("S2_STREAM")
15 .map_err(|_| "S2_STREAM env var not set")?
16 .parse()?;
17
18 let s2 = S2::new(S2Config::new(access_token))?;
19 let stream = s2.basin(basin_name).stream(stream_name);
20
21 let producer = stream.producer(ProducerConfig::new());
22
23 let ticket1 = producer.submit(AppendRecord::new("lorem")?).await?;
24 let ticket2 = producer.submit(AppendRecord::new("ipsum")?).await?;
25
26 let ack1 = ticket1.await?;
27 let ack2 = ticket2.await?;
28 println!("Record 1 seq_num: {}", ack1.seq_num);
29 println!("Record 2 seq_num: {}", ack2.seq_num);
30
31 producer.close().await?;
32
33 Ok(())
34}Sourcepub async fn read_session(
&self,
input: ReadInput,
) -> Result<Streaming<ReadBatch>, S2Error>
pub async fn read_session( &self, input: ReadInput, ) -> Result<Streaming<ReadBatch>, S2Error>
Create a read session.
Examples found in repository?
examples/consumer.rs (line 18)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
11 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
12 let stream_name: StreamName = std::env::var("S2_STREAM")?.parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let stream = s2.basin(basin_name).stream(stream_name);
16
17 let input = ReadInput::new();
18 let mut batches = stream.read_session(input).await?;
19 loop {
20 select! {
21 batch = batches.next() => {
22 let Some(batch) = batch else { break };
23 let batch = batch?;
24 println!("{batch:?}");
25 }
26 _ = tokio::signal::ctrl_c() => break,
27 }
28 }
29
30 Ok(())
31}Trait Implementations§
Auto Trait Implementations§
impl Freeze for S2Stream
impl !RefUnwindSafe for S2Stream
impl Send for S2Stream
impl Sync for S2Stream
impl Unpin for S2Stream
impl !UnwindSafe for S2Stream
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more