pub struct StreamConfigurationOptions {
pub max_inflight_requests: usize,
pub recovery: bool,
pub recovery_timeout_ms: u64,
pub recovery_backoff_ms: u64,
pub recovery_retries: u32,
pub server_lack_of_ack_timeout_ms: u64,
pub flush_timeout_ms: u64,
pub record_type: RecordType,
pub stream_paused_max_wait_time_ms: Option<u64>,
pub ack_callback: Option<Arc<dyn AckCallback>>,
pub callback_max_wait_time_ms: Option<u64>,
}Expand description
Configuration options for stream creation, recovery of broken streams and flushing.
These options control the behavior of ingestion streams, including memory limits, recovery policies, and timeout settings.
§Examples
use databricks_zerobus_ingest_sdk::StreamConfigurationOptions;
let options = StreamConfigurationOptions {
max_inflight_requests: 1_000_000,
recovery: true,
recovery_timeout_ms: 20_000,
recovery_retries: 5,
..Default::default()
};Fields§
§max_inflight_requests: usizeMaximum number of requests that can be sending or pending acknowledgement at any given time.
This limit controls memory usage and backpressure. When this limit is reached,
ingest_record() and ingest_records() calls will block until acknowledgments free up space.
Default: 1,000,000
recovery: boolWhether to enable automatic stream recovery on failure.
When enabled, the SDK will automatically attempt to reconnect and recover the stream when encountering retryable errors.
Default: true
recovery_timeout_ms: u64Timeout in milliseconds for each stream recovery attempt.
If a recovery attempt takes longer than this, it will be retried.
Default: 15,000 (15 seconds)
recovery_backoff_ms: u64Backoff time in milliseconds between stream recovery retry attempts.
The SDK will wait this duration before attempting another recovery after a failure.
Default: 2,000 (2 seconds)
recovery_retries: u32Maximum number of recovery retry attempts before giving up.
After this many failed attempts, the stream will close and return an error.
Default: 4
server_lack_of_ack_timeout_ms: u64Timeout in milliseconds for waiting for server acknowledgements.
If no acknowledgement is received within this time (and there are pending records), the stream will be considered failed and recovery will be triggered.
Default: 60,000 (60 seconds)
flush_timeout_ms: u64Timeout in milliseconds for flush operations.
If a flush() call cannot complete within this time, it will return a timeout error.
Default: 300,000 (5 minutes)
record_type: RecordTypeType of record to ingest.
Supported values:
- RecordType::Proto
- RecordType::Json
- RecordType::Unspecified
Default: RecordType::Proto
stream_paused_max_wait_time_ms: Option<u64>Maximum time in milliseconds to wait during graceful stream close.
When the server sends a CloseStreamSignal indicating it will close the stream, the SDK can enter a “paused” state where it:
- Continues accepting and buffering new ingest_record() calls
- Stops sending buffered records to the server
- Continues processing acknowledgments for in-flight records
- Waits for either all in-flight records to be acknowledged or the timeout to expire
Configuration values:
None: Wait for the full server-specified duration (most graceful)Some(0): Immediate recovery, close stream right away (current behavior)Some(x): Wait up to min(x, server_duration) milliseconds
Default: None (wait for full server duration)
ack_callback: Option<Arc<dyn AckCallback>>Optional callback invoked when records are acknowledged or encounter errors.
When set, this callback will be invoked:
- On successful acknowledgment:
on_ack(offset_id)is called - On error:
on_error(offset_id, error_message)is called
Default: None (no callbacks)
§Examples
use std::sync::Arc;
use databricks_zerobus_ingest_sdk::{AckCallback, StreamConfigurationOptions, OffsetId};
struct MyCallback;
impl AckCallback for MyCallback {
fn on_ack(&self, offset_id: OffsetId) {
println!("Acknowledged: {}", offset_id);
}
fn on_error(&self, offset_id: OffsetId, error_message: &str) {
eprintln!("Error {}: {}", offset_id, error_message);
}
}
let options = StreamConfigurationOptions {
ack_callback: Some(Arc::new(MyCallback)),
..Default::default()
};callback_max_wait_time_ms: Option<u64>Maximum time in milliseconds to wait for callbacks to finish after calling close() on the stream.
When the stream is closed, all tasks are shut down and the callback handler task is given a timeout to finish processing callbacks. After the timeout expires, or once all callbacks have been processed, the callback handler task is aborted and the stream is fully closed.
Configuration values:
None: Wait foreverSome(x): Wait up to x milliseconds
Default: Some(5000) (wait 5 seconds)
Trait Implementations§
Source§impl Clone for StreamConfigurationOptions
impl Clone for StreamConfigurationOptions
Source§fn clone(&self) -> StreamConfigurationOptions
fn clone(&self) -> StreamConfigurationOptions
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for StreamConfigurationOptions
impl !RefUnwindSafe for StreamConfigurationOptions
impl Send for StreamConfigurationOptions
impl Sync for StreamConfigurationOptions
impl Unpin for StreamConfigurationOptions
impl UnsafeUnpin for StreamConfigurationOptions
impl !UnwindSafe for StreamConfigurationOptions
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request