databricks-zerobus-ingest-sdk 2.0.0

A high-performance Rust client for streaming data ingestion into Databricks Delta tables using the Zerobus service
Documentation
use thiserror::Error;

/// Represents all possible errors that can occur when using Zerobus.
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ZerobusError {
    /// Returned when the client failed to open a gRPC channel to the Zerobus endpoint.
    #[error("Failed to open a channel: {0}.")]
    ChannelCreationError(String),
    /// Returned when the client failed to create a stream.
    #[error("Failed to create stream: {0}.")]
    CreateStreamError(tonic::Status),
    /// Returned when TLS handshake failed during connection setup.
    #[error("Failed to establish TLS connection.")]
    FailedToEstablishTlsConnectionError,
    /// Returned when the specified Zerobus endpoint is in invalid format.
    #[error("The specified Zerobus endpoint is in invalid format: {0}.")]
    InvalidZerobusEndpointError(String),
    /// Returned when the specified Unity Catalog table name is invalid.
    #[error("Specified UC table name is invalid: {0}.")]
    InvalidTableName(String),
    /// Returned when the specified Unity Catalog endpoint is in invalid format.
    #[error("Specified UC endpoint is in invalid format: {0}.")]
    InvalidUCEndpointError(String),
    /// Returned when the specified Unity Catalog token is invalid.
    #[error("Specified UC token is in invalid format: {0}.")]
    InvalidUCTokenError(String),
    /// Returned when the stream is closed.
    #[error("Stream is closed: {0}")]
    StreamClosedError(tonic::Status),
    /// Returned when the client provided an invalid argument.
    #[error("Invalid argument: {0}.")]
    InvalidArgument(String),
    /// Returned when the server returned an unexpected response.
    #[error("Unexpected response from server. Response: {0}")]
    UnexpectedStreamResponseError(String),
    /// Returned when the stream is in an invalid state for a requested operation.
    #[error("Stream is in invalid state: {0}")]
    InvalidStateError(String),
    /// Returned when a connection or setup operation times out.
    #[error("Connection timeout: {0}")]
    ConnectionTimeout(String),
    /// Returned when OAuth token fetching fails due to network or server errors.
    #[error("Token fetch failed: {0}")]
    TokenFetchError(String),
}

/// List of gRPC status codes that indicate unretriable errors.
const UNRETRIABLE_STATUS_CODES: &[tonic::Code] = &[
    tonic::Code::InvalidArgument,
    tonic::Code::Unauthenticated,
    tonic::Code::PermissionDenied,
    tonic::Code::OutOfRange,
    tonic::Code::Unimplemented,
    tonic::Code::NotFound,
];

impl ZerobusError {
    /// Determines whether this error can be automatically recovered through stream recovery.
    ///
    /// Retryable errors typically indicate transient issues like network failures or
    /// temporary server problems. Non-retryable errors indicate permanent issues like
    /// authentication failures or invalid configurations that require manual intervention.
    ///
    /// # Returns
    ///
    /// `true` if the SDK should attempt automatic recovery, `false` otherwise.
    pub fn is_retryable(&self) -> bool {
        match self {
            ZerobusError::InvalidArgument(_) => false,
            ZerobusError::StreamClosedError(status) => {
                !UNRETRIABLE_STATUS_CODES.contains(&status.code())
            }
            ZerobusError::CreateStreamError(status) => {
                !UNRETRIABLE_STATUS_CODES.contains(&status.code())
            }
            ZerobusError::ChannelCreationError(_) => true,
            ZerobusError::FailedToEstablishTlsConnectionError => true,
            ZerobusError::InvalidZerobusEndpointError(_) => false,
            ZerobusError::InvalidTableName(_) => false,
            ZerobusError::InvalidUCEndpointError(_) => false,
            ZerobusError::InvalidUCTokenError(_) => false,
            ZerobusError::UnexpectedStreamResponseError(_) => true,
            ZerobusError::InvalidStateError(_) => false,
            ZerobusError::ConnectionTimeout(_) => true,
            ZerobusError::TokenFetchError(_) => true,
        }
    }
}