rhei_sync/sync_engine.rs
1//! Core CDC-to-OLAP sync engine.
2//!
3//! [`CdcSyncEngine`] polls the CDC consumer, groups consecutive INSERT events
4//! into `SyncOp::BatchInsert` groups, and applies them to the OLAP engine
5//! using either the Arrow-native path or a SQL fallback.
6//!
7//! ## Transaction semantics
8//!
9//! When [`rhei_core::OlapEngine::supports_transactions`] returns `true` (e.g.
10//! DuckDB), each sync cycle is wrapped in `BEGIN TRANSACTION … COMMIT`. A
11//! failure triggers `ROLLBACK` and the watermark is *not* advanced, so the next
12//! cycle retries from the same position.
13//!
14//! When the backend does not support transactions (e.g. DataFusion), statements
15//! are committed one at a time. On failure the watermark is advanced to the
16//! last successfully applied event so that the next cycle does not re-apply
17//! already-committed changes — **partial-failure recovery**.
18//!
19//! ## Stale-schema handling
20//!
21//! If the OLAP engine rejects a statement with a message that matches known
22//! column-not-found patterns, the event (or batch) is skipped with a warning
23//! and the watermark advances past it. This prevents a single stale-schema
24//! event from blocking the entire sync pipeline.
25
26use std::sync::atomic::{AtomicI64, Ordering};
27
28use rhei_core::types::{CdcEvent, CdcOperation, SyncMode, SyncResult, SyncStatus};
29use tracing::{debug, warn};
30
31// Sentinel value: sync engine has never completed a sync cycle.
32const NEVER_SYNCED: i64 = -1;
33
34use crate::converter::{build_batch_insert, cdc_event_to_dml, cdc_events_to_batch};
35use crate::error::SyncError;
36use crate::temporal_converter::{
37 build_temporal_batch_insert, cdc_event_to_temporal_dml, cdc_events_to_temporal_batch,
38};
39
40/// Patterns indicating a stale-schema error (column dropped/added since CDC event was logged).
41/// These are substring matches against OLAP engine error messages.
42/// "column" + "not found" avoids matching "table not found" (which is a different error class).
43const STALE_SCHEMA_PATTERNS: &[&str] = &["column", "No field named", "schema mismatch"];
44
45fn is_stale_schema_error(msg: &str) -> bool {
46 // "column ... not found" covers DuckDB/DataFusion column errors without matching "table not found"
47 (msg.contains("not found") && STALE_SCHEMA_PATTERNS.iter().any(|p| msg.contains(p)))
48 || msg.contains("No field named")
49 || msg.contains("schema mismatch")
50 || msg.contains("not in INSERT column list")
51}
52
53/// CDC-based sync engine that replicates changes from OLTP to OLAP.
54///
55/// Polls the CDC consumer for new events, converts them to DML,
56/// and applies them to the OLAP engine.
57///
58/// Features:
59/// - **Batch INSERT**: Consecutive INSERT events for the same table are grouped
60/// into a single multi-row INSERT for better OLAP performance.
61/// - **CDC pruning**: Optionally prunes processed CDC events after successful sync
62/// to prevent unbounded growth of the `_rhei_cdc_log` table.
63/// - **Transaction safety**: Each sync cycle is wrapped in a BEGIN/COMMIT on the
64/// OLAP side with ROLLBACK on failure. Note: the actual transactional guarantee
65/// is backend-dependent — DuckDB supports real transactions, while DataFusion
66/// treats BEGIN/COMMIT/ROLLBACK as no-ops, so a mid-cycle failure may leave
67/// OLAP partially updated.
68pub struct CdcSyncEngine<C, O> {
69 cdc: C,
70 olap: O,
71 schema_registry: rhei_core::SchemaRegistry,
72 /// Last synced CDC sequence number, or `NEVER_SYNCED` if no sync has occurred.
73 last_synced_seq: AtomicI64,
74 batch_size: u32,
75 /// Whether to prune processed CDC events after a successful sync.
76 prune_after_sync: bool,
77 /// Sync mode: Destructive (default) or Temporal (SCD Type 2).
78 sync_mode: SyncMode,
79}
80
81impl<C, O> CdcSyncEngine<C, O>
82where
83 C: rhei_core::CdcConsumer,
84 O: rhei_core::OlapEngine,
85{
86 /// Create a new [`CdcSyncEngine`].
87 ///
88 /// - `cdc` — CDC consumer that provides change events (e.g. SQLite trigger log).
89 /// - `olap` — OLAP engine that receives the converted DML.
90 /// - `schema_registry` — registry of table schemas used to build WHERE clauses
91 /// and Arrow batches.
92 /// - `batch_size` — maximum number of CDC events to fetch per [`sync_once`](rhei_core::SyncEngine::sync_once) call.
93 ///
94 /// CDC pruning is enabled by default (`prune_after_sync = true`).
95 /// Sync mode defaults to [`rhei_core::types::SyncMode::Destructive`].
96 pub fn new(
97 cdc: C,
98 olap: O,
99 schema_registry: rhei_core::SchemaRegistry,
100 batch_size: u32,
101 ) -> Self {
102 Self {
103 cdc,
104 olap,
105 schema_registry,
106 last_synced_seq: AtomicI64::new(NEVER_SYNCED),
107 batch_size,
108 prune_after_sync: true,
109 sync_mode: SyncMode::default(),
110 }
111 }
112
113 /// Set whether to prune processed CDC events after sync.
114 pub fn with_prune_after_sync(mut self, prune: bool) -> Self {
115 self.prune_after_sync = prune;
116 self
117 }
118
119 /// Set the sync mode: Destructive (default) or Temporal (SCD Type 2).
120 pub fn with_sync_mode(mut self, mode: SyncMode) -> Self {
121 self.sync_mode = mode;
122 self
123 }
124}
125
126/// Group consecutive INSERT events by table name for batch processing.
127///
128/// Returns a list of operations to execute in order. Each operation is either:
129/// - A batch of INSERT events for the same table (to be combined into one INSERT)
130/// - A single UPDATE or DELETE event
131enum SyncOp<'a> {
132 /// A batch of consecutive INSERT events for the same table.
133 BatchInsert {
134 table: &'a str,
135 events: Vec<&'a CdcEvent>,
136 },
137 /// A single UPDATE or DELETE event.
138 Single(&'a CdcEvent),
139}
140
141/// Group events into SyncOps, batching consecutive same-table INSERTs.
142fn group_events(events: &[CdcEvent]) -> Vec<SyncOp<'_>> {
143 let mut ops: Vec<SyncOp<'_>> = Vec::new();
144 let mut i = 0;
145
146 while i < events.len() {
147 let event = &events[i];
148
149 if event.operation == CdcOperation::Insert {
150 // Collect consecutive INSERTs for the same table
151 let table = &event.table;
152 let mut batch: Vec<&CdcEvent> = vec![event];
153 let mut j = i + 1;
154 while j < events.len()
155 && events[j].operation == CdcOperation::Insert
156 && events[j].table == *table
157 {
158 batch.push(&events[j]);
159 j += 1;
160 }
161 ops.push(SyncOp::BatchInsert {
162 table,
163 events: batch,
164 });
165 i = j;
166 } else {
167 ops.push(SyncOp::Single(event));
168 i += 1;
169 }
170 }
171
172 ops
173}
174
175impl<C, O> rhei_core::SyncEngine for CdcSyncEngine<C, O>
176where
177 C: rhei_core::CdcConsumer,
178 O: rhei_core::OlapEngine,
179{
180 type Error = SyncError;
181
182 async fn sync_once(&self) -> Result<SyncResult, Self::Error> {
183 let raw_seq = self.last_synced_seq.load(Ordering::Relaxed);
184 let after_seq = if raw_seq == NEVER_SYNCED {
185 None
186 } else {
187 Some(raw_seq)
188 };
189
190 // Poll CDC events
191 let events = self
192 .cdc
193 .poll(after_seq, self.batch_size)
194 .await
195 .map_err(|e| SyncError::Cdc(e.to_string()))?;
196
197 if events.is_empty() {
198 return Ok(SyncResult {
199 events_processed: 0,
200 rows_inserted: 0,
201 rows_updated: 0,
202 rows_deleted: 0,
203 last_seq: after_seq,
204 pruned_count: None,
205 });
206 }
207
208 debug!(count = events.len(), "processing CDC events");
209
210 let mut rows_inserted: u64 = 0;
211 let mut rows_updated: u64 = 0;
212 let mut rows_deleted: u64 = 0;
213 let mut last_seq = after_seq;
214
215 // Group consecutive same-table INSERTs into batches
216 let ops = group_events(&events);
217
218 // Wrap the entire batch in an OLAP-side transaction if supported.
219 // Backends like DataFusion treat BEGIN/COMMIT/ROLLBACK as no-ops, so we
220 // skip them and rely on per-statement idempotency + seq-based recovery.
221 let use_transaction = self.olap.supports_transactions();
222 if use_transaction {
223 self.olap
224 .execute("BEGIN TRANSACTION")
225 .await
226 .map_err(|e| SyncError::Olap(e.to_string()))?;
227 }
228
229 let result = async {
230 for op in &ops {
231 match op {
232 SyncOp::BatchInsert {
233 table,
234 events: batch_events,
235 } => {
236 // Look up schema
237 let schema = match self.schema_registry.get(table) {
238 Ok(s) => s,
239 Err(_) => {
240 warn!(table, "skipping CDC events for unregistered table");
241 if let Some(last) = batch_events.last() {
242 last_seq = Some(last.seq);
243 }
244 continue;
245 }
246 };
247
248 // Try the typed Arrow path first. If the schema contains
249 // an unsupported Arrow type (Timestamp, Date32, Decimal,
250 // List, Struct, …) fall back to the SQL path which handles
251 // all scalar types via json_value_to_sql. If both fail,
252 // propagate the SQL error.
253 let arrow_result = match self.sync_mode {
254 SyncMode::Destructive => cdc_events_to_batch(batch_events, &schema),
255 SyncMode::Temporal => {
256 cdc_events_to_temporal_batch(batch_events, &schema)
257 }
258 };
259
260 let used_arrow = match arrow_result {
261 Ok(batch) => {
262 // Typed Arrow batch succeeded — use load_arrow.
263 if let Err(e) = self
264 .olap
265 .load_arrow(table, std::slice::from_ref(&batch))
266 .await
267 {
268 let msg = e.to_string();
269 if is_stale_schema_error(&msg) {
270 warn!(
271 table,
272 error = %msg,
273 "skipping batch INSERT due to stale schema \
274 (column mismatch)"
275 );
276 if let Some(last) = batch_events.last() {
277 last_seq = Some(last.seq);
278 }
279 continue;
280 }
281 return Err(SyncError::Olap(msg));
282 }
283 true
284 }
285 Err(SyncError::UnsupportedType(ref reason)) => {
286 // Schema has a type not handled by the Arrow path —
287 // fall back to SQL-based batch insert.
288 warn!(
289 table,
290 reason,
291 "falling back to SQL batch INSERT (unsupported Arrow type)"
292 );
293 false
294 }
295 Err(e) => return Err(e),
296 };
297
298 if !used_arrow {
299 // SQL fallback path
300 let sql = match self.sync_mode {
301 SyncMode::Destructive => build_batch_insert(batch_events, &schema)?,
302 SyncMode::Temporal => {
303 build_temporal_batch_insert(batch_events, &schema)?
304 }
305 };
306 if let Err(e) = self.olap.execute(&sql).await {
307 let msg = e.to_string();
308 if is_stale_schema_error(&msg) {
309 warn!(
310 table,
311 error = %msg,
312 "skipping batch INSERT due to stale schema \
313 (column mismatch, SQL path)"
314 );
315 if let Some(last) = batch_events.last() {
316 last_seq = Some(last.seq);
317 }
318 continue;
319 }
320 return Err(SyncError::Olap(msg));
321 }
322 }
323
324 rows_inserted += batch_events.len() as u64;
325 if let Some(last) = batch_events.last() {
326 last_seq = Some(last.seq);
327 }
328 }
329 SyncOp::Single(event) => {
330 // Look up schema
331 let schema = match self.schema_registry.get(&event.table) {
332 Ok(s) => s,
333 Err(_) => {
334 warn!(
335 table = event.table.as_str(),
336 "skipping CDC event for unregistered table"
337 );
338 last_seq = Some(event.seq);
339 continue;
340 }
341 };
342
343 let skip = match self.sync_mode {
344 SyncMode::Destructive => {
345 let dml = cdc_event_to_dml(event, &schema)?;
346 if dml.is_empty() {
347 last_seq = Some(event.seq);
348 continue;
349 }
350 match self.olap.execute(&dml).await {
351 Ok(_) => false,
352 Err(e) => {
353 let msg = e.to_string();
354 if is_stale_schema_error(&msg) {
355 warn!(
356 table = event.table.as_str(),
357 error = %msg,
358 "skipping event due to stale schema"
359 );
360 true
361 } else {
362 return Err(SyncError::Olap(msg));
363 }
364 }
365 }
366 }
367 SyncMode::Temporal => {
368 let stmts = cdc_event_to_temporal_dml(event, &schema)?;
369 // For multi-stmt temporal ops (close + insert), detect
370 // stale schema before executing any stmts to avoid
371 // partial state (closing a row without inserting its
372 // replacement). Check the last stmt (INSERT with all
373 // columns) against OLAP first as a dry-run query parse.
374 if stmts.len() > 1 {
375 if let Some(insert_stmt) = stmts.last() {
376 // Validate by attempting a EXPLAIN on the insert
377 // to catch column mismatches without side effects.
378 let explain = format!("EXPLAIN {insert_stmt}");
379 if let Err(e) = self.olap.query(&explain).await {
380 let msg = e.to_string();
381 if is_stale_schema_error(&msg) {
382 warn!(
383 table = event.table.as_str(),
384 error = %msg,
385 "skipping temporal event due to stale schema \
386 (detected before execution)"
387 );
388 last_seq = Some(event.seq);
389 continue;
390 }
391 return Err(SyncError::Olap(msg));
392 }
393 }
394 }
395 // All stmts validated — execute them
396 let mut skipped = false;
397 for stmt in &stmts {
398 if let Err(e) = self.olap.execute(stmt).await {
399 let msg = e.to_string();
400 if is_stale_schema_error(&msg) {
401 warn!(
402 table = event.table.as_str(),
403 error = %msg,
404 "skipping temporal event due to stale schema"
405 );
406 skipped = true;
407 break;
408 }
409 return Err(SyncError::Olap(msg));
410 }
411 }
412 skipped
413 }
414 };
415
416 if skip {
417 last_seq = Some(event.seq);
418 continue;
419 }
420
421 match event.operation {
422 CdcOperation::Insert => rows_inserted += 1,
423 CdcOperation::Update => rows_updated += 1,
424 CdcOperation::Delete => rows_deleted += 1,
425 }
426
427 last_seq = Some(event.seq);
428 }
429 }
430 }
431 Ok::<(), SyncError>(())
432 }
433 .await;
434
435 // Commit or rollback (only if we used a transaction)
436 match result {
437 Ok(()) => {
438 if use_transaction {
439 self.olap
440 .execute("COMMIT")
441 .await
442 .map_err(|e| SyncError::Olap(e.to_string()))?;
443 }
444 }
445 Err(e) => {
446 if use_transaction {
447 let _ = self.olap.execute("ROLLBACK").await;
448 return Err(e);
449 }
450 // Non-transactional backend: partial writes are already committed.
451 // Advance the watermark for what succeeded, then surface the error
452 // so the next cycle resumes from the correct position.
453 if let Some(seq) = last_seq {
454 self.last_synced_seq.store(seq, Ordering::Relaxed);
455 }
456 return Err(e);
457 }
458 }
459
460 // Update watermark (after commit for transactional, or final success for non-transactional)
461 if let Some(seq) = last_seq {
462 self.last_synced_seq.store(seq, Ordering::Relaxed);
463 }
464
465 // Prune processed CDC events if enabled
466 let pruned_count = if self.prune_after_sync {
467 if let Some(seq) = last_seq {
468 match self.cdc.prune(seq).await {
469 Ok(count) => {
470 debug!(pruned = count, up_to_seq = seq, "pruned CDC events");
471 Some(count)
472 }
473 Err(e) => {
474 // Pruning failure is non-fatal — data is already committed to OLAP
475 warn!(error = %e, "failed to prune CDC events");
476 None
477 }
478 }
479 } else {
480 None
481 }
482 } else {
483 None
484 };
485
486 let events_processed = events.len() as u64;
487 debug!(
488 events_processed,
489 rows_inserted, rows_updated, rows_deleted, "sync cycle complete"
490 );
491
492 #[cfg(feature = "metrics")]
493 {
494 metrics::counter!("rhei.sync.events_processed").increment(events_processed);
495 metrics::counter!("rhei.sync.rows_inserted").increment(rows_inserted);
496 metrics::counter!("rhei.sync.rows_updated").increment(rows_updated);
497 metrics::counter!("rhei.sync.rows_deleted").increment(rows_deleted);
498 if let Some(p) = pruned_count {
499 metrics::counter!("rhei.sync.rows_pruned").increment(p);
500 }
501 }
502
503 Ok(SyncResult {
504 events_processed,
505 rows_inserted,
506 rows_updated,
507 rows_deleted,
508 last_seq,
509 pruned_count,
510 })
511 }
512
513 async fn status(&self) -> Result<SyncStatus, Self::Error> {
514 let raw_seq = self.last_synced_seq.load(Ordering::Relaxed);
515 let last_synced = if raw_seq == NEVER_SYNCED {
516 None
517 } else {
518 Some(raw_seq)
519 };
520
521 let latest_available = self
522 .cdc
523 .latest_seq()
524 .await
525 .map_err(|e| SyncError::Cdc(e.to_string()))?;
526
527 let lag = match (last_synced, latest_available) {
528 (Some(synced), Some(available)) => (available - synced).max(0) as u64,
529 (None, Some(available)) => available.max(0) as u64,
530 _ => 0,
531 };
532
533 Ok(SyncStatus {
534 running: true,
535 last_synced_seq: last_synced,
536 latest_available_seq: latest_available,
537 lag,
538 })
539 }
540}
541
542// ---------------------------------------------------------------------------
543// Unit tests for sync_engine fallback logic
544// ---------------------------------------------------------------------------
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549
550 use std::sync::{Arc, Mutex};
551
552 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
553 use arrow::record_batch::RecordBatch;
554 use rhei_core::types::{CdcEvent, CdcOperation};
555 use rhei_core::{SchemaRegistry, TableSchema};
556 use serde_json::json;
557
558 // ------------------------------------------------------------------
559 // Minimal CdcConsumer that returns a pre-baked list of events once.
560 // ------------------------------------------------------------------
561
562 struct MockCdc {
563 events: Mutex<Option<Vec<CdcEvent>>>,
564 }
565
566 impl MockCdc {
567 fn new(events: Vec<CdcEvent>) -> Self {
568 Self {
569 events: Mutex::new(Some(events)),
570 }
571 }
572 }
573
574 impl rhei_core::CdcConsumer for MockCdc {
575 type Error = crate::SyncError;
576
577 async fn poll(
578 &self,
579 _after_seq: Option<i64>,
580 _limit: u32,
581 ) -> Result<Vec<CdcEvent>, Self::Error> {
582 Ok(self.events.lock().unwrap().take().unwrap_or_default())
583 }
584
585 async fn latest_seq(&self) -> Result<Option<i64>, Self::Error> {
586 Ok(None)
587 }
588
589 async fn prune(&self, _up_to_seq: i64) -> Result<u64, Self::Error> {
590 Ok(0)
591 }
592 }
593
594 // ------------------------------------------------------------------
595 // Minimal OlapEngine that records SQL statements executed.
596 // ------------------------------------------------------------------
597
598 struct MockOlap {
599 executed: Mutex<Vec<String>>,
600 }
601
602 impl MockOlap {
603 fn new() -> Self {
604 Self {
605 executed: Mutex::new(Vec::new()),
606 }
607 }
608
609 fn statements(&self) -> Vec<String> {
610 self.executed.lock().unwrap().clone()
611 }
612 }
613
614 impl rhei_core::OlapEngine for MockOlap {
615 type Error = crate::SyncError;
616
617 async fn query(&self, _sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
618 Ok(vec![])
619 }
620
621 async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
622 self.executed.lock().unwrap().push(sql.to_string());
623 Ok(0)
624 }
625
626 async fn load_arrow(
627 &self,
628 _table: &str,
629 _batches: &[RecordBatch],
630 ) -> Result<u64, Self::Error> {
631 // Unconditionally fail — forces fallback path to be tested
632 Err(crate::SyncError::Olap(
633 "load_arrow not supported in mock".into(),
634 ))
635 }
636
637 async fn create_table(
638 &self,
639 _table_name: &str,
640 _schema: &SchemaRef,
641 _primary_key: &[String],
642 ) -> Result<(), Self::Error> {
643 Ok(())
644 }
645
646 async fn table_exists(&self, _table_name: &str) -> Result<bool, Self::Error> {
647 Ok(true)
648 }
649
650 async fn add_column(
651 &self,
652 _table_name: &str,
653 _column_name: &str,
654 _data_type: &DataType,
655 ) -> Result<(), Self::Error> {
656 Ok(())
657 }
658
659 async fn drop_column(
660 &self,
661 _table_name: &str,
662 _column_name: &str,
663 ) -> Result<(), Self::Error> {
664 Ok(())
665 }
666 }
667
668 // ------------------------------------------------------------------
669 // OlapEngine variant whose load_arrow succeeds (for the happy path).
670 // ------------------------------------------------------------------
671
672 struct MockOlapArrow {
673 loaded: Mutex<u64>,
674 executed: Mutex<Vec<String>>,
675 }
676
677 impl MockOlapArrow {
678 fn new() -> Self {
679 Self {
680 loaded: Mutex::new(0),
681 executed: Mutex::new(Vec::new()),
682 }
683 }
684 }
685
686 impl rhei_core::OlapEngine for MockOlapArrow {
687 type Error = crate::SyncError;
688
689 async fn query(&self, _sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
690 Ok(vec![])
691 }
692
693 async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
694 self.executed.lock().unwrap().push(sql.to_string());
695 Ok(0)
696 }
697
698 async fn load_arrow(
699 &self,
700 _table: &str,
701 batches: &[RecordBatch],
702 ) -> Result<u64, Self::Error> {
703 let n: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
704 *self.loaded.lock().unwrap() += n;
705 Ok(n)
706 }
707
708 async fn create_table(
709 &self,
710 _table_name: &str,
711 _schema: &SchemaRef,
712 _primary_key: &[String],
713 ) -> Result<(), Self::Error> {
714 Ok(())
715 }
716
717 async fn table_exists(&self, _table_name: &str) -> Result<bool, Self::Error> {
718 Ok(true)
719 }
720
721 async fn add_column(
722 &self,
723 _table_name: &str,
724 _column_name: &str,
725 _data_type: &DataType,
726 ) -> Result<(), Self::Error> {
727 Ok(())
728 }
729
730 async fn drop_column(
731 &self,
732 _table_name: &str,
733 _column_name: &str,
734 ) -> Result<(), Self::Error> {
735 Ok(())
736 }
737 }
738
739 // ------------------------------------------------------------------
740 // Helper: build a schema with an unsupported Arrow type (Timestamp).
741 // ------------------------------------------------------------------
742
743 fn timestamp_schema() -> Arc<TableSchema> {
744 use arrow::datatypes::TimeUnit;
745 Arc::new(TableSchema::new(
746 "events",
747 Arc::new(Schema::new(vec![
748 Field::new("id", DataType::Int64, false),
749 Field::new("name", DataType::Utf8, true),
750 Field::new(
751 "created_at",
752 DataType::Timestamp(TimeUnit::Microsecond, None),
753 true,
754 ),
755 ])),
756 vec!["id".to_string()],
757 ))
758 }
759
760 fn make_insert_event(seq: i64, id: i64, name: &str) -> CdcEvent {
761 CdcEvent {
762 seq,
763 timestamp: 1000 + seq,
764 operation: CdcOperation::Insert,
765 table: "events".into(),
766 row_id: Some(id),
767 old_data: None,
768 new_data: Some(json!({"id": id, "name": name, "created_at": 1234567890})),
769 }
770 }
771
772 // ------------------------------------------------------------------
773 // Test: Timestamp schema falls back to SQL (no error), rows succeed.
774 // ------------------------------------------------------------------
775
776 #[tokio::test]
777 async fn test_timestamp_schema_falls_back_to_sql() {
778 use rhei_core::SyncEngine;
779
780 let schema = timestamp_schema();
781 let events = vec![
782 make_insert_event(1, 1, "Alice"),
783 make_insert_event(2, 2, "Bob"),
784 ];
785
786 let registry = SchemaRegistry::default();
787 registry
788 .register((*schema).clone())
789 .expect("register schema");
790
791 // MockOlap: load_arrow always fails, but execute() accepts anything.
792 // The fallback SQL path should be taken because the schema has Timestamp.
793 let cdc = MockCdc::new(events);
794 let olap = MockOlap::new();
795
796 let engine = CdcSyncEngine::new(cdc, olap, registry, 100);
797 let result = engine.sync_once().await;
798
799 // Should succeed (fallback to SQL path)
800 let result = result.expect("sync_once should not error for Timestamp schema");
801 assert_eq!(result.rows_inserted, 2, "both rows should be counted");
802
803 // The SQL fallback should have issued a batch INSERT (not two singles)
804 let stmts = engine.olap.statements();
805 // Filter out any transaction-control statements
806 let insert_stmts: Vec<_> = stmts
807 .iter()
808 .filter(|s| s.to_uppercase().contains("INSERT"))
809 .collect();
810 assert_eq!(
811 insert_stmts.len(),
812 1,
813 "expected one batch INSERT via SQL fallback"
814 );
815 assert!(
816 insert_stmts[0].contains("Alice") && insert_stmts[0].contains("Bob"),
817 "batch INSERT should contain both rows"
818 );
819 }
820
821 // ------------------------------------------------------------------
822 // Test: Supported schema uses Arrow path (load_arrow called).
823 // ------------------------------------------------------------------
824
825 #[tokio::test]
826 async fn test_supported_schema_uses_arrow_path() {
827 use rhei_core::SyncEngine;
828
829 let schema = Arc::new(TableSchema::new(
830 "users",
831 Arc::new(Schema::new(vec![
832 Field::new("id", DataType::Int64, false),
833 Field::new("name", DataType::Utf8, true),
834 ])),
835 vec!["id".to_string()],
836 ));
837
838 let events = vec![
839 CdcEvent {
840 seq: 1,
841 timestamp: 1000,
842 operation: CdcOperation::Insert,
843 table: "users".into(),
844 row_id: Some(1),
845 old_data: None,
846 new_data: Some(json!({"id": 1, "name": "Alice"})),
847 },
848 CdcEvent {
849 seq: 2,
850 timestamp: 1001,
851 operation: CdcOperation::Insert,
852 table: "users".into(),
853 row_id: Some(2),
854 old_data: None,
855 new_data: Some(json!({"id": 2, "name": "Bob"})),
856 },
857 ];
858
859 let registry = SchemaRegistry::default();
860 registry
861 .register((*schema).clone())
862 .expect("register schema");
863
864 let cdc = MockCdc::new(events);
865 let olap = MockOlapArrow::new();
866
867 let engine = CdcSyncEngine::new(cdc, olap, registry, 100);
868 let result = engine.sync_once().await.expect("sync_once should succeed");
869
870 assert_eq!(result.rows_inserted, 2);
871 // Arrow path: load_arrow was called, not execute()
872 let loaded = *engine.olap.loaded.lock().unwrap();
873 assert_eq!(loaded, 2, "load_arrow should have received 2 rows");
874 let executed = engine.olap.executed.lock().unwrap().clone();
875 let has_insert = executed.iter().any(|s| s.to_uppercase().contains("INSERT"));
876 assert!(
877 !has_insert,
878 "SQL INSERT should not be called when Arrow path succeeds"
879 );
880 }
881}