rhei-sidecar 1.5.0

Sidecar CDC consumer for Rhei — polls external databases by timestamp columns
Documentation
//! The [`SourceConnector`] trait and its built-in implementations.
//!
//! A `SourceConnector` wraps the synchronous `connector_arrow` query API
//! behind a single `query` method so that [`crate::TimestampCdcConsumer`] can
//! call it from a `tokio::task::spawn_blocking` closure without depending on
//! any particular backend.

use arrow::record_batch::RecordBatch;

use crate::error::SidecarError;

/// Abstraction over an external database connection used by `TimestampCdcConsumer<S>`.
///
/// Implementors wrap the synchronous `connector_arrow` query API and are
/// called from `tokio::task::spawn_blocking`. Because all implementations are
/// driven from a single background thread per consumer instance, methods may
/// use `&mut self` (no interior-mutability requirement).
///
/// # Contract
///
/// - **`Send + 'static`** — the connector is moved into a
///   `tokio::task::spawn_blocking` closure and must therefore be both `Send`
///   and free of non-`'static` borrows.
/// - **Read-only** — the poll queries issued by the consumer are `SELECT`
///   statements; implementations should not permit write operations.
/// - **Batch semantics** — the returned `Vec<RecordBatch>` may contain any
///   number of batches (including zero). The consumer flattens them into
///   individual [`rhei_core::CdcEvent`]s.
/// - **No ordering guarantee** — the connector returns rows in whatever order
///   the underlying database produces them; ordering is enforced by the
///   `ORDER BY` clause injected by the consumer.
///
/// # Built-in implementations
///
/// | Type | Feature |
/// |------|---------|
/// | `connector_arrow::rusqlite::SQLiteConnection` | `sqlite` (default) |
/// | `connector_arrow::postgres::PostgresConnection` | `postgres` |
pub trait SourceConnector: Send + 'static {
    /// Execute a read-only SQL query and return all result rows as Arrow
    /// [`RecordBatch`]es.
    ///
    /// The `sql` string is a complete `SELECT` statement generated by
    /// [`crate::TimestampCdcConsumer`]; implementations should forward it
    /// verbatim to the underlying driver.
    fn query(&mut self, sql: &str) -> Result<Vec<RecordBatch>, SidecarError>;
}

#[cfg(feature = "sqlite")]
impl SourceConnector for connector_arrow::rusqlite::SQLiteConnection {
    fn query(&mut self, sql: &str) -> Result<Vec<RecordBatch>, SidecarError> {
        connector_arrow::query(self, sql).map_err(SidecarError::from)
    }
}

#[cfg(feature = "postgres")]
impl SourceConnector for connector_arrow::postgres::PostgresConnection {
    fn query(&mut self, sql: &str) -> Result<Vec<RecordBatch>, SidecarError> {
        connector_arrow::query(self, sql).map_err(SidecarError::from)
    }
}