Skip to main content

laminar_core/subscription/
event.rs

1//! Change event types for the reactive subscription system.
2//!
3//! Provides three tiers of types matching the ring architecture:
4//! - Ring 0: [`EventType`] + [`NotificationRef`] (zero-allocation, cache-aligned)
5//! - Ring 1/2: [`ChangeEvent`] (data delivery via `Arc<RecordBatch>`)
6//! - Batching: [`ChangeEventBatch`] (coalesced delivery)
7
8use std::sync::Arc;
9
10use arrow_array::RecordBatch;
11
12use crate::operator::window::{CdcOperation, ChangelogRecord};
13
14// ---------------------------------------------------------------------------
15// EventType — Ring 0 discriminant
16// ---------------------------------------------------------------------------
17
18/// Discriminant for change event kinds.
19///
20/// Stored as `#[repr(u8)]` for compact, zero-cost embedding in
21/// [`NotificationRef`] and protocol headers.
22///
23/// Weight semantics match [`CdcOperation::weight()`]:
24/// - Insert: +1
25/// - Delete: -1
26/// - Update: 0 (decomposed into -1 / +1 pair in Z-set model)
27/// - Watermark / Snapshot: 0 (control events)
28#[repr(u8)]
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub enum EventType {
31    /// A new row was inserted (+1 weight).
32    Insert = 0,
33    /// A row was deleted (-1 weight).
34    Delete = 1,
35    /// A row was updated (weight 0 — decomposes to delete + insert).
36    Update = 2,
37    /// Watermark progress notification (no data).
38    Watermark = 3,
39    /// Snapshot delivery (initial state load, +1 weight per row).
40    Snapshot = 4,
41}
42
43impl EventType {
44    /// Returns the Z-set weight for this event type.
45    ///
46    /// Matches the convention in [`CdcOperation::weight()`]:
47    /// - Insert / Snapshot: +1
48    /// - Delete: -1
49    /// - Update / Watermark: 0
50    #[inline]
51    #[must_use]
52    pub fn weight(&self) -> i32 {
53        match self {
54            Self::Insert | Self::Snapshot => 1,
55            Self::Delete => -1,
56            Self::Update | Self::Watermark => 0,
57        }
58    }
59
60    /// Returns `true` if this event type carries data rows.
61    #[inline]
62    #[must_use]
63    pub fn has_data(&self) -> bool {
64        !matches!(self, Self::Watermark)
65    }
66}
67
68impl From<CdcOperation> for EventType {
69    /// Converts a [`CdcOperation`] into an [`EventType`].
70    ///
71    /// - `Insert` / `UpdateAfter` → `Insert`
72    /// - `Delete` / `UpdateBefore` → `Delete`
73    fn from(op: CdcOperation) -> Self {
74        match op {
75            CdcOperation::Insert | CdcOperation::UpdateAfter => Self::Insert,
76            CdcOperation::Delete | CdcOperation::UpdateBefore => Self::Delete,
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// NotificationRef — Ring 0 cache-line-aligned slot
83// ---------------------------------------------------------------------------
84
85/// Zero-allocation notification reference for Ring 0.
86///
87/// Exactly 64 bytes (`#[repr(C, align(64))]`) to occupy a single cache line,
88/// avoiding false sharing when used in lock-free notification slots.
89///
90/// Contains only metadata — actual data is fetched from Ring 1 using
91/// `batch_offset` as a reference into a shared buffer.
92#[repr(C, align(64))]
93#[derive(Debug, Clone, Copy)]
94pub struct NotificationRef {
95    /// Monotonically increasing sequence number.
96    pub sequence: u64,
97    /// Identifier of the source materialized view or query.
98    pub source_id: u32,
99    /// The type of change event.
100    pub event_type: EventType,
101    // 3 bytes padding (u8 enum + 3 to align next u32)
102    _pad_event: [u8; 3],
103    /// Number of rows affected.
104    pub row_count: u32,
105    /// Event timestamp (milliseconds since epoch).
106    pub timestamp: i64,
107    /// Offset into a shared batch buffer for data retrieval.
108    pub batch_offset: u64,
109    /// Padding to fill to 64 bytes.
110    _pad: [u8; 24],
111}
112
113impl NotificationRef {
114    /// Creates a new notification reference.
115    #[inline]
116    #[must_use]
117    pub const fn new(
118        sequence: u64,
119        source_id: u32,
120        event_type: EventType,
121        row_count: u32,
122        timestamp: i64,
123        batch_offset: u64,
124    ) -> Self {
125        Self {
126            sequence,
127            source_id,
128            event_type,
129            _pad_event: [0; 3],
130            row_count,
131            timestamp,
132            batch_offset,
133            _pad: [0; 24],
134        }
135    }
136}
137
138// ---------------------------------------------------------------------------
139// ChangeEvent — Ring 1/2 data delivery
140// ---------------------------------------------------------------------------
141
142/// A change event carrying Arrow data for subscriber delivery.
143///
144/// Each variant includes the minimal data needed for that event type.
145/// Data is shared via `Arc<RecordBatch>` for zero-copy fan-out to
146/// multiple subscribers.
147#[derive(Debug, Clone)]
148pub enum ChangeEvent {
149    /// A new row batch was inserted.
150    Insert {
151        /// The inserted rows.
152        data: Arc<RecordBatch>,
153        /// Event timestamp.
154        timestamp: i64,
155        /// Sequence number from the notification.
156        sequence: u64,
157    },
158    /// A row batch was deleted.
159    Delete {
160        /// The deleted rows.
161        data: Arc<RecordBatch>,
162        /// Event timestamp.
163        timestamp: i64,
164        /// Sequence number from the notification.
165        sequence: u64,
166    },
167    /// A row batch was updated (before + after).
168    Update {
169        /// The old row values.
170        old: Arc<RecordBatch>,
171        /// The new row values.
172        new: Arc<RecordBatch>,
173        /// Event timestamp.
174        timestamp: i64,
175        /// Sequence number from the notification.
176        sequence: u64,
177    },
178    /// Watermark progress (no data).
179    Watermark {
180        /// The new watermark timestamp.
181        timestamp: i64,
182    },
183    /// Initial snapshot delivery.
184    Snapshot {
185        /// The snapshot rows.
186        data: Arc<RecordBatch>,
187        /// Event timestamp.
188        timestamp: i64,
189        /// Sequence number from the notification.
190        sequence: u64,
191    },
192}
193
194impl ChangeEvent {
195    /// Creates an insert change event.
196    #[must_use]
197    pub fn insert(data: Arc<RecordBatch>, timestamp: i64, sequence: u64) -> Self {
198        Self::Insert {
199            data,
200            timestamp,
201            sequence,
202        }
203    }
204
205    /// Creates a delete change event.
206    #[must_use]
207    pub fn delete(data: Arc<RecordBatch>, timestamp: i64, sequence: u64) -> Self {
208        Self::Delete {
209            data,
210            timestamp,
211            sequence,
212        }
213    }
214
215    /// Creates an update change event with old and new values.
216    #[must_use]
217    pub fn update(
218        old: Arc<RecordBatch>,
219        new: Arc<RecordBatch>,
220        timestamp: i64,
221        sequence: u64,
222    ) -> Self {
223        Self::Update {
224            old,
225            new,
226            timestamp,
227            sequence,
228        }
229    }
230
231    /// Creates a watermark change event.
232    #[must_use]
233    pub fn watermark(timestamp: i64) -> Self {
234        Self::Watermark { timestamp }
235    }
236
237    /// Creates a snapshot change event.
238    #[must_use]
239    pub fn snapshot(data: Arc<RecordBatch>, timestamp: i64, sequence: u64) -> Self {
240        Self::Snapshot {
241            data,
242            timestamp,
243            sequence,
244        }
245    }
246
247    /// Returns the [`EventType`] for this change event.
248    #[must_use]
249    pub fn event_type(&self) -> EventType {
250        match self {
251            Self::Insert { .. } => EventType::Insert,
252            Self::Delete { .. } => EventType::Delete,
253            Self::Update { .. } => EventType::Update,
254            Self::Watermark { .. } => EventType::Watermark,
255            Self::Snapshot { .. } => EventType::Snapshot,
256        }
257    }
258
259    /// Returns the event timestamp.
260    #[must_use]
261    pub fn timestamp(&self) -> i64 {
262        match self {
263            Self::Insert { timestamp, .. }
264            | Self::Delete { timestamp, .. }
265            | Self::Update { timestamp, .. }
266            | Self::Watermark { timestamp }
267            | Self::Snapshot { timestamp, .. } => *timestamp,
268        }
269    }
270
271    /// Returns the sequence number, or `None` for watermark events.
272    #[must_use]
273    pub fn sequence(&self) -> Option<u64> {
274        match self {
275            Self::Insert { sequence, .. }
276            | Self::Delete { sequence, .. }
277            | Self::Update { sequence, .. }
278            | Self::Snapshot { sequence, .. } => Some(*sequence),
279            Self::Watermark { .. } => None,
280        }
281    }
282
283    /// Returns the total number of rows in this event.
284    ///
285    /// For `Update` events, returns the row count of the new batch.
286    /// For `Watermark`, returns 0.
287    #[must_use]
288    pub fn row_count(&self) -> usize {
289        match self {
290            Self::Insert { data, .. } | Self::Delete { data, .. } | Self::Snapshot { data, .. } => {
291                data.num_rows()
292            }
293            Self::Update { new, .. } => new.num_rows(),
294            Self::Watermark { .. } => 0,
295        }
296    }
297
298    /// Returns `true` if this event carries data rows.
299    #[must_use]
300    pub fn has_data(&self) -> bool {
301        self.event_type().has_data()
302    }
303
304    /// Creates a [`ChangeEvent`] from a [`ChangelogRecord`].
305    ///
306    /// Maps CDC operations to subscription event types:
307    /// - `Insert` / `UpdateAfter` → `ChangeEvent::Insert`
308    /// - `Delete` / `UpdateBefore` → `ChangeEvent::Delete`
309    #[must_use]
310    pub fn from_changelog_record(record: &ChangelogRecord, sequence: u64) -> Self {
311        let data = Arc::clone(&record.event.data);
312        let timestamp = record.emit_timestamp;
313        match record.operation {
314            CdcOperation::Insert | CdcOperation::UpdateAfter => Self::Insert {
315                data,
316                timestamp,
317                sequence,
318            },
319            CdcOperation::Delete | CdcOperation::UpdateBefore => Self::Delete {
320                data,
321                timestamp,
322                sequence,
323            },
324        }
325    }
326}
327
328// ---------------------------------------------------------------------------
329// ChangeEventBatch — coalesced delivery
330// ---------------------------------------------------------------------------
331
332/// A batch of change events for coalesced delivery to subscribers.
333///
334/// Groups multiple [`ChangeEvent`]s from the same source, with metadata
335/// about the sequence range for gap detection and resumption.
336#[derive(Debug, Clone)]
337pub struct ChangeEventBatch {
338    /// The source materialized view or query name.
339    pub source: String,
340    /// The events in this batch.
341    pub events: Vec<ChangeEvent>,
342    /// First sequence number in this batch.
343    pub first_sequence: u64,
344    /// Last sequence number in this batch (inclusive).
345    pub last_sequence: u64,
346}
347
348impl ChangeEventBatch {
349    /// Creates a new change event batch.
350    #[must_use]
351    pub fn new(
352        source: String,
353        events: Vec<ChangeEvent>,
354        first_sequence: u64,
355        last_sequence: u64,
356    ) -> Self {
357        Self {
358            source,
359            events,
360            first_sequence,
361            last_sequence,
362        }
363    }
364
365    /// Returns the total number of data rows across all events.
366    #[must_use]
367    pub fn total_rows(&self) -> usize {
368        self.events.iter().map(ChangeEvent::row_count).sum()
369    }
370
371    /// Returns `true` if this batch contains no events.
372    #[must_use]
373    pub fn is_empty(&self) -> bool {
374        self.events.is_empty()
375    }
376
377    /// Returns the number of events in this batch.
378    #[must_use]
379    pub fn len(&self) -> usize {
380        self.events.len()
381    }
382}
383
384// ===========================================================================
385// Tests
386// ===========================================================================
387
388#[cfg(test)]
389#[allow(clippy::cast_possible_wrap)]
390mod tests {
391    use super::*;
392    use arrow_array::Int64Array;
393    use arrow_schema::{DataType, Field, Schema};
394    use std::mem;
395
396    /// Helper: create a `RecordBatch` with `n` rows.
397    fn make_batch(n: usize) -> RecordBatch {
398        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
399        let values: Vec<i64> = (0..n as i64).collect();
400        let array = Int64Array::from(values);
401        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
402    }
403
404    // --- EventType tests ---
405
406    #[test]
407    fn event_type_weights() {
408        assert_eq!(EventType::Insert.weight(), 1);
409        assert_eq!(EventType::Delete.weight(), -1);
410        assert_eq!(EventType::Update.weight(), 0);
411        assert_eq!(EventType::Watermark.weight(), 0);
412        assert_eq!(EventType::Snapshot.weight(), 1);
413    }
414
415    #[test]
416    fn event_type_has_data() {
417        assert!(EventType::Insert.has_data());
418        assert!(EventType::Delete.has_data());
419        assert!(EventType::Update.has_data());
420        assert!(!EventType::Watermark.has_data());
421        assert!(EventType::Snapshot.has_data());
422    }
423
424    #[test]
425    fn event_type_from_cdc_operation() {
426        assert_eq!(EventType::from(CdcOperation::Insert), EventType::Insert);
427        assert_eq!(EventType::from(CdcOperation::Delete), EventType::Delete);
428        assert_eq!(
429            EventType::from(CdcOperation::UpdateAfter),
430            EventType::Insert
431        );
432        assert_eq!(
433            EventType::from(CdcOperation::UpdateBefore),
434            EventType::Delete
435        );
436    }
437
438    #[test]
439    fn event_type_repr_u8() {
440        assert_eq!(EventType::Insert as u8, 0);
441        assert_eq!(EventType::Delete as u8, 1);
442        assert_eq!(EventType::Update as u8, 2);
443        assert_eq!(EventType::Watermark as u8, 3);
444        assert_eq!(EventType::Snapshot as u8, 4);
445    }
446
447    // --- NotificationRef tests ---
448
449    #[test]
450    fn notification_ref_size_and_alignment() {
451        assert_eq!(mem::size_of::<NotificationRef>(), 64);
452        assert_eq!(mem::align_of::<NotificationRef>(), 64);
453    }
454
455    #[test]
456    fn notification_ref_fields() {
457        let nr = NotificationRef::new(42, 7, EventType::Insert, 100, 1_000_000, 0xFF);
458        assert_eq!(nr.sequence, 42);
459        assert_eq!(nr.source_id, 7);
460        assert_eq!(nr.event_type, EventType::Insert);
461        assert_eq!(nr.row_count, 100);
462        assert_eq!(nr.timestamp, 1_000_000);
463        assert_eq!(nr.batch_offset, 0xFF);
464    }
465
466    #[test]
467    fn notification_ref_copy() {
468        let a = NotificationRef::new(1, 2, EventType::Delete, 10, 500, 0);
469        let b = a; // Copy
470        assert_eq!(a.sequence, b.sequence);
471        assert_eq!(a.event_type, b.event_type);
472    }
473
474    // --- ChangeEvent tests ---
475
476    #[test]
477    fn change_event_insert() {
478        let batch = Arc::new(make_batch(5));
479        let ev = ChangeEvent::insert(Arc::clone(&batch), 1000, 1);
480        assert_eq!(ev.event_type(), EventType::Insert);
481        assert_eq!(ev.timestamp(), 1000);
482        assert_eq!(ev.sequence(), Some(1));
483        assert_eq!(ev.row_count(), 5);
484        assert!(ev.has_data());
485    }
486
487    #[test]
488    fn change_event_delete() {
489        let batch = Arc::new(make_batch(3));
490        let ev = ChangeEvent::delete(batch, 2000, 2);
491        assert_eq!(ev.event_type(), EventType::Delete);
492        assert_eq!(ev.timestamp(), 2000);
493        assert_eq!(ev.sequence(), Some(2));
494        assert_eq!(ev.row_count(), 3);
495    }
496
497    #[test]
498    fn change_event_update() {
499        let old = Arc::new(make_batch(2));
500        let new = Arc::new(make_batch(2));
501        let ev = ChangeEvent::update(old, new, 3000, 3);
502        assert_eq!(ev.event_type(), EventType::Update);
503        assert_eq!(ev.timestamp(), 3000);
504        assert_eq!(ev.sequence(), Some(3));
505        assert_eq!(ev.row_count(), 2);
506    }
507
508    #[test]
509    fn change_event_watermark() {
510        let ev = ChangeEvent::watermark(5000);
511        assert_eq!(ev.event_type(), EventType::Watermark);
512        assert_eq!(ev.timestamp(), 5000);
513        assert_eq!(ev.sequence(), None);
514        assert_eq!(ev.row_count(), 0);
515        assert!(!ev.has_data());
516    }
517
518    #[test]
519    fn change_event_snapshot() {
520        let batch = Arc::new(make_batch(10));
521        let ev = ChangeEvent::snapshot(batch, 100, 7);
522        assert_eq!(ev.event_type(), EventType::Snapshot);
523        assert_eq!(ev.row_count(), 10);
524    }
525
526    #[test]
527    fn change_event_clone_shares_arc() {
528        let batch = Arc::new(make_batch(4));
529        let ev = ChangeEvent::insert(Arc::clone(&batch), 0, 0);
530        let cloned = ev.clone();
531        // Both point to the same underlying allocation.
532        if let (ChangeEvent::Insert { data: d1, .. }, ChangeEvent::Insert { data: d2, .. }) =
533            (&ev, &cloned)
534        {
535            assert!(Arc::ptr_eq(d1, d2));
536        } else {
537            panic!("expected Insert variants");
538        }
539    }
540
541    #[test]
542    fn change_event_from_changelog_record() {
543        use crate::operator::Event;
544
545        let batch = make_batch(3);
546        let event = Event::new(1000, batch);
547
548        let insert_rec = ChangelogRecord::insert(event.clone(), 2000);
549        let ce = ChangeEvent::from_changelog_record(&insert_rec, 10);
550        assert_eq!(ce.event_type(), EventType::Insert);
551        assert_eq!(ce.timestamp(), 2000);
552        assert_eq!(ce.sequence(), Some(10));
553        assert_eq!(ce.row_count(), 3);
554
555        let delete_rec = ChangelogRecord::delete(event, 3000);
556        let ce = ChangeEvent::from_changelog_record(&delete_rec, 11);
557        assert_eq!(ce.event_type(), EventType::Delete);
558        assert_eq!(ce.timestamp(), 3000);
559        assert_eq!(ce.sequence(), Some(11));
560    }
561
562    // --- ChangeEventBatch tests ---
563
564    #[test]
565    fn change_event_batch_operations() {
566        let batch = Arc::new(make_batch(5));
567        let events = vec![
568            ChangeEvent::insert(Arc::clone(&batch), 100, 1),
569            ChangeEvent::insert(Arc::clone(&batch), 200, 2),
570            ChangeEvent::watermark(300),
571        ];
572        let ceb = ChangeEventBatch::new("test_mv".into(), events, 1, 2);
573        assert_eq!(ceb.len(), 3);
574        assert!(!ceb.is_empty());
575        assert_eq!(ceb.total_rows(), 10); // 5 + 5 + 0
576        assert_eq!(ceb.source, "test_mv");
577        assert_eq!(ceb.first_sequence, 1);
578        assert_eq!(ceb.last_sequence, 2);
579    }
580
581    #[test]
582    fn change_event_batch_empty() {
583        let ceb = ChangeEventBatch::new("empty".into(), vec![], 0, 0);
584        assert!(ceb.is_empty());
585        assert_eq!(ceb.len(), 0);
586        assert_eq!(ceb.total_rows(), 0);
587    }
588}