databricks_zerobus_ingest_sdk/lib.rs
1//! # Databricks Zerobus Ingest SDK
2//!
3//! A high-performance Rust client for streaming data ingestion into Databricks Delta tables.
4//!
5//! ## Quick Start
6//!
7//! ```rust,ignore
8//! use databricks_zerobus_ingest_sdk::{ZerobusSdk, TableProperties, ProtoMessage};
9//!
10//! let sdk = ZerobusSdk::builder()
11//! .endpoint(zerobus_endpoint)
12//! .unity_catalog_url(uc_endpoint)
13//! .build()?;
14//! let stream = sdk.create_stream(table_properties, client_id, client_secret, None).await?;
15//!
16//! // Ingest a record and wait for acknowledgment
17//! let offset = stream.ingest_record_offset(ProtoMessage(my_message)).await?;
18//! stream.wait_for_offset(offset).await?;
19//!
20//! stream.close().await?;
21//! ```
22//!
23//! See the `examples/` directory for complete working examples.
24
25pub mod databricks {
26 pub mod zerobus {
27 include!(concat!(env!("OUT_DIR"), "/databricks.zerobus.rs"));
28 }
29}
30
31#[cfg(feature = "arrow-flight")]
32mod arrow_configuration;
33#[cfg(feature = "arrow-flight")]
34mod arrow_metadata;
35#[cfg(feature = "arrow-flight")]
36mod arrow_stream;
37mod builder;
38mod callbacks;
39mod default_token_factory;
40mod errors;
41mod headers_provider;
42mod landing_zone;
43mod offset_generator;
44mod proxy;
45mod record_types;
46mod stream_configuration;
47mod stream_options;
48mod tls_config;
49
50use std::collections::HashMap;
51use std::fmt::Debug;
52use std::future::Future;
53use std::sync::atomic::{AtomicBool, Ordering};
54use std::sync::Arc;
55
56use prost::Message;
57use tokio::sync::RwLock;
58use tokio::time::Duration;
59use tokio_retry::strategy::FixedInterval;
60use tokio_retry::RetryIf;
61use tokio_stream::wrappers::ReceiverStream;
62use tokio_util::sync::CancellationToken;
63use tonic::metadata::MetadataValue;
64use tonic::transport::{Channel, Endpoint};
65use tracing::{debug, error, info, instrument, span, warn, Level};
66
67use databricks::zerobus::ephemeral_stream_request::Payload as RequestPayload;
68use databricks::zerobus::ephemeral_stream_response::Payload as ResponsePayload;
69use databricks::zerobus::zerobus_client::ZerobusClient;
70use databricks::zerobus::{
71 CloseStreamSignal, CreateIngestStreamRequest, EphemeralStreamRequest, EphemeralStreamResponse,
72 IngestRecordResponse, RecordType,
73};
74use landing_zone::LandingZone;
75
76/// **Experimental/Unsupported**: Arrow Flight ingestion is experimental and not yet
77/// supported for production use. The API may change in future releases.
78#[cfg(feature = "arrow-flight")]
79pub use arrow_configuration::ArrowStreamConfigurationOptions;
80#[cfg(feature = "arrow-flight")]
81pub use arrow_stream::{
82 ArrowSchema, ArrowTableProperties, DataType, Field, RecordBatch, ZerobusArrowStream,
83};
84pub use builder::ZerobusSdkBuilder;
85pub use callbacks::AckCallback;
86pub use default_token_factory::DefaultTokenFactory;
87pub use errors::ZerobusError;
88pub use headers_provider::{HeadersProvider, OAuthHeadersProvider, DEFAULT_X_ZEROBUS_SDK};
89pub use offset_generator::{OffsetId, OffsetIdGenerator};
90pub use record_types::{
91 EncodedBatch, EncodedBatchIter, EncodedRecord, JsonEncodedRecord, JsonString, JsonValue,
92 ProtoBytes, ProtoEncodedRecord, ProtoMessage,
93};
94pub use stream_configuration::StreamConfigurationOptions;
95#[cfg(feature = "testing")]
96pub use tls_config::NoTlsConfig;
97pub use tls_config::{SecureTlsConfig, TlsConfig};
98
99const SHUTDOWN_TIMEOUT_SECS: u64 = 2;
100
101/// The type of the stream connection created with the server.
102/// Currently we only support ephemeral streams on the server side, so we support only that in the SDK as well.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum StreamType {
105 /// Ephemeral streams exist only for the duration of the connection.
106 /// They are not persisted and are not recoverable.
107 Ephemeral,
108 /// UNSUPPORTED: Persistent streams are durable and recoverable.
109 Persistent,
110}
111
112/// The properties of the table to ingest to.
113///
114/// Used when creating streams via `ZerobusSdk::create_stream()` to specify
115/// which table to write to and the schema of records being ingested.
116///
117/// # Common errors:
118/// -`InvalidTableName`: table_name contains invalid characters or doesn't exist
119/// -`PermissionDenied`: insufficient permissions to write to the specified table
120/// -`InvalidArgument`: invalid or missing descriptor_proto or auth token
121#[derive(Debug, Clone)]
122pub struct TableProperties {
123 pub table_name: String,
124 pub descriptor_proto: Option<prost_types::DescriptorProto>,
125}
126
127pub type ZerobusResult<T> = Result<T, ZerobusError>;
128
129#[derive(Debug, Clone)]
130struct IngestRequest {
131 payload: EncodedBatch,
132 offset_id: OffsetId,
133}
134
135/// Map of logical offset to oneshot sender used to send acknowledgments back to the client.
136type OneshotMap = HashMap<OffsetId, tokio::sync::oneshot::Sender<ZerobusResult<OffsetId>>>;
137/// Landing zone for ingest records.
138type RecordLandingZone = Arc<LandingZone<Box<IngestRequest>>>;
139
140/// Messages sent to the callback handler task.
141#[derive(Debug, Clone)]
142enum CallbackMessage {
143 /// Acknowledgment callback with logical offset ID.
144 Ack(OffsetId),
145 /// Error callback with logical offset ID and error message.
146 Error(OffsetId, String),
147}
148
149/// Represents an active ingestion stream to a Databricks Delta table.
150///
151/// A `ZerobusStream` manages a bidirectional gRPC stream for ingesting records into
152/// a Unity Catalog table. It handles authentication, automatic recovery, acknowledgment
153/// tracking, and graceful shutdown.
154///
155/// # Lifecycle
156///
157/// 1. Create a stream via `ZerobusSdk::create_stream()`
158/// 2. Ingest records with `ingest_record()` and await acknowledgments
159/// 3. Optionally call `flush()` to ensure all records are persisted
160/// 4. Close the stream with `close()` to release resources
161///
162/// # Examples
163///
164/// ```no_run
165/// # use databricks_zerobus_ingest_sdk::*;
166/// # async fn example(mut stream: ZerobusStream, data: Vec<u8>) -> Result<(), ZerobusError> {
167/// // Ingest a single record
168/// let offset = stream.ingest_record_offset(data).await?;
169/// println!("Record sent with offset: {}", offset);
170///
171/// // Wait for acknowledgment
172/// stream.wait_for_offset(offset).await?;
173/// println!("Record acknowledged at offset: {}", offset);
174///
175/// // Close the stream gracefully
176/// stream.close().await?;
177/// # Ok(())
178/// # }
179/// ```
180pub struct ZerobusStream {
181 /// This is a 128-bit UUID that is unique across all streams in the system,
182 /// not just within a single table. The server returns this ID in the CreateStreamResponse
183 /// after validating the table properties and establishing the gRPC connection.
184 stream_id: Option<String>,
185 /// Type of gRPC stream that is used when sending records.
186 pub stream_type: StreamType,
187 /// Gets headers which are used in the first request to establish connection with the server.
188 pub headers_provider: Arc<dyn HeadersProvider>,
189 /// The stream configuration options related to recovery, fetching OAuth tokens, etc.
190 pub options: StreamConfigurationOptions,
191 /// The table properties - table name and descriptor of the table.
192 pub table_properties: TableProperties,
193 /// Logical landing zone that is used to store records that have been sent by user but not yet sent over the network.
194 landing_zone: RecordLandingZone,
195 /// Map of logical offset to oneshot sender.
196 oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
197 /// Supervisor task that manages the stream lifecycle such as stream creation, recovery, etc.
198 /// It orchestrates the receiver and sender tasks.
199 supervisor_task: tokio::task::JoinHandle<Result<(), ZerobusError>>,
200 /// The generator of logical offset IDs. Used to generate monotonically increasing offset IDs, even if the stream recovers.
201 logical_offset_id_generator: OffsetIdGenerator,
202 /// Signal that the stream is caught up to the given offset.
203 logical_last_received_offset_id_tx: tokio::sync::watch::Sender<Option<OffsetId>>,
204 /// Persistent offset ID receiver to ensure at least one receiver exists, preventing SendError
205 _logical_last_received_offset_id_rx: tokio::sync::watch::Receiver<Option<OffsetId>>,
206 /// A vector of records that have failed to be acknowledged.
207 failed_records: Arc<RwLock<Vec<EncodedBatch>>>,
208 /// Flag indicating if the stream has been closed.
209 is_closed: Arc<AtomicBool>,
210 /// Sync mutex to ensure that offset generation and record ingestion happen atomically.
211 sync_mutex: Arc<tokio::sync::Mutex<()>>,
212 /// Watch channel for last error received from the server.
213 server_error_rx: tokio::sync::watch::Receiver<Option<ZerobusError>>,
214 /// Cancellation token to signal receiver and sender tasks to abort. It is sent either when stream is closed or dropped.
215 cancellation_token: CancellationToken,
216 /// Callback handler task that executes callbacks in a separate thread.
217 callback_handler_task: Option<tokio::task::JoinHandle<()>>,
218}
219
220/// The main interface for interacting with the Zerobus API.
221/// # Examples
222/// ```no_run
223/// # use std::error::Error;
224/// # use std::sync::Arc;
225/// # use databricks_zerobus_ingest_sdk::{ZerobusSdk, StreamConfigurationOptions, TableProperties, ZerobusError, ZerobusResult};
226/// #
227/// # async fn write_single_row(row: impl prost::Message) -> Result<(), ZerobusError> {
228///
229/// // Create SDK using the builder
230/// let sdk = ZerobusSdk::builder()
231/// .endpoint("https://your-workspace.zerobus.region.cloud.databricks.com")
232/// .unity_catalog_url("https://your-workspace.cloud.databricks.com")
233/// .build()?;
234///
235/// // Define the arguments for the ephemeral stream.
236/// let table_properties = TableProperties {
237/// table_name: "test_table".to_string(),
238/// descriptor_proto: Default::default(),
239/// };
240/// let options = StreamConfigurationOptions {
241/// max_inflight_requests: 100,
242/// ..Default::default()
243/// };
244/// let client_id = "your-client-id".to_string();
245/// let client_secret = "your-client-secret".to_string();
246///
247/// // Create a stream
248/// let stream = sdk.create_stream(table_properties, client_id, client_secret, Some(options)).await?;
249///
250/// // Ingest a single record
251/// let offset_id = stream.ingest_record_offset(row.encode_to_vec()).await?;
252/// println!("Record sent with offset Id: {}", offset_id);
253///
254/// // Wait for acknowledgment
255/// stream.wait_for_offset(offset_id).await?;
256/// println!("Record acknowledged with offset Id: {}", offset_id);
257/// # Ok(())
258/// # }
259/// ```
260pub struct ZerobusSdk {
261 pub zerobus_endpoint: String,
262 /// Deprecated: This field is no longer used. TLS is now controlled via `tls_config`.
263 #[deprecated(
264 since = "0.5.0",
265 note = "This field is no longer used. TLS is controlled via tls_config."
266 )]
267 pub use_tls: bool,
268 pub unity_catalog_url: String,
269 shared_channel: tokio::sync::Mutex<Option<ZerobusClient<Channel>>>,
270 workspace_id: String,
271 tls_config: Arc<dyn TlsConfig>,
272}
273
274impl ZerobusSdk {
275 /// Creates a new SDK builder for fluent configuration.
276 ///
277 /// This is the recommended way to create a `ZerobusSdk` instance.
278 ///
279 /// # Examples
280 ///
281 /// ```no_run
282 /// use databricks_zerobus_ingest_sdk::ZerobusSdk;
283 ///
284 /// let sdk = ZerobusSdk::builder()
285 /// .endpoint("https://workspace.zerobus.databricks.com")
286 /// .unity_catalog_url("https://workspace.cloud.databricks.com")
287 /// .build()?;
288 /// # Ok::<(), databricks_zerobus_ingest_sdk::ZerobusError>(())
289 /// ```
290 pub fn builder() -> ZerobusSdkBuilder {
291 ZerobusSdkBuilder::new()
292 }
293
294 /// Creates a new Zerobus SDK instance.
295 ///
296 /// # Deprecated
297 ///
298 /// Use [`ZerobusSdk::builder()`] instead for more flexible configuration:
299 ///
300 /// ```no_run
301 /// use databricks_zerobus_ingest_sdk::ZerobusSdk;
302 ///
303 /// let sdk = ZerobusSdk::builder()
304 /// .endpoint("https://workspace.zerobus.databricks.com")
305 /// .unity_catalog_url("https://workspace.cloud.databricks.com")
306 /// .build()?;
307 /// # Ok::<(), databricks_zerobus_ingest_sdk::ZerobusError>(())
308 /// ```
309 ///
310 /// # Arguments
311 ///
312 /// * `zerobus_endpoint` - The Zerobus API endpoint URL (e.g., "https://workspace-id.cloud.databricks.com")
313 /// * `unity_catalog_url` - The Unity Catalog endpoint URL (e.g., "https://workspace.cloud.databricks.com")
314 ///
315 /// # Returns
316 ///
317 /// A new `ZerobusSdk` instance configured to use TLS.
318 ///
319 /// # Errors
320 ///
321 /// * `ChannelCreationError` - If the workspace ID cannot be extracted from the Zerobus endpoint
322 #[deprecated(since = "0.5.0", note = "Use ZerobusSdk::builder() instead")]
323 #[allow(clippy::result_large_err)]
324 pub fn new(zerobus_endpoint: String, unity_catalog_url: String) -> ZerobusResult<Self> {
325 let zerobus_endpoint = if !zerobus_endpoint.starts_with("https://")
326 && !zerobus_endpoint.starts_with("http://")
327 {
328 format!("https://{}", zerobus_endpoint)
329 } else {
330 zerobus_endpoint
331 };
332
333 let workspace_id = zerobus_endpoint
334 .strip_prefix("https://")
335 .or_else(|| zerobus_endpoint.strip_prefix("http://"))
336 .and_then(|s| s.split('.').next())
337 .map(|s| s.to_string())
338 .ok_or_else(|| {
339 ZerobusError::InvalidArgument(
340 "Failed to extract workspace ID from zerobus_endpoint".to_string(),
341 )
342 })?;
343
344 #[allow(deprecated)]
345 Ok(ZerobusSdk {
346 zerobus_endpoint,
347 use_tls: true,
348 unity_catalog_url,
349 workspace_id,
350 shared_channel: tokio::sync::Mutex::new(None),
351 tls_config: Arc::new(SecureTlsConfig::new()),
352 })
353 }
354
355 /// Creates a new SDK instance with explicit configuration.
356 ///
357 /// This is used internally by the builder pattern.
358 pub(crate) fn new_with_config(
359 zerobus_endpoint: String,
360 unity_catalog_url: String,
361 workspace_id: String,
362 tls_config: Arc<dyn TlsConfig>,
363 ) -> Self {
364 #[allow(deprecated)]
365 ZerobusSdk {
366 zerobus_endpoint,
367 use_tls: true,
368 unity_catalog_url,
369 workspace_id,
370 shared_channel: tokio::sync::Mutex::new(None),
371 tls_config,
372 }
373 }
374
375 /// Creates a new ingestion stream to a Unity Catalog table.
376 ///
377 /// This establishes a bidirectional gRPC stream for ingesting records. Authentication
378 /// is handled automatically using the provided OAuth credentials.
379 ///
380 /// # Arguments
381 ///
382 /// * `table_properties` - Table name and protobuf descriptor
383 /// * `client_id` - OAuth client ID for authentication
384 /// * `client_secret` - OAuth client secret for authentication
385 /// * `options` - Optional stream configuration (uses defaults if `None`)
386 ///
387 /// # Returns
388 ///
389 /// A `ZerobusStream` ready for ingesting records.
390 ///
391 /// # Errors
392 ///
393 /// * `CreateStreamError` - If stream creation fails
394 /// * `InvalidTableName` - If the table name is invalid or table doesn't exist
395 /// * `InvalidUCTokenError` - If OAuth authentication fails
396 /// * `PermissionDenied` - If credentials lack required permissions
397 ///
398 /// # Examples
399 ///
400 /// ```no_run
401 /// # use databricks_zerobus_ingest_sdk::*;
402 /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
403 /// let table_props = TableProperties {
404 /// table_name: "catalog.schema.table".to_string(),
405 /// descriptor_proto: Default::default(), // Load from generated files
406 /// };
407 ///
408 /// let stream = sdk.create_stream(
409 /// table_props,
410 /// "client-id".to_string(),
411 /// "client-secret".to_string(),
412 /// None,
413 /// ).await?;
414 /// # Ok(())
415 /// # }
416 /// ```
417 #[instrument(level = "debug", skip_all)]
418 pub async fn create_stream(
419 &self,
420 table_properties: TableProperties,
421 client_id: String,
422 client_secret: String,
423 options: Option<StreamConfigurationOptions>,
424 ) -> ZerobusResult<ZerobusStream> {
425 let headers_provider = OAuthHeadersProvider::new(
426 client_id,
427 client_secret,
428 table_properties.table_name.clone(),
429 self.workspace_id.clone(),
430 self.unity_catalog_url.clone(),
431 );
432 self.create_stream_with_headers_provider(
433 table_properties,
434 Arc::new(headers_provider),
435 options,
436 )
437 .await
438 }
439
440 /// Creates a new ingestion stream with a custom headers provider.
441 ///
442 /// This is an advanced method that allows you to implement your own authentication
443 /// logic by providing a custom implementation of the `HeadersProvider` trait.
444 ///
445 /// # Arguments
446 ///
447 /// * `table_properties` - Table name and protobuf descriptor
448 /// * `headers_provider` - An `Arc` holding your custom `HeadersProvider` implementation
449 /// * `options` - Optional stream configuration (uses defaults if `None`)
450 ///
451 /// # Returns
452 ///
453 /// A `ZerobusStream` ready for ingesting records.
454 ///
455 /// # Examples
456 ///
457 /// ```no_run
458 /// # use databricks_zerobus_ingest_sdk::*;
459 /// # use std::collections::HashMap;
460 /// # use std::sync::Arc;
461 /// # use async_trait::async_trait;
462 /// #
463 /// # struct MyHeadersProvider;
464 /// #
465 /// # #[async_trait]
466 /// # impl HeadersProvider for MyHeadersProvider {
467 /// # async fn get_headers(&self) -> ZerobusResult<HashMap<&'static str, String>> {
468 /// # let mut headers = HashMap::new();
469 /// # headers.insert("some_key", "some_value".to_string());
470 /// # Ok(headers)
471 /// # }
472 /// # }
473 /// #
474 /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
475 /// let table_props = TableProperties {
476 /// table_name: "catalog.schema.table".to_string(),
477 /// descriptor_proto: Default::default(),
478 /// };
479 ///
480 /// let headers_provider = Arc::new(MyHeadersProvider);
481 ///
482 /// let stream = sdk.create_stream_with_headers_provider(
483 /// table_props,
484 /// headers_provider,
485 /// None,
486 /// ).await?;
487 /// # Ok(())
488 /// # }
489 /// ```
490 #[instrument(level = "debug", skip_all)]
491 pub async fn create_stream_with_headers_provider(
492 &self,
493 table_properties: TableProperties,
494 headers_provider: Arc<dyn HeadersProvider>,
495 options: Option<StreamConfigurationOptions>,
496 ) -> ZerobusResult<ZerobusStream> {
497 let options = options.unwrap_or_default();
498
499 match options.record_type {
500 RecordType::Proto => {
501 if table_properties.descriptor_proto.is_none() {
502 return Err(ZerobusError::InvalidArgument(
503 "Proto descriptor is required for Proto record type".to_string(),
504 ));
505 }
506 }
507 RecordType::Json => {
508 if table_properties.descriptor_proto.is_some() {
509 warn!("JSON descriptor is not supported for Proto record type");
510 }
511 }
512 RecordType::Unspecified => {
513 return Err(ZerobusError::InvalidArgument(
514 "Record type is not specified".to_string(),
515 ));
516 }
517 }
518
519 let channel = self.get_or_create_channel_zerobus_client().await?;
520 let stream = ZerobusStream::new_stream(
521 channel,
522 table_properties,
523 Arc::clone(&headers_provider),
524 options,
525 )
526 .await;
527 match stream {
528 Ok(stream) => {
529 if let Some(stream_id) = stream.stream_id.as_ref() {
530 info!(stream_id = %stream_id, "Successfully created new ephemeral stream");
531 } else {
532 error!("Successfully created a stream but stream_id is None");
533 }
534 return Ok(stream);
535 }
536 Err(e) => {
537 error!("Stream initialization failed with error: {}", e);
538 return Err(e);
539 }
540 }
541 }
542
543 /// Recreates a failed stream and re-ingests unacknowledged records.
544 ///
545 /// This is useful when a stream encounters an error and you want to preserve
546 /// unacknowledged records. The method creates a new stream with the same
547 /// configuration and automatically re-ingests all records that weren't acknowledged.
548 ///
549 /// # Arguments
550 ///
551 /// * `stream` - The failed stream to recreate
552 ///
553 /// # Returns
554 ///
555 /// A new `ZerobusStream` with unacknowledged records already submitted.
556 ///
557 /// # Errors
558 ///
559 /// Returns any errors from stream creation or re-ingestion.
560 ///
561 /// # Examples
562 ///
563 /// ```no_run
564 /// # use databricks_zerobus_ingest_sdk::*;
565 /// # async fn example(sdk: ZerobusSdk, mut stream: ZerobusStream) -> Result<(), ZerobusError> {
566 /// match stream.close().await {
567 /// Err(_) => {
568 /// // Stream failed, recreate it
569 /// let new_stream = sdk.recreate_stream(&stream).await?;
570 /// // Continue using new_stream
571 /// }
572 /// Ok(_) => println!("Stream closed successfully"),
573 /// }
574 /// # Ok(())
575 /// # }
576 /// ```
577 #[instrument(level = "debug", skip_all)]
578 pub async fn recreate_stream(&self, stream: &ZerobusStream) -> ZerobusResult<ZerobusStream> {
579 let batches = stream.get_unacked_batches().await?;
580 let new_stream = self
581 .create_stream_with_headers_provider(
582 stream.table_properties.clone(),
583 Arc::clone(&stream.headers_provider),
584 Some(stream.options.clone()),
585 )
586 .await?;
587 for batch in batches {
588 let ack = new_stream.ingest_internal(batch).await?;
589 tokio::spawn(ack);
590 }
591 return Ok(new_stream);
592 }
593
594 /// Creates a new Arrow Flight ingestion stream to a Unity Catalog table.
595 ///
596 /// This establishes an Arrow Flight stream for high-performance ingestion of
597 /// Arrow RecordBatches. Authentication is handled automatically using the
598 /// provided OAuth credentials.
599 ///
600 /// # Arguments
601 ///
602 /// * `table_properties` - Table name and Arrow schema
603 /// * `client_id` - OAuth client ID for authentication
604 /// * `client_secret` - OAuth client secret for authentication
605 /// * `options` - Optional Arrow stream configuration (uses defaults if `None`)
606 ///
607 /// # Returns
608 ///
609 /// A `ZerobusArrowStream` ready for ingesting Arrow RecordBatches.
610 ///
611 /// # Errors
612 ///
613 /// * `CreateStreamError` - If stream creation fails
614 /// * `InvalidTableName` - If the table name is invalid or table doesn't exist
615 /// * `InvalidUCTokenError` - If OAuth authentication fails
616 /// * `PermissionDenied` - If credentials lack required permissions
617 ///
618 /// # Examples
619 ///
620 /// ```no_run
621 /// # use databricks_zerobus_ingest_sdk::*;
622 /// # use std::sync::Arc;
623 /// # use arrow_schema::{Schema as ArrowSchema, Field, DataType};
624 /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
625 /// let schema = Arc::new(ArrowSchema::new(vec![
626 /// Field::new("id", DataType::Int32, false),
627 /// Field::new("name", DataType::Utf8, true),
628 /// ]));
629 ///
630 /// let table_props = ArrowTableProperties {
631 /// table_name: "catalog.schema.table".to_string(),
632 /// schema,
633 /// };
634 ///
635 /// let stream = sdk.create_arrow_stream(
636 /// table_props,
637 /// "client-id".to_string(),
638 /// "client-secret".to_string(),
639 /// None,
640 /// ).await?;
641 /// # Ok(())
642 /// # }
643 /// ```
644 #[cfg(feature = "arrow-flight")]
645 #[instrument(level = "debug", skip_all)]
646 pub async fn create_arrow_stream(
647 &self,
648 table_properties: ArrowTableProperties,
649 client_id: String,
650 client_secret: String,
651 options: Option<ArrowStreamConfigurationOptions>,
652 ) -> ZerobusResult<ZerobusArrowStream> {
653 let headers_provider = OAuthHeadersProvider::new(
654 client_id,
655 client_secret,
656 table_properties.table_name.clone(),
657 self.workspace_id.clone(),
658 self.unity_catalog_url.clone(),
659 );
660 self.create_arrow_stream_with_headers_provider(
661 table_properties,
662 Arc::new(headers_provider),
663 options,
664 )
665 .await
666 }
667
668 /// Creates a new Arrow Flight stream with a custom headers provider.
669 ///
670 /// This is an advanced method that allows you to implement your own authentication
671 /// logic by providing a custom implementation of the `HeadersProvider` trait.
672 ///
673 /// # Arguments
674 ///
675 /// * `table_properties` - Table name and Arrow schema
676 /// * `headers_provider` - An `Arc` holding your custom `HeadersProvider` implementation
677 /// * `options` - Optional Arrow stream configuration (uses defaults if `None`)
678 ///
679 /// # Returns
680 ///
681 /// A `ZerobusArrowStream` ready for ingesting Arrow RecordBatches.
682 ///
683 /// # Examples
684 ///
685 /// ```no_run
686 /// # use databricks_zerobus_ingest_sdk::*;
687 /// # use std::collections::HashMap;
688 /// # use std::sync::Arc;
689 /// # use async_trait::async_trait;
690 /// # use arrow_schema::{Schema as ArrowSchema, Field, DataType};
691 /// #
692 /// # struct MyHeadersProvider;
693 /// #
694 /// # #[async_trait]
695 /// # impl HeadersProvider for MyHeadersProvider {
696 /// # async fn get_headers(&self) -> ZerobusResult<HashMap<&'static str, String>> {
697 /// # let mut headers = HashMap::new();
698 /// # headers.insert("authorization", "Bearer my-token".to_string());
699 /// # Ok(headers)
700 /// # }
701 /// # }
702 /// #
703 /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
704 /// let schema = Arc::new(ArrowSchema::new(vec![
705 /// Field::new("id", DataType::Int32, false),
706 /// ]));
707 ///
708 /// let table_props = ArrowTableProperties {
709 /// table_name: "catalog.schema.table".to_string(),
710 /// schema,
711 /// };
712 ///
713 /// let headers_provider = Arc::new(MyHeadersProvider);
714 ///
715 /// let stream = sdk.create_arrow_stream_with_headers_provider(
716 /// table_props,
717 /// headers_provider,
718 /// None,
719 /// ).await?;
720 /// # Ok(())
721 /// # }
722 /// ```
723 #[cfg(feature = "arrow-flight")]
724 #[instrument(level = "debug", skip_all)]
725 pub async fn create_arrow_stream_with_headers_provider(
726 &self,
727 table_properties: ArrowTableProperties,
728 headers_provider: Arc<dyn HeadersProvider>,
729 options: Option<ArrowStreamConfigurationOptions>,
730 ) -> ZerobusResult<ZerobusArrowStream> {
731 let options = options.unwrap_or_default();
732
733 let stream = ZerobusArrowStream::new(
734 &self.zerobus_endpoint,
735 Arc::clone(&self.tls_config),
736 table_properties,
737 headers_provider,
738 options,
739 )
740 .await;
741
742 match stream {
743 Ok(stream) => {
744 info!(
745 table_name = %stream.table_name(),
746 "Successfully created new Arrow Flight stream"
747 );
748 Ok(stream)
749 }
750 Err(e) => {
751 error!("Arrow Flight stream initialization failed: {}", e);
752 Err(e)
753 }
754 }
755 }
756
757 /// Recreates an Arrow Flight stream from a failed or closed stream, replaying any
758 /// unacknowledged batches.
759 ///
760 /// This method is useful when you want to manually recover from a stream failure
761 /// or continue ingestion after closing a stream with unacknowledged batches.
762 /// It creates a new stream with the same configuration and automatically ingests
763 /// any batches that were not acknowledged in the original stream.
764 ///
765 /// # Arguments
766 ///
767 /// * `stream` - A reference to the failed or closed Arrow Flight stream
768 ///
769 /// # Returns
770 ///
771 /// A new `ZerobusArrowStream` with the same configuration, with unacked batches
772 /// already queued for ingestion.
773 ///
774 /// # Errors
775 ///
776 /// * `InvalidStateError` - If the source stream is still active
777 /// * `CreateStreamError` - If stream creation fails
778 ///
779 /// # Examples
780 ///
781 /// ```no_run
782 /// # use databricks_zerobus_ingest_sdk::*;
783 /// # use arrow_array::RecordBatch;
784 /// # async fn example(sdk: ZerobusSdk, mut stream: ZerobusArrowStream) -> Result<(), ZerobusError> {
785 /// // Ingest some batches
786 /// // ...
787 ///
788 /// // Stream fails for some reason
789 /// match stream.flush().await {
790 /// Err(_) => {
791 /// // Close the failed stream
792 /// stream.close().await.ok();
793 ///
794 /// // Recreate and retry
795 /// let new_stream = sdk.recreate_arrow_stream(&stream).await?;
796 /// new_stream.flush().await?;
797 /// }
798 /// Ok(_) => {}
799 /// }
800 /// # Ok(())
801 /// # }
802 /// ```
803 #[cfg(feature = "arrow-flight")]
804 #[instrument(level = "debug", skip_all)]
805 pub async fn recreate_arrow_stream(
806 &self,
807 stream: &ZerobusArrowStream,
808 ) -> ZerobusResult<ZerobusArrowStream> {
809 let batches = stream.get_unacked_batches().await?;
810
811 let new_stream = self
812 .create_arrow_stream_with_headers_provider(
813 stream.table_properties().clone(),
814 stream.headers_provider(),
815 Some(stream.options().clone()),
816 )
817 .await?;
818
819 // Replay unacked batches.
820 for batch in batches {
821 let _offset = new_stream.ingest_batch(batch).await?;
822 }
823
824 info!(
825 table_name = %new_stream.table_name(),
826 "Successfully recreated Arrow Flight stream"
827 );
828
829 Ok(new_stream)
830 }
831
832 /// Gets or creates the shared Channel for all streams.
833 /// The first call creates the Channel, subsequent calls clone it.
834 /// All clones share the same underlying TCP connection via HTTP/2 multiplexing.
835 async fn get_or_create_channel_zerobus_client(&self) -> ZerobusResult<ZerobusClient<Channel>> {
836 let mut guard = self.shared_channel.lock().await;
837
838 if guard.is_none() {
839 // Create the channel for the first time.
840 let endpoint = Endpoint::from_shared(self.zerobus_endpoint.clone())
841 .map_err(|err| ZerobusError::ChannelCreationError(err.to_string()))?;
842
843 let endpoint = self.tls_config.configure_endpoint(endpoint)?;
844
845 // Check for HTTP proxy env vars (https_proxy, HTTPS_PROXY, etc.)
846 // and use a proxy connector if one is configured.
847 let host = endpoint.uri().host().unwrap_or_default().to_string();
848
849 let channel = if !proxy::is_no_proxy(&host) {
850 if let Some(proxy_connector) = proxy::create_proxy_connector() {
851 endpoint.connect_with_connector_lazy(proxy_connector)
852 } else {
853 endpoint.connect_lazy()
854 }
855 } else {
856 endpoint.connect_lazy()
857 };
858
859 let client = ZerobusClient::new(channel)
860 .max_decoding_message_size(usize::MAX)
861 .max_encoding_message_size(usize::MAX);
862
863 *guard = Some(client);
864 }
865
866 Ok(guard
867 .as_ref()
868 .expect("Channel was just initialized")
869 .clone())
870 }
871}
872
873impl ZerobusStream {
874 /// Creates a new ephemeral stream for ingesting records.
875 #[instrument(level = "debug", skip_all)]
876 async fn new_stream(
877 channel: ZerobusClient<Channel>,
878 table_properties: TableProperties,
879 headers_provider: Arc<dyn HeadersProvider>,
880 options: StreamConfigurationOptions,
881 ) -> ZerobusResult<Self> {
882 let (stream_init_result_tx, stream_init_result_rx) =
883 tokio::sync::oneshot::channel::<ZerobusResult<String>>();
884
885 let (logical_last_received_offset_id_tx, _logical_last_received_offset_id_rx) =
886 tokio::sync::watch::channel(None);
887 let landing_zone = Arc::new(LandingZone::<Box<IngestRequest>>::new(
888 options.max_inflight_requests,
889 ));
890
891 let oneshot_map = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
892 let is_closed = Arc::new(AtomicBool::new(false));
893 let failed_records = Arc::new(RwLock::new(Vec::new()));
894 let logical_offset_id_generator = OffsetIdGenerator::default();
895
896 let (server_error_tx, server_error_rx) = tokio::sync::watch::channel(None);
897 let cancellation_token = CancellationToken::new();
898 // Create callback channel and spawn callback handler task only if callback is defined
899 let (callback_tx, callback_handler_task) = if options.ack_callback.is_some() {
900 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
901 let task = Self::spawn_callback_handler_task(
902 rx,
903 options.ack_callback.clone(),
904 cancellation_token.clone(),
905 );
906 (Some(tx), Some(task))
907 } else {
908 (None, None)
909 };
910
911 let supervisor_task = tokio::task::spawn(Self::supervisor_task(
912 channel,
913 table_properties.clone(),
914 Arc::clone(&headers_provider),
915 options.clone(),
916 Arc::clone(&landing_zone),
917 Arc::clone(&oneshot_map),
918 logical_last_received_offset_id_tx.clone(),
919 Arc::clone(&is_closed),
920 Arc::clone(&failed_records),
921 stream_init_result_tx,
922 server_error_tx,
923 cancellation_token.clone(),
924 callback_tx.clone(),
925 ));
926 let stream_id = Some(stream_init_result_rx.await.map_err(|_| {
927 ZerobusError::UnexpectedStreamResponseError(
928 "Supervisor task died before stream creation".to_string(),
929 )
930 })??);
931
932 let stream = Self {
933 stream_type: StreamType::Ephemeral,
934 headers_provider,
935 options: options.clone(),
936 table_properties,
937 stream_id,
938 landing_zone,
939 oneshot_map,
940 supervisor_task,
941 logical_offset_id_generator,
942 logical_last_received_offset_id_tx,
943 _logical_last_received_offset_id_rx,
944 failed_records,
945 is_closed,
946 sync_mutex: Arc::new(tokio::sync::Mutex::new(())),
947 server_error_rx,
948 cancellation_token,
949 callback_handler_task,
950 };
951
952 Ok(stream)
953 }
954
955 /// Supervisor task is responsible for managing the stream lifecycle.
956 /// It handles stream creation, recovery, and error handling.
957 #[allow(clippy::too_many_arguments)]
958 #[instrument(level = "debug", skip_all, fields(table_name = %table_properties.table_name))]
959 async fn supervisor_task(
960 channel: ZerobusClient<Channel>,
961 table_properties: TableProperties,
962 headers_provider: Arc<dyn HeadersProvider>,
963 options: StreamConfigurationOptions,
964 landing_zone: RecordLandingZone,
965 oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
966 logical_last_received_offset_id_tx: tokio::sync::watch::Sender<Option<OffsetId>>,
967 is_closed: Arc<AtomicBool>,
968 failed_records: Arc<RwLock<Vec<EncodedBatch>>>,
969 stream_init_result_tx: tokio::sync::oneshot::Sender<ZerobusResult<String>>,
970 server_error_tx: tokio::sync::watch::Sender<Option<ZerobusError>>,
971 cancellation_token: CancellationToken,
972 callback_tx: Option<tokio::sync::mpsc::UnboundedSender<CallbackMessage>>,
973 ) -> ZerobusResult<()> {
974 let mut initial_stream_creation = true;
975 let mut stream_init_result_tx = Some(stream_init_result_tx);
976
977 loop {
978 debug!("Supervisor task loop");
979
980 if cancellation_token.is_cancelled() {
981 debug!("Supervisor task cancelled, exiting");
982 return Ok(());
983 }
984
985 let landing_zone_sender = Arc::clone(&landing_zone);
986 let landing_zone_receiver = Arc::clone(&landing_zone);
987 let landing_zone_recovery = Arc::clone(&landing_zone);
988
989 // 1. Create a stream.
990 let strategy = FixedInterval::from_millis(options.recovery_backoff_ms)
991 .take(options.recovery_retries as usize);
992
993 let create_attempt = || {
994 let channel = channel.clone();
995 let table_properties = table_properties.clone();
996 let headers_provider = Arc::clone(&headers_provider);
997 let record_type = options.record_type;
998
999 async move {
1000 tokio::time::timeout(
1001 Duration::from_millis(options.recovery_timeout_ms),
1002 Self::create_stream_connection(
1003 channel,
1004 &table_properties,
1005 &headers_provider,
1006 record_type,
1007 ),
1008 )
1009 .await
1010 .map_err(|_| {
1011 ZerobusError::CreateStreamError(tonic::Status::deadline_exceeded(
1012 "Stream creation timed out",
1013 ))
1014 })?
1015 }
1016 };
1017 let should_retry = |e: &ZerobusError| options.recovery && e.is_retryable();
1018 let creation = RetryIf::spawn(strategy, create_attempt, should_retry).await;
1019
1020 let (tx, response_grpc_stream, stream_id) = match creation {
1021 Ok((tx, response_grpc_stream, stream_id)) => (tx, response_grpc_stream, stream_id),
1022 Err(e) => {
1023 if initial_stream_creation {
1024 if let Some(tx) = stream_init_result_tx.take() {
1025 let _ = tx.send(Err(e.clone()));
1026 }
1027 } else {
1028 is_closed.store(true, Ordering::Relaxed);
1029 Self::fail_all_pending_records(
1030 landing_zone.clone(),
1031 oneshot_map.clone(),
1032 failed_records.clone(),
1033 &e,
1034 &callback_tx,
1035 )
1036 .await;
1037 }
1038 return Err(e);
1039 }
1040 };
1041 if initial_stream_creation {
1042 if let Some(stream_init_result_tx_inner) = stream_init_result_tx.take() {
1043 let _ = stream_init_result_tx_inner.send(Ok(stream_id.clone()));
1044 }
1045 initial_stream_creation = false;
1046 info!(stream_id = %stream_id, "Successfully created stream");
1047 } else {
1048 info!(stream_id = %stream_id, "Successfully recovered stream");
1049 let _ = server_error_tx.send(None);
1050 }
1051
1052 // 2. Reset landing zone.
1053 landing_zone_recovery.reset_observe();
1054
1055 // 3. Spawn receiver and sender task.
1056 let is_paused = Arc::new(AtomicBool::new(false));
1057 let mut recv_task = Self::spawn_receiver_task(
1058 response_grpc_stream,
1059 logical_last_received_offset_id_tx.clone(),
1060 landing_zone_receiver,
1061 oneshot_map.clone(),
1062 Arc::clone(&is_paused),
1063 options.clone(),
1064 server_error_tx.clone(),
1065 cancellation_token.clone(),
1066 callback_tx.clone(),
1067 );
1068 let mut send_task = Self::spawn_sender_task(
1069 tx,
1070 landing_zone_sender,
1071 Arc::clone(&is_paused),
1072 server_error_tx.clone(),
1073 cancellation_token.clone(),
1074 );
1075
1076 // 4. Wait for any of the two tasks to end.
1077 let result = tokio::select! {
1078 recv_result = &mut recv_task => {
1079 send_task.abort();
1080 match recv_result {
1081 Ok(Err(e)) => Err(e),
1082 Err(e) => Err(ZerobusError::UnexpectedStreamResponseError(
1083 format!("Receiver task panicked: {}", e)
1084 )),
1085 Ok(Ok(())) => {
1086 info!("Receiver task completed successfully");
1087 Ok(())
1088 }
1089 }
1090 }
1091 send_result = &mut send_task => {
1092 recv_task.abort();
1093 match send_result {
1094 Ok(Err(e)) => Err(e),
1095 Err(e) => Err(ZerobusError::UnexpectedStreamResponseError(
1096 format!("Sender task panicked: {}", e)
1097 )),
1098 Ok(Ok(())) => Ok(()) // This only happens when the sender task receives a cancellation signal.
1099 }
1100 }
1101 };
1102
1103 // 5. Handle errors.
1104 if let Err(error) = result {
1105 error!(stream_id = %stream_id, "Stream failure detected: {}", error);
1106 let error = match &error {
1107 // Mapping this to pass certain e2e tests.
1108 // TODO: Remove this once we fix tests.
1109 ZerobusError::StreamClosedError(status)
1110 if status.code() == tonic::Code::InvalidArgument =>
1111 {
1112 ZerobusError::InvalidArgument(status.message().to_string())
1113 }
1114 _ => error,
1115 };
1116 let _ = server_error_tx.send(Some(error.clone()));
1117 if !error.is_retryable() || !options.recovery {
1118 is_closed.store(true, Ordering::Relaxed);
1119 Self::fail_all_pending_records(
1120 landing_zone.clone(),
1121 oneshot_map.clone(),
1122 failed_records.clone(),
1123 &error,
1124 &callback_tx,
1125 )
1126 .await;
1127 return Err(error);
1128 }
1129 }
1130 }
1131 }
1132
1133 /// Creates a stream connection to the Zerobus API.
1134 /// Returns a tuple containing the sender, response gRPC stream, and stream ID.
1135 /// If the stream creation fails, it returns an error.
1136 #[instrument(level = "debug", skip_all, fields(table_name = %table_properties.table_name))]
1137 async fn create_stream_connection(
1138 mut channel: ZerobusClient<Channel>,
1139 table_properties: &TableProperties,
1140 headers_provider: &Arc<dyn HeadersProvider>,
1141 record_type: RecordType,
1142 ) -> ZerobusResult<(
1143 tokio::sync::mpsc::Sender<EphemeralStreamRequest>,
1144 tonic::Streaming<EphemeralStreamResponse>,
1145 String,
1146 )> {
1147 const CHANNEL_BUFFER_SIZE: usize = 2048;
1148 let (tx, rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE);
1149 let mut request_stream = tonic::Request::new(ReceiverStream::new(rx));
1150
1151 let stream_metadata = request_stream.metadata_mut();
1152 let headers = headers_provider.get_headers().await?;
1153
1154 for (key, value) in headers {
1155 match key {
1156 "x-databricks-zerobus-table-name" => {
1157 let table_name = MetadataValue::try_from(value.as_str())
1158 .map_err(|e| ZerobusError::InvalidTableName(e.to_string()))?;
1159 stream_metadata.insert("x-databricks-zerobus-table-name", table_name);
1160 }
1161 "authorization" => {
1162 let mut auth_value = MetadataValue::try_from(value.as_str()).map_err(|_| {
1163 error!(table_name = %table_properties.table_name, "Invalid token: {}", value);
1164 ZerobusError::InvalidUCTokenError(value)
1165 })?;
1166 auth_value.set_sensitive(true);
1167 stream_metadata.insert("authorization", auth_value);
1168 }
1169 other_key => {
1170 let header_value = MetadataValue::try_from(value.as_str())
1171 .map_err(|_| ZerobusError::InvalidArgument(other_key.to_string()))?;
1172 stream_metadata.insert(other_key, header_value);
1173 }
1174 }
1175 }
1176
1177 let mut response_grpc_stream = channel
1178 .ephemeral_stream(request_stream)
1179 .await
1180 .map_err(ZerobusError::CreateStreamError)?
1181 .into_inner();
1182
1183 let descriptor_proto = if record_type == RecordType::Proto {
1184 Some(
1185 table_properties
1186 .descriptor_proto
1187 .as_ref()
1188 .ok_or_else(|| {
1189 ZerobusError::InvalidArgument(
1190 "Descriptor proto is required for Proto record type".to_string(),
1191 )
1192 })?
1193 .encode_to_vec(),
1194 )
1195 } else {
1196 None
1197 };
1198
1199 let create_stream_request = RequestPayload::CreateStream(CreateIngestStreamRequest {
1200 table_name: Some(table_properties.table_name.to_string()),
1201 descriptor_proto,
1202 record_type: Some(record_type.into()),
1203 });
1204
1205 debug!("Sending CreateStream request.");
1206 tx.send(EphemeralStreamRequest {
1207 payload: Some(create_stream_request),
1208 })
1209 .await
1210 .map_err(|_| {
1211 error!(table_name = %table_properties.table_name, "Failed to send CreateStream request");
1212 ZerobusError::StreamClosedError(tonic::Status::internal(
1213 "Failed to send CreateStream request",
1214 ))
1215 })?;
1216 debug!("Waiting for CreateStream response.");
1217 let create_stream_response = response_grpc_stream.message().await;
1218
1219 match create_stream_response {
1220 Ok(Some(create_stream_response)) => match create_stream_response.payload {
1221 Some(ResponsePayload::CreateStreamResponse(resp)) => {
1222 if let Some(stream_id) = resp.stream_id {
1223 info!(stream_id = %stream_id, "Successfully created stream");
1224 Ok((tx, response_grpc_stream, stream_id))
1225 } else {
1226 error!("Successfully created a stream but stream_id is None");
1227 Err(ZerobusError::CreateStreamError(tonic::Status::internal(
1228 "Successfully created a stream but stream_id is None",
1229 )))
1230 }
1231 }
1232 unexpected_message => {
1233 error!("Unexpected response from server {unexpected_message:?}");
1234 Err(ZerobusError::CreateStreamError(tonic::Status::internal(
1235 "Unexpected response from server",
1236 )))
1237 }
1238 },
1239 Ok(None) => {
1240 info!("Server closed the stream gracefully before sending CreateStream response");
1241 Err(ZerobusError::CreateStreamError(tonic::Status::ok(
1242 "Stream closed gracefully by server",
1243 )))
1244 }
1245 Err(status) => {
1246 error!("CreateStream RPC failed: {status:?}");
1247 Err(ZerobusError::CreateStreamError(status))
1248 }
1249 }
1250 }
1251
1252 /// Ingests a single record into the stream.
1253 ///
1254 /// This method is non-blocking and returns immediately with a future. The record is
1255 /// queued for transmission and the returned future resolves when the server acknowledges
1256 /// the record has been durably written.
1257 ///
1258 /// # Arguments
1259 ///
1260 /// * `payload` - A record that can be converted to `EncodedRecord` (either JSON string or protobuf bytes)
1261 ///
1262 /// # Returns
1263 ///
1264 /// A future that resolves to the logical offset ID of the acknowledged record.
1265 ///
1266 /// # Errors
1267 ///
1268 /// * `InvalidArgument` - If the record type doesn't match stream configuration
1269 /// * `StreamClosedError` - If the stream has been closed
1270 /// * Other errors may be returned via the acknowledgment future
1271 ///
1272 /// # Examples
1273 ///
1274 /// ```no_run
1275 /// # use databricks_zerobus_ingest_sdk::*;
1276 /// # use prost::Message;
1277 /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1278 /// # let my_record = vec![1, 2, 3]; // Example protobuf-encoded data
1279 /// // Ingest and immediately await acknowledgment
1280 /// let ack = stream.ingest_record(my_record).await?;
1281 /// let offset = ack.await?;
1282 /// println!("Record written at offset: {}", offset);
1283 /// # Ok(())
1284 /// # }
1285 /// ```
1286 ///
1287 /// # Deprecation Note
1288 ///
1289 /// This method is deprecated. Use [`ingest_record_offset()`](Self::ingest_record_offset) instead,
1290 /// which returns the offset directly (after queuing) without Future wrapping. You can then use
1291 /// [`wait_for_offset()`](Self::wait_for_offset) to explicitly wait for acknowledgment when needed.
1292 #[deprecated(
1293 since = "0.4.0",
1294 note = "Use `ingest_record_offset()` instead which returns the offset directly after queuing"
1295 )]
1296 pub async fn ingest_record(
1297 &self,
1298 payload: impl Into<EncodedRecord>,
1299 ) -> ZerobusResult<impl Future<Output = ZerobusResult<OffsetId>>> {
1300 let encoded_batch = EncodedBatch::try_from_record(payload, self.options.record_type)
1301 .ok_or_else(|| {
1302 ZerobusError::InvalidArgument(
1303 "Record type does not match stream configuration".to_string(),
1304 )
1305 })?;
1306
1307 self.ingest_internal(encoded_batch).await
1308 }
1309
1310 /// Ingests a single record and returns its logical offset directly.
1311 ///
1312 /// This is an alternative to `ingest_record()` that returns the logical offset directly
1313 /// as an integer (after queuing) instead of wrapping it in a Future. Use `wait_for_offset()`
1314 /// to explicitly wait for server acknowledgment of this offset when needed.
1315 ///
1316 /// # Arguments
1317 ///
1318 /// * `payload` - A record that can be converted to `EncodedRecord` (either JSON string or protobuf bytes)
1319 ///
1320 /// # Returns
1321 ///
1322 /// The logical offset ID assigned to this record.
1323 ///
1324 /// # Errors
1325 ///
1326 /// * `InvalidArgument` - If the record type doesn't match stream configuration
1327 /// * `StreamClosedError` - If the stream has been closed
1328 ///
1329 /// # Examples
1330 ///
1331 /// ```no_run
1332 /// # use databricks_zerobus_ingest_sdk::*;
1333 /// # use prost::Message;
1334 /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1335 /// # let my_record = vec![1, 2, 3]; // Example protobuf-encoded data
1336 /// // Ingest and get offset immediately
1337 /// let offset = stream.ingest_record_offset(my_record).await?;
1338 ///
1339 /// // Later, wait for acknowledgment
1340 /// stream.wait_for_offset(offset).await?;
1341 /// println!("Record at offset {} has been acknowledged", offset);
1342 /// # Ok(())
1343 /// # }
1344 /// ```
1345 pub async fn ingest_record_offset(
1346 &self,
1347 payload: impl Into<EncodedRecord>,
1348 ) -> ZerobusResult<OffsetId> {
1349 let encoded_batch = EncodedBatch::try_from_record(payload, self.options.record_type)
1350 .ok_or_else(|| {
1351 ZerobusError::InvalidArgument(
1352 "Record type does not match stream configuration".to_string(),
1353 )
1354 })?;
1355
1356 self.ingest_internal_v2(encoded_batch).await
1357 }
1358
1359 /// Ingests a batch of records into the stream.
1360 ///
1361 /// This method is non-blocking and returns immediately with a future. The records are
1362 /// queued for transmission and the returned future resolves when the server acknowledges
1363 /// the entire batch has been durably written.
1364 ///
1365 /// # Arguments
1366 ///
1367 /// * `payload` - An iterator of protobuf-encoded records (each item should be convertible to `EncodedRecord`)
1368 ///
1369 /// # Returns
1370 ///
1371 /// A future that resolves to the logical offset ID of the last acknowledged batch.
1372 /// If the batch is empty, the future resoles to None.
1373 ///
1374 /// # Errors
1375 ///
1376 /// * `InvalidArgument` - If record types don't match stream configuration
1377 /// * `StreamClosedError` - If the stream has been closed
1378 /// * Other errors may be returned via the acknowledgment future
1379 ///
1380 /// # Examples
1381 ///
1382 /// ```no_run
1383 /// # use databricks_zerobus_ingest_sdk::*;
1384 /// # use prost::Message;
1385 /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1386 /// let records = vec![vec![1, 2, 3], vec![4, 5, 6]]; // Example protobuf-encoded data
1387 /// // Ingest batch and await acknowledgment
1388 /// let ack = stream.ingest_records(records).await?;
1389 /// let offset = ack.await?;
1390 /// match offset {
1391 /// Some(offset) => println!("Batch written at offset: {}", offset),
1392 /// None => println!("Empty batch - no records written"),
1393 /// }
1394 /// # Ok(())
1395 /// # }
1396 /// ```
1397 ///
1398 /// # Deprecation Note
1399 ///
1400 /// This method is deprecated. Use [`ingest_records_offset()`](Self::ingest_records_offset) instead,
1401 /// which returns the offset directly (after queuing) without Future wrapping. You can then use
1402 /// [`wait_for_offset()`](Self::wait_for_offset) to explicitly wait for acknowledgment when needed.
1403 #[deprecated(
1404 since = "0.4.0",
1405 note = "Use `ingest_records_offset()` instead which returns the offset directly after queuing"
1406 )]
1407 pub async fn ingest_records<I, T>(
1408 &self,
1409 payload: I,
1410 ) -> ZerobusResult<impl Future<Output = ZerobusResult<Option<OffsetId>>>>
1411 where
1412 I: IntoIterator<Item = T>,
1413 T: Into<EncodedRecord>,
1414 {
1415 let encoded_batch = EncodedBatch::try_from_batch(payload, self.options.record_type)
1416 .ok_or_else(|| {
1417 ZerobusError::InvalidArgument(
1418 "Record type does not match stream configuration".to_string(),
1419 )
1420 })?;
1421
1422 // For non-empty batches, get the future from ingest_internal
1423 let ingest_future = if encoded_batch.is_empty() {
1424 None
1425 } else {
1426 Some(self.ingest_internal(encoded_batch).await?)
1427 };
1428
1429 Ok(async move {
1430 match ingest_future {
1431 Some(fut) => fut.await.map(Option::Some),
1432 None => Ok(None),
1433 }
1434 })
1435 }
1436
1437 /// Ingests a batch of records and returns the logical offset directly.
1438 ///
1439 /// This is an alternative to `ingest_records()` that returns the logical offset directly
1440 /// (after queuing) instead of wrapping it in a Future. Use `wait_for_offset()` to explicitly
1441 /// wait for server acknowledgment when needed.
1442 ///
1443 /// # Arguments
1444 ///
1445 /// * `payload` - An iterator of records (each item should be convertible to `EncodedRecord`)
1446 ///
1447 /// # Returns
1448 ///
1449 /// `Some(offset_id)` for non-empty batches, or `None` if the batch is empty.
1450 ///
1451 /// # Errors
1452 ///
1453 /// * `InvalidArgument` - If record types don't match stream configuration
1454 /// * `StreamClosedError` - If the stream has been closed
1455 ///
1456 /// # Examples
1457 ///
1458 /// ```no_run
1459 /// # use databricks_zerobus_ingest_sdk::*;
1460 /// # use prost::Message;
1461 /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1462 /// let records = vec![vec![1, 2, 3], vec![4, 5, 6]]; // Example protobuf-encoded data
1463 ///
1464 /// // Ingest batch and get offset immediately
1465 /// if let Some(offset) = stream.ingest_records_offset(records).await? {
1466 /// // Later, wait for batch acknowledgment
1467 /// stream.wait_for_offset(offset).await?;
1468 /// println!("Batch at offset {} has been acknowledged", offset);
1469 /// }
1470 /// # Ok(())
1471 /// # }
1472 /// ```
1473 pub async fn ingest_records_offset<I, T>(&self, payload: I) -> ZerobusResult<Option<OffsetId>>
1474 where
1475 I: IntoIterator<Item = T>,
1476 T: Into<EncodedRecord>,
1477 {
1478 let encoded_batch = EncodedBatch::try_from_batch(payload, self.options.record_type)
1479 .ok_or_else(|| {
1480 ZerobusError::InvalidArgument(
1481 "Record type does not match stream configuration".to_string(),
1482 )
1483 })?;
1484
1485 if encoded_batch.is_empty() {
1486 Ok(None)
1487 } else {
1488 self.ingest_internal_v2(encoded_batch)
1489 .await
1490 .map(Option::Some)
1491 }
1492 }
1493 /// Internal unified method for ingesting records and batches
1494 async fn ingest_internal(
1495 &self,
1496 encoded_batch: EncodedBatch,
1497 ) -> ZerobusResult<impl Future<Output = ZerobusResult<OffsetId>>> {
1498 if self.is_closed.load(Ordering::Relaxed) {
1499 error!(table_name = %self.table_properties.table_name, "Stream closed");
1500 return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1501 "Stream closed",
1502 )));
1503 }
1504
1505 let _guard = self.sync_mutex.lock().await;
1506
1507 let offset_id = self.logical_offset_id_generator.next();
1508 debug!(
1509 offset_id = offset_id,
1510 record_count = encoded_batch.get_record_count(),
1511 "Ingesting record(s)"
1512 );
1513
1514 if let Some(stream_id) = self.stream_id.as_ref() {
1515 let (tx, rx) = tokio::sync::oneshot::channel();
1516 {
1517 let mut map = self.oneshot_map.lock().await;
1518 map.insert(offset_id, tx);
1519 }
1520 self.landing_zone
1521 .add(Box::new(IngestRequest {
1522 payload: encoded_batch,
1523 offset_id,
1524 }))
1525 .await;
1526 let stream_id = stream_id.to_string();
1527 Ok(async move {
1528 rx.await.map_err(|err| {
1529 error!(stream_id = %stream_id, "Failed to receive ack: {}", err);
1530 ZerobusError::StreamClosedError(tonic::Status::internal(
1531 "Failed to receive ack",
1532 ))
1533 })?
1534 })
1535 } else {
1536 error!("Stream ID is None");
1537 Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1538 "Stream ID is None",
1539 )))
1540 }
1541 }
1542
1543 /// Internal unified method for ingesting records and batches
1544 async fn ingest_internal_v2(&self, encoded_batch: EncodedBatch) -> ZerobusResult<OffsetId> {
1545 if self.is_closed.load(Ordering::Relaxed) {
1546 error!(table_name = %self.table_properties.table_name, "Stream closed");
1547 return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1548 "Stream closed",
1549 )));
1550 }
1551
1552 let _guard = self.sync_mutex.lock().await;
1553
1554 let offset_id = self.logical_offset_id_generator.next();
1555 debug!(
1556 offset_id = offset_id,
1557 record_count = encoded_batch.get_record_count(),
1558 "Ingesting record(s)"
1559 );
1560 self.landing_zone
1561 .add(Box::new(IngestRequest {
1562 payload: encoded_batch,
1563 offset_id,
1564 }))
1565 .await;
1566 Ok(offset_id)
1567 }
1568
1569 /// Spawns a task that handles callback execution in a separate thread.
1570 /// This task receives callback messages via a channel and executes them
1571 /// without blocking the receiver task.
1572 #[instrument(level = "debug", skip_all)]
1573 fn spawn_callback_handler_task(
1574 mut callback_rx: tokio::sync::mpsc::UnboundedReceiver<CallbackMessage>,
1575 ack_callback: Option<Arc<dyn AckCallback>>,
1576 cancellation_token: CancellationToken,
1577 ) -> tokio::task::JoinHandle<()> {
1578 tokio::spawn(async move {
1579 let span = span!(Level::DEBUG, "callback_handler");
1580 let _guard = span.enter();
1581 loop {
1582 tokio::select! {
1583 biased;
1584 message = callback_rx.recv() => {
1585 match message {
1586 Some(message) => {
1587 match message {
1588 CallbackMessage::Ack(logical_offset) => {
1589 if let Some(ref callback) = ack_callback {
1590 callback.on_ack(logical_offset);
1591 }
1592 }
1593 CallbackMessage::Error(logical_offset, error_message) => {
1594 if let Some(ref callback) = ack_callback {
1595 callback.on_error(logical_offset, &error_message);
1596 }
1597 }
1598 }
1599 }
1600 None => { // This happens when all senders are dropped.
1601 debug!("Callback handler task shutting down");
1602 return;
1603 }
1604 }
1605 }
1606 _ = cancellation_token.cancelled() => {
1607 debug!("Callback handler task cancelled");
1608 return;
1609 }
1610
1611 }
1612 }
1613 })
1614 }
1615
1616 /// Spawns a task that continuously reads from `response_grpc_stream`
1617 /// and propagates the received durability acknowledgements to the
1618 /// corresponding pending acks promises.
1619 #[instrument(level = "debug", skip_all)]
1620 #[allow(clippy::too_many_arguments)]
1621 fn spawn_receiver_task(
1622 mut response_grpc_stream: tonic::Streaming<EphemeralStreamResponse>,
1623 last_received_offset_id_tx: tokio::sync::watch::Sender<Option<OffsetId>>,
1624 landing_zone: RecordLandingZone,
1625 oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
1626 is_paused: Arc<AtomicBool>,
1627 options: StreamConfigurationOptions,
1628 server_error_tx: tokio::sync::watch::Sender<Option<ZerobusError>>,
1629 cancellation_token: CancellationToken,
1630 callback_tx: Option<tokio::sync::mpsc::UnboundedSender<CallbackMessage>>,
1631 ) -> tokio::task::JoinHandle<ZerobusResult<()>> {
1632 tokio::spawn(async move {
1633 let span = span!(Level::DEBUG, "inbound_stream_processor");
1634 let _guard = span.enter();
1635 let mut last_acked_offset = -1;
1636 let mut pause_deadline: Option<tokio::time::Instant> = None;
1637
1638 loop {
1639 if let Some(deadline) = pause_deadline {
1640 let now = tokio::time::Instant::now();
1641 let all_acked = landing_zone.is_observed_empty();
1642
1643 if now >= deadline {
1644 info!("Graceful close timeout reached. Triggering recovery.");
1645 return Ok(());
1646 } else if all_acked {
1647 info!("All in-flight records acknowledged during graceful close. Triggering recovery.");
1648 return Ok(());
1649 }
1650 }
1651
1652 let message_result = if let Some(deadline) = pause_deadline {
1653 tokio::select! {
1654 biased;
1655 _ = cancellation_token.cancelled() => return Ok(()),
1656 _ = tokio::time::sleep_until(deadline) => {
1657 continue;
1658 }
1659 res = tokio::time::timeout(
1660 Duration::from_millis(options.server_lack_of_ack_timeout_ms),
1661 response_grpc_stream.message(),
1662 ) => res,
1663 }
1664 } else {
1665 tokio::select! {
1666 biased;
1667 _ = cancellation_token.cancelled() => return Ok(()),
1668 res = tokio::time::timeout(
1669 Duration::from_millis(options.server_lack_of_ack_timeout_ms),
1670 response_grpc_stream.message(),
1671 ) => res,
1672 }
1673 };
1674
1675 match message_result {
1676 Ok(Ok(Some(ingest_record_response))) => match ingest_record_response.payload {
1677 Some(ResponsePayload::IngestRecordResponse(IngestRecordResponse {
1678 durability_ack_up_to_offset,
1679 })) => {
1680 let durability_ack_up_to_offset = match durability_ack_up_to_offset {
1681 Some(offset) => offset,
1682 None => {
1683 error!("Missing ack offset in server response");
1684 let error =
1685 ZerobusError::StreamClosedError(tonic::Status::internal(
1686 "Missing ack offset in server response",
1687 ));
1688 let _ = server_error_tx.send(Some(error.clone()));
1689 return Err(error);
1690 }
1691 };
1692 let mut last_logical_acked_offset = -2;
1693 let mut map = oneshot_map.lock().await;
1694 for _offset_to_ack in
1695 (last_acked_offset + 1)..=durability_ack_up_to_offset
1696 {
1697 if let Ok(record) = landing_zone.remove_observed() {
1698 let logical_offset = record.offset_id;
1699 last_logical_acked_offset = logical_offset;
1700
1701 if let Some(sender) = map.remove(&logical_offset) {
1702 let _ = sender.send(Ok(logical_offset));
1703 }
1704
1705 if let Some(ref tx) = callback_tx {
1706 let _ = tx.send(CallbackMessage::Ack(logical_offset));
1707 }
1708 }
1709 }
1710 drop(map);
1711 last_acked_offset = durability_ack_up_to_offset;
1712 if last_logical_acked_offset != -2 {
1713 let _ignore_on_channel_break = last_received_offset_id_tx
1714 .send(Some(last_logical_acked_offset));
1715 }
1716 }
1717 Some(ResponsePayload::CloseStreamSignal(CloseStreamSignal {
1718 duration,
1719 })) => {
1720 if options.recovery {
1721 let server_duration_ms = duration
1722 .as_ref()
1723 .map(|d| d.seconds as u64 * 1000 + d.nanos as u64 / 1_000_000)
1724 .unwrap_or(0);
1725
1726 let wait_duration_ms = match options.stream_paused_max_wait_time_ms
1727 {
1728 None => server_duration_ms,
1729 Some(0) => {
1730 // Immediate recovery
1731 info!("Server will close the stream in {}ms. Triggering stream recovery.", server_duration_ms);
1732 return Ok(());
1733 }
1734 Some(max_wait) => std::cmp::min(max_wait, server_duration_ms),
1735 };
1736
1737 if wait_duration_ms == 0 {
1738 info!("Server will close the stream. Triggering immediate recovery.");
1739 return Ok(());
1740 }
1741
1742 is_paused.store(true, Ordering::Relaxed);
1743 pause_deadline = Some(
1744 tokio::time::Instant::now()
1745 + Duration::from_millis(wait_duration_ms),
1746 );
1747 info!(
1748 "Server will close the stream in {}ms. Entering graceful close period (waiting up to {}ms for in-flight acks).",
1749 server_duration_ms, wait_duration_ms
1750 );
1751 }
1752 }
1753 unexpected_message => {
1754 error!("Unexpected response from server {unexpected_message:?}");
1755 let error = ZerobusError::StreamClosedError(tonic::Status::internal(
1756 "Unexpected response from server",
1757 ));
1758 let _ = server_error_tx.send(Some(error.clone()));
1759 return Err(error);
1760 }
1761 },
1762 Ok(Ok(None)) => {
1763 info!("Server closed the stream without errors.");
1764 let error = ZerobusError::StreamClosedError(tonic::Status::ok(
1765 "Stream closed by server without errors.",
1766 ));
1767 let _ = server_error_tx.send(Some(error.clone()));
1768 return Err(error);
1769 }
1770 Ok(Err(status)) => {
1771 error!("Unexpected response from server {status:?}");
1772 let error = ZerobusError::StreamClosedError(status);
1773 let _ = server_error_tx.send(Some(error.clone()));
1774 return Err(error);
1775 }
1776 Err(_timeout) => {
1777 // No message received for server_lack_of_ack_timeout_ms.
1778 if pause_deadline.is_none() && !landing_zone.is_observed_empty() {
1779 error!(
1780 "Server ack timeout: no response for {}ms",
1781 options.server_lack_of_ack_timeout_ms
1782 );
1783 let error = ZerobusError::StreamClosedError(
1784 tonic::Status::deadline_exceeded("Server ack timeout"),
1785 );
1786 let _ = server_error_tx.send(Some(error.clone()));
1787 return Err(error);
1788 }
1789 }
1790 }
1791 }
1792 })
1793 }
1794
1795 /// Spawns a task that continuously sends records to the Zerobus API by observing the landing zone
1796 /// to get records and sending them through the outbound stream to the gRPC stream.
1797 fn spawn_sender_task(
1798 outbound_stream: tokio::sync::mpsc::Sender<EphemeralStreamRequest>,
1799 landing_zone: RecordLandingZone,
1800 is_paused: Arc<AtomicBool>,
1801 server_error_tx: tokio::sync::watch::Sender<Option<ZerobusError>>,
1802 cancellation_token: CancellationToken,
1803 ) -> tokio::task::JoinHandle<ZerobusResult<()>> {
1804 tokio::spawn(async move {
1805 let physical_offset_id_generator = OffsetIdGenerator::default();
1806 loop {
1807 let item = tokio::select! {
1808 biased;
1809 _ = cancellation_token.cancelled() => return Ok(()),
1810 item = async {
1811 if is_paused.load(Ordering::Relaxed) {
1812 std::future::pending().await // Wait until supervisor task aborts this task.
1813 } else {
1814 landing_zone.observe().await
1815 }
1816 } => item.clone(),
1817 };
1818 let offset_id = physical_offset_id_generator.next();
1819 let request_payload = item.payload.into_request_payload(offset_id);
1820
1821 let send_result = outbound_stream
1822 .send(EphemeralStreamRequest {
1823 payload: Some(request_payload),
1824 })
1825 .await;
1826
1827 if let Err(err) = send_result {
1828 error!("Failed to send record: {}", err);
1829 let error = ZerobusError::StreamClosedError(tonic::Status::internal(
1830 "Failed to send record",
1831 ));
1832 let _ = server_error_tx.send(Some(error.clone()));
1833 return Err(error);
1834 }
1835 }
1836 })
1837 }
1838
1839 /// Fails all pending records by removing them from the landing zone and sending error to all pending acks promises.
1840 async fn fail_all_pending_records(
1841 landing_zone: RecordLandingZone,
1842 oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
1843 failed_records: Arc<RwLock<Vec<EncodedBatch>>>,
1844 error: &ZerobusError,
1845 callback_tx: &Option<tokio::sync::mpsc::UnboundedSender<CallbackMessage>>,
1846 ) {
1847 let mut failed_payloads = Vec::with_capacity(landing_zone.len());
1848 let records = landing_zone.remove_all();
1849 let mut map = oneshot_map.lock().await;
1850 let error_message = error.to_string();
1851 for record in records {
1852 failed_payloads.push(record.payload);
1853 if let Some(sender) = map.remove(&record.offset_id) {
1854 let _ = sender.send(Err(error.clone()));
1855 }
1856 if let Some(tx) = callback_tx {
1857 let _ = tx.send(CallbackMessage::Error(
1858 record.offset_id,
1859 error_message.clone(),
1860 ));
1861 }
1862 }
1863 *failed_records.write().await = failed_payloads;
1864 }
1865
1866 /// Internal method to wait for a specific offset to be acknowledged.
1867 /// Used by both `flush()` and `wait_for_offset()`.
1868 async fn wait_for_offset_internal(
1869 &self,
1870 offset_to_wait: OffsetId,
1871 operation_name: &str,
1872 ) -> ZerobusResult<()> {
1873 let wait_operation = async {
1874 let mut offset_receiver = self.logical_last_received_offset_id_tx.subscribe();
1875 let mut error_rx = self.server_error_rx.clone();
1876
1877 loop {
1878 let offset = *offset_receiver.borrow_and_update();
1879
1880 let stream_id = match self.stream_id.as_deref() {
1881 Some(stream_id) => stream_id,
1882 None => {
1883 error!("Stream ID is None during {}", operation_name.to_lowercase());
1884 "None"
1885 }
1886 };
1887 if let Some(offset) = offset {
1888 if offset >= offset_to_wait {
1889 info!(stream_id = %stream_id, "Stream is caught up to the given offset. {} completed.", operation_name);
1890 return Ok(());
1891 } else {
1892 info!(
1893 stream_id = %stream_id,
1894 "Stream is caught up to offset {}. Waiting for offset {}.",
1895 offset, offset_to_wait
1896 );
1897 }
1898 } else {
1899 info!(
1900 stream_id = %stream_id,
1901 "Stream is not caught up to any offset yet. Waiting for the first offset."
1902 );
1903 }
1904 if self.is_closed.load(Ordering::Relaxed) {
1905 // Re-check offset before failing, it might have been updated.
1906 let offset = *offset_receiver.borrow_and_update();
1907 if let Some(offset) = offset {
1908 if offset >= offset_to_wait {
1909 return Ok(());
1910 }
1911 }
1912 // The supervisor always sends the real error to server_error_tx
1913 // before setting is_closed=true, so check error_rx first to
1914 // return the actual error instead of a generic one.
1915 if let Some(server_error) = error_rx.borrow().clone() {
1916 return Err(server_error);
1917 }
1918 return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1919 format!("Stream closed during {}", operation_name.to_lowercase()),
1920 )));
1921 }
1922 // Race between offset updates and server errors.
1923 tokio::select! {
1924 result = offset_receiver.changed() => {
1925 // If offset_receiver channel is closed, break the loop.
1926 if result.is_err() {
1927 break;
1928 }
1929 // Loop continues to check new offset value.
1930 }
1931 _ = error_rx.changed() => {
1932 // Server error occurred, return it immediately if stream is closed.
1933 if let Some(server_error) = error_rx.borrow().clone() {
1934 if self.is_closed.load(Ordering::Relaxed) {
1935 // Re-check offset before failing, it might have been updated.
1936 let offset = *offset_receiver.borrow_and_update();
1937 if let Some(offset) = offset {
1938 if offset >= offset_to_wait {
1939 return Ok(());
1940 }
1941 }
1942 return Err(server_error);
1943 }
1944 }
1945 }
1946 }
1947 }
1948
1949 if let Some(server_error) = error_rx.borrow().clone() {
1950 if self.is_closed.load(Ordering::Relaxed) {
1951 return Err(server_error);
1952 }
1953 }
1954
1955 Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1956 format!("Stream closed during {}", operation_name.to_lowercase()),
1957 )))
1958 };
1959
1960 match tokio::time::timeout(
1961 Duration::from_millis(self.options.flush_timeout_ms),
1962 wait_operation,
1963 )
1964 .await
1965 {
1966 Ok(Ok(())) => Ok(()),
1967 Ok(Err(e)) => Err(e),
1968 Err(_) => {
1969 if let Some(stream_id) = self.stream_id.as_deref() {
1970 error!(stream_id = %stream_id, table_name = %self.table_properties.table_name, "{} timed out", operation_name);
1971 } else {
1972 error!(table_name = %self.table_properties.table_name, "{} timed out", operation_name);
1973 }
1974 Err(ZerobusError::StreamClosedError(
1975 tonic::Status::deadline_exceeded(format!("{} timed out", operation_name)),
1976 ))
1977 }
1978 }
1979 }
1980
1981 /// Flushes all currently pending records and waits for their acknowledgments.
1982 ///
1983 /// This method captures the current highest offset and waits until all records up to
1984 /// that offset have been acknowledged by the server. Records ingested during the flush
1985 /// operation are not included in this flush.
1986 ///
1987 /// # Returns
1988 ///
1989 /// `Ok(())` when all pending records at the time of the call have been acknowledged.
1990 ///
1991 /// # Errors
1992 ///
1993 /// * `StreamClosedError` - If the stream is closed or times out
1994 ///
1995 /// # Examples
1996 ///
1997 /// ```no_run
1998 /// # use databricks_zerobus_ingest_sdk::*;
1999 /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
2000 /// // Ingest many records
2001 /// for i in 0..1000 {
2002 /// let _offset = stream.ingest_record_offset(vec![i as u8]).await?;
2003 /// }
2004 ///
2005 /// // Wait for all to be acknowledged
2006 /// stream.flush().await?;
2007 /// println!("All 1000 records have been acknowledged");
2008 /// # Ok(())
2009 /// # }
2010 /// ```
2011 #[instrument(level = "debug", skip_all, fields(table_name = %self.table_properties.table_name))]
2012 pub async fn flush(&self) -> ZerobusResult<()> {
2013 let offset_to_wait = match self.logical_offset_id_generator.last() {
2014 Some(offset) => offset,
2015 None => return Ok(()), // Nothing to flush.
2016 };
2017 self.wait_for_offset_internal(offset_to_wait, "Flush").await
2018 }
2019
2020 /// Waits for server acknowledgment of a specific logical offset.
2021 ///
2022 /// This method blocks until the server has acknowledged the record or batch at the
2023 /// specified offset. Use this with offsets returned from `ingest_record_offset()` or
2024 /// `ingest_records_offset()` to explicitly control when to wait for acknowledgments.
2025 ///
2026 /// # Arguments
2027 ///
2028 /// * `offset` - The logical offset ID to wait for (returned from `ingest_record_offset()` or `ingest_records_offset()`)
2029 ///
2030 /// # Returns
2031 ///
2032 /// `Ok(())` when the record/batch at the specified offset has been acknowledged.
2033 ///
2034 /// # Errors
2035 ///
2036 /// * `StreamClosedError` - If the stream is closed or times out while waiting
2037 ///
2038 /// # Examples
2039 ///
2040 /// ```no_run
2041 /// # use databricks_zerobus_ingest_sdk::*;
2042 /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
2043 /// # let my_record = vec![1, 2, 3];
2044 /// // Ingest multiple records and collect their offsets
2045 /// let mut offsets = Vec::new();
2046 /// for i in 0..100 {
2047 /// let offset = stream.ingest_record_offset(vec![i as u8]).await?;
2048 /// offsets.push(offset);
2049 /// }
2050 ///
2051 /// // Wait for specific offsets
2052 /// for offset in offsets {
2053 /// stream.wait_for_offset(offset).await?;
2054 /// }
2055 /// println!("All records acknowledged");
2056 /// # Ok(())
2057 /// # }
2058 /// ```
2059 pub async fn wait_for_offset(&self, offset: OffsetId) -> ZerobusResult<()> {
2060 self.wait_for_offset_internal(offset, "Waiting for acknowledgement")
2061 .await
2062 }
2063
2064 /// Closes the stream gracefully after flushing all pending records.
2065 ///
2066 /// This method first calls `flush()` to ensure all pending records are acknowledged,
2067 /// then shuts down the stream and releases all resources. Always call this method
2068 /// when you're done with a stream to ensure data integrity.
2069 ///
2070 /// # Returns
2071 ///
2072 /// `Ok(())` if the stream was closed successfully after flushing all records.
2073 ///
2074 /// # Errors
2075 ///
2076 /// Returns any errors from the flush operation. If flush fails, some records
2077 /// may not have been acknowledged. Use `get_unacked_records()` to retrieve them.
2078 ///
2079 /// # Examples
2080 ///
2081 /// ```no_run
2082 /// # use databricks_zerobus_ingest_sdk::*;
2083 /// # async fn example(mut stream: ZerobusStream) -> Result<(), ZerobusError> {
2084 /// // After ingesting records...
2085 /// stream.close().await?;
2086 /// # Ok(())
2087 /// # }
2088 /// ```
2089 pub async fn close(&mut self) -> ZerobusResult<()> {
2090 if self.is_closed.load(Ordering::Relaxed) {
2091 return Ok(());
2092 }
2093 if let Some(stream_id) = self.stream_id.as_deref() {
2094 info!(stream_id = %stream_id, "Closing stream");
2095 } else {
2096 error!("Stream ID is None during closing");
2097 }
2098 let flush_result = self.flush().await;
2099 self.is_closed.store(true, Ordering::Relaxed);
2100 self.shutdown_all_tasks_gracefully().await;
2101 flush_result
2102 }
2103
2104 /// Gracefully shuts down the supervisor task.
2105 ///
2106 /// Signals cancellation and waits for the task to exit. If the timeout
2107 /// is provided and expires, forcefully aborts the task.
2108 async fn shutdown_all_tasks_gracefully(&mut self) {
2109 self.cancellation_token.cancel();
2110
2111 // Shutdown supervisor task.
2112 match tokio::time::timeout(
2113 Duration::from_secs(SHUTDOWN_TIMEOUT_SECS),
2114 &mut self.supervisor_task,
2115 )
2116 .await
2117 {
2118 Ok(_) => {
2119 debug!("Supervisor task exited gracefully");
2120 }
2121 Err(_) => {
2122 warn!("Supervisor task did not exit within timeout, aborting");
2123 self.supervisor_task.abort();
2124 }
2125 }
2126 // Shutdown callback handler task, if there are any callbacks.
2127 if let Some(mut task) = self.callback_handler_task.take() {
2128 if let Some(callback_max_wait_time_ms) = self.options.callback_max_wait_time_ms {
2129 match tokio::time::timeout(
2130 Duration::from_millis(callback_max_wait_time_ms),
2131 &mut task,
2132 )
2133 .await
2134 {
2135 Ok(_) => {
2136 debug!("Callback handler task exited gracefully");
2137 }
2138 Err(_) => {
2139 debug!("Callback handler task did not exit within timeout, aborting");
2140 task.abort();
2141 }
2142 }
2143 } else {
2144 debug!("Callback max wait time is not set, waiting indefinitely");
2145 let _ = (&mut task).await;
2146 }
2147 }
2148 }
2149
2150 /// Returns all records that were ingested but not acknowledged by the server.
2151 ///
2152 /// This method should only be called after a stream has failed or been closed.
2153 /// It's useful for implementing custom retry logic or persisting failed records.
2154 ///
2155 /// **Note:** This method flattens all unacknowledged records into a single iterator,
2156 /// losing the original batch grouping.
2157 /// If you want to preserve the batch grouping, use `ZerobusStream::get_unacked_batches()` instead.
2158 /// If you want to re-ingest unacknowledged records while preserving their batch
2159 /// structure, use `ZerobusSdk::recreate_stream()` instead.
2160 ///
2161 ///
2162 /// # Returns
2163 ///
2164 /// An iterator over individual `EncodedRecord` items. All unacknowledged records are
2165 /// flattened into a single sequence, regardless of how they were originally ingested
2166 /// (via `ingest_record()` or `ingest_records()`).
2167 ///
2168 /// # Errors
2169 ///
2170 /// * `InvalidStateError` - If called on an active (not closed) stream
2171 ///
2172 /// # Examples
2173 ///
2174 /// ```no_run
2175 /// # use databricks_zerobus_ingest_sdk::*;
2176 /// # async fn example(sdk: ZerobusSdk, mut stream: ZerobusStream) -> Result<(), ZerobusError> {
2177 /// match stream.close().await {
2178 /// Err(e) => {
2179 /// // Stream failed, get unacked records
2180 /// let unacked = stream.get_unacked_records().await?;
2181 /// let total_records = unacked.into_iter().count();
2182 /// println!("Failed to acknowledge {} records", total_records);
2183 ///
2184 /// // For re-ingestion with preserved batch structure, use recreate_stream
2185 /// let new_stream = sdk.recreate_stream(&stream).await?;
2186 /// }
2187 /// Ok(_) => println!("All records acknowledged"),
2188 /// }
2189 /// # Ok(())
2190 /// # }
2191 /// ```
2192 pub async fn get_unacked_records(&self) -> ZerobusResult<impl Iterator<Item = EncodedRecord>> {
2193 Ok(self
2194 .get_unacked_batches()
2195 .await?
2196 .into_iter()
2197 .flat_map(|batch| batch.into_iter()))
2198 }
2199
2200 /// Returns all records that were ingested but not acknowledged by the server, grouped by batch.
2201 ///
2202 /// This method should only be called after a stream has failed or been closed.
2203 /// It's useful for implementing custom retry logic or persisting failed records.
2204 ///
2205 /// **Note:** This method returns the unacknowledged records as a vector of `EncodedBatch` items,
2206 /// where each batch corresponds to how records were ingested:
2207 /// - Each `ingest_record()` call creates a single batch containing one record
2208 /// - Each `ingest_records()` call creates a single batch containing multiple records
2209 ///
2210 /// For alternatives, see `ZerobusStream::get_unacked_records()` and `ZerobusSdk::recreate_stream()`.
2211 ///
2212 /// # Returns
2213 ///
2214 /// A vector of `EncodedBatch` items. Records are grouped by their original ingestion call.
2215 pub async fn get_unacked_batches(&self) -> ZerobusResult<Vec<EncodedBatch>> {
2216 if self.is_closed.load(Ordering::Relaxed) {
2217 let failed = self.failed_records.read().await.clone();
2218 return Ok(failed);
2219 }
2220 if let Some(stream_id) = self.stream_id.as_deref() {
2221 error!(stream_id = %stream_id, "Cannot get unacked records from an active stream. Stream must be closed first.");
2222 } else {
2223 error!(
2224 "Cannot get unacked records from an active stream. Stream must be closed first."
2225 );
2226 }
2227 Err(ZerobusError::InvalidStateError(
2228 "Cannot get unacked records from an active stream. Stream must be closed first."
2229 .to_string(),
2230 ))
2231 }
2232}
2233
2234impl Drop for ZerobusStream {
2235 fn drop(&mut self) {
2236 self.is_closed.store(true, Ordering::Relaxed);
2237 self.cancellation_token.cancel();
2238 self.supervisor_task.abort();
2239 if let Some(callback_handler_task) = self.callback_handler_task.take() {
2240 callback_handler_task.abort();
2241 }
2242 }
2243}