Skip to main content

databricks_zerobus_ingest_sdk/
lib.rs

1//! # Databricks Zerobus Ingest SDK
2//!
3//! A high-performance Rust client for streaming data ingestion into Databricks Delta tables.
4//!
5//! ## Quick Start
6//!
7//! ```rust,ignore
8//! use databricks_zerobus_ingest_sdk::{ZerobusSdk, TableProperties, ProtoMessage};
9//!
10//! let sdk = ZerobusSdk::builder()
11//!     .endpoint(zerobus_endpoint)
12//!     .unity_catalog_url(uc_endpoint)
13//!     .build()?;
14//! let stream = sdk.create_stream(table_properties, client_id, client_secret, None).await?;
15//!
16//! // Ingest a record and wait for acknowledgment
17//! let offset = stream.ingest_record_offset(ProtoMessage(my_message)).await?;
18//! stream.wait_for_offset(offset).await?;
19//!
20//! stream.close().await?;
21//! ```
22//!
23//! See the `examples/` directory for complete working examples.
24
25pub mod databricks {
26    pub mod zerobus {
27        include!(concat!(env!("OUT_DIR"), "/databricks.zerobus.rs"));
28    }
29}
30
31#[cfg(feature = "arrow-flight")]
32mod arrow_configuration;
33#[cfg(feature = "arrow-flight")]
34mod arrow_metadata;
35#[cfg(feature = "arrow-flight")]
36mod arrow_stream;
37mod builder;
38mod callbacks;
39mod default_token_factory;
40mod errors;
41mod headers_provider;
42mod landing_zone;
43mod offset_generator;
44mod proxy;
45mod record_types;
46mod stream_configuration;
47mod stream_options;
48mod tls_config;
49
50use std::collections::HashMap;
51use std::fmt::Debug;
52use std::future::Future;
53use std::sync::atomic::{AtomicBool, Ordering};
54use std::sync::Arc;
55
56use prost::Message;
57use tokio::sync::RwLock;
58use tokio::time::Duration;
59use tokio_retry::strategy::FixedInterval;
60use tokio_retry::RetryIf;
61use tokio_stream::wrappers::ReceiverStream;
62use tokio_util::sync::CancellationToken;
63use tonic::metadata::MetadataValue;
64use tonic::transport::{Channel, Endpoint};
65use tracing::{debug, error, info, instrument, span, warn, Level};
66
67use databricks::zerobus::ephemeral_stream_request::Payload as RequestPayload;
68use databricks::zerobus::ephemeral_stream_response::Payload as ResponsePayload;
69use databricks::zerobus::zerobus_client::ZerobusClient;
70use databricks::zerobus::{
71    CloseStreamSignal, CreateIngestStreamRequest, EphemeralStreamRequest, EphemeralStreamResponse,
72    IngestRecordResponse, RecordType,
73};
74use landing_zone::LandingZone;
75
76/// **Experimental/Unsupported**: Arrow Flight ingestion is experimental and not yet
77/// supported for production use. The API may change in future releases.
78#[cfg(feature = "arrow-flight")]
79pub use arrow_configuration::ArrowStreamConfigurationOptions;
80#[cfg(feature = "arrow-flight")]
81pub use arrow_stream::{
82    ArrowSchema, ArrowTableProperties, DataType, Field, RecordBatch, ZerobusArrowStream,
83};
84pub use builder::ZerobusSdkBuilder;
85pub use callbacks::AckCallback;
86pub use default_token_factory::DefaultTokenFactory;
87pub use errors::ZerobusError;
88pub use headers_provider::{HeadersProvider, OAuthHeadersProvider, DEFAULT_X_ZEROBUS_SDK};
89pub use offset_generator::{OffsetId, OffsetIdGenerator};
90pub use record_types::{
91    EncodedBatch, EncodedBatchIter, EncodedRecord, JsonEncodedRecord, JsonString, JsonValue,
92    ProtoBytes, ProtoEncodedRecord, ProtoMessage,
93};
94pub use stream_configuration::StreamConfigurationOptions;
95#[cfg(feature = "testing")]
96pub use tls_config::NoTlsConfig;
97pub use tls_config::{SecureTlsConfig, TlsConfig};
98
99const SHUTDOWN_TIMEOUT_SECS: u64 = 2;
100
101/// The type of the stream connection created with the server.
102/// Currently we only support ephemeral streams on the server side, so we support only that in the SDK as well.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum StreamType {
105    /// Ephemeral streams exist only for the duration of the connection.
106    /// They are not persisted and are not recoverable.
107    Ephemeral,
108    /// UNSUPPORTED: Persistent streams are durable and recoverable.
109    Persistent,
110}
111
112/// The properties of the table to ingest to.
113///
114/// Used when creating streams via `ZerobusSdk::create_stream()` to specify
115/// which table to write to and the schema of records being ingested.
116///
117/// # Common errors:
118/// -`InvalidTableName`: table_name contains invalid characters or doesn't exist
119/// -`PermissionDenied`: insufficient permissions to write to the specified table
120/// -`InvalidArgument`: invalid or missing descriptor_proto or auth token
121#[derive(Debug, Clone)]
122pub struct TableProperties {
123    pub table_name: String,
124    pub descriptor_proto: Option<prost_types::DescriptorProto>,
125}
126
127pub type ZerobusResult<T> = Result<T, ZerobusError>;
128
129#[derive(Debug, Clone)]
130struct IngestRequest {
131    payload: EncodedBatch,
132    offset_id: OffsetId,
133}
134
135/// Map of logical offset to oneshot sender used to send acknowledgments back to the client.
136type OneshotMap = HashMap<OffsetId, tokio::sync::oneshot::Sender<ZerobusResult<OffsetId>>>;
137/// Landing zone for ingest records.
138type RecordLandingZone = Arc<LandingZone<Box<IngestRequest>>>;
139
140/// Messages sent to the callback handler task.
141#[derive(Debug, Clone)]
142enum CallbackMessage {
143    /// Acknowledgment callback with logical offset ID.
144    Ack(OffsetId),
145    /// Error callback with logical offset ID and error message.
146    Error(OffsetId, String),
147}
148
149/// Represents an active ingestion stream to a Databricks Delta table.
150///
151/// A `ZerobusStream` manages a bidirectional gRPC stream for ingesting records into
152/// a Unity Catalog table. It handles authentication, automatic recovery, acknowledgment
153/// tracking, and graceful shutdown.
154///
155/// # Lifecycle
156///
157/// 1. Create a stream via `ZerobusSdk::create_stream()`
158/// 2. Ingest records with `ingest_record()` and await acknowledgments
159/// 3. Optionally call `flush()` to ensure all records are persisted
160/// 4. Close the stream with `close()` to release resources
161///
162/// # Examples
163///
164/// ```no_run
165/// # use databricks_zerobus_ingest_sdk::*;
166/// # async fn example(mut stream: ZerobusStream, data: Vec<u8>) -> Result<(), ZerobusError> {
167/// // Ingest a single record
168/// let offset = stream.ingest_record_offset(data).await?;
169/// println!("Record sent with offset: {}", offset);
170///
171/// // Wait for acknowledgment
172/// stream.wait_for_offset(offset).await?;
173/// println!("Record acknowledged at offset: {}", offset);
174///
175/// // Close the stream gracefully
176/// stream.close().await?;
177/// # Ok(())
178/// # }
179/// ```
180pub struct ZerobusStream {
181    /// This is a 128-bit UUID that is unique across all streams in the system,
182    /// not just within a single table. The server returns this ID in the CreateStreamResponse
183    /// after validating the table properties and establishing the gRPC connection.
184    stream_id: Option<String>,
185    /// Type of gRPC stream that is used when sending records.
186    pub stream_type: StreamType,
187    /// Gets headers which are used in the first request to establish connection with the server.
188    pub headers_provider: Arc<dyn HeadersProvider>,
189    /// The stream configuration options related to recovery, fetching OAuth tokens, etc.
190    pub options: StreamConfigurationOptions,
191    /// The table properties - table name and descriptor of the table.
192    pub table_properties: TableProperties,
193    /// Logical landing zone that is used to store records that have been sent by user but not yet sent over the network.
194    landing_zone: RecordLandingZone,
195    /// Map of logical offset to oneshot sender.
196    oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
197    /// Supervisor task that manages the stream lifecycle such as stream creation, recovery, etc.
198    /// It orchestrates the receiver and sender tasks.
199    supervisor_task: tokio::task::JoinHandle<Result<(), ZerobusError>>,
200    /// The generator of logical offset IDs. Used to generate monotonically increasing offset IDs, even if the stream recovers.
201    logical_offset_id_generator: OffsetIdGenerator,
202    /// Signal that the stream is caught up to the given offset.
203    logical_last_received_offset_id_tx: tokio::sync::watch::Sender<Option<OffsetId>>,
204    /// Persistent offset ID receiver to ensure at least one receiver exists, preventing SendError
205    _logical_last_received_offset_id_rx: tokio::sync::watch::Receiver<Option<OffsetId>>,
206    /// A vector of records that have failed to be acknowledged.
207    failed_records: Arc<RwLock<Vec<EncodedBatch>>>,
208    /// Flag indicating if the stream has been closed.
209    is_closed: Arc<AtomicBool>,
210    /// Sync mutex to ensure that offset generation and record ingestion happen atomically.
211    sync_mutex: Arc<tokio::sync::Mutex<()>>,
212    /// Watch channel for last error received from the server.
213    server_error_rx: tokio::sync::watch::Receiver<Option<ZerobusError>>,
214    /// Cancellation token to signal receiver and sender tasks to abort. It is sent either when stream is closed or dropped.
215    cancellation_token: CancellationToken,
216    /// Callback handler task that executes callbacks in a separate thread.
217    callback_handler_task: Option<tokio::task::JoinHandle<()>>,
218}
219
220/// The main interface for interacting with the Zerobus API.
221/// # Examples
222/// ```no_run
223/// # use std::error::Error;
224/// # use std::sync::Arc;
225/// # use databricks_zerobus_ingest_sdk::{ZerobusSdk, StreamConfigurationOptions, TableProperties, ZerobusError, ZerobusResult};
226/// #
227/// # async fn write_single_row(row: impl prost::Message) -> Result<(), ZerobusError> {
228///
229/// // Create SDK using the builder
230/// let sdk = ZerobusSdk::builder()
231///     .endpoint("https://your-workspace.zerobus.region.cloud.databricks.com")
232///     .unity_catalog_url("https://your-workspace.cloud.databricks.com")
233///     .build()?;
234///
235/// // Define the arguments for the ephemeral stream.
236/// let table_properties = TableProperties {
237///     table_name: "test_table".to_string(),
238///     descriptor_proto: Default::default(),
239/// };
240/// let options = StreamConfigurationOptions {
241///     max_inflight_requests: 100,
242///     ..Default::default()
243/// };
244/// let client_id = "your-client-id".to_string();
245/// let client_secret = "your-client-secret".to_string();
246///
247/// // Create a stream
248/// let stream = sdk.create_stream(table_properties, client_id, client_secret, Some(options)).await?;
249///
250/// // Ingest a single record
251/// let offset_id = stream.ingest_record_offset(row.encode_to_vec()).await?;
252/// println!("Record sent with offset Id: {}", offset_id);
253///
254/// // Wait for acknowledgment
255/// stream.wait_for_offset(offset_id).await?;
256/// println!("Record acknowledged with offset Id: {}", offset_id);
257/// # Ok(())
258/// # }
259/// ```
260pub struct ZerobusSdk {
261    pub zerobus_endpoint: String,
262    /// Deprecated: This field is no longer used. TLS is now controlled via `tls_config`.
263    #[deprecated(
264        since = "0.5.0",
265        note = "This field is no longer used. TLS is controlled via tls_config."
266    )]
267    pub use_tls: bool,
268    pub unity_catalog_url: String,
269    shared_channel: tokio::sync::Mutex<Option<ZerobusClient<Channel>>>,
270    workspace_id: String,
271    tls_config: Arc<dyn TlsConfig>,
272}
273
274impl ZerobusSdk {
275    /// Creates a new SDK builder for fluent configuration.
276    ///
277    /// This is the recommended way to create a `ZerobusSdk` instance.
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// use databricks_zerobus_ingest_sdk::ZerobusSdk;
283    ///
284    /// let sdk = ZerobusSdk::builder()
285    ///     .endpoint("https://workspace.zerobus.databricks.com")
286    ///     .unity_catalog_url("https://workspace.cloud.databricks.com")
287    ///     .build()?;
288    /// # Ok::<(), databricks_zerobus_ingest_sdk::ZerobusError>(())
289    /// ```
290    pub fn builder() -> ZerobusSdkBuilder {
291        ZerobusSdkBuilder::new()
292    }
293
294    /// Creates a new Zerobus SDK instance.
295    ///
296    /// # Deprecated
297    ///
298    /// Use [`ZerobusSdk::builder()`] instead for more flexible configuration:
299    ///
300    /// ```no_run
301    /// use databricks_zerobus_ingest_sdk::ZerobusSdk;
302    ///
303    /// let sdk = ZerobusSdk::builder()
304    ///     .endpoint("https://workspace.zerobus.databricks.com")
305    ///     .unity_catalog_url("https://workspace.cloud.databricks.com")
306    ///     .build()?;
307    /// # Ok::<(), databricks_zerobus_ingest_sdk::ZerobusError>(())
308    /// ```
309    ///
310    /// # Arguments
311    ///
312    /// * `zerobus_endpoint` - The Zerobus API endpoint URL (e.g., "https://workspace-id.cloud.databricks.com")
313    /// * `unity_catalog_url` - The Unity Catalog endpoint URL (e.g., "https://workspace.cloud.databricks.com")
314    ///
315    /// # Returns
316    ///
317    /// A new `ZerobusSdk` instance configured to use TLS.
318    ///
319    /// # Errors
320    ///
321    /// * `ChannelCreationError` - If the workspace ID cannot be extracted from the Zerobus endpoint
322    #[deprecated(since = "0.5.0", note = "Use ZerobusSdk::builder() instead")]
323    #[allow(clippy::result_large_err)]
324    pub fn new(zerobus_endpoint: String, unity_catalog_url: String) -> ZerobusResult<Self> {
325        let zerobus_endpoint = if !zerobus_endpoint.starts_with("https://")
326            && !zerobus_endpoint.starts_with("http://")
327        {
328            format!("https://{}", zerobus_endpoint)
329        } else {
330            zerobus_endpoint
331        };
332
333        let workspace_id = zerobus_endpoint
334            .strip_prefix("https://")
335            .or_else(|| zerobus_endpoint.strip_prefix("http://"))
336            .and_then(|s| s.split('.').next())
337            .map(|s| s.to_string())
338            .ok_or_else(|| {
339                ZerobusError::InvalidArgument(
340                    "Failed to extract workspace ID from zerobus_endpoint".to_string(),
341                )
342            })?;
343
344        #[allow(deprecated)]
345        Ok(ZerobusSdk {
346            zerobus_endpoint,
347            use_tls: true,
348            unity_catalog_url,
349            workspace_id,
350            shared_channel: tokio::sync::Mutex::new(None),
351            tls_config: Arc::new(SecureTlsConfig::new()),
352        })
353    }
354
355    /// Creates a new SDK instance with explicit configuration.
356    ///
357    /// This is used internally by the builder pattern.
358    pub(crate) fn new_with_config(
359        zerobus_endpoint: String,
360        unity_catalog_url: String,
361        workspace_id: String,
362        tls_config: Arc<dyn TlsConfig>,
363    ) -> Self {
364        #[allow(deprecated)]
365        ZerobusSdk {
366            zerobus_endpoint,
367            use_tls: true,
368            unity_catalog_url,
369            workspace_id,
370            shared_channel: tokio::sync::Mutex::new(None),
371            tls_config,
372        }
373    }
374
375    /// Creates a new ingestion stream to a Unity Catalog table.
376    ///
377    /// This establishes a bidirectional gRPC stream for ingesting records. Authentication
378    /// is handled automatically using the provided OAuth credentials.
379    ///
380    /// # Arguments
381    ///
382    /// * `table_properties` - Table name and protobuf descriptor
383    /// * `client_id` - OAuth client ID for authentication
384    /// * `client_secret` - OAuth client secret for authentication
385    /// * `options` - Optional stream configuration (uses defaults if `None`)
386    ///
387    /// # Returns
388    ///
389    /// A `ZerobusStream` ready for ingesting records.
390    ///
391    /// # Errors
392    ///
393    /// * `CreateStreamError` - If stream creation fails
394    /// * `InvalidTableName` - If the table name is invalid or table doesn't exist
395    /// * `InvalidUCTokenError` - If OAuth authentication fails
396    /// * `PermissionDenied` - If credentials lack required permissions
397    ///
398    /// # Examples
399    ///
400    /// ```no_run
401    /// # use databricks_zerobus_ingest_sdk::*;
402    /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
403    /// let table_props = TableProperties {
404    ///     table_name: "catalog.schema.table".to_string(),
405    ///     descriptor_proto: Default::default(), // Load from generated files
406    /// };
407    ///
408    /// let stream = sdk.create_stream(
409    ///     table_props,
410    ///     "client-id".to_string(),
411    ///     "client-secret".to_string(),
412    ///     None,
413    /// ).await?;
414    /// # Ok(())
415    /// # }
416    /// ```
417    #[instrument(level = "debug", skip_all)]
418    pub async fn create_stream(
419        &self,
420        table_properties: TableProperties,
421        client_id: String,
422        client_secret: String,
423        options: Option<StreamConfigurationOptions>,
424    ) -> ZerobusResult<ZerobusStream> {
425        let headers_provider = OAuthHeadersProvider::new(
426            client_id,
427            client_secret,
428            table_properties.table_name.clone(),
429            self.workspace_id.clone(),
430            self.unity_catalog_url.clone(),
431        );
432        self.create_stream_with_headers_provider(
433            table_properties,
434            Arc::new(headers_provider),
435            options,
436        )
437        .await
438    }
439
440    /// Creates a new ingestion stream with a custom headers provider.
441    ///
442    /// This is an advanced method that allows you to implement your own authentication
443    /// logic by providing a custom implementation of the `HeadersProvider` trait.
444    ///
445    /// # Arguments
446    ///
447    /// * `table_properties` - Table name and protobuf descriptor
448    /// * `headers_provider` - An `Arc` holding your custom `HeadersProvider` implementation
449    /// * `options` - Optional stream configuration (uses defaults if `None`)
450    ///
451    /// # Returns
452    ///
453    /// A `ZerobusStream` ready for ingesting records.
454    ///
455    /// # Examples
456    ///
457    /// ```no_run
458    /// # use databricks_zerobus_ingest_sdk::*;
459    /// # use std::collections::HashMap;
460    /// # use std::sync::Arc;
461    /// # use async_trait::async_trait;
462    /// #
463    /// # struct MyHeadersProvider;
464    /// #
465    /// # #[async_trait]
466    /// # impl HeadersProvider for MyHeadersProvider {
467    /// #     async fn get_headers(&self) -> ZerobusResult<HashMap<&'static str, String>> {
468    /// #         let mut headers = HashMap::new();
469    /// #         headers.insert("some_key", "some_value".to_string());
470    /// #         Ok(headers)
471    /// #     }
472    /// # }
473    /// #
474    /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
475    /// let table_props = TableProperties {
476    ///     table_name: "catalog.schema.table".to_string(),
477    ///     descriptor_proto: Default::default(),
478    /// };
479    ///
480    /// let headers_provider = Arc::new(MyHeadersProvider);
481    ///
482    /// let stream = sdk.create_stream_with_headers_provider(
483    ///     table_props,
484    ///     headers_provider,
485    ///     None,
486    /// ).await?;
487    /// # Ok(())
488    /// # }
489    /// ```
490    #[instrument(level = "debug", skip_all)]
491    pub async fn create_stream_with_headers_provider(
492        &self,
493        table_properties: TableProperties,
494        headers_provider: Arc<dyn HeadersProvider>,
495        options: Option<StreamConfigurationOptions>,
496    ) -> ZerobusResult<ZerobusStream> {
497        let options = options.unwrap_or_default();
498
499        match options.record_type {
500            RecordType::Proto => {
501                if table_properties.descriptor_proto.is_none() {
502                    return Err(ZerobusError::InvalidArgument(
503                        "Proto descriptor is required for Proto record type".to_string(),
504                    ));
505                }
506            }
507            RecordType::Json => {
508                if table_properties.descriptor_proto.is_some() {
509                    warn!("JSON descriptor is not supported for Proto record type");
510                }
511            }
512            RecordType::Unspecified => {
513                return Err(ZerobusError::InvalidArgument(
514                    "Record type is not specified".to_string(),
515                ));
516            }
517        }
518
519        let channel = self.get_or_create_channel_zerobus_client().await?;
520        let stream = ZerobusStream::new_stream(
521            channel,
522            table_properties,
523            Arc::clone(&headers_provider),
524            options,
525        )
526        .await;
527        match stream {
528            Ok(stream) => {
529                if let Some(stream_id) = stream.stream_id.as_ref() {
530                    info!(stream_id = %stream_id, "Successfully created new ephemeral stream");
531                } else {
532                    error!("Successfully created a stream but stream_id is None");
533                }
534                return Ok(stream);
535            }
536            Err(e) => {
537                error!("Stream initialization failed with error: {}", e);
538                return Err(e);
539            }
540        }
541    }
542
543    /// Recreates a failed stream and re-ingests unacknowledged records.
544    ///
545    /// This is useful when a stream encounters an error and you want to preserve
546    /// unacknowledged records. The method creates a new stream with the same
547    /// configuration and automatically re-ingests all records that weren't acknowledged.
548    ///
549    /// # Arguments
550    ///
551    /// * `stream` - The failed stream to recreate
552    ///
553    /// # Returns
554    ///
555    /// A new `ZerobusStream` with unacknowledged records already submitted.
556    ///
557    /// # Errors
558    ///
559    /// Returns any errors from stream creation or re-ingestion.
560    ///
561    /// # Examples
562    ///
563    /// ```no_run
564    /// # use databricks_zerobus_ingest_sdk::*;
565    /// # async fn example(sdk: ZerobusSdk, mut stream: ZerobusStream) -> Result<(), ZerobusError> {
566    /// match stream.close().await {
567    ///     Err(_) => {
568    ///         // Stream failed, recreate it
569    ///         let new_stream = sdk.recreate_stream(&stream).await?;
570    ///         // Continue using new_stream
571    ///     }
572    ///     Ok(_) => println!("Stream closed successfully"),
573    /// }
574    /// # Ok(())
575    /// # }
576    /// ```
577    #[instrument(level = "debug", skip_all)]
578    pub async fn recreate_stream(&self, stream: &ZerobusStream) -> ZerobusResult<ZerobusStream> {
579        let batches = stream.get_unacked_batches().await?;
580        let new_stream = self
581            .create_stream_with_headers_provider(
582                stream.table_properties.clone(),
583                Arc::clone(&stream.headers_provider),
584                Some(stream.options.clone()),
585            )
586            .await?;
587        for batch in batches {
588            let ack = new_stream.ingest_internal(batch).await?;
589            tokio::spawn(ack);
590        }
591        return Ok(new_stream);
592    }
593
594    /// Creates a new Arrow Flight ingestion stream to a Unity Catalog table.
595    ///
596    /// This establishes an Arrow Flight stream for high-performance ingestion of
597    /// Arrow RecordBatches. Authentication is handled automatically using the
598    /// provided OAuth credentials.
599    ///
600    /// # Arguments
601    ///
602    /// * `table_properties` - Table name and Arrow schema
603    /// * `client_id` - OAuth client ID for authentication
604    /// * `client_secret` - OAuth client secret for authentication
605    /// * `options` - Optional Arrow stream configuration (uses defaults if `None`)
606    ///
607    /// # Returns
608    ///
609    /// A `ZerobusArrowStream` ready for ingesting Arrow RecordBatches.
610    ///
611    /// # Errors
612    ///
613    /// * `CreateStreamError` - If stream creation fails
614    /// * `InvalidTableName` - If the table name is invalid or table doesn't exist
615    /// * `InvalidUCTokenError` - If OAuth authentication fails
616    /// * `PermissionDenied` - If credentials lack required permissions
617    ///
618    /// # Examples
619    ///
620    /// ```no_run
621    /// # use databricks_zerobus_ingest_sdk::*;
622    /// # use std::sync::Arc;
623    /// # use arrow_schema::{Schema as ArrowSchema, Field, DataType};
624    /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
625    /// let schema = Arc::new(ArrowSchema::new(vec![
626    ///     Field::new("id", DataType::Int32, false),
627    ///     Field::new("name", DataType::Utf8, true),
628    /// ]));
629    ///
630    /// let table_props = ArrowTableProperties {
631    ///     table_name: "catalog.schema.table".to_string(),
632    ///     schema,
633    /// };
634    ///
635    /// let stream = sdk.create_arrow_stream(
636    ///     table_props,
637    ///     "client-id".to_string(),
638    ///     "client-secret".to_string(),
639    ///     None,
640    /// ).await?;
641    /// # Ok(())
642    /// # }
643    /// ```
644    #[cfg(feature = "arrow-flight")]
645    #[instrument(level = "debug", skip_all)]
646    pub async fn create_arrow_stream(
647        &self,
648        table_properties: ArrowTableProperties,
649        client_id: String,
650        client_secret: String,
651        options: Option<ArrowStreamConfigurationOptions>,
652    ) -> ZerobusResult<ZerobusArrowStream> {
653        let headers_provider = OAuthHeadersProvider::new(
654            client_id,
655            client_secret,
656            table_properties.table_name.clone(),
657            self.workspace_id.clone(),
658            self.unity_catalog_url.clone(),
659        );
660        self.create_arrow_stream_with_headers_provider(
661            table_properties,
662            Arc::new(headers_provider),
663            options,
664        )
665        .await
666    }
667
668    /// Creates a new Arrow Flight stream with a custom headers provider.
669    ///
670    /// This is an advanced method that allows you to implement your own authentication
671    /// logic by providing a custom implementation of the `HeadersProvider` trait.
672    ///
673    /// # Arguments
674    ///
675    /// * `table_properties` - Table name and Arrow schema
676    /// * `headers_provider` - An `Arc` holding your custom `HeadersProvider` implementation
677    /// * `options` - Optional Arrow stream configuration (uses defaults if `None`)
678    ///
679    /// # Returns
680    ///
681    /// A `ZerobusArrowStream` ready for ingesting Arrow RecordBatches.
682    ///
683    /// # Examples
684    ///
685    /// ```no_run
686    /// # use databricks_zerobus_ingest_sdk::*;
687    /// # use std::collections::HashMap;
688    /// # use std::sync::Arc;
689    /// # use async_trait::async_trait;
690    /// # use arrow_schema::{Schema as ArrowSchema, Field, DataType};
691    /// #
692    /// # struct MyHeadersProvider;
693    /// #
694    /// # #[async_trait]
695    /// # impl HeadersProvider for MyHeadersProvider {
696    /// #     async fn get_headers(&self) -> ZerobusResult<HashMap<&'static str, String>> {
697    /// #         let mut headers = HashMap::new();
698    /// #         headers.insert("authorization", "Bearer my-token".to_string());
699    /// #         Ok(headers)
700    /// #     }
701    /// # }
702    /// #
703    /// # async fn example(sdk: ZerobusSdk) -> Result<(), ZerobusError> {
704    /// let schema = Arc::new(ArrowSchema::new(vec![
705    ///     Field::new("id", DataType::Int32, false),
706    /// ]));
707    ///
708    /// let table_props = ArrowTableProperties {
709    ///     table_name: "catalog.schema.table".to_string(),
710    ///     schema,
711    /// };
712    ///
713    /// let headers_provider = Arc::new(MyHeadersProvider);
714    ///
715    /// let stream = sdk.create_arrow_stream_with_headers_provider(
716    ///     table_props,
717    ///     headers_provider,
718    ///     None,
719    /// ).await?;
720    /// # Ok(())
721    /// # }
722    /// ```
723    #[cfg(feature = "arrow-flight")]
724    #[instrument(level = "debug", skip_all)]
725    pub async fn create_arrow_stream_with_headers_provider(
726        &self,
727        table_properties: ArrowTableProperties,
728        headers_provider: Arc<dyn HeadersProvider>,
729        options: Option<ArrowStreamConfigurationOptions>,
730    ) -> ZerobusResult<ZerobusArrowStream> {
731        let options = options.unwrap_or_default();
732
733        let stream = ZerobusArrowStream::new(
734            &self.zerobus_endpoint,
735            Arc::clone(&self.tls_config),
736            table_properties,
737            headers_provider,
738            options,
739        )
740        .await;
741
742        match stream {
743            Ok(stream) => {
744                info!(
745                    table_name = %stream.table_name(),
746                    "Successfully created new Arrow Flight stream"
747                );
748                Ok(stream)
749            }
750            Err(e) => {
751                error!("Arrow Flight stream initialization failed: {}", e);
752                Err(e)
753            }
754        }
755    }
756
757    /// Recreates an Arrow Flight stream from a failed or closed stream, replaying any
758    /// unacknowledged batches.
759    ///
760    /// This method is useful when you want to manually recover from a stream failure
761    /// or continue ingestion after closing a stream with unacknowledged batches.
762    /// It creates a new stream with the same configuration and automatically ingests
763    /// any batches that were not acknowledged in the original stream.
764    ///
765    /// # Arguments
766    ///
767    /// * `stream` - A reference to the failed or closed Arrow Flight stream
768    ///
769    /// # Returns
770    ///
771    /// A new `ZerobusArrowStream` with the same configuration, with unacked batches
772    /// already queued for ingestion.
773    ///
774    /// # Errors
775    ///
776    /// * `InvalidStateError` - If the source stream is still active
777    /// * `CreateStreamError` - If stream creation fails
778    ///
779    /// # Examples
780    ///
781    /// ```no_run
782    /// # use databricks_zerobus_ingest_sdk::*;
783    /// # use arrow_array::RecordBatch;
784    /// # async fn example(sdk: ZerobusSdk, mut stream: ZerobusArrowStream) -> Result<(), ZerobusError> {
785    /// // Ingest some batches
786    /// // ...
787    ///
788    /// // Stream fails for some reason
789    /// match stream.flush().await {
790    ///     Err(_) => {
791    ///         // Close the failed stream
792    ///         stream.close().await.ok();
793    ///
794    ///         // Recreate and retry
795    ///         let new_stream = sdk.recreate_arrow_stream(&stream).await?;
796    ///         new_stream.flush().await?;
797    ///     }
798    ///     Ok(_) => {}
799    /// }
800    /// # Ok(())
801    /// # }
802    /// ```
803    #[cfg(feature = "arrow-flight")]
804    #[instrument(level = "debug", skip_all)]
805    pub async fn recreate_arrow_stream(
806        &self,
807        stream: &ZerobusArrowStream,
808    ) -> ZerobusResult<ZerobusArrowStream> {
809        let batches = stream.get_unacked_batches().await?;
810
811        let new_stream = self
812            .create_arrow_stream_with_headers_provider(
813                stream.table_properties().clone(),
814                stream.headers_provider(),
815                Some(stream.options().clone()),
816            )
817            .await?;
818
819        // Replay unacked batches.
820        for batch in batches {
821            let _offset = new_stream.ingest_batch(batch).await?;
822        }
823
824        info!(
825            table_name = %new_stream.table_name(),
826            "Successfully recreated Arrow Flight stream"
827        );
828
829        Ok(new_stream)
830    }
831
832    /// Gets or creates the shared Channel for all streams.
833    /// The first call creates the Channel, subsequent calls clone it.
834    /// All clones share the same underlying TCP connection via HTTP/2 multiplexing.
835    async fn get_or_create_channel_zerobus_client(&self) -> ZerobusResult<ZerobusClient<Channel>> {
836        let mut guard = self.shared_channel.lock().await;
837
838        if guard.is_none() {
839            // Create the channel for the first time.
840            let endpoint = Endpoint::from_shared(self.zerobus_endpoint.clone())
841                .map_err(|err| ZerobusError::ChannelCreationError(err.to_string()))?;
842
843            let endpoint = self.tls_config.configure_endpoint(endpoint)?;
844
845            // Check for HTTP proxy env vars (https_proxy, HTTPS_PROXY, etc.)
846            // and use a proxy connector if one is configured.
847            let host = endpoint.uri().host().unwrap_or_default().to_string();
848
849            let channel = if !proxy::is_no_proxy(&host) {
850                if let Some(proxy_connector) = proxy::create_proxy_connector() {
851                    endpoint.connect_with_connector_lazy(proxy_connector)
852                } else {
853                    endpoint.connect_lazy()
854                }
855            } else {
856                endpoint.connect_lazy()
857            };
858
859            let client = ZerobusClient::new(channel)
860                .max_decoding_message_size(usize::MAX)
861                .max_encoding_message_size(usize::MAX);
862
863            *guard = Some(client);
864        }
865
866        Ok(guard
867            .as_ref()
868            .expect("Channel was just initialized")
869            .clone())
870    }
871}
872
873impl ZerobusStream {
874    /// Creates a new ephemeral stream for ingesting records.
875    #[instrument(level = "debug", skip_all)]
876    async fn new_stream(
877        channel: ZerobusClient<Channel>,
878        table_properties: TableProperties,
879        headers_provider: Arc<dyn HeadersProvider>,
880        options: StreamConfigurationOptions,
881    ) -> ZerobusResult<Self> {
882        let (stream_init_result_tx, stream_init_result_rx) =
883            tokio::sync::oneshot::channel::<ZerobusResult<String>>();
884
885        let (logical_last_received_offset_id_tx, _logical_last_received_offset_id_rx) =
886            tokio::sync::watch::channel(None);
887        let landing_zone = Arc::new(LandingZone::<Box<IngestRequest>>::new(
888            options.max_inflight_requests,
889        ));
890
891        let oneshot_map = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
892        let is_closed = Arc::new(AtomicBool::new(false));
893        let failed_records = Arc::new(RwLock::new(Vec::new()));
894        let logical_offset_id_generator = OffsetIdGenerator::default();
895
896        let (server_error_tx, server_error_rx) = tokio::sync::watch::channel(None);
897        let cancellation_token = CancellationToken::new();
898        // Create callback channel and spawn callback handler task only if callback is defined
899        let (callback_tx, callback_handler_task) = if options.ack_callback.is_some() {
900            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
901            let task = Self::spawn_callback_handler_task(
902                rx,
903                options.ack_callback.clone(),
904                cancellation_token.clone(),
905            );
906            (Some(tx), Some(task))
907        } else {
908            (None, None)
909        };
910
911        let supervisor_task = tokio::task::spawn(Self::supervisor_task(
912            channel,
913            table_properties.clone(),
914            Arc::clone(&headers_provider),
915            options.clone(),
916            Arc::clone(&landing_zone),
917            Arc::clone(&oneshot_map),
918            logical_last_received_offset_id_tx.clone(),
919            Arc::clone(&is_closed),
920            Arc::clone(&failed_records),
921            stream_init_result_tx,
922            server_error_tx,
923            cancellation_token.clone(),
924            callback_tx.clone(),
925        ));
926        let stream_id = Some(stream_init_result_rx.await.map_err(|_| {
927            ZerobusError::UnexpectedStreamResponseError(
928                "Supervisor task died before stream creation".to_string(),
929            )
930        })??);
931
932        let stream = Self {
933            stream_type: StreamType::Ephemeral,
934            headers_provider,
935            options: options.clone(),
936            table_properties,
937            stream_id,
938            landing_zone,
939            oneshot_map,
940            supervisor_task,
941            logical_offset_id_generator,
942            logical_last_received_offset_id_tx,
943            _logical_last_received_offset_id_rx,
944            failed_records,
945            is_closed,
946            sync_mutex: Arc::new(tokio::sync::Mutex::new(())),
947            server_error_rx,
948            cancellation_token,
949            callback_handler_task,
950        };
951
952        Ok(stream)
953    }
954
955    /// Supervisor task is responsible for managing the stream lifecycle.
956    /// It handles stream creation, recovery, and error handling.
957    #[allow(clippy::too_many_arguments)]
958    #[instrument(level = "debug", skip_all, fields(table_name = %table_properties.table_name))]
959    async fn supervisor_task(
960        channel: ZerobusClient<Channel>,
961        table_properties: TableProperties,
962        headers_provider: Arc<dyn HeadersProvider>,
963        options: StreamConfigurationOptions,
964        landing_zone: RecordLandingZone,
965        oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
966        logical_last_received_offset_id_tx: tokio::sync::watch::Sender<Option<OffsetId>>,
967        is_closed: Arc<AtomicBool>,
968        failed_records: Arc<RwLock<Vec<EncodedBatch>>>,
969        stream_init_result_tx: tokio::sync::oneshot::Sender<ZerobusResult<String>>,
970        server_error_tx: tokio::sync::watch::Sender<Option<ZerobusError>>,
971        cancellation_token: CancellationToken,
972        callback_tx: Option<tokio::sync::mpsc::UnboundedSender<CallbackMessage>>,
973    ) -> ZerobusResult<()> {
974        let mut initial_stream_creation = true;
975        let mut stream_init_result_tx = Some(stream_init_result_tx);
976
977        loop {
978            debug!("Supervisor task loop");
979
980            if cancellation_token.is_cancelled() {
981                debug!("Supervisor task cancelled, exiting");
982                return Ok(());
983            }
984
985            let landing_zone_sender = Arc::clone(&landing_zone);
986            let landing_zone_receiver = Arc::clone(&landing_zone);
987            let landing_zone_recovery = Arc::clone(&landing_zone);
988
989            // 1. Create a stream.
990            let strategy = FixedInterval::from_millis(options.recovery_backoff_ms)
991                .take(options.recovery_retries as usize);
992
993            let create_attempt = || {
994                let channel = channel.clone();
995                let table_properties = table_properties.clone();
996                let headers_provider = Arc::clone(&headers_provider);
997                let record_type = options.record_type;
998
999                async move {
1000                    tokio::time::timeout(
1001                        Duration::from_millis(options.recovery_timeout_ms),
1002                        Self::create_stream_connection(
1003                            channel,
1004                            &table_properties,
1005                            &headers_provider,
1006                            record_type,
1007                        ),
1008                    )
1009                    .await
1010                    .map_err(|_| {
1011                        ZerobusError::CreateStreamError(tonic::Status::deadline_exceeded(
1012                            "Stream creation timed out",
1013                        ))
1014                    })?
1015                }
1016            };
1017            let should_retry = |e: &ZerobusError| options.recovery && e.is_retryable();
1018            let creation = RetryIf::spawn(strategy, create_attempt, should_retry).await;
1019
1020            let (tx, response_grpc_stream, stream_id) = match creation {
1021                Ok((tx, response_grpc_stream, stream_id)) => (tx, response_grpc_stream, stream_id),
1022                Err(e) => {
1023                    if initial_stream_creation {
1024                        if let Some(tx) = stream_init_result_tx.take() {
1025                            let _ = tx.send(Err(e.clone()));
1026                        }
1027                    } else {
1028                        is_closed.store(true, Ordering::Relaxed);
1029                        Self::fail_all_pending_records(
1030                            landing_zone.clone(),
1031                            oneshot_map.clone(),
1032                            failed_records.clone(),
1033                            &e,
1034                            &callback_tx,
1035                        )
1036                        .await;
1037                    }
1038                    return Err(e);
1039                }
1040            };
1041            if initial_stream_creation {
1042                if let Some(stream_init_result_tx_inner) = stream_init_result_tx.take() {
1043                    let _ = stream_init_result_tx_inner.send(Ok(stream_id.clone()));
1044                }
1045                initial_stream_creation = false;
1046                info!(stream_id = %stream_id, "Successfully created stream");
1047            } else {
1048                info!(stream_id = %stream_id, "Successfully recovered stream");
1049                let _ = server_error_tx.send(None);
1050            }
1051
1052            // 2. Reset landing zone.
1053            landing_zone_recovery.reset_observe();
1054
1055            // 3. Spawn receiver and sender task.
1056            let is_paused = Arc::new(AtomicBool::new(false));
1057            let mut recv_task = Self::spawn_receiver_task(
1058                response_grpc_stream,
1059                logical_last_received_offset_id_tx.clone(),
1060                landing_zone_receiver,
1061                oneshot_map.clone(),
1062                Arc::clone(&is_paused),
1063                options.clone(),
1064                server_error_tx.clone(),
1065                cancellation_token.clone(),
1066                callback_tx.clone(),
1067            );
1068            let mut send_task = Self::spawn_sender_task(
1069                tx,
1070                landing_zone_sender,
1071                Arc::clone(&is_paused),
1072                server_error_tx.clone(),
1073                cancellation_token.clone(),
1074            );
1075
1076            // 4. Wait for any of the two tasks to end.
1077            let result = tokio::select! {
1078                recv_result = &mut recv_task => {
1079                    send_task.abort();
1080                    match recv_result {
1081                        Ok(Err(e)) => Err(e),
1082                        Err(e) => Err(ZerobusError::UnexpectedStreamResponseError(
1083                            format!("Receiver task panicked: {}", e)
1084                        )),
1085                        Ok(Ok(())) => {
1086                            info!("Receiver task completed successfully");
1087                            Ok(())
1088                        }
1089                    }
1090                }
1091                send_result = &mut send_task => {
1092                    recv_task.abort();
1093                    match send_result {
1094                        Ok(Err(e)) => Err(e),
1095                        Err(e) => Err(ZerobusError::UnexpectedStreamResponseError(
1096                            format!("Sender task panicked: {}", e)
1097                        )),
1098                        Ok(Ok(())) => Ok(()) // This only happens when the sender task receives a cancellation signal.
1099                    }
1100                }
1101            };
1102
1103            // 5. Handle errors.
1104            if let Err(error) = result {
1105                error!(stream_id = %stream_id, "Stream failure detected: {}", error);
1106                let error = match &error {
1107                    // Mapping this to pass certain e2e tests.
1108                    // TODO: Remove this once we fix tests.
1109                    ZerobusError::StreamClosedError(status)
1110                        if status.code() == tonic::Code::InvalidArgument =>
1111                    {
1112                        ZerobusError::InvalidArgument(status.message().to_string())
1113                    }
1114                    _ => error,
1115                };
1116                let _ = server_error_tx.send(Some(error.clone()));
1117                if !error.is_retryable() || !options.recovery {
1118                    is_closed.store(true, Ordering::Relaxed);
1119                    Self::fail_all_pending_records(
1120                        landing_zone.clone(),
1121                        oneshot_map.clone(),
1122                        failed_records.clone(),
1123                        &error,
1124                        &callback_tx,
1125                    )
1126                    .await;
1127                    return Err(error);
1128                }
1129            }
1130        }
1131    }
1132
1133    /// Creates a stream connection to the Zerobus API.
1134    /// Returns a tuple containing the sender, response gRPC stream, and stream ID.
1135    /// If the stream creation fails, it returns an error.
1136    #[instrument(level = "debug", skip_all, fields(table_name = %table_properties.table_name))]
1137    async fn create_stream_connection(
1138        mut channel: ZerobusClient<Channel>,
1139        table_properties: &TableProperties,
1140        headers_provider: &Arc<dyn HeadersProvider>,
1141        record_type: RecordType,
1142    ) -> ZerobusResult<(
1143        tokio::sync::mpsc::Sender<EphemeralStreamRequest>,
1144        tonic::Streaming<EphemeralStreamResponse>,
1145        String,
1146    )> {
1147        const CHANNEL_BUFFER_SIZE: usize = 2048;
1148        let (tx, rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE);
1149        let mut request_stream = tonic::Request::new(ReceiverStream::new(rx));
1150
1151        let stream_metadata = request_stream.metadata_mut();
1152        let headers = headers_provider.get_headers().await?;
1153
1154        for (key, value) in headers {
1155            match key {
1156                "x-databricks-zerobus-table-name" => {
1157                    let table_name = MetadataValue::try_from(value.as_str())
1158                        .map_err(|e| ZerobusError::InvalidTableName(e.to_string()))?;
1159                    stream_metadata.insert("x-databricks-zerobus-table-name", table_name);
1160                }
1161                "authorization" => {
1162                    let mut auth_value = MetadataValue::try_from(value.as_str()).map_err(|_| {
1163                        error!(table_name = %table_properties.table_name, "Invalid token: {}", value);
1164                        ZerobusError::InvalidUCTokenError(value)
1165                    })?;
1166                    auth_value.set_sensitive(true);
1167                    stream_metadata.insert("authorization", auth_value);
1168                }
1169                other_key => {
1170                    let header_value = MetadataValue::try_from(value.as_str())
1171                        .map_err(|_| ZerobusError::InvalidArgument(other_key.to_string()))?;
1172                    stream_metadata.insert(other_key, header_value);
1173                }
1174            }
1175        }
1176
1177        let mut response_grpc_stream = channel
1178            .ephemeral_stream(request_stream)
1179            .await
1180            .map_err(ZerobusError::CreateStreamError)?
1181            .into_inner();
1182
1183        let descriptor_proto = if record_type == RecordType::Proto {
1184            Some(
1185                table_properties
1186                    .descriptor_proto
1187                    .as_ref()
1188                    .ok_or_else(|| {
1189                        ZerobusError::InvalidArgument(
1190                            "Descriptor proto is required for Proto record type".to_string(),
1191                        )
1192                    })?
1193                    .encode_to_vec(),
1194            )
1195        } else {
1196            None
1197        };
1198
1199        let create_stream_request = RequestPayload::CreateStream(CreateIngestStreamRequest {
1200            table_name: Some(table_properties.table_name.to_string()),
1201            descriptor_proto,
1202            record_type: Some(record_type.into()),
1203        });
1204
1205        debug!("Sending CreateStream request.");
1206        tx.send(EphemeralStreamRequest {
1207            payload: Some(create_stream_request),
1208        })
1209        .await
1210        .map_err(|_| {
1211            error!(table_name = %table_properties.table_name, "Failed to send CreateStream request");
1212            ZerobusError::StreamClosedError(tonic::Status::internal(
1213                "Failed to send CreateStream request",
1214            ))
1215        })?;
1216        debug!("Waiting for CreateStream response.");
1217        let create_stream_response = response_grpc_stream.message().await;
1218
1219        match create_stream_response {
1220            Ok(Some(create_stream_response)) => match create_stream_response.payload {
1221                Some(ResponsePayload::CreateStreamResponse(resp)) => {
1222                    if let Some(stream_id) = resp.stream_id {
1223                        info!(stream_id = %stream_id, "Successfully created stream");
1224                        Ok((tx, response_grpc_stream, stream_id))
1225                    } else {
1226                        error!("Successfully created a stream but stream_id is None");
1227                        Err(ZerobusError::CreateStreamError(tonic::Status::internal(
1228                            "Successfully created a stream but stream_id is None",
1229                        )))
1230                    }
1231                }
1232                unexpected_message => {
1233                    error!("Unexpected response from server {unexpected_message:?}");
1234                    Err(ZerobusError::CreateStreamError(tonic::Status::internal(
1235                        "Unexpected response from server",
1236                    )))
1237                }
1238            },
1239            Ok(None) => {
1240                info!("Server closed the stream gracefully before sending CreateStream response");
1241                Err(ZerobusError::CreateStreamError(tonic::Status::ok(
1242                    "Stream closed gracefully by server",
1243                )))
1244            }
1245            Err(status) => {
1246                error!("CreateStream RPC failed: {status:?}");
1247                Err(ZerobusError::CreateStreamError(status))
1248            }
1249        }
1250    }
1251
1252    /// Ingests a single record into the stream.
1253    ///
1254    /// This method is non-blocking and returns immediately with a future. The record is
1255    /// queued for transmission and the returned future resolves when the server acknowledges
1256    /// the record has been durably written.
1257    ///
1258    /// # Arguments
1259    ///
1260    /// * `payload` - A record that can be converted to `EncodedRecord` (either JSON string or protobuf bytes)
1261    ///
1262    /// # Returns
1263    ///
1264    /// A future that resolves to the logical offset ID of the acknowledged record.
1265    ///
1266    /// # Errors
1267    ///
1268    /// * `InvalidArgument` - If the record type doesn't match stream configuration
1269    /// * `StreamClosedError` - If the stream has been closed
1270    /// * Other errors may be returned via the acknowledgment future
1271    ///
1272    /// # Examples
1273    ///
1274    /// ```no_run
1275    /// # use databricks_zerobus_ingest_sdk::*;
1276    /// # use prost::Message;
1277    /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1278    /// # let my_record = vec![1, 2, 3]; // Example protobuf-encoded data
1279    /// // Ingest and immediately await acknowledgment
1280    /// let ack = stream.ingest_record(my_record).await?;
1281    /// let offset = ack.await?;
1282    /// println!("Record written at offset: {}", offset);
1283    /// # Ok(())
1284    /// # }
1285    /// ```
1286    ///
1287    /// # Deprecation Note
1288    ///
1289    /// This method is deprecated. Use [`ingest_record_offset()`](Self::ingest_record_offset) instead,
1290    /// which returns the offset directly (after queuing) without Future wrapping. You can then use
1291    /// [`wait_for_offset()`](Self::wait_for_offset) to explicitly wait for acknowledgment when needed.
1292    #[deprecated(
1293        since = "0.4.0",
1294        note = "Use `ingest_record_offset()` instead which returns the offset directly after queuing"
1295    )]
1296    pub async fn ingest_record(
1297        &self,
1298        payload: impl Into<EncodedRecord>,
1299    ) -> ZerobusResult<impl Future<Output = ZerobusResult<OffsetId>>> {
1300        let encoded_batch = EncodedBatch::try_from_record(payload, self.options.record_type)
1301            .ok_or_else(|| {
1302                ZerobusError::InvalidArgument(
1303                    "Record type does not match stream configuration".to_string(),
1304                )
1305            })?;
1306
1307        self.ingest_internal(encoded_batch).await
1308    }
1309
1310    /// Ingests a single record and returns its logical offset directly.
1311    ///
1312    /// This is an alternative to `ingest_record()` that returns the logical offset directly
1313    /// as an integer (after queuing) instead of wrapping it in a Future. Use `wait_for_offset()`
1314    /// to explicitly wait for server acknowledgment of this offset when needed.
1315    ///
1316    /// # Arguments
1317    ///
1318    /// * `payload` - A record that can be converted to `EncodedRecord` (either JSON string or protobuf bytes)
1319    ///
1320    /// # Returns
1321    ///
1322    /// The logical offset ID assigned to this record.
1323    ///
1324    /// # Errors
1325    ///
1326    /// * `InvalidArgument` - If the record type doesn't match stream configuration
1327    /// * `StreamClosedError` - If the stream has been closed
1328    ///
1329    /// # Examples
1330    ///
1331    /// ```no_run
1332    /// # use databricks_zerobus_ingest_sdk::*;
1333    /// # use prost::Message;
1334    /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1335    /// # let my_record = vec![1, 2, 3]; // Example protobuf-encoded data
1336    /// // Ingest and get offset immediately
1337    /// let offset = stream.ingest_record_offset(my_record).await?;
1338    ///
1339    /// // Later, wait for acknowledgment
1340    /// stream.wait_for_offset(offset).await?;
1341    /// println!("Record at offset {} has been acknowledged", offset);
1342    /// # Ok(())
1343    /// # }
1344    /// ```
1345    pub async fn ingest_record_offset(
1346        &self,
1347        payload: impl Into<EncodedRecord>,
1348    ) -> ZerobusResult<OffsetId> {
1349        let encoded_batch = EncodedBatch::try_from_record(payload, self.options.record_type)
1350            .ok_or_else(|| {
1351                ZerobusError::InvalidArgument(
1352                    "Record type does not match stream configuration".to_string(),
1353                )
1354            })?;
1355
1356        self.ingest_internal_v2(encoded_batch).await
1357    }
1358
1359    /// Ingests a batch of records into the stream.
1360    ///
1361    /// This method is non-blocking and returns immediately with a future. The records are
1362    /// queued for transmission and the returned future resolves when the server acknowledges
1363    /// the entire batch has been durably written.
1364    ///
1365    /// # Arguments
1366    ///
1367    /// * `payload` - An iterator of protobuf-encoded records (each item should be convertible to `EncodedRecord`)
1368    ///
1369    /// # Returns
1370    ///
1371    /// A future that resolves to the logical offset ID of the last acknowledged batch.
1372    /// If the batch is empty, the future resoles to None.
1373    ///
1374    /// # Errors
1375    ///
1376    /// * `InvalidArgument` - If record types don't match stream configuration
1377    /// * `StreamClosedError` - If the stream has been closed
1378    /// * Other errors may be returned via the acknowledgment future
1379    ///
1380    /// # Examples
1381    ///
1382    /// ```no_run
1383    /// # use databricks_zerobus_ingest_sdk::*;
1384    /// # use prost::Message;
1385    /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1386    /// let records = vec![vec![1, 2, 3], vec![4, 5, 6]]; // Example protobuf-encoded data
1387    /// // Ingest batch and await acknowledgment
1388    /// let ack = stream.ingest_records(records).await?;
1389    /// let offset = ack.await?;
1390    /// match offset {
1391    ///     Some(offset) => println!("Batch written at offset: {}", offset),
1392    ///     None => println!("Empty batch - no records written"),
1393    /// }
1394    /// # Ok(())
1395    /// # }
1396    /// ```
1397    ///
1398    /// # Deprecation Note
1399    ///
1400    /// This method is deprecated. Use [`ingest_records_offset()`](Self::ingest_records_offset) instead,
1401    /// which returns the offset directly (after queuing) without Future wrapping. You can then use
1402    /// [`wait_for_offset()`](Self::wait_for_offset) to explicitly wait for acknowledgment when needed.
1403    #[deprecated(
1404        since = "0.4.0",
1405        note = "Use `ingest_records_offset()` instead which returns the offset directly after queuing"
1406    )]
1407    pub async fn ingest_records<I, T>(
1408        &self,
1409        payload: I,
1410    ) -> ZerobusResult<impl Future<Output = ZerobusResult<Option<OffsetId>>>>
1411    where
1412        I: IntoIterator<Item = T>,
1413        T: Into<EncodedRecord>,
1414    {
1415        let encoded_batch = EncodedBatch::try_from_batch(payload, self.options.record_type)
1416            .ok_or_else(|| {
1417                ZerobusError::InvalidArgument(
1418                    "Record type does not match stream configuration".to_string(),
1419                )
1420            })?;
1421
1422        // For non-empty batches, get the future from ingest_internal
1423        let ingest_future = if encoded_batch.is_empty() {
1424            None
1425        } else {
1426            Some(self.ingest_internal(encoded_batch).await?)
1427        };
1428
1429        Ok(async move {
1430            match ingest_future {
1431                Some(fut) => fut.await.map(Option::Some),
1432                None => Ok(None),
1433            }
1434        })
1435    }
1436
1437    /// Ingests a batch of records and returns the logical offset directly.
1438    ///
1439    /// This is an alternative to `ingest_records()` that returns the logical offset directly
1440    /// (after queuing) instead of wrapping it in a Future. Use `wait_for_offset()` to explicitly
1441    /// wait for server acknowledgment when needed.
1442    ///
1443    /// # Arguments
1444    ///
1445    /// * `payload` - An iterator of records (each item should be convertible to `EncodedRecord`)
1446    ///
1447    /// # Returns
1448    ///
1449    /// `Some(offset_id)` for non-empty batches, or `None` if the batch is empty.
1450    ///
1451    /// # Errors
1452    ///
1453    /// * `InvalidArgument` - If record types don't match stream configuration
1454    /// * `StreamClosedError` - If the stream has been closed
1455    ///
1456    /// # Examples
1457    ///
1458    /// ```no_run
1459    /// # use databricks_zerobus_ingest_sdk::*;
1460    /// # use prost::Message;
1461    /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
1462    /// let records = vec![vec![1, 2, 3], vec![4, 5, 6]]; // Example protobuf-encoded data
1463    ///
1464    /// // Ingest batch and get offset immediately
1465    /// if let Some(offset) = stream.ingest_records_offset(records).await? {
1466    ///     // Later, wait for batch acknowledgment
1467    ///     stream.wait_for_offset(offset).await?;
1468    ///     println!("Batch at offset {} has been acknowledged", offset);
1469    /// }
1470    /// # Ok(())
1471    /// # }
1472    /// ```
1473    pub async fn ingest_records_offset<I, T>(&self, payload: I) -> ZerobusResult<Option<OffsetId>>
1474    where
1475        I: IntoIterator<Item = T>,
1476        T: Into<EncodedRecord>,
1477    {
1478        let encoded_batch = EncodedBatch::try_from_batch(payload, self.options.record_type)
1479            .ok_or_else(|| {
1480                ZerobusError::InvalidArgument(
1481                    "Record type does not match stream configuration".to_string(),
1482                )
1483            })?;
1484
1485        if encoded_batch.is_empty() {
1486            Ok(None)
1487        } else {
1488            self.ingest_internal_v2(encoded_batch)
1489                .await
1490                .map(Option::Some)
1491        }
1492    }
1493    /// Internal unified method for ingesting records and batches
1494    async fn ingest_internal(
1495        &self,
1496        encoded_batch: EncodedBatch,
1497    ) -> ZerobusResult<impl Future<Output = ZerobusResult<OffsetId>>> {
1498        if self.is_closed.load(Ordering::Relaxed) {
1499            error!(table_name = %self.table_properties.table_name, "Stream closed");
1500            return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1501                "Stream closed",
1502            )));
1503        }
1504
1505        let _guard = self.sync_mutex.lock().await;
1506
1507        let offset_id = self.logical_offset_id_generator.next();
1508        debug!(
1509            offset_id = offset_id,
1510            record_count = encoded_batch.get_record_count(),
1511            "Ingesting record(s)"
1512        );
1513
1514        if let Some(stream_id) = self.stream_id.as_ref() {
1515            let (tx, rx) = tokio::sync::oneshot::channel();
1516            {
1517                let mut map = self.oneshot_map.lock().await;
1518                map.insert(offset_id, tx);
1519            }
1520            self.landing_zone
1521                .add(Box::new(IngestRequest {
1522                    payload: encoded_batch,
1523                    offset_id,
1524                }))
1525                .await;
1526            let stream_id = stream_id.to_string();
1527            Ok(async move {
1528                rx.await.map_err(|err| {
1529                    error!(stream_id = %stream_id, "Failed to receive ack: {}", err);
1530                    ZerobusError::StreamClosedError(tonic::Status::internal(
1531                        "Failed to receive ack",
1532                    ))
1533                })?
1534            })
1535        } else {
1536            error!("Stream ID is None");
1537            Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1538                "Stream ID is None",
1539            )))
1540        }
1541    }
1542
1543    /// Internal unified method for ingesting records and batches
1544    async fn ingest_internal_v2(&self, encoded_batch: EncodedBatch) -> ZerobusResult<OffsetId> {
1545        if self.is_closed.load(Ordering::Relaxed) {
1546            error!(table_name = %self.table_properties.table_name, "Stream closed");
1547            return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1548                "Stream closed",
1549            )));
1550        }
1551
1552        let _guard = self.sync_mutex.lock().await;
1553
1554        let offset_id = self.logical_offset_id_generator.next();
1555        debug!(
1556            offset_id = offset_id,
1557            record_count = encoded_batch.get_record_count(),
1558            "Ingesting record(s)"
1559        );
1560        self.landing_zone
1561            .add(Box::new(IngestRequest {
1562                payload: encoded_batch,
1563                offset_id,
1564            }))
1565            .await;
1566        Ok(offset_id)
1567    }
1568
1569    /// Spawns a task that handles callback execution in a separate thread.
1570    /// This task receives callback messages via a channel and executes them
1571    /// without blocking the receiver task.
1572    #[instrument(level = "debug", skip_all)]
1573    fn spawn_callback_handler_task(
1574        mut callback_rx: tokio::sync::mpsc::UnboundedReceiver<CallbackMessage>,
1575        ack_callback: Option<Arc<dyn AckCallback>>,
1576        cancellation_token: CancellationToken,
1577    ) -> tokio::task::JoinHandle<()> {
1578        tokio::spawn(async move {
1579            let span = span!(Level::DEBUG, "callback_handler");
1580            let _guard = span.enter();
1581            loop {
1582                tokio::select! {
1583                    biased;
1584                    message = callback_rx.recv() => {
1585                        match message {
1586                            Some(message) => {
1587                                match message {
1588                                    CallbackMessage::Ack(logical_offset) => {
1589                                        if let Some(ref callback) = ack_callback {
1590                                            callback.on_ack(logical_offset);
1591                                        }
1592                                    }
1593                                    CallbackMessage::Error(logical_offset, error_message) => {
1594                                        if let Some(ref callback) = ack_callback {
1595                                            callback.on_error(logical_offset, &error_message);
1596                                        }
1597                                    }
1598                                }
1599                            }
1600                            None => { // This happens when all senders are dropped.
1601                                debug!("Callback handler task shutting down");
1602                                return;
1603                            }
1604                        }
1605                    }
1606                    _ = cancellation_token.cancelled() => {
1607                        debug!("Callback handler task cancelled");
1608                        return;
1609                    }
1610
1611                }
1612            }
1613        })
1614    }
1615
1616    /// Spawns a task that continuously reads from `response_grpc_stream`
1617    /// and propagates the received durability acknowledgements to the
1618    /// corresponding pending acks promises.
1619    #[instrument(level = "debug", skip_all)]
1620    #[allow(clippy::too_many_arguments)]
1621    fn spawn_receiver_task(
1622        mut response_grpc_stream: tonic::Streaming<EphemeralStreamResponse>,
1623        last_received_offset_id_tx: tokio::sync::watch::Sender<Option<OffsetId>>,
1624        landing_zone: RecordLandingZone,
1625        oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
1626        is_paused: Arc<AtomicBool>,
1627        options: StreamConfigurationOptions,
1628        server_error_tx: tokio::sync::watch::Sender<Option<ZerobusError>>,
1629        cancellation_token: CancellationToken,
1630        callback_tx: Option<tokio::sync::mpsc::UnboundedSender<CallbackMessage>>,
1631    ) -> tokio::task::JoinHandle<ZerobusResult<()>> {
1632        tokio::spawn(async move {
1633            let span = span!(Level::DEBUG, "inbound_stream_processor");
1634            let _guard = span.enter();
1635            let mut last_acked_offset = -1;
1636            let mut pause_deadline: Option<tokio::time::Instant> = None;
1637
1638            loop {
1639                if let Some(deadline) = pause_deadline {
1640                    let now = tokio::time::Instant::now();
1641                    let all_acked = landing_zone.is_observed_empty();
1642
1643                    if now >= deadline {
1644                        info!("Graceful close timeout reached. Triggering recovery.");
1645                        return Ok(());
1646                    } else if all_acked {
1647                        info!("All in-flight records acknowledged during graceful close. Triggering recovery.");
1648                        return Ok(());
1649                    }
1650                }
1651
1652                let message_result = if let Some(deadline) = pause_deadline {
1653                    tokio::select! {
1654                        biased;
1655                        _ = cancellation_token.cancelled() => return Ok(()),
1656                        _ = tokio::time::sleep_until(deadline) => {
1657                            continue;
1658                        }
1659                        res = tokio::time::timeout(
1660                            Duration::from_millis(options.server_lack_of_ack_timeout_ms),
1661                            response_grpc_stream.message(),
1662                        ) => res,
1663                    }
1664                } else {
1665                    tokio::select! {
1666                        biased;
1667                        _ = cancellation_token.cancelled() => return Ok(()),
1668                        res = tokio::time::timeout(
1669                            Duration::from_millis(options.server_lack_of_ack_timeout_ms),
1670                            response_grpc_stream.message(),
1671                        ) => res,
1672                    }
1673                };
1674
1675                match message_result {
1676                    Ok(Ok(Some(ingest_record_response))) => match ingest_record_response.payload {
1677                        Some(ResponsePayload::IngestRecordResponse(IngestRecordResponse {
1678                            durability_ack_up_to_offset,
1679                        })) => {
1680                            let durability_ack_up_to_offset = match durability_ack_up_to_offset {
1681                                Some(offset) => offset,
1682                                None => {
1683                                    error!("Missing ack offset in server response");
1684                                    let error =
1685                                        ZerobusError::StreamClosedError(tonic::Status::internal(
1686                                            "Missing ack offset in server response",
1687                                        ));
1688                                    let _ = server_error_tx.send(Some(error.clone()));
1689                                    return Err(error);
1690                                }
1691                            };
1692                            let mut last_logical_acked_offset = -2;
1693                            let mut map = oneshot_map.lock().await;
1694                            for _offset_to_ack in
1695                                (last_acked_offset + 1)..=durability_ack_up_to_offset
1696                            {
1697                                if let Ok(record) = landing_zone.remove_observed() {
1698                                    let logical_offset = record.offset_id;
1699                                    last_logical_acked_offset = logical_offset;
1700
1701                                    if let Some(sender) = map.remove(&logical_offset) {
1702                                        let _ = sender.send(Ok(logical_offset));
1703                                    }
1704
1705                                    if let Some(ref tx) = callback_tx {
1706                                        let _ = tx.send(CallbackMessage::Ack(logical_offset));
1707                                    }
1708                                }
1709                            }
1710                            drop(map);
1711                            last_acked_offset = durability_ack_up_to_offset;
1712                            if last_logical_acked_offset != -2 {
1713                                let _ignore_on_channel_break = last_received_offset_id_tx
1714                                    .send(Some(last_logical_acked_offset));
1715                            }
1716                        }
1717                        Some(ResponsePayload::CloseStreamSignal(CloseStreamSignal {
1718                            duration,
1719                        })) => {
1720                            if options.recovery {
1721                                let server_duration_ms = duration
1722                                    .as_ref()
1723                                    .map(|d| d.seconds as u64 * 1000 + d.nanos as u64 / 1_000_000)
1724                                    .unwrap_or(0);
1725
1726                                let wait_duration_ms = match options.stream_paused_max_wait_time_ms
1727                                {
1728                                    None => server_duration_ms,
1729                                    Some(0) => {
1730                                        // Immediate recovery
1731                                        info!("Server will close the stream in {}ms. Triggering stream recovery.", server_duration_ms);
1732                                        return Ok(());
1733                                    }
1734                                    Some(max_wait) => std::cmp::min(max_wait, server_duration_ms),
1735                                };
1736
1737                                if wait_duration_ms == 0 {
1738                                    info!("Server will close the stream. Triggering immediate recovery.");
1739                                    return Ok(());
1740                                }
1741
1742                                is_paused.store(true, Ordering::Relaxed);
1743                                pause_deadline = Some(
1744                                    tokio::time::Instant::now()
1745                                        + Duration::from_millis(wait_duration_ms),
1746                                );
1747                                info!(
1748                                    "Server will close the stream in {}ms. Entering graceful close period (waiting up to {}ms for in-flight acks).",
1749                                    server_duration_ms, wait_duration_ms
1750                                );
1751                            }
1752                        }
1753                        unexpected_message => {
1754                            error!("Unexpected response from server {unexpected_message:?}");
1755                            let error = ZerobusError::StreamClosedError(tonic::Status::internal(
1756                                "Unexpected response from server",
1757                            ));
1758                            let _ = server_error_tx.send(Some(error.clone()));
1759                            return Err(error);
1760                        }
1761                    },
1762                    Ok(Ok(None)) => {
1763                        info!("Server closed the stream without errors.");
1764                        let error = ZerobusError::StreamClosedError(tonic::Status::ok(
1765                            "Stream closed by server without errors.",
1766                        ));
1767                        let _ = server_error_tx.send(Some(error.clone()));
1768                        return Err(error);
1769                    }
1770                    Ok(Err(status)) => {
1771                        error!("Unexpected response from server {status:?}");
1772                        let error = ZerobusError::StreamClosedError(status);
1773                        let _ = server_error_tx.send(Some(error.clone()));
1774                        return Err(error);
1775                    }
1776                    Err(_timeout) => {
1777                        // No message received for server_lack_of_ack_timeout_ms.
1778                        if pause_deadline.is_none() && !landing_zone.is_observed_empty() {
1779                            error!(
1780                                "Server ack timeout: no response for {}ms",
1781                                options.server_lack_of_ack_timeout_ms
1782                            );
1783                            let error = ZerobusError::StreamClosedError(
1784                                tonic::Status::deadline_exceeded("Server ack timeout"),
1785                            );
1786                            let _ = server_error_tx.send(Some(error.clone()));
1787                            return Err(error);
1788                        }
1789                    }
1790                }
1791            }
1792        })
1793    }
1794
1795    /// Spawns a task that continuously sends records to the Zerobus API by observing the landing zone
1796    /// to get records and sending them through the outbound stream to the gRPC stream.
1797    fn spawn_sender_task(
1798        outbound_stream: tokio::sync::mpsc::Sender<EphemeralStreamRequest>,
1799        landing_zone: RecordLandingZone,
1800        is_paused: Arc<AtomicBool>,
1801        server_error_tx: tokio::sync::watch::Sender<Option<ZerobusError>>,
1802        cancellation_token: CancellationToken,
1803    ) -> tokio::task::JoinHandle<ZerobusResult<()>> {
1804        tokio::spawn(async move {
1805            let physical_offset_id_generator = OffsetIdGenerator::default();
1806            loop {
1807                let item = tokio::select! {
1808                    biased;
1809                    _ = cancellation_token.cancelled() => return Ok(()),
1810                    item = async {
1811                        if is_paused.load(Ordering::Relaxed) {
1812                            std::future::pending().await // Wait until supervisor task aborts this task.
1813                        } else {
1814                            landing_zone.observe().await
1815                        }
1816                    } => item.clone(),
1817                };
1818                let offset_id = physical_offset_id_generator.next();
1819                let request_payload = item.payload.into_request_payload(offset_id);
1820
1821                let send_result = outbound_stream
1822                    .send(EphemeralStreamRequest {
1823                        payload: Some(request_payload),
1824                    })
1825                    .await;
1826
1827                if let Err(err) = send_result {
1828                    error!("Failed to send record: {}", err);
1829                    let error = ZerobusError::StreamClosedError(tonic::Status::internal(
1830                        "Failed to send record",
1831                    ));
1832                    let _ = server_error_tx.send(Some(error.clone()));
1833                    return Err(error);
1834                }
1835            }
1836        })
1837    }
1838
1839    /// Fails all pending records by removing them from the landing zone and sending error to all pending acks promises.
1840    async fn fail_all_pending_records(
1841        landing_zone: RecordLandingZone,
1842        oneshot_map: Arc<tokio::sync::Mutex<OneshotMap>>,
1843        failed_records: Arc<RwLock<Vec<EncodedBatch>>>,
1844        error: &ZerobusError,
1845        callback_tx: &Option<tokio::sync::mpsc::UnboundedSender<CallbackMessage>>,
1846    ) {
1847        let mut failed_payloads = Vec::with_capacity(landing_zone.len());
1848        let records = landing_zone.remove_all();
1849        let mut map = oneshot_map.lock().await;
1850        let error_message = error.to_string();
1851        for record in records {
1852            failed_payloads.push(record.payload);
1853            if let Some(sender) = map.remove(&record.offset_id) {
1854                let _ = sender.send(Err(error.clone()));
1855            }
1856            if let Some(tx) = callback_tx {
1857                let _ = tx.send(CallbackMessage::Error(
1858                    record.offset_id,
1859                    error_message.clone(),
1860                ));
1861            }
1862        }
1863        *failed_records.write().await = failed_payloads;
1864    }
1865
1866    /// Internal method to wait for a specific offset to be acknowledged.
1867    /// Used by both `flush()` and `wait_for_offset()`.
1868    async fn wait_for_offset_internal(
1869        &self,
1870        offset_to_wait: OffsetId,
1871        operation_name: &str,
1872    ) -> ZerobusResult<()> {
1873        let wait_operation = async {
1874            let mut offset_receiver = self.logical_last_received_offset_id_tx.subscribe();
1875            let mut error_rx = self.server_error_rx.clone();
1876
1877            loop {
1878                let offset = *offset_receiver.borrow_and_update();
1879
1880                let stream_id = match self.stream_id.as_deref() {
1881                    Some(stream_id) => stream_id,
1882                    None => {
1883                        error!("Stream ID is None during {}", operation_name.to_lowercase());
1884                        "None"
1885                    }
1886                };
1887                if let Some(offset) = offset {
1888                    if offset >= offset_to_wait {
1889                        info!(stream_id = %stream_id, "Stream is caught up to the given offset. {} completed.", operation_name);
1890                        return Ok(());
1891                    } else {
1892                        info!(
1893                            stream_id = %stream_id,
1894                            "Stream is caught up to offset {}. Waiting for offset {}.",
1895                            offset, offset_to_wait
1896                        );
1897                    }
1898                } else {
1899                    info!(
1900                        stream_id = %stream_id,
1901                        "Stream is not caught up to any offset yet. Waiting for the first offset."
1902                    );
1903                }
1904                if self.is_closed.load(Ordering::Relaxed) {
1905                    // Re-check offset before failing, it might have been updated.
1906                    let offset = *offset_receiver.borrow_and_update();
1907                    if let Some(offset) = offset {
1908                        if offset >= offset_to_wait {
1909                            return Ok(());
1910                        }
1911                    }
1912                    // The supervisor always sends the real error to server_error_tx
1913                    // before setting is_closed=true, so check error_rx first to
1914                    // return the actual error instead of a generic one.
1915                    if let Some(server_error) = error_rx.borrow().clone() {
1916                        return Err(server_error);
1917                    }
1918                    return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1919                        format!("Stream closed during {}", operation_name.to_lowercase()),
1920                    )));
1921                }
1922                // Race between offset updates and server errors.
1923                tokio::select! {
1924                    result = offset_receiver.changed() => {
1925                        // If offset_receiver channel is closed, break the loop.
1926                        if result.is_err() {
1927                            break;
1928                        }
1929                        // Loop continues to check new offset value.
1930                    }
1931                    _ = error_rx.changed() => {
1932                        // Server error occurred, return it immediately if stream is closed.
1933                        if let Some(server_error) = error_rx.borrow().clone() {
1934                            if self.is_closed.load(Ordering::Relaxed) {
1935                                // Re-check offset before failing, it might have been updated.
1936                                let offset = *offset_receiver.borrow_and_update();
1937                                if let Some(offset) = offset {
1938                                    if offset >= offset_to_wait {
1939                                        return Ok(());
1940                                    }
1941                                }
1942                                return Err(server_error);
1943                            }
1944                        }
1945                    }
1946                }
1947            }
1948
1949            if let Some(server_error) = error_rx.borrow().clone() {
1950                if self.is_closed.load(Ordering::Relaxed) {
1951                    return Err(server_error);
1952                }
1953            }
1954
1955            Err(ZerobusError::StreamClosedError(tonic::Status::internal(
1956                format!("Stream closed during {}", operation_name.to_lowercase()),
1957            )))
1958        };
1959
1960        match tokio::time::timeout(
1961            Duration::from_millis(self.options.flush_timeout_ms),
1962            wait_operation,
1963        )
1964        .await
1965        {
1966            Ok(Ok(())) => Ok(()),
1967            Ok(Err(e)) => Err(e),
1968            Err(_) => {
1969                if let Some(stream_id) = self.stream_id.as_deref() {
1970                    error!(stream_id = %stream_id, table_name = %self.table_properties.table_name, "{} timed out", operation_name);
1971                } else {
1972                    error!(table_name = %self.table_properties.table_name, "{} timed out", operation_name);
1973                }
1974                Err(ZerobusError::StreamClosedError(
1975                    tonic::Status::deadline_exceeded(format!("{} timed out", operation_name)),
1976                ))
1977            }
1978        }
1979    }
1980
1981    /// Flushes all currently pending records and waits for their acknowledgments.
1982    ///
1983    /// This method captures the current highest offset and waits until all records up to
1984    /// that offset have been acknowledged by the server. Records ingested during the flush
1985    /// operation are not included in this flush.
1986    ///
1987    /// # Returns
1988    ///
1989    /// `Ok(())` when all pending records at the time of the call have been acknowledged.
1990    ///
1991    /// # Errors
1992    ///
1993    /// * `StreamClosedError` - If the stream is closed or times out
1994    ///
1995    /// # Examples
1996    ///
1997    /// ```no_run
1998    /// # use databricks_zerobus_ingest_sdk::*;
1999    /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
2000    /// // Ingest many records
2001    /// for i in 0..1000 {
2002    ///     let _offset = stream.ingest_record_offset(vec![i as u8]).await?;
2003    /// }
2004    ///
2005    /// // Wait for all to be acknowledged
2006    /// stream.flush().await?;
2007    /// println!("All 1000 records have been acknowledged");
2008    /// # Ok(())
2009    /// # }
2010    /// ```
2011    #[instrument(level = "debug", skip_all, fields(table_name = %self.table_properties.table_name))]
2012    pub async fn flush(&self) -> ZerobusResult<()> {
2013        let offset_to_wait = match self.logical_offset_id_generator.last() {
2014            Some(offset) => offset,
2015            None => return Ok(()), // Nothing to flush.
2016        };
2017        self.wait_for_offset_internal(offset_to_wait, "Flush").await
2018    }
2019
2020    /// Waits for server acknowledgment of a specific logical offset.
2021    ///
2022    /// This method blocks until the server has acknowledged the record or batch at the
2023    /// specified offset. Use this with offsets returned from `ingest_record_offset()` or
2024    /// `ingest_records_offset()` to explicitly control when to wait for acknowledgments.
2025    ///
2026    /// # Arguments
2027    ///
2028    /// * `offset` - The logical offset ID to wait for (returned from `ingest_record_offset()` or `ingest_records_offset()`)
2029    ///
2030    /// # Returns
2031    ///
2032    /// `Ok(())` when the record/batch at the specified offset has been acknowledged.
2033    ///
2034    /// # Errors
2035    ///
2036    /// * `StreamClosedError` - If the stream is closed or times out while waiting
2037    ///
2038    /// # Examples
2039    ///
2040    /// ```no_run
2041    /// # use databricks_zerobus_ingest_sdk::*;
2042    /// # async fn example(stream: ZerobusStream) -> Result<(), ZerobusError> {
2043    /// # let my_record = vec![1, 2, 3];
2044    /// // Ingest multiple records and collect their offsets
2045    /// let mut offsets = Vec::new();
2046    /// for i in 0..100 {
2047    ///     let offset = stream.ingest_record_offset(vec![i as u8]).await?;
2048    ///     offsets.push(offset);
2049    /// }
2050    ///
2051    /// // Wait for specific offsets
2052    /// for offset in offsets {
2053    ///     stream.wait_for_offset(offset).await?;
2054    /// }
2055    /// println!("All records acknowledged");
2056    /// # Ok(())
2057    /// # }
2058    /// ```
2059    pub async fn wait_for_offset(&self, offset: OffsetId) -> ZerobusResult<()> {
2060        self.wait_for_offset_internal(offset, "Waiting for acknowledgement")
2061            .await
2062    }
2063
2064    /// Closes the stream gracefully after flushing all pending records.
2065    ///
2066    /// This method first calls `flush()` to ensure all pending records are acknowledged,
2067    /// then shuts down the stream and releases all resources. Always call this method
2068    /// when you're done with a stream to ensure data integrity.
2069    ///
2070    /// # Returns
2071    ///
2072    /// `Ok(())` if the stream was closed successfully after flushing all records.
2073    ///
2074    /// # Errors
2075    ///
2076    /// Returns any errors from the flush operation. If flush fails, some records
2077    /// may not have been acknowledged. Use `get_unacked_records()` to retrieve them.
2078    ///
2079    /// # Examples
2080    ///
2081    /// ```no_run
2082    /// # use databricks_zerobus_ingest_sdk::*;
2083    /// # async fn example(mut stream: ZerobusStream) -> Result<(), ZerobusError> {
2084    /// // After ingesting records...
2085    /// stream.close().await?;
2086    /// # Ok(())
2087    /// # }
2088    /// ```
2089    pub async fn close(&mut self) -> ZerobusResult<()> {
2090        if self.is_closed.load(Ordering::Relaxed) {
2091            return Ok(());
2092        }
2093        if let Some(stream_id) = self.stream_id.as_deref() {
2094            info!(stream_id = %stream_id, "Closing stream");
2095        } else {
2096            error!("Stream ID is None during closing");
2097        }
2098        let flush_result = self.flush().await;
2099        self.is_closed.store(true, Ordering::Relaxed);
2100        self.shutdown_all_tasks_gracefully().await;
2101        flush_result
2102    }
2103
2104    /// Gracefully shuts down the supervisor task.
2105    ///
2106    /// Signals cancellation and waits for the task to exit. If the timeout
2107    /// is provided and expires, forcefully aborts the task.
2108    async fn shutdown_all_tasks_gracefully(&mut self) {
2109        self.cancellation_token.cancel();
2110
2111        // Shutdown supervisor task.
2112        match tokio::time::timeout(
2113            Duration::from_secs(SHUTDOWN_TIMEOUT_SECS),
2114            &mut self.supervisor_task,
2115        )
2116        .await
2117        {
2118            Ok(_) => {
2119                debug!("Supervisor task exited gracefully");
2120            }
2121            Err(_) => {
2122                warn!("Supervisor task did not exit within timeout, aborting");
2123                self.supervisor_task.abort();
2124            }
2125        }
2126        // Shutdown callback handler task, if there are any callbacks.
2127        if let Some(mut task) = self.callback_handler_task.take() {
2128            if let Some(callback_max_wait_time_ms) = self.options.callback_max_wait_time_ms {
2129                match tokio::time::timeout(
2130                    Duration::from_millis(callback_max_wait_time_ms),
2131                    &mut task,
2132                )
2133                .await
2134                {
2135                    Ok(_) => {
2136                        debug!("Callback handler task exited gracefully");
2137                    }
2138                    Err(_) => {
2139                        debug!("Callback handler task did not exit within timeout, aborting");
2140                        task.abort();
2141                    }
2142                }
2143            } else {
2144                debug!("Callback max wait time is not set, waiting indefinitely");
2145                let _ = (&mut task).await;
2146            }
2147        }
2148    }
2149
2150    /// Returns all records that were ingested but not acknowledged by the server.
2151    ///
2152    /// This method should only be called after a stream has failed or been closed.
2153    /// It's useful for implementing custom retry logic or persisting failed records.
2154    ///
2155    /// **Note:** This method flattens all unacknowledged records into a single iterator,
2156    /// losing the original batch grouping.
2157    /// If you want to preserve the batch grouping, use `ZerobusStream::get_unacked_batches()` instead.
2158    /// If you want to re-ingest unacknowledged records while preserving their batch
2159    /// structure, use `ZerobusSdk::recreate_stream()` instead.
2160    ///
2161    ///
2162    /// # Returns
2163    ///
2164    /// An iterator over individual `EncodedRecord` items. All unacknowledged records are
2165    /// flattened into a single sequence, regardless of how they were originally ingested
2166    /// (via `ingest_record()` or `ingest_records()`).
2167    ///
2168    /// # Errors
2169    ///
2170    /// * `InvalidStateError` - If called on an active (not closed) stream
2171    ///
2172    /// # Examples
2173    ///
2174    /// ```no_run
2175    /// # use databricks_zerobus_ingest_sdk::*;
2176    /// # async fn example(sdk: ZerobusSdk, mut stream: ZerobusStream) -> Result<(), ZerobusError> {
2177    /// match stream.close().await {
2178    ///     Err(e) => {
2179    ///         // Stream failed, get unacked records
2180    ///         let unacked = stream.get_unacked_records().await?;
2181    ///         let total_records = unacked.into_iter().count();
2182    ///         println!("Failed to acknowledge {} records", total_records);
2183    ///         
2184    ///         // For re-ingestion with preserved batch structure, use recreate_stream
2185    ///         let new_stream = sdk.recreate_stream(&stream).await?;
2186    ///     }
2187    ///     Ok(_) => println!("All records acknowledged"),
2188    /// }
2189    /// # Ok(())
2190    /// # }
2191    /// ```
2192    pub async fn get_unacked_records(&self) -> ZerobusResult<impl Iterator<Item = EncodedRecord>> {
2193        Ok(self
2194            .get_unacked_batches()
2195            .await?
2196            .into_iter()
2197            .flat_map(|batch| batch.into_iter()))
2198    }
2199
2200    /// Returns all records that were ingested but not acknowledged by the server, grouped by batch.
2201    ///
2202    /// This method should only be called after a stream has failed or been closed.
2203    /// It's useful for implementing custom retry logic or persisting failed records.
2204    ///
2205    /// **Note:** This method returns the unacknowledged records as a vector of `EncodedBatch` items,
2206    /// where each batch corresponds to how records were ingested:
2207    /// - Each `ingest_record()` call creates a single batch containing one record
2208    /// - Each `ingest_records()` call creates a single batch containing multiple records
2209    ///
2210    /// For alternatives, see `ZerobusStream::get_unacked_records()` and `ZerobusSdk::recreate_stream()`.
2211    ///
2212    /// # Returns
2213    ///
2214    /// A vector of `EncodedBatch` items. Records are grouped by their original ingestion call.
2215    pub async fn get_unacked_batches(&self) -> ZerobusResult<Vec<EncodedBatch>> {
2216        if self.is_closed.load(Ordering::Relaxed) {
2217            let failed = self.failed_records.read().await.clone();
2218            return Ok(failed);
2219        }
2220        if let Some(stream_id) = self.stream_id.as_deref() {
2221            error!(stream_id = %stream_id, "Cannot get unacked records from an active stream. Stream must be closed first.");
2222        } else {
2223            error!(
2224                "Cannot get unacked records from an active stream. Stream must be closed first."
2225            );
2226        }
2227        Err(ZerobusError::InvalidStateError(
2228            "Cannot get unacked records from an active stream. Stream must be closed first."
2229                .to_string(),
2230        ))
2231    }
2232}
2233
2234impl Drop for ZerobusStream {
2235    fn drop(&mut self) {
2236        self.is_closed.store(true, Ordering::Relaxed);
2237        self.cancellation_token.cancel();
2238        self.supervisor_task.abort();
2239        if let Some(callback_handler_task) = self.callback_handler_task.take() {
2240            callback_handler_task.abort();
2241        }
2242    }
2243}