pub mod error;
use std::sync::Arc;
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
pub use error::HtapError;
pub use rhei_core::types::*;
pub use rhei_core::{
CdcConsumer, OlapEngine, OltpEngine, QueryRouter, SchemaRegistry, SyncEngine, TableSchema,
};
#[cfg(feature = "datafusion-backend")]
pub use rhei_olap::{DataFusionEngine, SharedDataFusionEngine, StorageMode};
#[cfg(feature = "duckdb-backend")]
pub use rhei_olap::{DuckDbEngine, SharedDuckDbEngine};
pub use rhei_olap::{OlapBackend, OlapError, OltpBackend, OltpError};
pub use rhei_oltp_rusqlite::{RusqliteCdcProducer, RusqliteEngine, RusqliteOltpError};
pub use rhei_sync::{
spawn_sync_loop, temporalize_schema, CdcSyncEngine, HeuristicRouter, SqlParserRouter,
};
#[cfg(feature = "sidecar")]
pub use rhei_sidecar::{
DeleteDetection, NullWatermarkStore, RocksDbWatermarkStore, SidecarError, SourceConnector,
TimestampCdcConfig, TimestampCdcConsumer, TimestampTableConfig, WatermarkStore,
};
#[cfg(feature = "sidecar")]
pub type SqliteTimestampConsumer =
rhei_sidecar::TimestampCdcConsumer<connector_arrow::rusqlite::SQLiteConnection>;
#[cfg(feature = "sidecar")]
pub type PostgresTimestampConsumer =
rhei_sidecar::TimestampCdcConsumer<connector_arrow::postgres::PostgresConnection>;
#[cfg(feature = "sidecar")]
pub enum SidecarSource {
Sqlite(String),
Postgres(String),
}
#[cfg(feature = "rocksdb-cdc")]
pub use rhei_cdc_rocksdb::{RocksDbCdcConfig, RocksDbCdcError, RocksDbCdcLog};
#[cfg(not(feature = "sidecar"))]
pub enum CdcSource {
Rusqlite(RusqliteCdcProducer),
#[cfg(feature = "rocksdb-cdc")]
RocksDb(Arc<RocksDbCdcLog>),
#[cfg(feature = "rocksdb-cdc")]
RocksDbBridge {
sqlite: RusqliteCdcProducer,
rocksdb: Arc<RocksDbCdcLog>,
},
}
#[cfg(not(feature = "sidecar"))]
impl CdcConsumer for CdcSource {
type Error = HtapError;
async fn poll(
&self,
after_seq: Option<i64>,
limit: u32,
) -> Result<Vec<rhei_core::types::CdcEvent>, Self::Error> {
match self {
Self::Rusqlite(p) => p.poll(after_seq, limit).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDb(p) => p.poll(after_seq, limit).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDbBridge { sqlite, rocksdb } => {
bridge_poll(sqlite, rocksdb, after_seq, limit).await
}
}
}
async fn latest_seq(&self) -> Result<Option<i64>, Self::Error> {
match self {
Self::Rusqlite(p) => p.latest_seq().await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDb(p) => p.latest_seq().await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDbBridge { sqlite, rocksdb } => bridge_latest_seq(sqlite, rocksdb).await,
}
}
async fn prune(&self, up_to_seq: i64) -> Result<u64, Self::Error> {
match self {
Self::Rusqlite(p) => p.prune(up_to_seq).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDb(p) => p.prune(up_to_seq).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDbBridge { rocksdb, .. } => {
rocksdb.prune(up_to_seq).await.map_err(HtapError::from)
}
}
}
}
#[cfg(feature = "sidecar")]
pub enum CdcSource {
Rusqlite(RusqliteCdcProducer),
SidecarSqlite(SqliteTimestampConsumer),
SidecarPostgres(PostgresTimestampConsumer),
#[cfg(feature = "rocksdb-cdc")]
RocksDb(Arc<RocksDbCdcLog>),
#[cfg(feature = "rocksdb-cdc")]
RocksDbBridge {
sqlite: RusqliteCdcProducer,
rocksdb: Arc<RocksDbCdcLog>,
},
}
#[cfg(feature = "sidecar")]
impl CdcConsumer for CdcSource {
type Error = HtapError;
async fn poll(
&self,
after_seq: Option<i64>,
limit: u32,
) -> Result<Vec<rhei_core::types::CdcEvent>, Self::Error> {
match self {
Self::Rusqlite(p) => p.poll(after_seq, limit).await.map_err(HtapError::from),
Self::SidecarSqlite(p) => p.poll(after_seq, limit).await.map_err(HtapError::from),
Self::SidecarPostgres(p) => p.poll(after_seq, limit).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDb(p) => p.poll(after_seq, limit).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDbBridge { sqlite, rocksdb } => {
bridge_poll(sqlite, rocksdb, after_seq, limit).await
}
}
}
async fn latest_seq(&self) -> Result<Option<i64>, Self::Error> {
match self {
Self::Rusqlite(p) => p.latest_seq().await.map_err(HtapError::from),
Self::SidecarSqlite(p) => p.latest_seq().await.map_err(HtapError::from),
Self::SidecarPostgres(p) => p.latest_seq().await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDb(p) => p.latest_seq().await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDbBridge { sqlite, rocksdb } => bridge_latest_seq(sqlite, rocksdb).await,
}
}
async fn prune(&self, up_to_seq: i64) -> Result<u64, Self::Error> {
match self {
Self::Rusqlite(p) => p.prune(up_to_seq).await.map_err(HtapError::from),
Self::SidecarSqlite(p) => p.prune(up_to_seq).await.map_err(HtapError::from),
Self::SidecarPostgres(p) => p.prune(up_to_seq).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDb(p) => p.prune(up_to_seq).await.map_err(HtapError::from),
#[cfg(feature = "rocksdb-cdc")]
Self::RocksDbBridge { rocksdb, .. } => {
rocksdb.prune(up_to_seq).await.map_err(HtapError::from)
}
}
}
}
#[cfg(feature = "rocksdb-cdc")]
async fn bridge_latest_seq(
sqlite: &RusqliteCdcProducer,
rocksdb: &Arc<RocksDbCdcLog>,
) -> Result<Option<i64>, HtapError> {
use rhei_core::CdcConsumer as _;
let rocksdb_seq = rocksdb.latest_seq().await.map_err(HtapError::from)?;
let sqlite_latest = sqlite.latest_seq().await.map_err(HtapError::from)?;
let pending_count: i64 = match sqlite_latest {
None => 0,
Some(s) => {
let watermark = rocksdb
.get_bridge_watermark()
.map_err(HtapError::from)?
.unwrap_or(0);
(s - watermark).max(0)
}
};
if pending_count == 0 {
return Ok(rocksdb_seq);
}
let base = rocksdb_seq.unwrap_or(0);
Ok(Some(base + pending_count))
}
#[cfg(feature = "rocksdb-cdc")]
async fn bridge_poll(
sqlite: &RusqliteCdcProducer,
rocksdb: &Arc<RocksDbCdcLog>,
after_seq: Option<i64>,
limit: u32,
) -> Result<Vec<rhei_core::types::CdcEvent>, HtapError> {
use rhei_core::CdcConsumer as _;
let bridge_watermark = rocksdb.get_bridge_watermark().map_err(HtapError::from)?;
let mut new_events = sqlite
.poll(bridge_watermark, limit)
.await
.map_err(HtapError::from)?;
if !new_events.is_empty() {
let max_sqlite_seq = new_events.iter().map(|e| e.seq).max().unwrap_or(0);
rocksdb
.append_and_set_bridge_watermark(&mut new_events, max_sqlite_seq)
.map_err(HtapError::from)?;
if let Err(e) = sqlite.prune(max_sqlite_seq).await {
warn!(
max_sqlite_seq,
error = %e,
"bridge_poll: sqlite.prune failed (best-effort); \
bridge watermark is committed, events will not be re-bridged"
);
}
}
rocksdb
.poll(after_seq, limit)
.await
.map_err(HtapError::from)
}
type SyncEngineType = CdcSyncEngine<CdcSource, OlapBackend>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OlapBackendType {
#[cfg(feature = "duckdb-backend")]
DuckDb,
#[cfg(feature = "datafusion-backend")]
DataFusion,
}
pub struct HtapConfig {
pub oltp_path: String,
pub olap_in_memory: bool,
pub olap_path: Option<String>,
pub sync_batch_size: u32,
pub prune_after_sync: bool,
pub sync_interval: Option<Duration>,
pub olap_backend: OlapBackendType,
pub read_pool_size: usize,
pub sync_mode: SyncMode,
#[cfg(feature = "datafusion-backend")]
pub datafusion_storage: rhei_olap::StorageMode,
#[cfg(feature = "sidecar")]
pub sidecar: Option<SidecarConfig>,
#[cfg(feature = "rocksdb-cdc")]
pub rocksdb_cdc_path: Option<String>,
pub schema_registry_path: Option<String>,
}
#[cfg(feature = "sidecar")]
pub struct SidecarConfig {
pub source: SidecarSource,
pub timestamp_config: rhei_sidecar::TimestampCdcConfig,
pub enable_local_oltp: bool,
pub watermark_path: Option<String>,
}
impl Default for HtapConfig {
fn default() -> Self {
Self {
oltp_path: "rhei.db".to_string(),
olap_in_memory: true,
olap_path: None,
sync_batch_size: 1000,
prune_after_sync: true,
sync_interval: None,
#[cfg(feature = "datafusion-backend")]
olap_backend: OlapBackendType::DataFusion,
#[cfg(all(feature = "duckdb-backend", not(feature = "datafusion-backend")))]
olap_backend: OlapBackendType::DuckDb,
read_pool_size: 4,
sync_mode: SyncMode::default(),
#[cfg(feature = "datafusion-backend")]
datafusion_storage: rhei_olap::StorageMode::InMemory,
#[cfg(feature = "sidecar")]
sidecar: None,
#[cfg(feature = "rocksdb-cdc")]
rocksdb_cdc_path: None,
schema_registry_path: None,
}
}
}
enum SyncLoopState {
Stopped,
Running {
handle: tokio::task::JoinHandle<()>,
cancel: CancellationToken,
},
}
pub struct HtapEngine {
oltp: Option<OltpBackend>,
olap: OlapBackend,
router: SqlParserRouter,
sync_engine: Arc<SyncEngineType>,
schema_registry: SchemaRegistry,
sync_loop: SyncLoopState,
sync_mode: SyncMode,
uses_local_cdc: bool,
ddl_lock: Arc<tokio::sync::RwLock<()>>,
schema_registry_path: Option<String>,
}
impl HtapEngine {
pub async fn new(config: HtapConfig) -> Result<Self, HtapError> {
let olap = Self::create_olap_backend(&config)?;
let schema_registry = if let Some(ref path) = config.schema_registry_path {
SchemaRegistry::load_from_disk(path).map_err(|e| {
HtapError::Other(format!("failed to load schema registry from '{path}': {e}"))
})?
} else {
SchemaRegistry::new()
};
#[cfg(feature = "sidecar")]
if config.sidecar.is_some() {
return Self::new_sidecar(config, olap, schema_registry).await;
}
Self::new_standard(config, olap, schema_registry).await
}
async fn new_standard(
config: HtapConfig,
olap: OlapBackend,
schema_registry: SchemaRegistry,
) -> Result<Self, HtapError> {
let rusqlite_engine =
RusqliteEngine::new_local(&config.oltp_path, config.read_pool_size).await?;
let cdc_conn = rusqlite_engine.new_connection().await?;
rhei_oltp_rusqlite::cdc_setup::ensure_cdc_log_table(&cdc_conn).await?;
info!("trigger-based CDC enabled (rusqlite)");
let cdc_producer = RusqliteCdcProducer::new(cdc_conn);
let oltp = OltpBackend::Rusqlite(rusqlite_engine);
#[cfg(feature = "rocksdb-cdc")]
let cdc_source = if let Some(ref path) = config.rocksdb_cdc_path {
let rocksdb_config = rhei_cdc_rocksdb::RocksDbCdcConfig {
path: path.clone(),
create_if_missing: true,
};
let rocksdb_log = rhei_cdc_rocksdb::RocksDbCdcLog::open(&rocksdb_config)?;
info!(
path = path.as_str(),
"RocksDB CDC bridge enabled (SQLite triggers -> RocksDB durable log)"
);
CdcSource::RocksDbBridge {
sqlite: cdc_producer,
rocksdb: Arc::new(rocksdb_log),
}
} else {
CdcSource::Rusqlite(cdc_producer)
};
#[cfg(not(feature = "rocksdb-cdc"))]
let cdc_source = CdcSource::Rusqlite(cdc_producer);
let sync_engine = Arc::new(
CdcSyncEngine::new(
cdc_source,
olap.clone(),
schema_registry.clone(),
config.sync_batch_size,
)
.with_prune_after_sync(config.prune_after_sync)
.with_sync_mode(config.sync_mode),
);
let backend_name = olap.backend_name();
info!(
oltp_path = config.oltp_path.as_str(),
olap_in_memory = config.olap_in_memory,
olap_backend = backend_name,
oltp_backend = "rusqlite",
prune_after_sync = config.prune_after_sync,
"HTAP engine initialized"
);
let mut engine = Self {
oltp: Some(oltp),
olap,
router: SqlParserRouter::new(),
sync_engine,
schema_registry,
sync_loop: SyncLoopState::Stopped,
sync_mode: config.sync_mode,
uses_local_cdc: true,
ddl_lock: Arc::new(tokio::sync::RwLock::new(())),
schema_registry_path: config.schema_registry_path,
};
if let Some(interval) = config.sync_interval {
engine.start_sync(interval);
}
Ok(engine)
}
#[cfg(feature = "sidecar")]
async fn new_sidecar(
config: HtapConfig,
olap: OlapBackend,
schema_registry: SchemaRegistry,
) -> Result<Self, HtapError> {
let sidecar_config = config
.sidecar
.as_ref()
.ok_or_else(|| HtapError::Other("sidecar config required for new_sidecar".into()))?;
let watermark_store: Box<dyn rhei_sidecar::WatermarkStore> =
if let Some(ref wm_path) = sidecar_config.watermark_path {
info!(path = wm_path.as_str(), "opening RocksDB watermark store");
Box::new(
rhei_sidecar::RocksDbWatermarkStore::open(wm_path).map_err(|e| {
HtapError::Other(format!("failed to open watermark store: {e}"))
})?,
)
} else {
Box::new(rhei_sidecar::NullWatermarkStore)
};
let (cdc_source, source_description) = match &sidecar_config.source {
SidecarSource::Sqlite(path) => {
use connector_arrow::rusqlite::{rusqlite, SQLiteConnection};
let raw_conn = rusqlite::Connection::open(path)
.map_err(|e| HtapError::Other(format!("failed to open SQLite source: {e}")))?;
let sqlite_conn = SQLiteConnection::new(raw_conn);
let consumer = rhei_sidecar::TimestampCdcConsumer::try_with_watermark_store(
sqlite_conn,
sidecar_config.timestamp_config.clone(),
watermark_store,
)
.map_err(|e| HtapError::Other(format!("failed to load watermark state: {e}")))?;
(CdcSource::SidecarSqlite(consumer), format!("sqlite:{path}"))
}
SidecarSource::Postgres(conn_str) => {
use connector_arrow::postgres::{postgres::NoTls, PostgresConnection};
let conn_str_owned = conn_str.clone();
let pg_client = tokio::task::spawn_blocking(move || {
connector_arrow::postgres::postgres::Client::connect(&conn_str_owned, NoTls)
})
.await
.map_err(|e| HtapError::Other(format!("spawn_blocking panicked: {e}")))?
.map_err(|e| HtapError::Other(format!("failed to open PostgreSQL source: {e}")))?;
let pg_conn = PostgresConnection::new(pg_client);
let consumer = rhei_sidecar::TimestampCdcConsumer::try_with_watermark_store(
pg_conn,
sidecar_config.timestamp_config.clone(),
watermark_store,
)
.map_err(|e| HtapError::Other(format!("failed to load watermark state: {e}")))?;
(
CdcSource::SidecarPostgres(consumer),
"postgres:<connection_string>".to_string(),
)
}
};
let oltp: Option<OltpBackend> = if sidecar_config.enable_local_oltp {
Some(OltpBackend::Rusqlite(
RusqliteEngine::new_local(&config.oltp_path, config.read_pool_size).await?,
))
} else {
None
};
let sync_engine = Arc::new(
CdcSyncEngine::new(
cdc_source,
olap.clone(),
schema_registry.clone(),
config.sync_batch_size,
)
.with_prune_after_sync(false) .with_sync_mode(config.sync_mode),
);
info!(
source = source_description.as_str(),
enable_local_oltp = sidecar_config.enable_local_oltp,
sync_mode = ?config.sync_mode,
"sidecar engine initialized"
);
let mut engine = Self {
oltp,
olap,
router: SqlParserRouter::new(),
sync_engine,
schema_registry,
sync_loop: SyncLoopState::Stopped,
sync_mode: config.sync_mode,
uses_local_cdc: false, ddl_lock: Arc::new(tokio::sync::RwLock::new(())),
schema_registry_path: config.schema_registry_path,
};
if let Some(interval) = config.sync_interval {
engine.start_sync(interval);
}
Ok(engine)
}
fn create_olap_backend(config: &HtapConfig) -> Result<OlapBackend, HtapError> {
match config.olap_backend {
#[cfg(feature = "duckdb-backend")]
OlapBackendType::DuckDb => {
let engine = if config.olap_in_memory {
DuckDbEngine::in_memory().map_err(rhei_olap::OlapError::from)?
} else {
let path = config.olap_path.as_deref().unwrap_or("rhei_olap.duckdb");
DuckDbEngine::persistent(path).map_err(rhei_olap::OlapError::from)?
};
Ok(OlapBackend::DuckDb(SharedDuckDbEngine::new(engine)))
}
#[cfg(feature = "datafusion-backend")]
OlapBackendType::DataFusion => {
let engine = DataFusionEngine::with_storage(config.datafusion_storage.clone())
.map_err(rhei_olap::OlapError::from)?;
Ok(OlapBackend::DataFusion(SharedDataFusionEngine::new(engine)))
}
}
}
pub async fn register_table(&self, schema: TableSchema) -> Result<(), HtapError> {
let _ddl_guard = self.ddl_lock.write().await;
schema.validate()?;
if let Ok(existing) = self.schema_registry.get(&schema.name) {
if existing.arrow_schema == schema.arrow_schema
&& existing.primary_key == schema.primary_key
{
debug!(
table = schema.name.as_str(),
"register_table: table already registered with matching schema, skipping"
);
return Ok(());
}
return Err(HtapError::Other(format!(
"table '{}' is already registered with a different schema; \
use add_column/drop_column for schema evolution",
schema.name
)));
}
let table_name = schema.name.clone();
let arrow_schema = schema.arrow_schema.clone();
let olap_schema = if self.sync_mode == SyncMode::Temporal {
for reserved in &["_rhei_valid_from", "_rhei_valid_to", "_rhei_operation"] {
if arrow_schema.field_with_name(reserved).is_ok() {
return Err(HtapError::Other(format!(
"table '{}' already contains reserved temporal column '{}'",
table_name, reserved
)));
}
}
temporalize_schema(&arrow_schema)
} else {
arrow_schema
};
let create_pk: &[String] = match self.sync_mode {
SyncMode::Destructive => &schema.primary_key,
SyncMode::Temporal => &[],
};
self.olap
.create_table(&table_name, &olap_schema, create_pk)
.await?;
if self.uses_local_cdc {
if let Some(ref oltp) = self.oltp {
let OltpBackend::Rusqlite(engine) = oltp;
rhei_oltp_rusqlite::cdc_setup::setup_cdc(&engine.connection(), &table_name).await?;
}
}
self.schema_registry.register(schema)?;
if let Some(ref path) = self.schema_registry_path {
if let Err(e) = self.schema_registry.save_to_disk(path) {
warn!(
path = path.as_str(),
error = %e,
"failed to persist schema registry after register_table"
);
}
}
info!(
table = table_name.as_str(),
"table registered for HTAP replication"
);
Ok(())
}
pub async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, HtapError> {
let target = self.router.route(sql);
#[cfg(feature = "metrics")]
match target {
QueryTarget::Oltp => metrics::counter!("rhei.query.routed_oltp").increment(1),
QueryTarget::Olap => metrics::counter!("rhei.query.routed_olap").increment(1),
};
match target {
QueryTarget::Oltp => {
if let Some(ref oltp) = self.oltp {
let batches = oltp.query(sql, &[]).await?;
Ok(batches)
} else {
let upper = sql.trim_start().to_uppercase();
if upper.starts_with("SELECT")
|| upper.starts_with("WITH")
|| upper.starts_with("EXPLAIN")
{
let batches = self.olap.query(sql).await?;
Ok(batches)
} else {
Err(HtapError::OltpNotAvailable)
}
}
}
QueryTarget::Olap => {
let batches = self.olap.query(sql).await?;
Ok(batches)
}
}
}
pub async fn query_with_hint(
&self,
sql: &str,
hint: QueryHint,
) -> Result<Vec<RecordBatch>, HtapError> {
match hint {
QueryHint::ForceOltp => {
let oltp = self.oltp.as_ref().ok_or(HtapError::OltpNotAvailable)?;
let batches = oltp.query(sql, &[]).await?;
Ok(batches)
}
QueryHint::ForceOlap => {
let batches = self.olap.query(sql).await?;
Ok(batches)
}
QueryHint::Auto => self.query(sql).await,
}
}
pub async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64, HtapError> {
let oltp = self.oltp.as_ref().ok_or(HtapError::OltpNotAvailable)?;
let rows = oltp.execute(sql, params).await?;
Ok(rows)
}
pub async fn execute_batch(
&self,
statements: &[(String, Vec<serde_json::Value>)],
) -> Result<(), HtapError> {
let oltp = self.oltp.as_ref().ok_or(HtapError::OltpNotAvailable)?;
oltp.execute_batch(statements).await?;
Ok(())
}
pub async fn sync_now(&self) -> Result<SyncResult, HtapError> {
let _guard = self.ddl_lock.read().await;
let result = self.sync_engine.sync_once().await?;
Ok(result)
}
pub fn start_sync(&mut self, interval: Duration) {
if let SyncLoopState::Running { cancel, handle } =
std::mem::replace(&mut self.sync_loop, SyncLoopState::Stopped)
{
cancel.cancel();
handle.abort();
}
let cancel = CancellationToken::new();
let handle = spawn_sync_loop(
Arc::clone(&self.sync_engine),
interval,
cancel.clone(),
Arc::clone(&self.ddl_lock),
);
self.sync_loop = SyncLoopState::Running { handle, cancel };
}
pub async fn stop_sync(&mut self) {
if let SyncLoopState::Running { cancel, handle } =
std::mem::replace(&mut self.sync_loop, SyncLoopState::Stopped)
{
cancel.cancel();
let _ = handle.await;
}
}
pub fn is_sync_running(&self) -> bool {
matches!(&self.sync_loop, SyncLoopState::Running { handle, .. } if !handle.is_finished())
}
pub async fn sync_status(&self) -> Result<SyncStatus, HtapError> {
let mut status = self.sync_engine.status().await?;
status.running = self.is_sync_running();
#[cfg(feature = "metrics")]
metrics::gauge!("rhei.sync.cdc_lag").set(status.lag as f64);
Ok(status)
}
pub fn schema_registry(&self) -> &SchemaRegistry {
&self.schema_registry
}
pub fn oltp(&self) -> Option<&OltpBackend> {
self.oltp.as_ref()
}
pub fn oltp_rusqlite(&self) -> Option<&RusqliteEngine> {
self.oltp.as_ref().and_then(OltpBackend::as_rusqlite)
}
pub fn olap(&self) -> &OlapBackend {
&self.olap
}
pub async fn initial_sync(&self, table_name: &str) -> Result<u64, HtapError> {
rhei_core::validate_identifier(table_name)?;
let schema = self.schema_registry.get(table_name)?;
info!(table = table_name, "starting initial sync (full load)");
let oltp = self.oltp.as_ref().ok_or(HtapError::OltpNotAvailable)?;
let sql = format!("SELECT * FROM {table_name}");
let batches = oltp.query(&sql, &[]).await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
if batches.is_empty() || total_rows == 0 {
info!(
table = table_name,
rows = 0,
"initial sync complete (empty table)"
);
return Ok(0);
}
debug_assert!(
batches.iter().all(|b| b.num_columns() == batches[0].num_columns()),
"OltpEngine::query returned batches with inconsistent column counts for table '{table_name}'"
);
let col_indices: Vec<usize> = {
let first_schema = batches[0].schema();
schema
.arrow_schema
.fields()
.iter()
.map(|f| {
first_schema.index_of(f.name()).map_err(|_| {
HtapError::Other(format!(
"column '{}' not found in OLTP result for table '{}'",
f.name(),
table_name
))
})
})
.collect::<Result<_, _>>()?
};
let mut aligned_batches = Vec::with_capacity(batches.len());
for batch in &batches {
let columns: Vec<_> = col_indices
.iter()
.map(|&i| batch.column(i).clone())
.collect();
aligned_batches.push(
RecordBatch::try_new(schema.arrow_schema.clone(), columns)
.map_err(|e| HtapError::Other(e.to_string()))?,
);
}
let rows = self.olap.load_arrow(table_name, &aligned_batches).await?;
info!(table = table_name, rows, "initial sync complete");
Ok(rows)
}
pub async fn initial_sync_all(&self) -> Result<u64, HtapError> {
let table_names = self.schema_registry.table_names();
let mut total = 0u64;
for name in &table_names {
total += self.initial_sync(name).await?;
}
Ok(total)
}
pub async fn add_column(
&self,
table_name: &str,
column_name: &str,
data_type: arrow::datatypes::DataType,
) -> Result<(), HtapError> {
let _ddl_guard = self.ddl_lock.write().await;
info!(table = table_name, column = column_name, "adding column");
rhei_core::validate_identifier(column_name)?;
let existing = self.schema_registry.get(table_name)?;
if existing.arrow_schema.field_with_name(column_name).is_ok() {
return Err(rhei_core::CoreError::SchemaValidation(format!(
"column '{}' already exists in table '{}'",
column_name, table_name
))
.into());
}
drop(existing);
self.olap
.add_column(table_name, column_name, &data_type)
.await?;
if self.uses_local_cdc {
if let Some(ref oltp) = self.oltp {
self.teardown_cdc_triggers(oltp, table_name).await?;
if let Err(e) = self.setup_cdc_triggers(oltp, table_name).await {
warn!(
table = table_name,
error = %e,
"add_column: setup_cdc failed after teardown; restoring triggers"
);
let _ = self.setup_cdc_triggers(oltp, table_name).await;
return Err(e);
}
}
}
self.schema_registry
.add_column(table_name, column_name, data_type)?;
info!(
table = table_name,
column = column_name,
"column added and CDC updated"
);
Ok(())
}
pub async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), HtapError> {
let _ddl_guard = self.ddl_lock.write().await;
info!(table = table_name, column = column_name, "dropping column");
rhei_core::validate_identifier(column_name)?;
let schema = self.schema_registry.get(table_name)?;
if schema.primary_key.contains(&column_name.to_string()) {
return Err(rhei_core::CoreError::SchemaValidation(format!(
"cannot drop primary key column '{}' from table '{}'",
column_name, table_name
))
.into());
}
if schema.arrow_schema.field_with_name(column_name).is_err() {
return Err(rhei_core::CoreError::SchemaValidation(format!(
"column '{}' not found in table '{}'",
column_name, table_name
))
.into());
}
drop(schema);
if self.oltp.is_none() {
self.olap.drop_column(table_name, column_name).await?;
self.schema_registry.drop_column(table_name, column_name)?;
info!(table = table_name, column = column_name, "column dropped");
return Ok(());
}
let oltp = self.oltp.as_ref().ok_or(HtapError::OltpNotAvailable)?;
if self.uses_local_cdc {
self.teardown_cdc_triggers(oltp, table_name).await?;
}
let result = async {
self.execute(
&format!("ALTER TABLE {table_name} DROP COLUMN {column_name}"),
&[],
)
.await?;
self.olap.drop_column(table_name, column_name).await?;
if self.uses_local_cdc {
self.setup_cdc_triggers(oltp, table_name).await?;
}
self.schema_registry.drop_column(table_name, column_name)?;
Ok::<(), HtapError>(())
}
.await;
if let Err(ref e) = result {
warn!(
table = table_name,
error = %e,
"drop_column failed after CDC teardown; attempting to restore triggers"
);
if self.uses_local_cdc {
if let Err(rebuild_err) = self.setup_cdc_triggers(oltp, table_name).await {
warn!(
table = table_name,
error = %rebuild_err,
"failed to restore CDC triggers after drop_column error; \
table may need manual re-registration"
);
}
}
} else {
info!(
table = table_name,
column = column_name,
"column dropped and CDC triggers updated"
);
}
result
}
async fn setup_cdc_triggers(
&self,
oltp: &OltpBackend,
table_name: &str,
) -> Result<(), HtapError> {
let OltpBackend::Rusqlite(engine) = oltp;
rhei_oltp_rusqlite::cdc_setup::setup_cdc(&engine.connection(), table_name).await?;
Ok(())
}
async fn teardown_cdc_triggers(
&self,
oltp: &OltpBackend,
table_name: &str,
) -> Result<(), HtapError> {
let OltpBackend::Rusqlite(engine) = oltp;
rhei_oltp_rusqlite::cdc_setup::teardown_cdc(&engine.connection(), table_name).await?;
Ok(())
}
pub async fn shutdown(&mut self) {
self.stop_sync().await;
info!("HTAP engine shut down");
}
}
impl Drop for HtapEngine {
fn drop(&mut self) {
if let SyncLoopState::Running { cancel, handle } =
std::mem::replace(&mut self.sync_loop, SyncLoopState::Stopped)
{
cancel.cancel();
handle.abort();
}
}
}