pub struct StreamClient { /* private fields */ }
Expand description
Client for stream-level operations.
Implementations§
Source§impl StreamClient
impl StreamClient
Sourcepub fn new(
config: ClientConfig,
basin: BasinName,
stream: impl Into<String>,
) -> Self
pub fn new( config: ClientConfig, basin: BasinName, stream: impl Into<String>, ) -> Self
Create a new stream client.
Examples found in repository?
examples/get_latest_record.rs (line 12)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let basin: BasinName = "my-favorite-basin".parse()?;
11 let stream = "my-favorite-stream";
12 let stream_client = StreamClient::new(config, basin, stream);
13
14 let read_limit = ReadLimit::new().with_count(1);
15 let read_request = ReadRequest::new(ReadStart::TailOffset(1)).with_limit(read_limit);
16 let latest_record = stream_client.read(read_request).await?;
17
18 println!("{latest_record:#?}");
19
20 Ok(())
21}
More examples
examples/consumer.rs (line 14)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let start_seq_num = 0;
17 let read_session_request = ReadSessionRequest::new(ReadStart::SeqNum(start_seq_num));
18 let mut read_stream = stream_client.read_session(read_session_request).await?;
19
20 loop {
21 select! {
22 next_batch = read_stream.next() => {
23 let Some(next_batch) = next_batch else { break };
24 let next_batch = next_batch?;
25 println!("{next_batch:?}");
26 }
27 _ = tokio::signal::ctrl_c() => break,
28 }
29 }
30
31 Ok(())
32}
examples/explicit_trim.rs (line 12)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let basin: BasinName = "my-favorite-basin".parse()?;
11 let stream = "my-favorite-stream";
12 let stream_client = StreamClient::new(config, basin, stream);
13
14 let tail = stream_client.check_tail().await?;
15 if tail.seq_num == 0 {
16 println!("Empty stream");
17 return Ok(());
18 }
19
20 let latest_seq_num = tail.seq_num - 1;
21 let trim_request = CommandRecord::trim(latest_seq_num);
22
23 let append_record_batch = AppendRecordBatch::try_from_iter([trim_request])
24 .expect("valid batch with 1 command record");
25 let append_input = AppendInput::new(append_record_batch);
26 let _ = stream_client.append(append_input).await?;
27
28 println!("Trim requested");
29
30 Ok(())
31}
examples/producer.rs (line 14)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}
Sourcepub fn new_with_connector<C>(
config: ClientConfig,
basin: BasinName,
stream: impl Into<String>,
connector: C,
) -> Self
pub fn new_with_connector<C>( config: ClientConfig, basin: BasinName, stream: impl Into<String>, connector: C, ) -> Self
Create a new stream client using a custom connector.
Sourcepub async fn check_tail(&self) -> Result<StreamPosition, ClientError>
pub async fn check_tail(&self) -> Result<StreamPosition, ClientError>
Check the tail of the stream.
Examples found in repository?
examples/explicit_trim.rs (line 14)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let basin: BasinName = "my-favorite-basin".parse()?;
11 let stream = "my-favorite-stream";
12 let stream_client = StreamClient::new(config, basin, stream);
13
14 let tail = stream_client.check_tail().await?;
15 if tail.seq_num == 0 {
16 println!("Empty stream");
17 return Ok(());
18 }
19
20 let latest_seq_num = tail.seq_num - 1;
21 let trim_request = CommandRecord::trim(latest_seq_num);
22
23 let append_record_batch = AppendRecordBatch::try_from_iter([trim_request])
24 .expect("valid batch with 1 command record");
25 let append_input = AppendInput::new(append_record_batch);
26 let _ = stream_client.append(append_input).await?;
27
28 println!("Trim requested");
29
30 Ok(())
31}
Sourcepub async fn read(&self, req: ReadRequest) -> Result<ReadOutput, ClientError>
pub async fn read(&self, req: ReadRequest) -> Result<ReadOutput, ClientError>
Retrieve a batch of records from a stream.
Examples found in repository?
examples/get_latest_record.rs (line 16)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let basin: BasinName = "my-favorite-basin".parse()?;
11 let stream = "my-favorite-stream";
12 let stream_client = StreamClient::new(config, basin, stream);
13
14 let read_limit = ReadLimit::new().with_count(1);
15 let read_request = ReadRequest::new(ReadStart::TailOffset(1)).with_limit(read_limit);
16 let latest_record = stream_client.read(read_request).await?;
17
18 println!("{latest_record:#?}");
19
20 Ok(())
21}
Sourcepub async fn read_session(
&self,
req: ReadSessionRequest,
) -> Result<Streaming<ReadOutput>, ClientError>
pub async fn read_session( &self, req: ReadSessionRequest, ) -> Result<Streaming<ReadOutput>, ClientError>
Retrieve batches of records from a stream continuously.
Examples found in repository?
examples/consumer.rs (line 18)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let start_seq_num = 0;
17 let read_session_request = ReadSessionRequest::new(ReadStart::SeqNum(start_seq_num));
18 let mut read_stream = stream_client.read_session(read_session_request).await?;
19
20 loop {
21 select! {
22 next_batch = read_stream.next() => {
23 let Some(next_batch) = next_batch else { break };
24 let next_batch = next_batch?;
25 println!("{next_batch:?}");
26 }
27 _ = tokio::signal::ctrl_c() => break,
28 }
29 }
30
31 Ok(())
32}
Sourcepub async fn append(&self, req: AppendInput) -> Result<AppendAck, ClientError>
pub async fn append(&self, req: AppendInput) -> Result<AppendAck, ClientError>
Append a batch of records to a stream.
Examples found in repository?
examples/explicit_trim.rs (line 26)
7async fn main() -> Result<(), Box<dyn std::error::Error>> {
8 let token = std::env::var("S2_ACCESS_TOKEN")?;
9 let config = ClientConfig::new(token);
10 let basin: BasinName = "my-favorite-basin".parse()?;
11 let stream = "my-favorite-stream";
12 let stream_client = StreamClient::new(config, basin, stream);
13
14 let tail = stream_client.check_tail().await?;
15 if tail.seq_num == 0 {
16 println!("Empty stream");
17 return Ok(());
18 }
19
20 let latest_seq_num = tail.seq_num - 1;
21 let trim_request = CommandRecord::trim(latest_seq_num);
22
23 let append_record_batch = AppendRecordBatch::try_from_iter([trim_request])
24 .expect("valid batch with 1 command record");
25 let append_input = AppendInput::new(append_record_batch);
26 let _ = stream_client.append(append_input).await?;
27
28 println!("Trim requested");
29
30 Ok(())
31}
More examples
examples/producer.rs (line 23)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}
Sourcepub async fn append_session<S>(
&self,
req: S,
) -> Result<Streaming<AppendAck>, ClientError>
pub async fn append_session<S>( &self, req: S, ) -> Result<Streaming<AppendAck>, ClientError>
Append batches of records to a stream continuously, while guaranteeing pipelined requests are processed in order. If any request fails, the session is terminated.
Examples found in repository?
examples/producer.rs (line 40)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}
Trait Implementations§
Source§impl Clone for StreamClient
impl Clone for StreamClient
Source§fn clone(&self) -> StreamClient
fn clone(&self) -> StreamClient
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreAuto Trait Implementations§
impl !Freeze for StreamClient
impl !RefUnwindSafe for StreamClient
impl Send for StreamClient
impl Sync for StreamClient
impl Unpin for StreamClient
impl !UnwindSafe for StreamClient
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a tonic::Request