Skip to main content

rhei_sync/
sync_engine.rs

1//! Core CDC-to-OLAP sync engine.
2//!
3//! [`CdcSyncEngine`] polls the CDC consumer, groups consecutive INSERT events
4//! into `SyncOp::BatchInsert` groups, and applies them to the OLAP engine
5//! using either the Arrow-native path or a SQL fallback.
6//!
7//! ## Transaction semantics
8//!
9//! When [`rhei_core::OlapEngine::supports_transactions`] returns `true` (e.g.
10//! DuckDB), each sync cycle is wrapped in `BEGIN TRANSACTION … COMMIT`.  A
11//! failure triggers `ROLLBACK` and the watermark is *not* advanced, so the next
12//! cycle retries from the same position.
13//!
14//! When the backend does not support transactions (e.g. DataFusion), statements
15//! are committed one at a time.  On failure the watermark is advanced to the
16//! last successfully applied event so that the next cycle does not re-apply
17//! already-committed changes — **partial-failure recovery**.
18//!
19//! ## Stale-schema handling
20//!
21//! If the OLAP engine rejects a statement with a message that matches known
22//! column-not-found patterns, the event (or batch) is skipped with a warning
23//! and the watermark advances past it.  This prevents a single stale-schema
24//! event from blocking the entire sync pipeline.
25
26use std::sync::atomic::{AtomicI64, Ordering};
27
28use rhei_core::types::{CdcEvent, CdcOperation, SyncMode, SyncResult, SyncStatus};
29use tracing::{debug, warn};
30
31// Sentinel value: sync engine has never completed a sync cycle.
32const NEVER_SYNCED: i64 = -1;
33
34use crate::converter::{build_batch_insert, cdc_event_to_dml, cdc_events_to_batch};
35use crate::error::SyncError;
36use crate::temporal_converter::{
37    build_temporal_batch_insert, cdc_event_to_temporal_dml, cdc_events_to_temporal_batch,
38};
39
40/// Patterns indicating a stale-schema error (column dropped/added since CDC event was logged).
41/// These are substring matches against OLAP engine error messages.
42/// "column" + "not found" avoids matching "table not found" (which is a different error class).
43const STALE_SCHEMA_PATTERNS: &[&str] = &["column", "No field named", "schema mismatch"];
44
45fn is_stale_schema_error(msg: &str) -> bool {
46    // "column ... not found" covers DuckDB/DataFusion column errors without matching "table not found"
47    (msg.contains("not found") && STALE_SCHEMA_PATTERNS.iter().any(|p| msg.contains(p)))
48        || msg.contains("No field named")
49        || msg.contains("schema mismatch")
50        || msg.contains("not in INSERT column list")
51}
52
53/// CDC-based sync engine that replicates changes from OLTP to OLAP.
54///
55/// Polls the CDC consumer for new events, converts them to DML,
56/// and applies them to the OLAP engine.
57///
58/// Features:
59/// - **Batch INSERT**: Consecutive INSERT events for the same table are grouped
60///   into a single multi-row INSERT for better OLAP performance.
61/// - **CDC pruning**: Optionally prunes processed CDC events after successful sync
62///   to prevent unbounded growth of the `_rhei_cdc_log` table.
63/// - **Transaction safety**: Each sync cycle is wrapped in a BEGIN/COMMIT on the
64///   OLAP side with ROLLBACK on failure. Note: the actual transactional guarantee
65///   is backend-dependent — DuckDB supports real transactions, while DataFusion
66///   treats BEGIN/COMMIT/ROLLBACK as no-ops, so a mid-cycle failure may leave
67///   OLAP partially updated.
68pub struct CdcSyncEngine<C, O> {
69    cdc: C,
70    olap: O,
71    schema_registry: rhei_core::SchemaRegistry,
72    /// Last synced CDC sequence number, or `NEVER_SYNCED` if no sync has occurred.
73    last_synced_seq: AtomicI64,
74    batch_size: u32,
75    /// Whether to prune processed CDC events after a successful sync.
76    prune_after_sync: bool,
77    /// Sync mode: Destructive (default) or Temporal (SCD Type 2).
78    sync_mode: SyncMode,
79}
80
81impl<C, O> CdcSyncEngine<C, O>
82where
83    C: rhei_core::CdcConsumer,
84    O: rhei_core::OlapEngine,
85{
86    /// Create a new [`CdcSyncEngine`].
87    ///
88    /// - `cdc` — CDC consumer that provides change events (e.g. SQLite trigger log).
89    /// - `olap` — OLAP engine that receives the converted DML.
90    /// - `schema_registry` — registry of table schemas used to build WHERE clauses
91    ///   and Arrow batches.
92    /// - `batch_size` — maximum number of CDC events to fetch per [`sync_once`](rhei_core::SyncEngine::sync_once) call.
93    ///
94    /// CDC pruning is enabled by default (`prune_after_sync = true`).
95    /// Sync mode defaults to [`rhei_core::types::SyncMode::Destructive`].
96    pub fn new(
97        cdc: C,
98        olap: O,
99        schema_registry: rhei_core::SchemaRegistry,
100        batch_size: u32,
101    ) -> Self {
102        Self {
103            cdc,
104            olap,
105            schema_registry,
106            last_synced_seq: AtomicI64::new(NEVER_SYNCED),
107            batch_size,
108            prune_after_sync: true,
109            sync_mode: SyncMode::default(),
110        }
111    }
112
113    /// Set whether to prune processed CDC events after sync.
114    pub fn with_prune_after_sync(mut self, prune: bool) -> Self {
115        self.prune_after_sync = prune;
116        self
117    }
118
119    /// Set the sync mode: Destructive (default) or Temporal (SCD Type 2).
120    pub fn with_sync_mode(mut self, mode: SyncMode) -> Self {
121        self.sync_mode = mode;
122        self
123    }
124}
125
126/// Group consecutive INSERT events by table name for batch processing.
127///
128/// Returns a list of operations to execute in order. Each operation is either:
129/// - A batch of INSERT events for the same table (to be combined into one INSERT)
130/// - A single UPDATE or DELETE event
131enum SyncOp<'a> {
132    /// A batch of consecutive INSERT events for the same table.
133    BatchInsert {
134        table: &'a str,
135        events: Vec<&'a CdcEvent>,
136    },
137    /// A single UPDATE or DELETE event.
138    Single(&'a CdcEvent),
139}
140
141/// Group events into SyncOps, batching consecutive same-table INSERTs.
142fn group_events(events: &[CdcEvent]) -> Vec<SyncOp<'_>> {
143    let mut ops: Vec<SyncOp<'_>> = Vec::new();
144    let mut i = 0;
145
146    while i < events.len() {
147        let event = &events[i];
148
149        if event.operation == CdcOperation::Insert {
150            // Collect consecutive INSERTs for the same table
151            let table = &event.table;
152            let mut batch: Vec<&CdcEvent> = vec![event];
153            let mut j = i + 1;
154            while j < events.len()
155                && events[j].operation == CdcOperation::Insert
156                && events[j].table == *table
157            {
158                batch.push(&events[j]);
159                j += 1;
160            }
161            ops.push(SyncOp::BatchInsert {
162                table,
163                events: batch,
164            });
165            i = j;
166        } else {
167            ops.push(SyncOp::Single(event));
168            i += 1;
169        }
170    }
171
172    ops
173}
174
175impl<C, O> rhei_core::SyncEngine for CdcSyncEngine<C, O>
176where
177    C: rhei_core::CdcConsumer,
178    O: rhei_core::OlapEngine,
179{
180    type Error = SyncError;
181
182    async fn sync_once(&self) -> Result<SyncResult, Self::Error> {
183        let raw_seq = self.last_synced_seq.load(Ordering::Relaxed);
184        let after_seq = if raw_seq == NEVER_SYNCED {
185            None
186        } else {
187            Some(raw_seq)
188        };
189
190        // Poll CDC events
191        let events = self
192            .cdc
193            .poll(after_seq, self.batch_size)
194            .await
195            .map_err(|e| SyncError::Cdc(e.to_string()))?;
196
197        if events.is_empty() {
198            return Ok(SyncResult {
199                events_processed: 0,
200                rows_inserted: 0,
201                rows_updated: 0,
202                rows_deleted: 0,
203                last_seq: after_seq,
204                pruned_count: None,
205            });
206        }
207
208        debug!(count = events.len(), "processing CDC events");
209
210        let mut rows_inserted: u64 = 0;
211        let mut rows_updated: u64 = 0;
212        let mut rows_deleted: u64 = 0;
213        let mut last_seq = after_seq;
214
215        // Group consecutive same-table INSERTs into batches
216        let ops = group_events(&events);
217
218        // Wrap the entire batch in an OLAP-side transaction if supported.
219        // Backends like DataFusion treat BEGIN/COMMIT/ROLLBACK as no-ops, so we
220        // skip them and rely on per-statement idempotency + seq-based recovery.
221        let use_transaction = self.olap.supports_transactions();
222        if use_transaction {
223            self.olap
224                .execute("BEGIN TRANSACTION")
225                .await
226                .map_err(|e| SyncError::Olap(e.to_string()))?;
227        }
228
229        let result = async {
230            for op in &ops {
231                match op {
232                    SyncOp::BatchInsert {
233                        table,
234                        events: batch_events,
235                    } => {
236                        // Look up schema
237                        let schema = match self.schema_registry.get(table) {
238                            Ok(s) => s,
239                            Err(_) => {
240                                warn!(table, "skipping CDC events for unregistered table");
241                                if let Some(last) = batch_events.last() {
242                                    last_seq = Some(last.seq);
243                                }
244                                continue;
245                            }
246                        };
247
248                        // Try the typed Arrow path first. If the schema contains
249                        // an unsupported Arrow type (Timestamp, Date32, Decimal,
250                        // List, Struct, …) fall back to the SQL path which handles
251                        // all scalar types via json_value_to_sql. If both fail,
252                        // propagate the SQL error.
253                        let arrow_result = match self.sync_mode {
254                            SyncMode::Destructive => cdc_events_to_batch(batch_events, &schema),
255                            SyncMode::Temporal => {
256                                cdc_events_to_temporal_batch(batch_events, &schema)
257                            }
258                        };
259
260                        let used_arrow = match arrow_result {
261                            Ok(batch) => {
262                                // Typed Arrow batch succeeded — use load_arrow.
263                                if let Err(e) = self
264                                    .olap
265                                    .load_arrow(table, std::slice::from_ref(&batch))
266                                    .await
267                                {
268                                    let msg = e.to_string();
269                                    if is_stale_schema_error(&msg) {
270                                        warn!(
271                                            table,
272                                            error = %msg,
273                                            "skipping batch INSERT due to stale schema \
274                                             (column mismatch)"
275                                        );
276                                        if let Some(last) = batch_events.last() {
277                                            last_seq = Some(last.seq);
278                                        }
279                                        continue;
280                                    }
281                                    return Err(SyncError::Olap(msg));
282                                }
283                                true
284                            }
285                            Err(SyncError::UnsupportedType(ref reason)) => {
286                                // Schema has a type not handled by the Arrow path —
287                                // fall back to SQL-based batch insert.
288                                warn!(
289                                    table,
290                                    reason,
291                                    "falling back to SQL batch INSERT (unsupported Arrow type)"
292                                );
293                                false
294                            }
295                            Err(e) => return Err(e),
296                        };
297
298                        if !used_arrow {
299                            // SQL fallback path
300                            let sql = match self.sync_mode {
301                                SyncMode::Destructive => build_batch_insert(batch_events, &schema)?,
302                                SyncMode::Temporal => {
303                                    build_temporal_batch_insert(batch_events, &schema)?
304                                }
305                            };
306                            if let Err(e) = self.olap.execute(&sql).await {
307                                let msg = e.to_string();
308                                if is_stale_schema_error(&msg) {
309                                    warn!(
310                                        table,
311                                        error = %msg,
312                                        "skipping batch INSERT due to stale schema \
313                                         (column mismatch, SQL path)"
314                                    );
315                                    if let Some(last) = batch_events.last() {
316                                        last_seq = Some(last.seq);
317                                    }
318                                    continue;
319                                }
320                                return Err(SyncError::Olap(msg));
321                            }
322                        }
323
324                        rows_inserted += batch_events.len() as u64;
325                        if let Some(last) = batch_events.last() {
326                            last_seq = Some(last.seq);
327                        }
328                    }
329                    SyncOp::Single(event) => {
330                        // Look up schema
331                        let schema = match self.schema_registry.get(&event.table) {
332                            Ok(s) => s,
333                            Err(_) => {
334                                warn!(
335                                    table = event.table.as_str(),
336                                    "skipping CDC event for unregistered table"
337                                );
338                                last_seq = Some(event.seq);
339                                continue;
340                            }
341                        };
342
343                        let skip = match self.sync_mode {
344                            SyncMode::Destructive => {
345                                let dml = cdc_event_to_dml(event, &schema)?;
346                                if dml.is_empty() {
347                                    last_seq = Some(event.seq);
348                                    continue;
349                                }
350                                match self.olap.execute(&dml).await {
351                                    Ok(_) => false,
352                                    Err(e) => {
353                                        let msg = e.to_string();
354                                        if is_stale_schema_error(&msg) {
355                                            warn!(
356                                                table = event.table.as_str(),
357                                                error = %msg,
358                                                "skipping event due to stale schema"
359                                            );
360                                            true
361                                        } else {
362                                            return Err(SyncError::Olap(msg));
363                                        }
364                                    }
365                                }
366                            }
367                            SyncMode::Temporal => {
368                                let stmts = cdc_event_to_temporal_dml(event, &schema)?;
369                                // For multi-stmt temporal ops (close + insert), detect
370                                // stale schema before executing any stmts to avoid
371                                // partial state (closing a row without inserting its
372                                // replacement). Check the last stmt (INSERT with all
373                                // columns) against OLAP first as a dry-run query parse.
374                                if stmts.len() > 1 {
375                                    if let Some(insert_stmt) = stmts.last() {
376                                        // Validate by attempting a EXPLAIN on the insert
377                                        // to catch column mismatches without side effects.
378                                        let explain = format!("EXPLAIN {insert_stmt}");
379                                        if let Err(e) = self.olap.query(&explain).await {
380                                            let msg = e.to_string();
381                                            if is_stale_schema_error(&msg) {
382                                                warn!(
383                                                    table = event.table.as_str(),
384                                                    error = %msg,
385                                                    "skipping temporal event due to stale schema \
386                                                     (detected before execution)"
387                                                );
388                                                last_seq = Some(event.seq);
389                                                continue;
390                                            }
391                                            return Err(SyncError::Olap(msg));
392                                        }
393                                    }
394                                }
395                                // All stmts validated — execute them
396                                let mut skipped = false;
397                                for stmt in &stmts {
398                                    if let Err(e) = self.olap.execute(stmt).await {
399                                        let msg = e.to_string();
400                                        if is_stale_schema_error(&msg) {
401                                            warn!(
402                                                table = event.table.as_str(),
403                                                error = %msg,
404                                                "skipping temporal event due to stale schema"
405                                            );
406                                            skipped = true;
407                                            break;
408                                        }
409                                        return Err(SyncError::Olap(msg));
410                                    }
411                                }
412                                skipped
413                            }
414                        };
415
416                        if skip {
417                            last_seq = Some(event.seq);
418                            continue;
419                        }
420
421                        match event.operation {
422                            CdcOperation::Insert => rows_inserted += 1,
423                            CdcOperation::Update => rows_updated += 1,
424                            CdcOperation::Delete => rows_deleted += 1,
425                        }
426
427                        last_seq = Some(event.seq);
428                    }
429                }
430            }
431            Ok::<(), SyncError>(())
432        }
433        .await;
434
435        // Commit or rollback (only if we used a transaction)
436        match result {
437            Ok(()) => {
438                if use_transaction {
439                    self.olap
440                        .execute("COMMIT")
441                        .await
442                        .map_err(|e| SyncError::Olap(e.to_string()))?;
443                }
444            }
445            Err(e) => {
446                if use_transaction {
447                    let _ = self.olap.execute("ROLLBACK").await;
448                    return Err(e);
449                }
450                // Non-transactional backend: partial writes are already committed.
451                // Advance the watermark for what succeeded, then surface the error
452                // so the next cycle resumes from the correct position.
453                if let Some(seq) = last_seq {
454                    self.last_synced_seq.store(seq, Ordering::Relaxed);
455                }
456                return Err(e);
457            }
458        }
459
460        // Update watermark (after commit for transactional, or final success for non-transactional)
461        if let Some(seq) = last_seq {
462            self.last_synced_seq.store(seq, Ordering::Relaxed);
463        }
464
465        // Prune processed CDC events if enabled
466        let pruned_count = if self.prune_after_sync {
467            if let Some(seq) = last_seq {
468                match self.cdc.prune(seq).await {
469                    Ok(count) => {
470                        debug!(pruned = count, up_to_seq = seq, "pruned CDC events");
471                        Some(count)
472                    }
473                    Err(e) => {
474                        // Pruning failure is non-fatal — data is already committed to OLAP
475                        warn!(error = %e, "failed to prune CDC events");
476                        None
477                    }
478                }
479            } else {
480                None
481            }
482        } else {
483            None
484        };
485
486        let events_processed = events.len() as u64;
487        debug!(
488            events_processed,
489            rows_inserted, rows_updated, rows_deleted, "sync cycle complete"
490        );
491
492        #[cfg(feature = "metrics")]
493        {
494            metrics::counter!("rhei.sync.events_processed").increment(events_processed);
495            metrics::counter!("rhei.sync.rows_inserted").increment(rows_inserted);
496            metrics::counter!("rhei.sync.rows_updated").increment(rows_updated);
497            metrics::counter!("rhei.sync.rows_deleted").increment(rows_deleted);
498            if let Some(p) = pruned_count {
499                metrics::counter!("rhei.sync.rows_pruned").increment(p);
500            }
501        }
502
503        Ok(SyncResult {
504            events_processed,
505            rows_inserted,
506            rows_updated,
507            rows_deleted,
508            last_seq,
509            pruned_count,
510        })
511    }
512
513    async fn status(&self) -> Result<SyncStatus, Self::Error> {
514        let raw_seq = self.last_synced_seq.load(Ordering::Relaxed);
515        let last_synced = if raw_seq == NEVER_SYNCED {
516            None
517        } else {
518            Some(raw_seq)
519        };
520
521        let latest_available = self
522            .cdc
523            .latest_seq()
524            .await
525            .map_err(|e| SyncError::Cdc(e.to_string()))?;
526
527        let lag = match (last_synced, latest_available) {
528            (Some(synced), Some(available)) => (available - synced).max(0) as u64,
529            (None, Some(available)) => available.max(0) as u64,
530            _ => 0,
531        };
532
533        Ok(SyncStatus {
534            running: true,
535            last_synced_seq: last_synced,
536            latest_available_seq: latest_available,
537            lag,
538        })
539    }
540}
541
542// ---------------------------------------------------------------------------
543// Unit tests for sync_engine fallback logic
544// ---------------------------------------------------------------------------
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549
550    use std::sync::{Arc, Mutex};
551
552    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
553    use arrow::record_batch::RecordBatch;
554    use rhei_core::types::{CdcEvent, CdcOperation};
555    use rhei_core::{SchemaRegistry, TableSchema};
556    use serde_json::json;
557
558    // ------------------------------------------------------------------
559    // Minimal CdcConsumer that returns a pre-baked list of events once.
560    // ------------------------------------------------------------------
561
562    struct MockCdc {
563        events: Mutex<Option<Vec<CdcEvent>>>,
564    }
565
566    impl MockCdc {
567        fn new(events: Vec<CdcEvent>) -> Self {
568            Self {
569                events: Mutex::new(Some(events)),
570            }
571        }
572    }
573
574    impl rhei_core::CdcConsumer for MockCdc {
575        type Error = crate::SyncError;
576
577        async fn poll(
578            &self,
579            _after_seq: Option<i64>,
580            _limit: u32,
581        ) -> Result<Vec<CdcEvent>, Self::Error> {
582            Ok(self.events.lock().unwrap().take().unwrap_or_default())
583        }
584
585        async fn latest_seq(&self) -> Result<Option<i64>, Self::Error> {
586            Ok(None)
587        }
588
589        async fn prune(&self, _up_to_seq: i64) -> Result<u64, Self::Error> {
590            Ok(0)
591        }
592    }
593
594    // ------------------------------------------------------------------
595    // Minimal OlapEngine that records SQL statements executed.
596    // ------------------------------------------------------------------
597
598    struct MockOlap {
599        executed: Mutex<Vec<String>>,
600    }
601
602    impl MockOlap {
603        fn new() -> Self {
604            Self {
605                executed: Mutex::new(Vec::new()),
606            }
607        }
608
609        fn statements(&self) -> Vec<String> {
610            self.executed.lock().unwrap().clone()
611        }
612    }
613
614    impl rhei_core::OlapEngine for MockOlap {
615        type Error = crate::SyncError;
616
617        async fn query(&self, _sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
618            Ok(vec![])
619        }
620
621        async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
622            self.executed.lock().unwrap().push(sql.to_string());
623            Ok(0)
624        }
625
626        async fn load_arrow(
627            &self,
628            _table: &str,
629            _batches: &[RecordBatch],
630        ) -> Result<u64, Self::Error> {
631            // Unconditionally fail — forces fallback path to be tested
632            Err(crate::SyncError::Olap(
633                "load_arrow not supported in mock".into(),
634            ))
635        }
636
637        async fn create_table(
638            &self,
639            _table_name: &str,
640            _schema: &SchemaRef,
641            _primary_key: &[String],
642        ) -> Result<(), Self::Error> {
643            Ok(())
644        }
645
646        async fn table_exists(&self, _table_name: &str) -> Result<bool, Self::Error> {
647            Ok(true)
648        }
649
650        async fn add_column(
651            &self,
652            _table_name: &str,
653            _column_name: &str,
654            _data_type: &DataType,
655        ) -> Result<(), Self::Error> {
656            Ok(())
657        }
658
659        async fn drop_column(
660            &self,
661            _table_name: &str,
662            _column_name: &str,
663        ) -> Result<(), Self::Error> {
664            Ok(())
665        }
666    }
667
668    // ------------------------------------------------------------------
669    // OlapEngine variant whose load_arrow succeeds (for the happy path).
670    // ------------------------------------------------------------------
671
672    struct MockOlapArrow {
673        loaded: Mutex<u64>,
674        executed: Mutex<Vec<String>>,
675    }
676
677    impl MockOlapArrow {
678        fn new() -> Self {
679            Self {
680                loaded: Mutex::new(0),
681                executed: Mutex::new(Vec::new()),
682            }
683        }
684    }
685
686    impl rhei_core::OlapEngine for MockOlapArrow {
687        type Error = crate::SyncError;
688
689        async fn query(&self, _sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
690            Ok(vec![])
691        }
692
693        async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
694            self.executed.lock().unwrap().push(sql.to_string());
695            Ok(0)
696        }
697
698        async fn load_arrow(
699            &self,
700            _table: &str,
701            batches: &[RecordBatch],
702        ) -> Result<u64, Self::Error> {
703            let n: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
704            *self.loaded.lock().unwrap() += n;
705            Ok(n)
706        }
707
708        async fn create_table(
709            &self,
710            _table_name: &str,
711            _schema: &SchemaRef,
712            _primary_key: &[String],
713        ) -> Result<(), Self::Error> {
714            Ok(())
715        }
716
717        async fn table_exists(&self, _table_name: &str) -> Result<bool, Self::Error> {
718            Ok(true)
719        }
720
721        async fn add_column(
722            &self,
723            _table_name: &str,
724            _column_name: &str,
725            _data_type: &DataType,
726        ) -> Result<(), Self::Error> {
727            Ok(())
728        }
729
730        async fn drop_column(
731            &self,
732            _table_name: &str,
733            _column_name: &str,
734        ) -> Result<(), Self::Error> {
735            Ok(())
736        }
737    }
738
739    // ------------------------------------------------------------------
740    // Helper: build a schema with an unsupported Arrow type (Timestamp).
741    // ------------------------------------------------------------------
742
743    fn timestamp_schema() -> Arc<TableSchema> {
744        use arrow::datatypes::TimeUnit;
745        Arc::new(TableSchema::new(
746            "events",
747            Arc::new(Schema::new(vec![
748                Field::new("id", DataType::Int64, false),
749                Field::new("name", DataType::Utf8, true),
750                Field::new(
751                    "created_at",
752                    DataType::Timestamp(TimeUnit::Microsecond, None),
753                    true,
754                ),
755            ])),
756            vec!["id".to_string()],
757        ))
758    }
759
760    fn make_insert_event(seq: i64, id: i64, name: &str) -> CdcEvent {
761        CdcEvent {
762            seq,
763            timestamp: 1000 + seq,
764            operation: CdcOperation::Insert,
765            table: "events".into(),
766            row_id: Some(id),
767            old_data: None,
768            new_data: Some(json!({"id": id, "name": name, "created_at": 1234567890})),
769        }
770    }
771
772    // ------------------------------------------------------------------
773    // Test: Timestamp schema falls back to SQL (no error), rows succeed.
774    // ------------------------------------------------------------------
775
776    #[tokio::test]
777    async fn test_timestamp_schema_falls_back_to_sql() {
778        use rhei_core::SyncEngine;
779
780        let schema = timestamp_schema();
781        let events = vec![
782            make_insert_event(1, 1, "Alice"),
783            make_insert_event(2, 2, "Bob"),
784        ];
785
786        let registry = SchemaRegistry::default();
787        registry
788            .register((*schema).clone())
789            .expect("register schema");
790
791        // MockOlap: load_arrow always fails, but execute() accepts anything.
792        // The fallback SQL path should be taken because the schema has Timestamp.
793        let cdc = MockCdc::new(events);
794        let olap = MockOlap::new();
795
796        let engine = CdcSyncEngine::new(cdc, olap, registry, 100);
797        let result = engine.sync_once().await;
798
799        // Should succeed (fallback to SQL path)
800        let result = result.expect("sync_once should not error for Timestamp schema");
801        assert_eq!(result.rows_inserted, 2, "both rows should be counted");
802
803        // The SQL fallback should have issued a batch INSERT (not two singles)
804        let stmts = engine.olap.statements();
805        // Filter out any transaction-control statements
806        let insert_stmts: Vec<_> = stmts
807            .iter()
808            .filter(|s| s.to_uppercase().contains("INSERT"))
809            .collect();
810        assert_eq!(
811            insert_stmts.len(),
812            1,
813            "expected one batch INSERT via SQL fallback"
814        );
815        assert!(
816            insert_stmts[0].contains("Alice") && insert_stmts[0].contains("Bob"),
817            "batch INSERT should contain both rows"
818        );
819    }
820
821    // ------------------------------------------------------------------
822    // Test: Supported schema uses Arrow path (load_arrow called).
823    // ------------------------------------------------------------------
824
825    #[tokio::test]
826    async fn test_supported_schema_uses_arrow_path() {
827        use rhei_core::SyncEngine;
828
829        let schema = Arc::new(TableSchema::new(
830            "users",
831            Arc::new(Schema::new(vec![
832                Field::new("id", DataType::Int64, false),
833                Field::new("name", DataType::Utf8, true),
834            ])),
835            vec!["id".to_string()],
836        ));
837
838        let events = vec![
839            CdcEvent {
840                seq: 1,
841                timestamp: 1000,
842                operation: CdcOperation::Insert,
843                table: "users".into(),
844                row_id: Some(1),
845                old_data: None,
846                new_data: Some(json!({"id": 1, "name": "Alice"})),
847            },
848            CdcEvent {
849                seq: 2,
850                timestamp: 1001,
851                operation: CdcOperation::Insert,
852                table: "users".into(),
853                row_id: Some(2),
854                old_data: None,
855                new_data: Some(json!({"id": 2, "name": "Bob"})),
856            },
857        ];
858
859        let registry = SchemaRegistry::default();
860        registry
861            .register((*schema).clone())
862            .expect("register schema");
863
864        let cdc = MockCdc::new(events);
865        let olap = MockOlapArrow::new();
866
867        let engine = CdcSyncEngine::new(cdc, olap, registry, 100);
868        let result = engine.sync_once().await.expect("sync_once should succeed");
869
870        assert_eq!(result.rows_inserted, 2);
871        // Arrow path: load_arrow was called, not execute()
872        let loaded = *engine.olap.loaded.lock().unwrap();
873        assert_eq!(loaded, 2, "load_arrow should have received 2 rows");
874        let executed = engine.olap.executed.lock().unwrap().clone();
875        let has_insert = executed.iter().any(|s| s.to_uppercase().contains("INSERT"));
876        assert!(
877            !has_insert,
878            "SQL INSERT should not be called when Arrow path succeeds"
879        );
880    }
881}