Skip to main content

StreamConfigurationOptions

Struct StreamConfigurationOptions 

Source
pub struct StreamConfigurationOptions {
    pub max_inflight_requests: usize,
    pub recovery: bool,
    pub recovery_timeout_ms: u64,
    pub recovery_backoff_ms: u64,
    pub recovery_retries: u32,
    pub server_lack_of_ack_timeout_ms: u64,
    pub flush_timeout_ms: u64,
    pub record_type: RecordType,
    pub stream_paused_max_wait_time_ms: Option<u64>,
    pub ack_callback: Option<Arc<dyn AckCallback>>,
    pub callback_max_wait_time_ms: Option<u64>,
}
Expand description

Configuration options for stream creation, recovery of broken streams and flushing.

These options control the behavior of ingestion streams, including memory limits, recovery policies, and timeout settings.

§Examples

use databricks_zerobus_ingest_sdk::StreamConfigurationOptions;

let options = StreamConfigurationOptions {
    max_inflight_requests: 1_000_000,
    recovery: true,
    recovery_timeout_ms: 20_000,
    recovery_retries: 5,
    ..Default::default()
};

Fields§

§max_inflight_requests: usize

Maximum number of requests that can be sending or pending acknowledgement at any given time.

This limit controls memory usage and backpressure. When this limit is reached, ingest_record() and ingest_records() calls will block until acknowledgments free up space.

Default: 1,000,000

§recovery: bool

Whether to enable automatic stream recovery on failure.

When enabled, the SDK will automatically attempt to reconnect and recover the stream when encountering retryable errors.

Default: true

§recovery_timeout_ms: u64

Timeout in milliseconds for each stream recovery attempt.

If a recovery attempt takes longer than this, it will be retried.

Default: 15,000 (15 seconds)

§recovery_backoff_ms: u64

Backoff time in milliseconds between stream recovery retry attempts.

The SDK will wait this duration before attempting another recovery after a failure.

Default: 2,000 (2 seconds)

§recovery_retries: u32

Maximum number of recovery retry attempts before giving up.

After this many failed attempts, the stream will close and return an error.

Default: 4

§server_lack_of_ack_timeout_ms: u64

Timeout in milliseconds for waiting for server acknowledgements.

If no acknowledgement is received within this time (and there are pending records), the stream will be considered failed and recovery will be triggered.

Default: 60,000 (60 seconds)

§flush_timeout_ms: u64

Timeout in milliseconds for flush operations.

If a flush() call cannot complete within this time, it will return a timeout error.

Default: 300,000 (5 minutes)

§record_type: RecordType

Type of record to ingest.

Supported values:

  • RecordType::Proto
  • RecordType::Json
  • RecordType::Unspecified

Default: RecordType::Proto

§stream_paused_max_wait_time_ms: Option<u64>

Maximum time in milliseconds to wait during graceful stream close.

When the server sends a CloseStreamSignal indicating it will close the stream, the SDK can enter a “paused” state where it:

  • Continues accepting and buffering new ingest_record() calls
  • Stops sending buffered records to the server
  • Continues processing acknowledgments for in-flight records
  • Waits for either all in-flight records to be acknowledged or the timeout to expire

Configuration values:

  • None: Wait for the full server-specified duration (most graceful)
  • Some(0): Immediate recovery, close stream right away (current behavior)
  • Some(x): Wait up to min(x, server_duration) milliseconds

Default: None (wait for full server duration)

§ack_callback: Option<Arc<dyn AckCallback>>

Optional callback invoked when records are acknowledged or encounter errors.

When set, this callback will be invoked:

  • On successful acknowledgment: on_ack(offset_id) is called
  • On error: on_error(offset_id, error_message) is called

Default: None (no callbacks)

§Examples

use std::sync::Arc;
use databricks_zerobus_ingest_sdk::{AckCallback, StreamConfigurationOptions, OffsetId};

struct MyCallback;

impl AckCallback for MyCallback {
    fn on_ack(&self, offset_id: OffsetId) {
        println!("Acknowledged: {}", offset_id);
    }

    fn on_error(&self, offset_id: OffsetId, error_message: &str) {
        eprintln!("Error {}: {}", offset_id, error_message);
    }
}

let options = StreamConfigurationOptions {
    ack_callback: Some(Arc::new(MyCallback)),
    ..Default::default()
};
§callback_max_wait_time_ms: Option<u64>

Maximum time in milliseconds to wait for callbacks to finish after calling close() on the stream.

When the stream is closed, all tasks are shut down and the callback handler task is given a timeout to finish processing callbacks. After the timeout expires, or once all callbacks have been processed, the callback handler task is aborted and the stream is fully closed.

Configuration values:

  • None: Wait forever
  • Some(x): Wait up to x milliseconds

Default: Some(5000) (wait 5 seconds)

Trait Implementations§

Source§

impl Clone for StreamConfigurationOptions

Source§

fn clone(&self) -> StreamConfigurationOptions

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Default for StreamConfigurationOptions

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more