Skip to main content

databricks_zerobus_ingest_sdk/
stream_configuration.rs

1use std::sync::Arc;
2
3use crate::callbacks::AckCallback;
4use crate::databricks::zerobus::RecordType;
5use crate::stream_options::defaults;
6
7/// Configuration options for stream creation, recovery of broken streams and flushing.
8///
9/// These options control the behavior of ingestion streams, including memory limits,
10/// recovery policies, and timeout settings.
11///
12/// # Examples
13///
14/// ```
15/// use databricks_zerobus_ingest_sdk::StreamConfigurationOptions;
16///
17/// let options = StreamConfigurationOptions {
18///     max_inflight_requests: 1_000_000,
19///     recovery: true,
20///     recovery_timeout_ms: 20_000,
21///     recovery_retries: 5,
22///     ..Default::default()
23/// };
24/// ```
25#[derive(Clone)]
26pub struct StreamConfigurationOptions {
27    /// Maximum number of requests that can be sending or pending acknowledgement at any given time.
28    ///
29    /// This limit controls memory usage and backpressure. When this limit is reached,
30    /// `ingest_record()` and `ingest_records()` calls will block until acknowledgments free up space.
31    ///
32    /// Default: 1,000,000
33    pub max_inflight_requests: usize,
34
35    /// Whether to enable automatic stream recovery on failure.
36    ///
37    /// When enabled, the SDK will automatically attempt to reconnect and recover
38    /// the stream when encountering retryable errors.
39    ///
40    /// Default: `true`
41    pub recovery: bool,
42
43    /// Timeout in milliseconds for each stream recovery attempt.
44    ///
45    /// If a recovery attempt takes longer than this, it will be retried.
46    ///
47    /// Default: 15,000 (15 seconds)
48    pub recovery_timeout_ms: u64,
49
50    /// Backoff time in milliseconds between stream recovery retry attempts.
51    ///
52    /// The SDK will wait this duration before attempting another recovery after a failure.
53    ///
54    /// Default: 2,000 (2 seconds)
55    pub recovery_backoff_ms: u64,
56
57    /// Maximum number of recovery retry attempts before giving up.
58    ///
59    /// After this many failed attempts, the stream will close and return an error.
60    ///
61    /// Default: 4
62    pub recovery_retries: u32,
63
64    /// Timeout in milliseconds for waiting for server acknowledgements.
65    ///
66    /// If no acknowledgement is received within this time (and there are pending records),
67    /// the stream will be considered failed and recovery will be triggered.
68    ///
69    /// Default: 60,000 (60 seconds)
70    pub server_lack_of_ack_timeout_ms: u64,
71
72    /// Timeout in milliseconds for flush operations.
73    ///
74    /// If a flush() call cannot complete within this time, it will return a timeout error.
75    ///
76    /// Default: 300,000 (5 minutes)
77    pub flush_timeout_ms: u64,
78
79    /// Type of record to ingest.
80    ///
81    /// Supported values:
82    /// - RecordType::Proto
83    /// - RecordType::Json
84    /// - RecordType::Unspecified
85    ///
86    /// Default: RecordType::Proto
87    pub record_type: RecordType,
88
89    /// Maximum time in milliseconds to wait during graceful stream close.
90    ///
91    /// When the server sends a CloseStreamSignal indicating it will close the stream,
92    /// the SDK can enter a "paused" state where it:
93    /// - Continues accepting and buffering new ingest_record() calls
94    /// - Stops sending buffered records to the server
95    /// - Continues processing acknowledgments for in-flight records
96    /// - Waits for either all in-flight records to be acknowledged or the timeout to expire
97    ///
98    /// Configuration values:
99    /// - `None`: Wait for the full server-specified duration (most graceful)
100    /// - `Some(0)`: Immediate recovery, close stream right away (current behavior)
101    /// - `Some(x)`: Wait up to min(x, server_duration) milliseconds
102    ///
103    /// Default: `None` (wait for full server duration)
104    pub stream_paused_max_wait_time_ms: Option<u64>,
105
106    /// Optional callback invoked when records are acknowledged or encounter errors.
107    ///
108    /// When set, this callback will be invoked:
109    /// - On successful acknowledgment: `on_ack(offset_id)` is called
110    /// - On error: `on_error(offset_id, error_message)` is called
111    ///
112    ///
113    /// Default: `None` (no callbacks)
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use std::sync::Arc;
119    /// use databricks_zerobus_ingest_sdk::{AckCallback, StreamConfigurationOptions, OffsetId};
120    ///
121    /// struct MyCallback;
122    ///
123    /// impl AckCallback for MyCallback {
124    ///     fn on_ack(&self, offset_id: OffsetId) {
125    ///         println!("Acknowledged: {}", offset_id);
126    ///     }
127    ///
128    ///     fn on_error(&self, offset_id: OffsetId, error_message: &str) {
129    ///         eprintln!("Error {}: {}", offset_id, error_message);
130    ///     }
131    /// }
132    ///
133    /// let options = StreamConfigurationOptions {
134    ///     ack_callback: Some(Arc::new(MyCallback)),
135    ///     ..Default::default()
136    /// };
137    /// ```
138    pub ack_callback: Option<Arc<dyn AckCallback>>,
139
140    /// Maximum time in milliseconds to wait for callbacks to finish after calling close() on the stream.
141    ///
142    /// When the stream is closed, all tasks are shut down and the callback handler task is
143    /// given a timeout to finish processing callbacks. After the timeout expires, or once all
144    /// callbacks have been processed, the callback handler task is aborted and the stream is
145    /// fully closed.
146    ///
147    /// Configuration values:
148    /// - `None`: Wait forever
149    /// - `Some(x)`: Wait up to x milliseconds
150    ///
151    /// Default: `Some(5000)` (wait 5 seconds)
152    pub callback_max_wait_time_ms: Option<u64>,
153}
154
155impl Default for StreamConfigurationOptions {
156    fn default() -> Self {
157        Self {
158            max_inflight_requests: 1_000_000,
159            recovery: defaults::RECOVERY,
160            recovery_timeout_ms: defaults::RECOVERY_TIMEOUT_MS,
161            recovery_backoff_ms: defaults::RECOVERY_BACKOFF_MS,
162            recovery_retries: defaults::RECOVERY_RETRIES,
163            server_lack_of_ack_timeout_ms: defaults::SERVER_LACK_OF_ACK_TIMEOUT_MS,
164            flush_timeout_ms: defaults::FLUSH_TIMEOUT_MS,
165            record_type: RecordType::Proto,
166            stream_paused_max_wait_time_ms: None,
167            ack_callback: None,
168            callback_max_wait_time_ms: Some(defaults::CALLBACK_MAX_WAIT_TIME_MS),
169        }
170    }
171}