pub struct AppendRecordsBatchingOpts { /* private fields */ }Expand description
Options to configure batching scheme for AppendRecordsBatchingStream.
Implementations§
Source§impl AppendRecordsBatchingOpts
impl AppendRecordsBatchingOpts
Sourcepub fn new() -> Self
pub fn new() -> Self
Construct an options struct with defaults.
Examples found in repository?
examples/producer.rs (line 33)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}Sourcepub fn with_max_batch_records(self, max_batch_records: usize) -> Self
pub fn with_max_batch_records(self, max_batch_records: usize) -> Self
Maximum number of records in a batch.
Sourcepub fn with_match_seq_num(self, match_seq_num: Option<u64>) -> Self
pub fn with_match_seq_num(self, match_seq_num: Option<u64>) -> Self
Enforce that the sequence number issued to the first record matches.
This is incremented automatically for each batch.
Examples found in repository?
examples/producer.rs (line 35)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}Sourcepub fn with_fencing_token(self, fencing_token: Option<FencingToken>) -> Self
pub fn with_fencing_token(self, fencing_token: Option<FencingToken>) -> Self
Enforce a fencing token.
Examples found in repository?
examples/producer.rs (line 34)
9async fn main() -> Result<(), Box<dyn std::error::Error>> {
10 let token = std::env::var("S2_ACCESS_TOKEN")?;
11 let config = ClientConfig::new(token);
12 let basin: BasinName = "my-favorite-basin".parse()?;
13 let stream = "my-favorite-stream";
14 let stream_client = StreamClient::new(config, basin, stream);
15
16 let fencing_token = FencingToken::generate(32).expect("valid fencing token");
17
18 // Set the fencing token.
19 let fencing_token_record: AppendRecord = CommandRecord::fence(fencing_token.clone()).into();
20 let fencing_token_batch = AppendRecordBatch::try_from_iter([fencing_token_record])
21 .expect("valid batch with 1 append record");
22 let fencing_token_append_input = AppendInput::new(fencing_token_batch);
23 let set_fencing_token = stream_client.append(fencing_token_append_input).await?;
24
25 let match_seq_num = set_fencing_token.tail.seq_num;
26
27 // Stream of records
28 let append_stream = futures::stream::iter([
29 AppendRecord::new("record_1")?,
30 AppendRecord::new("record_2")?,
31 ]);
32
33 let append_records_batching_opts = AppendRecordsBatchingOpts::new()
34 .with_fencing_token(Some(fencing_token))
35 .with_match_seq_num(Some(match_seq_num));
36
37 let append_session_request =
38 AppendRecordsBatchingStream::new(append_stream, append_records_batching_opts);
39
40 let mut append_session_stream = stream_client.append_session(append_session_request).await?;
41
42 while let Some(next) = append_session_stream.next().await {
43 let next = next?;
44 println!("{next:#?}");
45 }
46
47 Ok(())
48}Sourcepub fn with_linger(self, linger_duration: impl Into<Duration>) -> Self
pub fn with_linger(self, linger_duration: impl Into<Duration>) -> Self
Linger duration for records before flushing.
A linger duration of 5ms is set by default. Set to Duration::ZERO
to disable.
Trait Implementations§
Source§impl Clone for AppendRecordsBatchingOpts
impl Clone for AppendRecordsBatchingOpts
Source§fn clone(&self) -> AppendRecordsBatchingOpts
fn clone(&self) -> AppendRecordsBatchingOpts
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl Debug for AppendRecordsBatchingOpts
impl Debug for AppendRecordsBatchingOpts
Auto Trait Implementations§
impl Freeze for AppendRecordsBatchingOpts
impl RefUnwindSafe for AppendRecordsBatchingOpts
impl Send for AppendRecordsBatchingOpts
impl Sync for AppendRecordsBatchingOpts
impl Unpin for AppendRecordsBatchingOpts
impl UnwindSafe for AppendRecordsBatchingOpts
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request