databricks_zerobus_ingest_sdk/stream_configuration.rs
1use std::sync::Arc;
2
3use crate::callbacks::AckCallback;
4use crate::databricks::zerobus::RecordType;
5use crate::stream_options::defaults;
6
7/// Configuration options for stream creation, recovery of broken streams and flushing.
8///
9/// These options control the behavior of ingestion streams, including memory limits,
10/// recovery policies, and timeout settings.
11///
12/// # Examples
13///
14/// ```
15/// use databricks_zerobus_ingest_sdk::StreamConfigurationOptions;
16///
17/// let options = StreamConfigurationOptions {
18/// max_inflight_requests: 1_000_000,
19/// recovery: true,
20/// recovery_timeout_ms: 20_000,
21/// recovery_retries: 5,
22/// ..Default::default()
23/// };
24/// ```
25#[derive(Clone)]
26pub struct StreamConfigurationOptions {
27 /// Maximum number of requests that can be sending or pending acknowledgement at any given time.
28 ///
29 /// This limit controls memory usage and backpressure. When this limit is reached,
30 /// `ingest_record()` and `ingest_records()` calls will block until acknowledgments free up space.
31 ///
32 /// Default: 1,000,000
33 pub max_inflight_requests: usize,
34
35 /// Whether to enable automatic stream recovery on failure.
36 ///
37 /// When enabled, the SDK will automatically attempt to reconnect and recover
38 /// the stream when encountering retryable errors.
39 ///
40 /// Default: `true`
41 pub recovery: bool,
42
43 /// Timeout in milliseconds for each stream recovery attempt.
44 ///
45 /// If a recovery attempt takes longer than this, it will be retried.
46 ///
47 /// Default: 15,000 (15 seconds)
48 pub recovery_timeout_ms: u64,
49
50 /// Backoff time in milliseconds between stream recovery retry attempts.
51 ///
52 /// The SDK will wait this duration before attempting another recovery after a failure.
53 ///
54 /// Default: 2,000 (2 seconds)
55 pub recovery_backoff_ms: u64,
56
57 /// Maximum number of recovery retry attempts before giving up.
58 ///
59 /// After this many failed attempts, the stream will close and return an error.
60 ///
61 /// Default: 4
62 pub recovery_retries: u32,
63
64 /// Timeout in milliseconds for waiting for server acknowledgements.
65 ///
66 /// If no acknowledgement is received within this time (and there are pending records),
67 /// the stream will be considered failed and recovery will be triggered.
68 ///
69 /// Default: 60,000 (60 seconds)
70 pub server_lack_of_ack_timeout_ms: u64,
71
72 /// Timeout in milliseconds for flush operations.
73 ///
74 /// If a flush() call cannot complete within this time, it will return a timeout error.
75 ///
76 /// Default: 300,000 (5 minutes)
77 pub flush_timeout_ms: u64,
78
79 /// Type of record to ingest.
80 ///
81 /// Supported values:
82 /// - RecordType::Proto
83 /// - RecordType::Json
84 /// - RecordType::Unspecified
85 ///
86 /// Default: RecordType::Proto
87 pub record_type: RecordType,
88
89 /// Maximum time in milliseconds to wait during graceful stream close.
90 ///
91 /// When the server sends a CloseStreamSignal indicating it will close the stream,
92 /// the SDK can enter a "paused" state where it:
93 /// - Continues accepting and buffering new ingest_record() calls
94 /// - Stops sending buffered records to the server
95 /// - Continues processing acknowledgments for in-flight records
96 /// - Waits for either all in-flight records to be acknowledged or the timeout to expire
97 ///
98 /// Configuration values:
99 /// - `None`: Wait for the full server-specified duration (most graceful)
100 /// - `Some(0)`: Immediate recovery, close stream right away (current behavior)
101 /// - `Some(x)`: Wait up to min(x, server_duration) milliseconds
102 ///
103 /// Default: `None` (wait for full server duration)
104 pub stream_paused_max_wait_time_ms: Option<u64>,
105
106 /// Optional callback invoked when records are acknowledged or encounter errors.
107 ///
108 /// When set, this callback will be invoked:
109 /// - On successful acknowledgment: `on_ack(offset_id)` is called
110 /// - On error: `on_error(offset_id, error_message)` is called
111 ///
112 ///
113 /// Default: `None` (no callbacks)
114 ///
115 /// # Examples
116 ///
117 /// ```
118 /// use std::sync::Arc;
119 /// use databricks_zerobus_ingest_sdk::{AckCallback, StreamConfigurationOptions, OffsetId};
120 ///
121 /// struct MyCallback;
122 ///
123 /// impl AckCallback for MyCallback {
124 /// fn on_ack(&self, offset_id: OffsetId) {
125 /// println!("Acknowledged: {}", offset_id);
126 /// }
127 ///
128 /// fn on_error(&self, offset_id: OffsetId, error_message: &str) {
129 /// eprintln!("Error {}: {}", offset_id, error_message);
130 /// }
131 /// }
132 ///
133 /// let options = StreamConfigurationOptions {
134 /// ack_callback: Some(Arc::new(MyCallback)),
135 /// ..Default::default()
136 /// };
137 /// ```
138 pub ack_callback: Option<Arc<dyn AckCallback>>,
139
140 /// Maximum time in milliseconds to wait for callbacks to finish after calling close() on the stream.
141 ///
142 /// When the stream is closed, all tasks are shut down and the callback handler task is
143 /// given a timeout to finish processing callbacks. After the timeout expires, or once all
144 /// callbacks have been processed, the callback handler task is aborted and the stream is
145 /// fully closed.
146 ///
147 /// Configuration values:
148 /// - `None`: Wait forever
149 /// - `Some(x)`: Wait up to x milliseconds
150 ///
151 /// Default: `Some(5000)` (wait 5 seconds)
152 pub callback_max_wait_time_ms: Option<u64>,
153}
154
155impl Default for StreamConfigurationOptions {
156 fn default() -> Self {
157 Self {
158 max_inflight_requests: 1_000_000,
159 recovery: defaults::RECOVERY,
160 recovery_timeout_ms: defaults::RECOVERY_TIMEOUT_MS,
161 recovery_backoff_ms: defaults::RECOVERY_BACKOFF_MS,
162 recovery_retries: defaults::RECOVERY_RETRIES,
163 server_lack_of_ack_timeout_ms: defaults::SERVER_LACK_OF_ACK_TIMEOUT_MS,
164 flush_timeout_ms: defaults::FLUSH_TIMEOUT_MS,
165 record_type: RecordType::Proto,
166 stream_paused_max_wait_time_ms: None,
167 ack_callback: None,
168 callback_max_wait_time_ms: Some(defaults::CALLBACK_MAX_WAIT_TIME_MS),
169 }
170 }
171}