1use std::sync::Arc;
9
10use arrow_array::RecordBatch;
11
12use crate::operator::window::{CdcOperation, ChangelogRecord};
13
14#[repr(u8)]
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub enum EventType {
31 Insert = 0,
33 Delete = 1,
35 Update = 2,
37 Watermark = 3,
39 Snapshot = 4,
41}
42
43impl EventType {
44 #[inline]
51 #[must_use]
52 pub fn weight(&self) -> i32 {
53 match self {
54 Self::Insert | Self::Snapshot => 1,
55 Self::Delete => -1,
56 Self::Update | Self::Watermark => 0,
57 }
58 }
59
60 #[inline]
62 #[must_use]
63 pub fn has_data(&self) -> bool {
64 !matches!(self, Self::Watermark)
65 }
66}
67
68impl From<CdcOperation> for EventType {
69 fn from(op: CdcOperation) -> Self {
74 match op {
75 CdcOperation::Insert | CdcOperation::UpdateAfter => Self::Insert,
76 CdcOperation::Delete | CdcOperation::UpdateBefore => Self::Delete,
77 }
78 }
79}
80
81#[repr(C, align(64))]
93#[derive(Debug, Clone, Copy)]
94pub struct NotificationRef {
95 pub sequence: u64,
97 pub source_id: u32,
99 pub event_type: EventType,
101 _pad_event: [u8; 3],
103 pub row_count: u32,
105 pub timestamp: i64,
107 pub batch_offset: u64,
109 _pad: [u8; 24],
111}
112
113impl NotificationRef {
114 #[inline]
116 #[must_use]
117 pub const fn new(
118 sequence: u64,
119 source_id: u32,
120 event_type: EventType,
121 row_count: u32,
122 timestamp: i64,
123 batch_offset: u64,
124 ) -> Self {
125 Self {
126 sequence,
127 source_id,
128 event_type,
129 _pad_event: [0; 3],
130 row_count,
131 timestamp,
132 batch_offset,
133 _pad: [0; 24],
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
148pub enum ChangeEvent {
149 Insert {
151 data: Arc<RecordBatch>,
153 timestamp: i64,
155 sequence: u64,
157 },
158 Delete {
160 data: Arc<RecordBatch>,
162 timestamp: i64,
164 sequence: u64,
166 },
167 Update {
169 old: Arc<RecordBatch>,
171 new: Arc<RecordBatch>,
173 timestamp: i64,
175 sequence: u64,
177 },
178 Watermark {
180 timestamp: i64,
182 },
183 Snapshot {
185 data: Arc<RecordBatch>,
187 timestamp: i64,
189 sequence: u64,
191 },
192}
193
194impl ChangeEvent {
195 #[must_use]
197 pub fn insert(data: Arc<RecordBatch>, timestamp: i64, sequence: u64) -> Self {
198 Self::Insert {
199 data,
200 timestamp,
201 sequence,
202 }
203 }
204
205 #[must_use]
207 pub fn delete(data: Arc<RecordBatch>, timestamp: i64, sequence: u64) -> Self {
208 Self::Delete {
209 data,
210 timestamp,
211 sequence,
212 }
213 }
214
215 #[must_use]
217 pub fn update(
218 old: Arc<RecordBatch>,
219 new: Arc<RecordBatch>,
220 timestamp: i64,
221 sequence: u64,
222 ) -> Self {
223 Self::Update {
224 old,
225 new,
226 timestamp,
227 sequence,
228 }
229 }
230
231 #[must_use]
233 pub fn watermark(timestamp: i64) -> Self {
234 Self::Watermark { timestamp }
235 }
236
237 #[must_use]
239 pub fn snapshot(data: Arc<RecordBatch>, timestamp: i64, sequence: u64) -> Self {
240 Self::Snapshot {
241 data,
242 timestamp,
243 sequence,
244 }
245 }
246
247 #[must_use]
249 pub fn event_type(&self) -> EventType {
250 match self {
251 Self::Insert { .. } => EventType::Insert,
252 Self::Delete { .. } => EventType::Delete,
253 Self::Update { .. } => EventType::Update,
254 Self::Watermark { .. } => EventType::Watermark,
255 Self::Snapshot { .. } => EventType::Snapshot,
256 }
257 }
258
259 #[must_use]
261 pub fn timestamp(&self) -> i64 {
262 match self {
263 Self::Insert { timestamp, .. }
264 | Self::Delete { timestamp, .. }
265 | Self::Update { timestamp, .. }
266 | Self::Watermark { timestamp }
267 | Self::Snapshot { timestamp, .. } => *timestamp,
268 }
269 }
270
271 #[must_use]
273 pub fn sequence(&self) -> Option<u64> {
274 match self {
275 Self::Insert { sequence, .. }
276 | Self::Delete { sequence, .. }
277 | Self::Update { sequence, .. }
278 | Self::Snapshot { sequence, .. } => Some(*sequence),
279 Self::Watermark { .. } => None,
280 }
281 }
282
283 #[must_use]
288 pub fn row_count(&self) -> usize {
289 match self {
290 Self::Insert { data, .. } | Self::Delete { data, .. } | Self::Snapshot { data, .. } => {
291 data.num_rows()
292 }
293 Self::Update { new, .. } => new.num_rows(),
294 Self::Watermark { .. } => 0,
295 }
296 }
297
298 #[must_use]
300 pub fn has_data(&self) -> bool {
301 self.event_type().has_data()
302 }
303
304 #[must_use]
310 pub fn from_changelog_record(record: &ChangelogRecord, sequence: u64) -> Self {
311 let data = Arc::clone(&record.event.data);
312 let timestamp = record.emit_timestamp;
313 match record.operation {
314 CdcOperation::Insert | CdcOperation::UpdateAfter => Self::Insert {
315 data,
316 timestamp,
317 sequence,
318 },
319 CdcOperation::Delete | CdcOperation::UpdateBefore => Self::Delete {
320 data,
321 timestamp,
322 sequence,
323 },
324 }
325 }
326}
327
328#[derive(Debug, Clone)]
337pub struct ChangeEventBatch {
338 pub source: String,
340 pub events: Vec<ChangeEvent>,
342 pub first_sequence: u64,
344 pub last_sequence: u64,
346}
347
348impl ChangeEventBatch {
349 #[must_use]
351 pub fn new(
352 source: String,
353 events: Vec<ChangeEvent>,
354 first_sequence: u64,
355 last_sequence: u64,
356 ) -> Self {
357 Self {
358 source,
359 events,
360 first_sequence,
361 last_sequence,
362 }
363 }
364
365 #[must_use]
367 pub fn total_rows(&self) -> usize {
368 self.events.iter().map(ChangeEvent::row_count).sum()
369 }
370
371 #[must_use]
373 pub fn is_empty(&self) -> bool {
374 self.events.is_empty()
375 }
376
377 #[must_use]
379 pub fn len(&self) -> usize {
380 self.events.len()
381 }
382}
383
384#[cfg(test)]
389#[allow(clippy::cast_possible_wrap)]
390mod tests {
391 use super::*;
392 use arrow_array::Int64Array;
393 use arrow_schema::{DataType, Field, Schema};
394 use std::mem;
395
396 fn make_batch(n: usize) -> RecordBatch {
398 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
399 let values: Vec<i64> = (0..n as i64).collect();
400 let array = Int64Array::from(values);
401 RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
402 }
403
404 #[test]
407 fn event_type_weights() {
408 assert_eq!(EventType::Insert.weight(), 1);
409 assert_eq!(EventType::Delete.weight(), -1);
410 assert_eq!(EventType::Update.weight(), 0);
411 assert_eq!(EventType::Watermark.weight(), 0);
412 assert_eq!(EventType::Snapshot.weight(), 1);
413 }
414
415 #[test]
416 fn event_type_has_data() {
417 assert!(EventType::Insert.has_data());
418 assert!(EventType::Delete.has_data());
419 assert!(EventType::Update.has_data());
420 assert!(!EventType::Watermark.has_data());
421 assert!(EventType::Snapshot.has_data());
422 }
423
424 #[test]
425 fn event_type_from_cdc_operation() {
426 assert_eq!(EventType::from(CdcOperation::Insert), EventType::Insert);
427 assert_eq!(EventType::from(CdcOperation::Delete), EventType::Delete);
428 assert_eq!(
429 EventType::from(CdcOperation::UpdateAfter),
430 EventType::Insert
431 );
432 assert_eq!(
433 EventType::from(CdcOperation::UpdateBefore),
434 EventType::Delete
435 );
436 }
437
438 #[test]
439 fn event_type_repr_u8() {
440 assert_eq!(EventType::Insert as u8, 0);
441 assert_eq!(EventType::Delete as u8, 1);
442 assert_eq!(EventType::Update as u8, 2);
443 assert_eq!(EventType::Watermark as u8, 3);
444 assert_eq!(EventType::Snapshot as u8, 4);
445 }
446
447 #[test]
450 fn notification_ref_size_and_alignment() {
451 assert_eq!(mem::size_of::<NotificationRef>(), 64);
452 assert_eq!(mem::align_of::<NotificationRef>(), 64);
453 }
454
455 #[test]
456 fn notification_ref_fields() {
457 let nr = NotificationRef::new(42, 7, EventType::Insert, 100, 1_000_000, 0xFF);
458 assert_eq!(nr.sequence, 42);
459 assert_eq!(nr.source_id, 7);
460 assert_eq!(nr.event_type, EventType::Insert);
461 assert_eq!(nr.row_count, 100);
462 assert_eq!(nr.timestamp, 1_000_000);
463 assert_eq!(nr.batch_offset, 0xFF);
464 }
465
466 #[test]
467 fn notification_ref_copy() {
468 let a = NotificationRef::new(1, 2, EventType::Delete, 10, 500, 0);
469 let b = a; assert_eq!(a.sequence, b.sequence);
471 assert_eq!(a.event_type, b.event_type);
472 }
473
474 #[test]
477 fn change_event_insert() {
478 let batch = Arc::new(make_batch(5));
479 let ev = ChangeEvent::insert(Arc::clone(&batch), 1000, 1);
480 assert_eq!(ev.event_type(), EventType::Insert);
481 assert_eq!(ev.timestamp(), 1000);
482 assert_eq!(ev.sequence(), Some(1));
483 assert_eq!(ev.row_count(), 5);
484 assert!(ev.has_data());
485 }
486
487 #[test]
488 fn change_event_delete() {
489 let batch = Arc::new(make_batch(3));
490 let ev = ChangeEvent::delete(batch, 2000, 2);
491 assert_eq!(ev.event_type(), EventType::Delete);
492 assert_eq!(ev.timestamp(), 2000);
493 assert_eq!(ev.sequence(), Some(2));
494 assert_eq!(ev.row_count(), 3);
495 }
496
497 #[test]
498 fn change_event_update() {
499 let old = Arc::new(make_batch(2));
500 let new = Arc::new(make_batch(2));
501 let ev = ChangeEvent::update(old, new, 3000, 3);
502 assert_eq!(ev.event_type(), EventType::Update);
503 assert_eq!(ev.timestamp(), 3000);
504 assert_eq!(ev.sequence(), Some(3));
505 assert_eq!(ev.row_count(), 2);
506 }
507
508 #[test]
509 fn change_event_watermark() {
510 let ev = ChangeEvent::watermark(5000);
511 assert_eq!(ev.event_type(), EventType::Watermark);
512 assert_eq!(ev.timestamp(), 5000);
513 assert_eq!(ev.sequence(), None);
514 assert_eq!(ev.row_count(), 0);
515 assert!(!ev.has_data());
516 }
517
518 #[test]
519 fn change_event_snapshot() {
520 let batch = Arc::new(make_batch(10));
521 let ev = ChangeEvent::snapshot(batch, 100, 7);
522 assert_eq!(ev.event_type(), EventType::Snapshot);
523 assert_eq!(ev.row_count(), 10);
524 }
525
526 #[test]
527 fn change_event_clone_shares_arc() {
528 let batch = Arc::new(make_batch(4));
529 let ev = ChangeEvent::insert(Arc::clone(&batch), 0, 0);
530 let cloned = ev.clone();
531 if let (ChangeEvent::Insert { data: d1, .. }, ChangeEvent::Insert { data: d2, .. }) =
533 (&ev, &cloned)
534 {
535 assert!(Arc::ptr_eq(d1, d2));
536 } else {
537 panic!("expected Insert variants");
538 }
539 }
540
541 #[test]
542 fn change_event_from_changelog_record() {
543 use crate::operator::Event;
544
545 let batch = make_batch(3);
546 let event = Event::new(1000, batch);
547
548 let insert_rec = ChangelogRecord::insert(event.clone(), 2000);
549 let ce = ChangeEvent::from_changelog_record(&insert_rec, 10);
550 assert_eq!(ce.event_type(), EventType::Insert);
551 assert_eq!(ce.timestamp(), 2000);
552 assert_eq!(ce.sequence(), Some(10));
553 assert_eq!(ce.row_count(), 3);
554
555 let delete_rec = ChangelogRecord::delete(event, 3000);
556 let ce = ChangeEvent::from_changelog_record(&delete_rec, 11);
557 assert_eq!(ce.event_type(), EventType::Delete);
558 assert_eq!(ce.timestamp(), 3000);
559 assert_eq!(ce.sequence(), Some(11));
560 }
561
562 #[test]
565 fn change_event_batch_operations() {
566 let batch = Arc::new(make_batch(5));
567 let events = vec![
568 ChangeEvent::insert(Arc::clone(&batch), 100, 1),
569 ChangeEvent::insert(Arc::clone(&batch), 200, 2),
570 ChangeEvent::watermark(300),
571 ];
572 let ceb = ChangeEventBatch::new("test_mv".into(), events, 1, 2);
573 assert_eq!(ceb.len(), 3);
574 assert!(!ceb.is_empty());
575 assert_eq!(ceb.total_rows(), 10); assert_eq!(ceb.source, "test_mv");
577 assert_eq!(ceb.first_sequence, 1);
578 assert_eq!(ceb.last_sequence, 2);
579 }
580
581 #[test]
582 fn change_event_batch_empty() {
583 let ceb = ChangeEventBatch::new("empty".into(), vec![], 0, 0);
584 assert!(ceb.is_empty());
585 assert_eq!(ceb.len(), 0);
586 assert_eq!(ceb.total_rows(), 0);
587 }
588}