Skip to main content

rustcdc/core/
event.rs

1//! Canonical event envelope definitions and validation helpers.
2
3use std::fmt::{Display, Formatter};
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::core::{Error, Result};
9
10/// Current version of the canonical event envelope.
11pub const EVENT_ENVELOPE_VERSION: u16 = 1;
12
13/// CRUD-style operations emitted by a source.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
15#[serde(rename_all = "snake_case")]
16#[non_exhaustive]
17pub enum Operation {
18    #[default]
19    Insert,
20    Update,
21    Delete,
22    Read,
23    SchemaChange,
24    /// All rows were removed from the table by a `TRUNCATE` statement.
25    ///
26    /// `before` and `after` are always `None` for truncate events.
27    /// Only connectors that advertise [`crate::source::ConnectorCapabilities::truncate`]
28    /// emit this variant.
29    Truncate,
30}
31
32impl Display for Operation {
33    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34        f.write_str(self.to_str())
35    }
36}
37
38impl Operation {
39    /// Return a `&'static str` representation without heap allocation.
40    ///
41    /// Prefer this over `to_string()` on hot paths.
42    pub fn to_str(self) -> &'static str {
43        match self {
44            Self::Insert => "insert",
45            Self::Update => "update",
46            Self::Delete => "delete",
47            Self::Read => "read",
48            Self::SchemaChange => "schema_change",
49            Self::Truncate => "truncate",
50        }
51    }
52
53    /// Returns `true` for INSERT, UPDATE, and DELETE operations.
54    ///
55    /// Use this to filter out READ, SCHEMA_CHANGE, and TRUNCATE events when
56    /// you only care about row-level data mutations.
57    #[inline]
58    pub const fn is_data_change(self) -> bool {
59        matches!(self, Self::Insert | Self::Update | Self::Delete)
60    }
61
62    /// Returns `true` for INSERT events.
63    #[inline]
64    pub const fn is_insert(self) -> bool {
65        matches!(self, Self::Insert)
66    }
67
68    /// Returns `true` for UPDATE events.
69    #[inline]
70    pub const fn is_update(self) -> bool {
71        matches!(self, Self::Update)
72    }
73
74    /// Returns `true` for DELETE events.
75    #[inline]
76    pub const fn is_delete(self) -> bool {
77        matches!(self, Self::Delete)
78    }
79
80    /// Returns `true` for READ events (emitted during snapshot).
81    #[inline]
82    pub const fn is_read(self) -> bool {
83        matches!(self, Self::Read)
84    }
85
86    /// Returns `true` for SCHEMA_CHANGE events.
87    #[inline]
88    pub const fn is_schema_change(self) -> bool {
89        matches!(self, Self::SchemaChange)
90    }
91
92    /// Returns `true` for TRUNCATE events.
93    #[inline]
94    pub const fn is_truncate(self) -> bool {
95        matches!(self, Self::Truncate)
96    }
97}
98
99impl std::str::FromStr for Operation {
100    type Err = Error;
101
102    /// Parse an `Operation` from its canonical string form.
103    ///
104    /// Accepts the same lowercase snake_case strings produced by [`Operation::to_str`] and
105    /// [`Display`](std::fmt::Display). Parsing is case-sensitive.
106    ///
107    /// # Errors
108    ///
109    /// Returns [`Error::ValidationError`] when the string does not match any known variant.
110    ///
111    /// # Example
112    ///
113    /// ```
114    /// use std::str::FromStr;
115    /// use rustcdc::Operation;
116    ///
117    /// assert_eq!(Operation::from_str("insert").unwrap(), Operation::Insert);
118    /// assert_eq!(Operation::from_str("schema_change").unwrap(), Operation::SchemaChange);
119    /// assert!(Operation::from_str("INSERT").is_err()); // case-sensitive
120    /// ```
121    fn from_str(s: &str) -> Result<Self> {
122        match s {
123            "insert" => Ok(Self::Insert),
124            "update" => Ok(Self::Update),
125            "delete" => Ok(Self::Delete),
126            "read" => Ok(Self::Read),
127            "schema_change" => Ok(Self::SchemaChange),
128            "truncate" => Ok(Self::Truncate),
129            other => Err(Error::ValidationError(vec![format!(
130                "unknown operation '{}': expected one of insert, update, delete, read, schema_change, truncate",
131                other
132            )])),
133        }
134    }
135}
136
137/// Source identity and position metadata.
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
139pub struct SourceMetadata {
140    /// Logical name of the source connector.
141    pub source_name: String,
142    /// Source-specific durable position encoded as a string.
143    pub offset: String,
144    /// Source timestamp associated with the position.
145    pub timestamp: u64,
146}
147
148/// Snapshot progress information when an event is emitted during snapshotting.
149#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
150pub struct SnapshotMetadata {
151    /// Identifier for the snapshot session.
152    pub snapshot_id: String,
153    /// Zero-based snapshot chunk index.
154    pub chunk_index: u32,
155    /// Whether this chunk is the final one in the snapshot.
156    pub is_last_chunk: bool,
157}
158
159/// Transaction metadata when an event belongs to a multi-event transaction.
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
161pub struct TransactionMetadata {
162    /// Transaction identifier assigned by the source.
163    pub tx_id: u64,
164    /// Total number of events expected in the transaction, if reported by the source.
165    ///
166    /// `None` when the connector does not provide a total-event count for the transaction
167    /// (most CDC protocols do not). Connectors that do know the count should set this.
168    pub total_events: Option<u32>,
169    /// Zero-based position of this event within the transaction.
170    pub event_index: u32,
171}
172
173/// Validation error describing a broken contract in an event.
174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
175pub struct ValidationError {
176    /// Name of the field that failed validation.
177    pub field: String,
178    /// Human-readable explanation of the validation failure.
179    pub message: String,
180}
181
182impl Display for ValidationError {
183    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
184        write!(f, "{}: {}", self.field, self.message)
185    }
186}
187
188impl std::error::Error for ValidationError {}
189
190impl ValidationError {
191    fn new(field: impl Into<String>, message: impl Into<String>) -> Self {
192        Self {
193            field: field.into(),
194            message: message.into(),
195        }
196    }
197}
198
199/// All validation failures from a single [`Event::validate`] call.
200///
201/// Returned as the `Err` variant of [`Event::validate`] so callers can access
202/// each field-level error individually or format the whole list as a single string.
203///
204/// ```
205/// use rustcdc::Event;
206///
207/// let event = Event::default(); // ts == 0 → invalid
208/// let err = event.validate().unwrap_err();
209/// // Display: semicolon-joined list of all violations
210/// println!("{err}");
211/// // Iterate individual errors
212/// for e in err.errors() {
213///     println!("  field={} msg={}", e.field, e.message);
214/// }
215/// ```
216#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct ValidationErrors(Vec<ValidationError>);
218
219impl ValidationErrors {
220    fn new(errors: Vec<ValidationError>) -> Self {
221        Self(errors)
222    }
223
224    /// Returns a slice of the individual field-level validation failures.
225    pub fn errors(&self) -> &[ValidationError] {
226        &self.0
227    }
228
229    /// Consume this wrapper and return the owned error list.
230    pub fn into_errors(self) -> Vec<ValidationError> {
231        self.0
232    }
233
234    /// Number of distinct validation failures.
235    pub fn len(&self) -> usize {
236        self.0.len()
237    }
238
239    /// Returns `true` when there are no validation failures.
240    ///
241    /// This is always `false` in practice — `ValidationErrors` is only
242    /// constructed when at least one error exists.
243    pub fn is_empty(&self) -> bool {
244        self.0.is_empty()
245    }
246
247    /// Iterate over individual field-level validation failures.
248    pub fn iter(&self) -> std::slice::Iter<'_, ValidationError> {
249        self.0.iter()
250    }
251}
252
253impl<'a> IntoIterator for &'a ValidationErrors {
254    type Item = &'a ValidationError;
255    type IntoIter = std::slice::Iter<'a, ValidationError>;
256
257    fn into_iter(self) -> Self::IntoIter {
258        self.0.iter()
259    }
260}
261
262impl IntoIterator for ValidationErrors {
263    type Item = ValidationError;
264    type IntoIter = std::vec::IntoIter<ValidationError>;
265
266    fn into_iter(self) -> Self::IntoIter {
267        self.0.into_iter()
268    }
269}
270
271impl Display for ValidationErrors {
272    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
273        let messages: Vec<String> = self.0.iter().map(|e| e.to_string()).collect();
274        write!(f, "{}", messages.join("; "))
275    }
276}
277
278impl std::error::Error for ValidationErrors {}
279
280impl From<ValidationErrors> for Error {
281    fn from(errs: ValidationErrors) -> Self {
282        Self::ValidationError(errs.0.iter().map(|e| e.to_string()).collect())
283    }
284}
285
286/// Canonical event envelope used across all sources.
287///
288/// # Examples
289///
290/// ```
291/// use rustcdc::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
292/// use serde_json::json;
293///
294/// let event = Event {
295///     before: None,
296///     after: Some(json!({"id": 1, "name": "alice"})),
297///     op: Operation::Insert,
298///     source: SourceMetadata {
299///         source_name: "postgres".into(),
300///         offset: "0/16B6A70".into(),
301///         timestamp: 1,
302///     },
303///     ts: 1,
304///     schema: Some("public".into()),
305///     table: "users".into(),
306///     primary_key: Some(vec!["id".into()]),
307///     snapshot: None,
308///     transaction: None,
309///     envelope_version: EVENT_ENVELOPE_VERSION,
310///     before_is_key_only: false,
311/// };
312///
313/// let encoded = event.to_json().unwrap();
314/// let decoded = Event::from_json(&encoded).unwrap();
315/// assert_eq!(decoded.table, "users");
316/// assert!(decoded.validate().is_ok());
317/// ```
318#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
319pub struct Event {
320    /// Row state before the operation, when available.
321    pub before: Option<Value>,
322    /// Row state after the operation, when available.
323    pub after: Option<Value>,
324    /// CRUD operation represented by this event.
325    pub op: Operation,
326    /// Source identity and durable position metadata.
327    pub source: SourceMetadata,
328    /// Event timestamp in milliseconds since epoch.
329    pub ts: u64,
330    /// Schema name when the source provides one.
331    pub schema: Option<String>,
332    /// Table name that produced the event.
333    pub table: String,
334    /// Primary key column names, if available.
335    pub primary_key: Option<Vec<String>>,
336    /// Snapshot metadata when the event belongs to a snapshot phase.
337    pub snapshot: Option<SnapshotMetadata>,
338    /// Transaction metadata when the event belongs to a transaction.
339    pub transaction: Option<TransactionMetadata>,
340    /// Canonical envelope version for compatibility checks.
341    pub envelope_version: u16,
342    /// Advisory flag — set to `true` when the `before` field contains only
343    /// primary-key columns rather than the full pre-image row.
344    ///
345    /// This occurs on PostgreSQL UPDATE and DELETE events when the table's
346    /// `REPLICA IDENTITY` is `DEFAULT` (the factory default). In that mode,
347    /// PostgreSQL only includes the old primary key values in the WAL record
348    /// rather than the complete before-image. Applications that compute row diffs
349    /// or need the full prior state must check this flag; when it is `true`,
350    /// `before` cannot be used as a complete row snapshot.
351    ///
352    /// Always `false` for INSERT, READ, SCHEMA_CHANGE, and TRUNCATE events, and
353    /// for all MySQL / MariaDB / SQL Server events.
354    #[serde(default)]
355    pub before_is_key_only: bool,
356}
357
358impl Default for Event {
359    /// Returns a minimal INSERT event skeleton with correct `envelope_version`.
360    ///
361    /// All fields default to empty/zero; callers must set `table`, `ts`, and
362    /// other required fields before passing the event to validation or encoding.
363    fn default() -> Self {
364        Self {
365            before: None,
366            after: None,
367            op: Operation::default(),
368            source: SourceMetadata::default(),
369            ts: 0,
370            schema: None,
371            table: String::new(),
372            primary_key: None,
373            snapshot: None,
374            transaction: None,
375            envelope_version: EVENT_ENVELOPE_VERSION,
376            before_is_key_only: false,
377        }
378    }
379}
380
381impl Event {
382    /// Serialize the event to compact JSON.
383    pub fn to_json(&self) -> Result<String> {
384        Ok(serde_json::to_string(self)?)
385    }
386
387    /// Serialize the event to compact JSON bytes.
388    ///
389    /// Prefer this over `to_json()` when you need a `Vec<u8>` directly (e.g. for
390    /// Kafka message values, HTTP request bodies). Avoids a UTF-8 round-trip.
391    pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
392        Ok(serde_json::to_vec(self)?)
393    }
394
395    /// Deserialize an event from JSON.
396    pub fn from_json(input: &str) -> Result<Self> {
397        Ok(serde_json::from_str(input)?)
398    }
399
400    /// Deserialize an event from JSON bytes.
401    pub fn from_json_bytes(input: &[u8]) -> Result<Self> {
402        Ok(serde_json::from_slice(input)?)
403    }
404
405    /// Return the fully-qualified table name as `"schema.table"` when a schema
406    /// is present, or just `"table"` when no schema was provided by the source.
407    ///
408    /// Useful for routing, logging, and constructing Kafka topic names.
409    ///
410    /// # Example
411    ///
412    /// ```
413    /// use rustcdc::{Event, Operation, EVENT_ENVELOPE_VERSION};
414    ///
415    /// let mut event = Event { table: "orders".into(), ..Event::default() };
416    /// assert_eq!(event.qualified_table_name(), "orders");
417    ///
418    /// event.schema = Some("public".into());
419    /// assert_eq!(event.qualified_table_name(), "public.orders");
420    /// ```
421    pub fn qualified_table_name(&self) -> String {
422        match &self.schema {
423            Some(schema) if !schema.is_empty() => format!("{}.{}", schema, self.table),
424            _ => self.table.clone(),
425        }
426    }
427
428    /// Validate the event against the canonical envelope contract.
429    ///
430    /// Returns `Ok(())` when the event satisfies all envelope constraints.
431    /// Returns `Err(ValidationErrors)` with every violated constraint when one
432    /// or more fields fail. Use [`ValidationErrors::errors()`] to iterate the
433    /// individual failures or `Display` to format them as a joined string.
434    pub fn validate(&self) -> std::result::Result<(), ValidationErrors> {
435        let mut errors = Vec::new();
436
437        if self.table.trim().is_empty() {
438            errors.push(ValidationError::new(
439                "table",
440                "table name must not be empty",
441            ));
442        }
443
444        if self.ts == 0 {
445            errors.push(ValidationError::new("ts", "timestamp must be non-zero"));
446        }
447
448        if self.envelope_version != EVENT_ENVELOPE_VERSION {
449            errors.push(ValidationError::new(
450                "envelope_version",
451                format!(
452                    "expected envelope version {EVENT_ENVELOPE_VERSION}, got {}",
453                    self.envelope_version
454                ),
455            ));
456        }
457
458        if self.source.source_name.trim().is_empty() {
459            errors.push(ValidationError::new(
460                "source.source_name",
461                "source_name must not be empty",
462            ));
463        }
464
465        match self.op {
466            Operation::Insert => {
467                if self.after.is_none() {
468                    errors.push(ValidationError::new(
469                        "after",
470                        "insert events must include after",
471                    ));
472                }
473                if self.before.is_some() {
474                    errors.push(ValidationError::new(
475                        "before",
476                        "insert events must not include before",
477                    ));
478                }
479            }
480            Operation::Update => {
481                if self.after.is_none() {
482                    errors.push(ValidationError::new(
483                        "after",
484                        "update events must include after",
485                    ));
486                }
487                if self.before.is_none() {
488                    errors.push(ValidationError::new(
489                        "before",
490                        "update events must include before",
491                    ));
492                }
493            }
494            Operation::Delete => {
495                if self.before.is_none() {
496                    errors.push(ValidationError::new(
497                        "before",
498                        "delete events must include before",
499                    ));
500                }
501                if self.after.is_some() {
502                    errors.push(ValidationError::new(
503                        "after",
504                        "delete events must not include after",
505                    ));
506                }
507            }
508            Operation::Read => {
509                if self.after.is_none() {
510                    errors.push(ValidationError::new(
511                        "after",
512                        "read events must include after",
513                    ));
514                }
515            }
516            Operation::SchemaChange => {
517                if self.after.is_none() {
518                    errors.push(ValidationError::new(
519                        "after",
520                        "schema_change events must include after",
521                    ));
522                }
523            }
524            Operation::Truncate => {
525                if self.before.is_some() {
526                    errors.push(ValidationError::new(
527                        "before",
528                        "truncate events must not include before",
529                    ));
530                }
531                if self.after.is_some() {
532                    errors.push(ValidationError::new(
533                        "after",
534                        "truncate events must not include after",
535                    ));
536                }
537            }
538        }
539
540        if let Some(transaction) = &self.transaction {
541            if let Some(total) = transaction.total_events {
542                if total == 0 {
543                    errors.push(ValidationError::new(
544                        "transaction.total_events",
545                        "total_events must be greater than zero when set",
546                    ));
547                }
548                if transaction.event_index >= total {
549                    errors.push(ValidationError::new(
550                        "transaction.event_index",
551                        "event_index must be lower than total_events",
552                    ));
553                }
554            }
555        }
556
557        if self.before_is_key_only && self.op != Operation::Update && self.op != Operation::Delete {
558            errors.push(ValidationError::new(
559                "before_is_key_only",
560                "before_is_key_only can only be true for UPDATE or DELETE events",
561            ));
562        }
563
564        if self.before_is_key_only && self.before.is_none() {
565            errors.push(ValidationError::new(
566                "before_is_key_only",
567                "before_is_key_only is true but before is None; \
568                 key-only before-images must carry at least the primary-key columns in before",
569            ));
570        }
571
572        if errors.is_empty() {
573            Ok(())
574        } else {
575            Err(ValidationErrors::new(errors))
576        }
577    }
578
579    /// Convert validation failures into the crate's shared error type.
580    ///
581    /// Equivalent to `event.validate().map_err(Error::from)`.
582    pub fn validate_or_error(&self) -> Result<()> {
583        self.validate().map_err(Error::from)
584    }
585
586    /// Returns `true` when a full pre-image row is available in `before`.
587    ///
588    /// This is `true` iff `before` is `Some` **and** `before_is_key_only` is `false`.
589    ///
590    /// Use this instead of checking `before.is_some()` alone when you need the
591    /// complete prior row state — for example, when computing row diffs or emitting
592    /// before-images to a downstream store. A `before` field that is `Some` but
593    /// `before_is_key_only == true` contains only primary-key columns and cannot
594    /// be used as a complete row snapshot.
595    ///
596    /// # Example
597    ///
598    /// ```
599    /// use rustcdc::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
600    /// use serde_json::json;
601    ///
602    /// let mut event = Event {
603    ///     before: Some(json!({"id": 1})),
604    ///     after: Some(json!({"id": 1, "name": "bob"})),
605    ///     op: Operation::Update,
606    ///     before_is_key_only: true,
607    ///     ..Event::default()
608    /// };
609    /// event.ts = 1;
610    /// event.table = "users".into();
611    /// event.source.source_name = "pg".into();
612    ///
613    /// // Key-only before: `before` is present but partial.
614    /// assert!(!event.has_full_before());
615    ///
616    /// event.before_is_key_only = false;
617    /// assert!(event.has_full_before());
618    /// ```
619    #[inline]
620    pub fn has_full_before(&self) -> bool {
621        self.before.is_some() && !self.before_is_key_only
622    }
623
624    /// Extracts the primary-key column values from the most appropriate row image.
625    ///
626    /// Returns a JSON object containing only the columns listed in `primary_key`,
627    /// taken from `after` for INSERT / UPDATE / READ / SCHEMA_CHANGE, and from
628    /// `before` for DELETE. Returns `None` when:
629    ///
630    /// - `primary_key` is `None` or empty.
631    /// - The relevant row image (`after` or `before`) is absent or not a JSON object.
632    ///
633    /// This is the canonical source for Kafka message keys and idempotency
634    /// fingerprints derived from primary-key values alone.
635    ///
636    /// # Example
637    ///
638    /// ```
639    /// use rustcdc::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
640    /// use serde_json::json;
641    ///
642    /// let event = Event {
643    ///     before: None,
644    ///     after: Some(json!({"id": 42, "name": "alice", "age": 30})),
645    ///     op: Operation::Insert,
646    ///     primary_key: Some(vec!["id".into()]),
647    ///     ..Event::default()
648    /// };
649    ///
650    /// let key = event.primary_key_values().unwrap();
651    /// assert_eq!(key["id"], json!(42));
652    /// assert!(key.get("name").is_none());
653    /// ```
654    pub fn primary_key_values(&self) -> Option<serde_json::Value> {
655        let keys = self.primary_key.as_deref()?;
656        if keys.is_empty() {
657            return None;
658        }
659
660        let row = match self.op {
661            Operation::Delete => self.before.as_ref(),
662            _ => self.after.as_ref().or(self.before.as_ref()),
663        };
664
665        let obj = row?.as_object()?;
666        let mut result = serde_json::Map::with_capacity(keys.len());
667        for key in keys {
668            if let Some(value) = obj.get(key) {
669                result.insert(key.clone(), value.clone());
670            }
671        }
672
673        if result.is_empty() {
674            None
675        } else {
676            Some(serde_json::Value::Object(result))
677        }
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use serde_json::json;
684
685    use crate::core::Error;
686
687    use super::{
688        Event, Operation, SnapshotMetadata, SourceMetadata, TransactionMetadata,
689        EVENT_ENVELOPE_VERSION,
690    };
691
692    fn valid_event() -> Event {
693        Event {
694            before: None,
695            after: Some(json!({"id": 1, "name": "alice"})),
696            op: Operation::Insert,
697            source: SourceMetadata {
698                source_name: "postgres".into(),
699                offset: "0/16B6A70".into(),
700                timestamp: 1,
701            },
702            ts: 1,
703            schema: Some("public".into()),
704            table: "users".into(),
705            primary_key: Some(vec!["id".into()]),
706            snapshot: Some(SnapshotMetadata {
707                snapshot_id: "snap-1".into(),
708                chunk_index: 0,
709                is_last_chunk: false,
710            }),
711            transaction: Some(TransactionMetadata {
712                tx_id: 42,
713                total_events: Some(2),
714                event_index: 0,
715            }),
716            envelope_version: EVENT_ENVELOPE_VERSION,
717            before_is_key_only: false,
718        }
719    }
720
721    #[test]
722    fn round_trip_json_preserves_event() {
723        let event = valid_event();
724        let encoded = event.to_json().unwrap();
725        let decoded = Event::from_json(&encoded).unwrap();
726        assert_eq!(event, decoded);
727    }
728
729    #[test]
730    fn valid_event_passes_validation() {
731        assert!(valid_event().validate().is_ok());
732    }
733
734    #[test]
735    fn invalid_insert_reports_multiple_errors() {
736        let mut event = valid_event();
737        event.before = Some(json!({"id": 1}));
738        event.after = None;
739        event.table.clear();
740        event.ts = 0;
741        event.envelope_version = 99;
742
743        let errors = event.validate().unwrap_err();
744        assert!(errors.iter().any(|error| error.field == "before"));
745        assert!(errors.iter().any(|error| error.field == "after"));
746        assert!(errors.iter().any(|error| error.field == "table"));
747        assert!(errors.iter().any(|error| error.field == "ts"));
748        assert!(errors.iter().any(|error| error.field == "envelope_version"));
749    }
750
751    #[test]
752    fn invalid_json_returns_error_not_panic() {
753        let error = Event::from_json("{").unwrap_err();
754        assert!(matches!(error, crate::core::Error::SerializationError(_)));
755    }
756
757    #[test]
758    fn large_payload_round_trip_is_supported() {
759        let mut event = valid_event();
760        event.after = Some(json!({"blob": "x".repeat(1024 * 1024)}));
761        let encoded = event.to_json().unwrap();
762        let decoded = Event::from_json(&encoded).unwrap();
763        assert_eq!(event, decoded);
764    }
765
766    #[test]
767    fn operation_display_uses_stable_lowercase_labels() {
768        assert_eq!(Operation::Insert.to_string(), "insert");
769        assert_eq!(Operation::Update.to_string(), "update");
770        assert_eq!(Operation::Delete.to_string(), "delete");
771        assert_eq!(Operation::Read.to_string(), "read");
772        assert_eq!(Operation::SchemaChange.to_string(), "schema_change");
773    }
774
775    #[test]
776    fn update_delete_read_validation_paths_enforce_contract() {
777        let mut update = valid_event();
778        update.op = Operation::Update;
779        update.before = None;
780        let update_errors = update.validate().unwrap_err();
781        assert!(update_errors.iter().any(|error| error.field == "before"));
782
783        let mut delete = valid_event();
784        delete.op = Operation::Delete;
785        delete.before = None;
786        delete.after = Some(json!({"id": 1}));
787        let delete_errors = delete.validate().unwrap_err();
788        assert!(delete_errors.iter().any(|error| error.field == "before"));
789        assert!(delete_errors.iter().any(|error| error.field == "after"));
790
791        let mut read = valid_event();
792        read.op = Operation::Read;
793        read.after = None;
794        let read_errors = read.validate().unwrap_err();
795        assert!(read_errors.iter().any(|error| error.field == "after"));
796
797        let mut schema_change = valid_event();
798        schema_change.op = Operation::SchemaChange;
799        schema_change.after = None;
800        let schema_change_errors = schema_change.validate().unwrap_err();
801        assert!(schema_change_errors
802            .iter()
803            .any(|error| error.field == "after"));
804    }
805
806    #[test]
807    fn transaction_validation_rejects_invalid_bounds() {
808        let mut event = valid_event();
809        event.transaction = Some(TransactionMetadata {
810            tx_id: 9,
811            total_events: Some(0),
812            event_index: 0,
813        });
814        let errors = event.validate().unwrap_err();
815        assert!(errors
816            .iter()
817            .any(|error| error.field == "transaction.total_events"));
818
819        event.transaction = Some(TransactionMetadata {
820            tx_id: 9,
821            total_events: Some(2),
822            event_index: 2,
823        });
824        let errors = event.validate().unwrap_err();
825        assert!(errors
826            .iter()
827            .any(|error| error.field == "transaction.event_index"));
828    }
829
830    #[test]
831    fn before_is_key_only_rejected_on_non_update_delete_events() {
832        for op in [
833            Operation::Insert,
834            Operation::Read,
835            Operation::SchemaChange,
836            Operation::Truncate,
837        ] {
838            let mut event = valid_event();
839            event.op = op;
840            event.before_is_key_only = true;
841            // Adjust before/after to satisfy per-op contract so only the flag fires.
842            match op {
843                Operation::Insert | Operation::Read | Operation::SchemaChange => {
844                    event.before = None;
845                    event.after = Some(json!({"id": 1}));
846                }
847                Operation::Truncate => {
848                    event.before = None;
849                    event.after = None;
850                }
851                _ => {}
852            }
853            let errors = event.validate().unwrap_err();
854            assert!(
855                errors.iter().any(|e| e.field == "before_is_key_only"),
856                "expected before_is_key_only error for op={op:?}"
857            );
858        }
859    }
860
861    #[test]
862    fn before_is_key_only_accepted_on_update_and_delete_events() {
863        // UPDATE with key-only before
864        let mut update = valid_event();
865        update.op = Operation::Update;
866        update.before = Some(json!({"id": 1}));
867        update.after = Some(json!({"id": 1, "name": "bob"}));
868        update.before_is_key_only = true;
869        assert!(
870            update.validate().is_ok(),
871            "UPDATE should allow before_is_key_only=true"
872        );
873
874        // DELETE with key-only before
875        let mut delete = valid_event();
876        delete.op = Operation::Delete;
877        delete.before = Some(json!({"id": 1}));
878        delete.after = None;
879        delete.before_is_key_only = true;
880        assert!(
881            delete.validate().is_ok(),
882            "DELETE should allow before_is_key_only=true"
883        );
884    }
885
886    #[test]
887    fn before_is_key_only_true_requires_before_to_be_some() {
888        // before_is_key_only = true with before = None is always invalid, regardless of op.
889        for op in [Operation::Update, Operation::Delete] {
890            let mut event = valid_event();
891            event.op = op;
892            event.before = None; // ← no before image at all
893            event.before_is_key_only = true;
894            if op == Operation::Update {
895                event.after = Some(json!({"id": 1}));
896            }
897            let errors = event.validate().unwrap_err();
898            assert!(
899                errors.iter().any(|e| e.field == "before_is_key_only"),
900                "expected before_is_key_only error when before=None for op={op:?}; got: {errors}"
901            );
902        }
903    }
904
905    #[test]
906    fn event_default_has_correct_envelope_version() {
907        let event = Event::default();
908        assert_eq!(event.envelope_version, EVENT_ENVELOPE_VERSION);
909        assert!(!event.before_is_key_only);
910        assert_eq!(event.op, Operation::Insert);
911    }
912
913    #[test]
914    fn validate_or_error_maps_to_validation_error_type() {
915        let mut event = valid_event();
916        event.source.source_name = String::new();
917        let error = event.validate_or_error().unwrap_err();
918        match error {
919            Error::ValidationError(messages) => {
920                assert!(messages
921                    .iter()
922                    .any(|message| message.contains("source.source_name")));
923            }
924            other => panic!("expected ValidationError, got {other}"),
925        }
926    }
927
928    #[test]
929    fn has_full_before_distinguishes_key_only_from_full() {
930        let base = Event {
931            before: Some(json!({"id": 1, "name": "alice"})),
932            after: Some(json!({"id": 1, "name": "bob"})),
933            op: Operation::Update,
934            before_is_key_only: false,
935            ..Event::default()
936        };
937        assert!(base.has_full_before(), "full before should return true");
938
939        let key_only = Event {
940            before_is_key_only: true,
941            ..base.clone()
942        };
943        assert!(
944            !key_only.has_full_before(),
945            "key-only before should return false"
946        );
947
948        let no_before = Event {
949            before: None,
950            before_is_key_only: false,
951            ..base
952        };
953        assert!(
954            !no_before.has_full_before(),
955            "absent before should return false"
956        );
957    }
958
959    #[test]
960    fn primary_key_values_extracts_from_after_on_insert() {
961        let event = Event {
962            after: Some(json!({"id": 42, "name": "alice", "age": 30})),
963            op: Operation::Insert,
964            primary_key: Some(vec!["id".into()]),
965            ..Event::default()
966        };
967        let kv = event.primary_key_values().unwrap();
968        assert_eq!(kv["id"], json!(42));
969        assert!(kv.get("name").is_none());
970    }
971
972    #[test]
973    fn primary_key_values_extracts_from_before_on_delete() {
974        let event = Event {
975            before: Some(json!({"id": 7, "name": "bob"})),
976            after: None,
977            op: Operation::Delete,
978            primary_key: Some(vec!["id".into()]),
979            ..Event::default()
980        };
981        let kv = event.primary_key_values().unwrap();
982        assert_eq!(kv["id"], json!(7));
983    }
984
985    #[test]
986    fn primary_key_values_returns_none_when_no_pk_defined() {
987        let event = Event {
988            after: Some(json!({"id": 1})),
989            op: Operation::Insert,
990            primary_key: None,
991            ..Event::default()
992        };
993        assert!(event.primary_key_values().is_none());
994    }
995
996    #[test]
997    fn primary_key_values_returns_none_when_pk_fields_absent_from_row() {
998        let event = Event {
999            after: Some(json!({"name": "only_name"})),
1000            op: Operation::Insert,
1001            primary_key: Some(vec!["id".into()]),
1002            ..Event::default()
1003        };
1004        // "id" not present in `after`, so result should be None
1005        assert!(event.primary_key_values().is_none());
1006    }
1007
1008    #[test]
1009    fn primary_key_values_handles_composite_keys() {
1010        let event = Event {
1011            after: Some(json!({"tenant_id": 1, "user_id": 99, "name": "charlie"})),
1012            op: Operation::Insert,
1013            primary_key: Some(vec!["tenant_id".into(), "user_id".into()]),
1014            ..Event::default()
1015        };
1016        let kv = event.primary_key_values().unwrap();
1017        assert_eq!(kv["tenant_id"], json!(1));
1018        assert_eq!(kv["user_id"], json!(99));
1019        assert!(kv.get("name").is_none());
1020    }
1021
1022    #[test]
1023    fn operation_from_str_parses_all_variants() {
1024        use std::str::FromStr;
1025        assert_eq!(Operation::from_str("insert").unwrap(), Operation::Insert);
1026        assert_eq!(Operation::from_str("update").unwrap(), Operation::Update);
1027        assert_eq!(Operation::from_str("delete").unwrap(), Operation::Delete);
1028        assert_eq!(Operation::from_str("read").unwrap(), Operation::Read);
1029        assert_eq!(
1030            Operation::from_str("schema_change").unwrap(),
1031            Operation::SchemaChange
1032        );
1033        assert_eq!(
1034            Operation::from_str("truncate").unwrap(),
1035            Operation::Truncate
1036        );
1037    }
1038
1039    #[test]
1040    fn operation_from_str_rejects_unknown_and_wrong_case() {
1041        use std::str::FromStr;
1042        assert!(Operation::from_str("INSERT").is_err()); // case-sensitive
1043        assert!(Operation::from_str("unknown").is_err());
1044        assert!(Operation::from_str("").is_err());
1045    }
1046
1047    #[test]
1048    fn operation_round_trips_through_str() {
1049        use std::str::FromStr;
1050        for op in [
1051            Operation::Insert,
1052            Operation::Update,
1053            Operation::Delete,
1054            Operation::Read,
1055            Operation::SchemaChange,
1056            Operation::Truncate,
1057        ] {
1058            assert_eq!(
1059                Operation::from_str(op.to_str()).unwrap(),
1060                op,
1061                "round-trip failed for {op}"
1062            );
1063        }
1064    }
1065
1066    #[test]
1067    fn validation_errors_display_joins_all_failures() {
1068        let event = Event::default(); // ts == 0, table empty, source_name empty
1069        let errs = event.validate().unwrap_err();
1070        assert!(errs.len() >= 3); // at least ts, table, source_name
1071        let display = errs.to_string();
1072        assert!(display.contains("ts"));
1073        assert!(display.contains("table"));
1074    }
1075
1076    #[test]
1077    fn validation_errors_iterates_individually() {
1078        let event = Event::default();
1079        let errs = event.validate().unwrap_err();
1080        let fields: Vec<&str> = errs.iter().map(|e| e.field.as_str()).collect();
1081        assert!(fields.contains(&"ts"));
1082        assert!(fields.contains(&"table"));
1083    }
1084
1085    #[test]
1086    fn validation_errors_into_iter_consuming_works() {
1087        let event = Event::default();
1088        let errs = event.validate().unwrap_err();
1089        let count = errs.len();
1090        let collected: Vec<_> = errs.into_iter().collect();
1091        assert_eq!(collected.len(), count);
1092    }
1093
1094    #[test]
1095    fn validation_error_implements_std_error() {
1096        use std::error::Error as StdError;
1097        let ve = super::ValidationError {
1098            field: "ts".into(),
1099            message: "must be non-zero".into(),
1100        };
1101        // std::error::Error is object-safe; can be used as dyn Error
1102        let _: &dyn StdError = &ve;
1103    }
1104
1105    #[test]
1106    fn qualified_table_name_includes_schema_when_present() {
1107        let mut event = Event {
1108            table: "orders".into(),
1109            ..Event::default()
1110        };
1111        assert_eq!(event.qualified_table_name(), "orders");
1112        event.schema = Some("public".into());
1113        assert_eq!(event.qualified_table_name(), "public.orders");
1114    }
1115
1116    #[test]
1117    fn qualified_table_name_ignores_empty_schema() {
1118        let event = Event {
1119            table: "users".into(),
1120            schema: Some(String::new()),
1121            ..Event::default()
1122        };
1123        assert_eq!(event.qualified_table_name(), "users");
1124    }
1125}