pub struct AppendRecordsBatchingOpts { /* private fields */ }๐Deprecated since 0.21.1: This crate has been renamed to
s2-sdk. Please update your Cargo.toml to use s2-sdk instead.Expand description
Options to configure batching scheme for AppendRecordsBatchingStream.
Implementationsยง
Sourceยงimpl AppendRecordsBatchingOpts
impl AppendRecordsBatchingOpts
Sourcepub fn new() -> Self
๐Deprecated since 0.21.1: This crate has been renamed to s2-sdk. Please update your Cargo.toml to use s2-sdk instead.
pub fn new() -> Self
s2-sdk. Please update your Cargo.toml to use s2-sdk instead.Construct an options struct with defaults.
Examples found in repository?
examples/producer.rs (line 33)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}Sourcepub fn with_max_batch_records(self, max_batch_records: usize) -> Self
๐Deprecated since 0.21.1: This crate has been renamed to s2-sdk. Please update your Cargo.toml to use s2-sdk instead.
pub fn with_max_batch_records(self, max_batch_records: usize) -> Self
s2-sdk. Please update your Cargo.toml to use s2-sdk instead.Maximum number of records in a batch.
Sourcepub fn with_match_seq_num(self, match_seq_num: Option<u64>) -> Self
๐Deprecated since 0.21.1: This crate has been renamed to s2-sdk. Please update your Cargo.toml to use s2-sdk instead.
pub fn with_match_seq_num(self, match_seq_num: Option<u64>) -> Self
s2-sdk. Please update your Cargo.toml to use s2-sdk instead.Enforce that the sequence number issued to the first record matches.
This is incremented automatically for each batch.
Examples found in repository?
examples/producer.rs (line 35)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}Sourcepub fn with_fencing_token(self, fencing_token: Option<FencingToken>) -> Self
๐Deprecated since 0.21.1: This crate has been renamed to s2-sdk. Please update your Cargo.toml to use s2-sdk instead.
pub fn with_fencing_token(self, fencing_token: Option<FencingToken>) -> Self
s2-sdk. Please update your Cargo.toml to use s2-sdk instead.Enforce a fencing token.
Examples found in repository?
examples/producer.rs (line 34)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}Sourcepub fn with_linger(self, linger_duration: impl Into<Duration>) -> Self
๐Deprecated since 0.21.1: This crate has been renamed to s2-sdk. Please update your Cargo.toml to use s2-sdk instead.
pub fn with_linger(self, linger_duration: impl Into<Duration>) -> Self
s2-sdk. Please update your Cargo.toml to use s2-sdk instead.Linger duration for records before flushing.
A linger duration of 5ms is set by default. Set to Duration::ZERO
to disable.
Trait Implementationsยง
Sourceยงimpl Clone for AppendRecordsBatchingOpts
impl Clone for AppendRecordsBatchingOpts
Sourceยงfn clone(&self) -> AppendRecordsBatchingOpts
fn clone(&self) -> AppendRecordsBatchingOpts
Returns a duplicate of the value. Read more
1.0.0 ยท Sourceยงfn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSourceยงimpl Debug for AppendRecordsBatchingOpts
impl Debug for AppendRecordsBatchingOpts
Auto Trait Implementationsยง
impl Freeze for AppendRecordsBatchingOpts
impl RefUnwindSafe for AppendRecordsBatchingOpts
impl Send for AppendRecordsBatchingOpts
impl Sync for AppendRecordsBatchingOpts
impl Unpin for AppendRecordsBatchingOpts
impl UnwindSafe for AppendRecordsBatchingOpts
Blanket Implementationsยง
Sourceยงimpl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Sourceยงfn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Sourceยงimpl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Sourceยงimpl<T> Instrument for T
impl<T> Instrument for T
Sourceยงfn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Sourceยงfn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Sourceยงimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Sourceยงfn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request