pub struct S2Basin { /* private fields */ }Expand description
A basin in an S2 account.
See S2::basin.
Implementations§
Source§impl S2Basin
impl S2Basin
Sourcepub fn stream(&self, name: StreamName) -> S2Stream
pub fn stream(&self, name: StreamName) -> S2Stream
Get an S2Stream.
Examples found in repository?
examples/consumer.rs (line 15)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token = std::env::var("S2_ACCESS_TOKEN")?;
11 let basin_name: BasinName = std::env::var("S2_BASIN")?.parse()?;
12 let stream_name: StreamName = std::env::var("S2_STREAM")?.parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let stream = s2.basin(basin_name).stream(stream_name);
16
17 let input = ReadInput::new();
18 let mut batches = stream.read_session(input).await?;
19 loop {
20 select! {
21 batch = batches.next() => {
22 let Some(batch) = batch else { break };
23 let batch = batch?;
24 println!("{batch:?}");
25 }
26 _ = tokio::signal::ctrl_c() => break,
27 }
28 }
29
30 Ok(())
31}More examples
examples/get_latest_record.rs (line 20)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let access_token =
11 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
12 let basin_name: BasinName = std::env::var("S2_BASIN")
13 .map_err(|_| "S2_BASIN env var not set")?
14 .parse()?;
15 let stream_name: StreamName = std::env::var("S2_STREAM")
16 .map_err(|_| "S2_STREAM env var not set")?
17 .parse()?;
18
19 let s2 = S2::new(S2Config::new(access_token))?;
20 let stream = s2.basin(basin_name).stream(stream_name);
21
22 let input = ReadInput::new()
23 .with_start(ReadStart::new().with_from(ReadFrom::TailOffset(1)))
24 .with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(1)));
25 let batch = stream.read(input).await?;
26 println!("{batch:#?}");
27
28 Ok(())
29}examples/explicit_trim.rs (line 18)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13 let stream_name: StreamName = std::env::var("S2_STREAM")
14 .map_err(|_| "S2_STREAM env var not set")?
15 .parse()?;
16
17 let s2 = S2::new(S2Config::new(access_token))?;
18 let stream = s2.basin(basin_name).stream(stream_name);
19
20 let tail = stream.check_tail().await?;
21 if tail.seq_num == 0 {
22 println!("Empty stream");
23 return Ok(());
24 }
25
26 let input = AppendInput::new(AppendRecordBatch::try_from_iter([CommandRecord::trim(
27 tail.seq_num - 1,
28 )
29 .into()])?);
30 stream.append(input).await?;
31 println!("Trim requested");
32
33 Ok(())
34}examples/producer.rs (line 19)
8async fn main() -> Result<(), Box<dyn std::error::Error>> {
9 let access_token =
10 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
11 let basin_name: BasinName = std::env::var("S2_BASIN")
12 .map_err(|_| "S2_BASIN env var not set")?
13 .parse()?;
14 let stream_name: StreamName = std::env::var("S2_STREAM")
15 .map_err(|_| "S2_STREAM env var not set")?
16 .parse()?;
17
18 let s2 = S2::new(S2Config::new(access_token))?;
19 let stream = s2.basin(basin_name).stream(stream_name);
20
21 let producer = stream.producer(ProducerConfig::new());
22
23 let ticket1 = producer.submit(AppendRecord::new("lorem")?).await?;
24 let ticket2 = producer.submit(AppendRecord::new("ipsum")?).await?;
25
26 let ack1 = ticket1.await?;
27 let ack2 = ticket2.await?;
28 println!("Record 1 seq_num: {}", ack1.seq_num);
29 println!("Record 2 seq_num: {}", ack2.seq_num);
30
31 producer.close().await?;
32
33 Ok(())
34}Sourcepub async fn list_streams(
&self,
input: ListStreamsInput,
) -> Result<Page<StreamInfo>, S2Error>
pub async fn list_streams( &self, input: ListStreamsInput, ) -> Result<Page<StreamInfo>, S2Error>
List a page of streams.
See list_all_streams for automatic pagination.
Examples found in repository?
examples/list_streams.rs (line 18)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13
14 let s2 = S2::new(S2Config::new(access_token))?;
15 let basin = s2.basin(basin_name);
16
17 let input = ListStreamsInput::new().with_prefix("my-".parse()?);
18 let page = basin.list_streams(input).await?;
19 println!("{page:#?}");
20
21 Ok(())
22}Sourcepub fn list_all_streams(
&self,
input: ListAllStreamsInput,
) -> Streaming<StreamInfo>
pub fn list_all_streams( &self, input: ListAllStreamsInput, ) -> Streaming<StreamInfo>
List all streams, paginating automatically.
Sourcepub async fn create_stream(
&self,
input: CreateStreamInput,
) -> Result<StreamInfo, S2Error>
pub async fn create_stream( &self, input: CreateStreamInput, ) -> Result<StreamInfo, S2Error>
Create a stream.
Examples found in repository?
examples/create_stream.rs (line 28)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 let access_token =
12 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
13 let basin_name: BasinName = std::env::var("S2_BASIN")
14 .map_err(|_| "S2_BASIN env var not set")?
15 .parse()?;
16 let stream_name: StreamName = std::env::var("S2_STREAM")
17 .map_err(|_| "S2_STREAM env var not set")?
18 .parse()?;
19
20 let s2 = S2::new(S2Config::new(access_token))?;
21 let basin = s2.basin(basin_name);
22
23 let input = CreateStreamInput::new(stream_name.clone()).with_config(
24 StreamConfig::new().with_timestamping(
25 TimestampingConfig::new().with_mode(TimestampingMode::ClientRequire),
26 ),
27 );
28 let stream_info = basin.create_stream(input).await?;
29 println!("{stream_info:#?}");
30
31 let stream_config = basin.get_stream_config(stream_name).await?;
32 println!("{stream_config:#?}");
33
34 Ok(())
35}Sourcepub async fn get_stream_config(
&self,
name: StreamName,
) -> Result<StreamConfig, S2Error>
pub async fn get_stream_config( &self, name: StreamName, ) -> Result<StreamConfig, S2Error>
Get stream configuration.
Examples found in repository?
examples/create_stream.rs (line 31)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 let access_token =
12 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
13 let basin_name: BasinName = std::env::var("S2_BASIN")
14 .map_err(|_| "S2_BASIN env var not set")?
15 .parse()?;
16 let stream_name: StreamName = std::env::var("S2_STREAM")
17 .map_err(|_| "S2_STREAM env var not set")?
18 .parse()?;
19
20 let s2 = S2::new(S2Config::new(access_token))?;
21 let basin = s2.basin(basin_name);
22
23 let input = CreateStreamInput::new(stream_name.clone()).with_config(
24 StreamConfig::new().with_timestamping(
25 TimestampingConfig::new().with_mode(TimestampingMode::ClientRequire),
26 ),
27 );
28 let stream_info = basin.create_stream(input).await?;
29 println!("{stream_info:#?}");
30
31 let stream_config = basin.get_stream_config(stream_name).await?;
32 println!("{stream_config:#?}");
33
34 Ok(())
35}Sourcepub async fn delete_stream(
&self,
input: DeleteStreamInput,
) -> Result<(), S2Error>
pub async fn delete_stream( &self, input: DeleteStreamInput, ) -> Result<(), S2Error>
Delete a stream.
Examples found in repository?
examples/delete_stream.rs (line 21)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let access_token =
9 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
10 let basin_name: BasinName = std::env::var("S2_BASIN")
11 .map_err(|_| "S2_BASIN env var not set")?
12 .parse()?;
13 let stream_name: StreamName = std::env::var("S2_STREAM")
14 .map_err(|_| "S2_STREAM env var not set")?
15 .parse()?;
16
17 let s2 = S2::new(S2Config::new(access_token))?;
18 let basin = s2.basin(basin_name);
19
20 let input = DeleteStreamInput::new(stream_name);
21 basin.delete_stream(input).await?;
22 println!("Deletion requested");
23
24 Ok(())
25}Sourcepub async fn reconfigure_stream(
&self,
input: ReconfigureStreamInput,
) -> Result<StreamConfig, S2Error>
pub async fn reconfigure_stream( &self, input: ReconfigureStreamInput, ) -> Result<StreamConfig, S2Error>
Reconfigure a stream.
Examples found in repository?
examples/reconfigure_stream.rs (line 27)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 let access_token =
12 std::env::var("S2_ACCESS_TOKEN").map_err(|_| "S2_ACCESS_TOKEN env var not set")?;
13 let basin_name: BasinName = std::env::var("S2_BASIN")
14 .map_err(|_| "S2_BASIN env var not set")?
15 .parse()?;
16 let stream_name: StreamName = std::env::var("S2_STREAM")
17 .map_err(|_| "S2_STREAM env var not set")?
18 .parse()?;
19
20 let s2 = S2::new(S2Config::new(access_token))?;
21 let basin = s2.basin(basin_name);
22
23 let input = ReconfigureStreamInput::new(
24 stream_name,
25 StreamReconfiguration::new().with_retention_policy(RetentionPolicy::Age(10 * 24 * 60 * 60)),
26 );
27 let config = basin.reconfigure_stream(input).await?;
28 println!("{config:#?}");
29
30 Ok(())
31}Trait Implementations§
Auto Trait Implementations§
impl Freeze for S2Basin
impl !RefUnwindSafe for S2Basin
impl Send for S2Basin
impl Sync for S2Basin
impl Unpin for S2Basin
impl !UnwindSafe for S2Basin
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more