1use std::fmt::{Display, Formatter};
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::core::{Error, Result};
9
10pub const EVENT_ENVELOPE_VERSION: u16 = 1;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
15#[serde(rename_all = "snake_case")]
16#[non_exhaustive]
17pub enum Operation {
18 #[default]
19 Insert,
20 Update,
21 Delete,
22 Read,
23 SchemaChange,
24 Truncate,
30}
31
32impl Display for Operation {
33 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34 f.write_str(self.to_str())
35 }
36}
37
38impl Operation {
39 pub fn to_str(self) -> &'static str {
43 match self {
44 Self::Insert => "insert",
45 Self::Update => "update",
46 Self::Delete => "delete",
47 Self::Read => "read",
48 Self::SchemaChange => "schema_change",
49 Self::Truncate => "truncate",
50 }
51 }
52
53 #[inline]
58 pub const fn is_data_change(self) -> bool {
59 matches!(self, Self::Insert | Self::Update | Self::Delete)
60 }
61
62 #[inline]
64 pub const fn is_insert(self) -> bool {
65 matches!(self, Self::Insert)
66 }
67
68 #[inline]
70 pub const fn is_update(self) -> bool {
71 matches!(self, Self::Update)
72 }
73
74 #[inline]
76 pub const fn is_delete(self) -> bool {
77 matches!(self, Self::Delete)
78 }
79
80 #[inline]
82 pub const fn is_read(self) -> bool {
83 matches!(self, Self::Read)
84 }
85
86 #[inline]
88 pub const fn is_schema_change(self) -> bool {
89 matches!(self, Self::SchemaChange)
90 }
91
92 #[inline]
94 pub const fn is_truncate(self) -> bool {
95 matches!(self, Self::Truncate)
96 }
97}
98
99impl std::str::FromStr for Operation {
100 type Err = Error;
101
102 fn from_str(s: &str) -> Result<Self> {
122 match s {
123 "insert" => Ok(Self::Insert),
124 "update" => Ok(Self::Update),
125 "delete" => Ok(Self::Delete),
126 "read" => Ok(Self::Read),
127 "schema_change" => Ok(Self::SchemaChange),
128 "truncate" => Ok(Self::Truncate),
129 other => Err(Error::ValidationError(vec![format!(
130 "unknown operation '{}': expected one of insert, update, delete, read, schema_change, truncate",
131 other
132 )])),
133 }
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
139pub struct SourceMetadata {
140 pub source_name: String,
142 pub offset: String,
144 pub timestamp: u64,
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
150pub struct SnapshotMetadata {
151 pub snapshot_id: String,
153 pub chunk_index: u32,
155 pub is_last_chunk: bool,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
161pub struct TransactionMetadata {
162 pub tx_id: u64,
164 pub total_events: Option<u32>,
169 pub event_index: u32,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
175pub struct ValidationError {
176 pub field: String,
178 pub message: String,
180}
181
182impl Display for ValidationError {
183 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
184 write!(f, "{}: {}", self.field, self.message)
185 }
186}
187
188impl std::error::Error for ValidationError {}
189
190impl ValidationError {
191 fn new(field: impl Into<String>, message: impl Into<String>) -> Self {
192 Self {
193 field: field.into(),
194 message: message.into(),
195 }
196 }
197}
198
199#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct ValidationErrors(Vec<ValidationError>);
218
219impl ValidationErrors {
220 fn new(errors: Vec<ValidationError>) -> Self {
221 Self(errors)
222 }
223
224 pub fn errors(&self) -> &[ValidationError] {
226 &self.0
227 }
228
229 pub fn into_errors(self) -> Vec<ValidationError> {
231 self.0
232 }
233
234 pub fn len(&self) -> usize {
236 self.0.len()
237 }
238
239 pub fn is_empty(&self) -> bool {
244 self.0.is_empty()
245 }
246
247 pub fn iter(&self) -> std::slice::Iter<'_, ValidationError> {
249 self.0.iter()
250 }
251}
252
253impl<'a> IntoIterator for &'a ValidationErrors {
254 type Item = &'a ValidationError;
255 type IntoIter = std::slice::Iter<'a, ValidationError>;
256
257 fn into_iter(self) -> Self::IntoIter {
258 self.0.iter()
259 }
260}
261
262impl IntoIterator for ValidationErrors {
263 type Item = ValidationError;
264 type IntoIter = std::vec::IntoIter<ValidationError>;
265
266 fn into_iter(self) -> Self::IntoIter {
267 self.0.into_iter()
268 }
269}
270
271impl Display for ValidationErrors {
272 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
273 let messages: Vec<String> = self.0.iter().map(|e| e.to_string()).collect();
274 write!(f, "{}", messages.join("; "))
275 }
276}
277
278impl std::error::Error for ValidationErrors {}
279
280impl From<ValidationErrors> for Error {
281 fn from(errs: ValidationErrors) -> Self {
282 Self::ValidationError(errs.0.iter().map(|e| e.to_string()).collect())
283 }
284}
285
286#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
319pub struct Event {
320 pub before: Option<Value>,
322 pub after: Option<Value>,
324 pub op: Operation,
326 pub source: SourceMetadata,
328 pub ts: u64,
330 pub schema: Option<String>,
332 pub table: String,
334 pub primary_key: Option<Vec<String>>,
336 pub snapshot: Option<SnapshotMetadata>,
338 pub transaction: Option<TransactionMetadata>,
340 pub envelope_version: u16,
342 #[serde(default)]
355 pub before_is_key_only: bool,
356}
357
358impl Default for Event {
359 fn default() -> Self {
364 Self {
365 before: None,
366 after: None,
367 op: Operation::default(),
368 source: SourceMetadata::default(),
369 ts: 0,
370 schema: None,
371 table: String::new(),
372 primary_key: None,
373 snapshot: None,
374 transaction: None,
375 envelope_version: EVENT_ENVELOPE_VERSION,
376 before_is_key_only: false,
377 }
378 }
379}
380
381impl Event {
382 pub fn to_json(&self) -> Result<String> {
384 Ok(serde_json::to_string(self)?)
385 }
386
387 pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
392 Ok(serde_json::to_vec(self)?)
393 }
394
395 pub fn from_json(input: &str) -> Result<Self> {
397 Ok(serde_json::from_str(input)?)
398 }
399
400 pub fn from_json_bytes(input: &[u8]) -> Result<Self> {
402 Ok(serde_json::from_slice(input)?)
403 }
404
405 pub fn qualified_table_name(&self) -> String {
422 match &self.schema {
423 Some(schema) if !schema.is_empty() => format!("{}.{}", schema, self.table),
424 _ => self.table.clone(),
425 }
426 }
427
428 pub fn validate(&self) -> std::result::Result<(), ValidationErrors> {
435 let mut errors = Vec::new();
436
437 if self.table.trim().is_empty() {
438 errors.push(ValidationError::new(
439 "table",
440 "table name must not be empty",
441 ));
442 }
443
444 if self.ts == 0 {
445 errors.push(ValidationError::new("ts", "timestamp must be non-zero"));
446 }
447
448 if self.envelope_version != EVENT_ENVELOPE_VERSION {
449 errors.push(ValidationError::new(
450 "envelope_version",
451 format!(
452 "expected envelope version {EVENT_ENVELOPE_VERSION}, got {}",
453 self.envelope_version
454 ),
455 ));
456 }
457
458 if self.source.source_name.trim().is_empty() {
459 errors.push(ValidationError::new(
460 "source.source_name",
461 "source_name must not be empty",
462 ));
463 }
464
465 match self.op {
466 Operation::Insert => {
467 if self.after.is_none() {
468 errors.push(ValidationError::new(
469 "after",
470 "insert events must include after",
471 ));
472 }
473 if self.before.is_some() {
474 errors.push(ValidationError::new(
475 "before",
476 "insert events must not include before",
477 ));
478 }
479 }
480 Operation::Update => {
481 if self.after.is_none() {
482 errors.push(ValidationError::new(
483 "after",
484 "update events must include after",
485 ));
486 }
487 if self.before.is_none() {
488 errors.push(ValidationError::new(
489 "before",
490 "update events must include before",
491 ));
492 }
493 }
494 Operation::Delete => {
495 if self.before.is_none() {
496 errors.push(ValidationError::new(
497 "before",
498 "delete events must include before",
499 ));
500 }
501 if self.after.is_some() {
502 errors.push(ValidationError::new(
503 "after",
504 "delete events must not include after",
505 ));
506 }
507 }
508 Operation::Read => {
509 if self.after.is_none() {
510 errors.push(ValidationError::new(
511 "after",
512 "read events must include after",
513 ));
514 }
515 }
516 Operation::SchemaChange => {
517 if self.after.is_none() {
518 errors.push(ValidationError::new(
519 "after",
520 "schema_change events must include after",
521 ));
522 }
523 }
524 Operation::Truncate => {
525 if self.before.is_some() {
526 errors.push(ValidationError::new(
527 "before",
528 "truncate events must not include before",
529 ));
530 }
531 if self.after.is_some() {
532 errors.push(ValidationError::new(
533 "after",
534 "truncate events must not include after",
535 ));
536 }
537 }
538 }
539
540 if let Some(transaction) = &self.transaction {
541 if let Some(total) = transaction.total_events {
542 if total == 0 {
543 errors.push(ValidationError::new(
544 "transaction.total_events",
545 "total_events must be greater than zero when set",
546 ));
547 }
548 if transaction.event_index >= total {
549 errors.push(ValidationError::new(
550 "transaction.event_index",
551 "event_index must be lower than total_events",
552 ));
553 }
554 }
555 }
556
557 if self.before_is_key_only && self.op != Operation::Update && self.op != Operation::Delete {
558 errors.push(ValidationError::new(
559 "before_is_key_only",
560 "before_is_key_only can only be true for UPDATE or DELETE events",
561 ));
562 }
563
564 if self.before_is_key_only && self.before.is_none() {
565 errors.push(ValidationError::new(
566 "before_is_key_only",
567 "before_is_key_only is true but before is None; \
568 key-only before-images must carry at least the primary-key columns in before",
569 ));
570 }
571
572 if errors.is_empty() {
573 Ok(())
574 } else {
575 Err(ValidationErrors::new(errors))
576 }
577 }
578
579 pub fn validate_or_error(&self) -> Result<()> {
583 self.validate().map_err(Error::from)
584 }
585
586 #[inline]
620 pub fn has_full_before(&self) -> bool {
621 self.before.is_some() && !self.before_is_key_only
622 }
623
624 pub fn primary_key_values(&self) -> Option<serde_json::Value> {
655 let keys = self.primary_key.as_deref()?;
656 if keys.is_empty() {
657 return None;
658 }
659
660 let row = match self.op {
661 Operation::Delete => self.before.as_ref(),
662 _ => self.after.as_ref().or(self.before.as_ref()),
663 };
664
665 let obj = row?.as_object()?;
666 let mut result = serde_json::Map::with_capacity(keys.len());
667 for key in keys {
668 if let Some(value) = obj.get(key) {
669 result.insert(key.clone(), value.clone());
670 }
671 }
672
673 if result.is_empty() {
674 None
675 } else {
676 Some(serde_json::Value::Object(result))
677 }
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use serde_json::json;
684
685 use crate::core::Error;
686
687 use super::{
688 Event, Operation, SnapshotMetadata, SourceMetadata, TransactionMetadata,
689 EVENT_ENVELOPE_VERSION,
690 };
691
692 fn valid_event() -> Event {
693 Event {
694 before: None,
695 after: Some(json!({"id": 1, "name": "alice"})),
696 op: Operation::Insert,
697 source: SourceMetadata {
698 source_name: "postgres".into(),
699 offset: "0/16B6A70".into(),
700 timestamp: 1,
701 },
702 ts: 1,
703 schema: Some("public".into()),
704 table: "users".into(),
705 primary_key: Some(vec!["id".into()]),
706 snapshot: Some(SnapshotMetadata {
707 snapshot_id: "snap-1".into(),
708 chunk_index: 0,
709 is_last_chunk: false,
710 }),
711 transaction: Some(TransactionMetadata {
712 tx_id: 42,
713 total_events: Some(2),
714 event_index: 0,
715 }),
716 envelope_version: EVENT_ENVELOPE_VERSION,
717 before_is_key_only: false,
718 }
719 }
720
721 #[test]
722 fn round_trip_json_preserves_event() {
723 let event = valid_event();
724 let encoded = event.to_json().unwrap();
725 let decoded = Event::from_json(&encoded).unwrap();
726 assert_eq!(event, decoded);
727 }
728
729 #[test]
730 fn valid_event_passes_validation() {
731 assert!(valid_event().validate().is_ok());
732 }
733
734 #[test]
735 fn invalid_insert_reports_multiple_errors() {
736 let mut event = valid_event();
737 event.before = Some(json!({"id": 1}));
738 event.after = None;
739 event.table.clear();
740 event.ts = 0;
741 event.envelope_version = 99;
742
743 let errors = event.validate().unwrap_err();
744 assert!(errors.iter().any(|error| error.field == "before"));
745 assert!(errors.iter().any(|error| error.field == "after"));
746 assert!(errors.iter().any(|error| error.field == "table"));
747 assert!(errors.iter().any(|error| error.field == "ts"));
748 assert!(errors.iter().any(|error| error.field == "envelope_version"));
749 }
750
751 #[test]
752 fn invalid_json_returns_error_not_panic() {
753 let error = Event::from_json("{").unwrap_err();
754 assert!(matches!(error, crate::core::Error::SerializationError(_)));
755 }
756
757 #[test]
758 fn large_payload_round_trip_is_supported() {
759 let mut event = valid_event();
760 event.after = Some(json!({"blob": "x".repeat(1024 * 1024)}));
761 let encoded = event.to_json().unwrap();
762 let decoded = Event::from_json(&encoded).unwrap();
763 assert_eq!(event, decoded);
764 }
765
766 #[test]
767 fn operation_display_uses_stable_lowercase_labels() {
768 assert_eq!(Operation::Insert.to_string(), "insert");
769 assert_eq!(Operation::Update.to_string(), "update");
770 assert_eq!(Operation::Delete.to_string(), "delete");
771 assert_eq!(Operation::Read.to_string(), "read");
772 assert_eq!(Operation::SchemaChange.to_string(), "schema_change");
773 }
774
775 #[test]
776 fn update_delete_read_validation_paths_enforce_contract() {
777 let mut update = valid_event();
778 update.op = Operation::Update;
779 update.before = None;
780 let update_errors = update.validate().unwrap_err();
781 assert!(update_errors.iter().any(|error| error.field == "before"));
782
783 let mut delete = valid_event();
784 delete.op = Operation::Delete;
785 delete.before = None;
786 delete.after = Some(json!({"id": 1}));
787 let delete_errors = delete.validate().unwrap_err();
788 assert!(delete_errors.iter().any(|error| error.field == "before"));
789 assert!(delete_errors.iter().any(|error| error.field == "after"));
790
791 let mut read = valid_event();
792 read.op = Operation::Read;
793 read.after = None;
794 let read_errors = read.validate().unwrap_err();
795 assert!(read_errors.iter().any(|error| error.field == "after"));
796
797 let mut schema_change = valid_event();
798 schema_change.op = Operation::SchemaChange;
799 schema_change.after = None;
800 let schema_change_errors = schema_change.validate().unwrap_err();
801 assert!(schema_change_errors
802 .iter()
803 .any(|error| error.field == "after"));
804 }
805
806 #[test]
807 fn transaction_validation_rejects_invalid_bounds() {
808 let mut event = valid_event();
809 event.transaction = Some(TransactionMetadata {
810 tx_id: 9,
811 total_events: Some(0),
812 event_index: 0,
813 });
814 let errors = event.validate().unwrap_err();
815 assert!(errors
816 .iter()
817 .any(|error| error.field == "transaction.total_events"));
818
819 event.transaction = Some(TransactionMetadata {
820 tx_id: 9,
821 total_events: Some(2),
822 event_index: 2,
823 });
824 let errors = event.validate().unwrap_err();
825 assert!(errors
826 .iter()
827 .any(|error| error.field == "transaction.event_index"));
828 }
829
830 #[test]
831 fn before_is_key_only_rejected_on_non_update_delete_events() {
832 for op in [
833 Operation::Insert,
834 Operation::Read,
835 Operation::SchemaChange,
836 Operation::Truncate,
837 ] {
838 let mut event = valid_event();
839 event.op = op;
840 event.before_is_key_only = true;
841 match op {
843 Operation::Insert | Operation::Read | Operation::SchemaChange => {
844 event.before = None;
845 event.after = Some(json!({"id": 1}));
846 }
847 Operation::Truncate => {
848 event.before = None;
849 event.after = None;
850 }
851 _ => {}
852 }
853 let errors = event.validate().unwrap_err();
854 assert!(
855 errors.iter().any(|e| e.field == "before_is_key_only"),
856 "expected before_is_key_only error for op={op:?}"
857 );
858 }
859 }
860
861 #[test]
862 fn before_is_key_only_accepted_on_update_and_delete_events() {
863 let mut update = valid_event();
865 update.op = Operation::Update;
866 update.before = Some(json!({"id": 1}));
867 update.after = Some(json!({"id": 1, "name": "bob"}));
868 update.before_is_key_only = true;
869 assert!(
870 update.validate().is_ok(),
871 "UPDATE should allow before_is_key_only=true"
872 );
873
874 let mut delete = valid_event();
876 delete.op = Operation::Delete;
877 delete.before = Some(json!({"id": 1}));
878 delete.after = None;
879 delete.before_is_key_only = true;
880 assert!(
881 delete.validate().is_ok(),
882 "DELETE should allow before_is_key_only=true"
883 );
884 }
885
886 #[test]
887 fn before_is_key_only_true_requires_before_to_be_some() {
888 for op in [Operation::Update, Operation::Delete] {
890 let mut event = valid_event();
891 event.op = op;
892 event.before = None; event.before_is_key_only = true;
894 if op == Operation::Update {
895 event.after = Some(json!({"id": 1}));
896 }
897 let errors = event.validate().unwrap_err();
898 assert!(
899 errors.iter().any(|e| e.field == "before_is_key_only"),
900 "expected before_is_key_only error when before=None for op={op:?}; got: {errors}"
901 );
902 }
903 }
904
905 #[test]
906 fn event_default_has_correct_envelope_version() {
907 let event = Event::default();
908 assert_eq!(event.envelope_version, EVENT_ENVELOPE_VERSION);
909 assert!(!event.before_is_key_only);
910 assert_eq!(event.op, Operation::Insert);
911 }
912
913 #[test]
914 fn validate_or_error_maps_to_validation_error_type() {
915 let mut event = valid_event();
916 event.source.source_name = String::new();
917 let error = event.validate_or_error().unwrap_err();
918 match error {
919 Error::ValidationError(messages) => {
920 assert!(messages
921 .iter()
922 .any(|message| message.contains("source.source_name")));
923 }
924 other => panic!("expected ValidationError, got {other}"),
925 }
926 }
927
928 #[test]
929 fn has_full_before_distinguishes_key_only_from_full() {
930 let base = Event {
931 before: Some(json!({"id": 1, "name": "alice"})),
932 after: Some(json!({"id": 1, "name": "bob"})),
933 op: Operation::Update,
934 before_is_key_only: false,
935 ..Event::default()
936 };
937 assert!(base.has_full_before(), "full before should return true");
938
939 let key_only = Event {
940 before_is_key_only: true,
941 ..base.clone()
942 };
943 assert!(
944 !key_only.has_full_before(),
945 "key-only before should return false"
946 );
947
948 let no_before = Event {
949 before: None,
950 before_is_key_only: false,
951 ..base
952 };
953 assert!(
954 !no_before.has_full_before(),
955 "absent before should return false"
956 );
957 }
958
959 #[test]
960 fn primary_key_values_extracts_from_after_on_insert() {
961 let event = Event {
962 after: Some(json!({"id": 42, "name": "alice", "age": 30})),
963 op: Operation::Insert,
964 primary_key: Some(vec!["id".into()]),
965 ..Event::default()
966 };
967 let kv = event.primary_key_values().unwrap();
968 assert_eq!(kv["id"], json!(42));
969 assert!(kv.get("name").is_none());
970 }
971
972 #[test]
973 fn primary_key_values_extracts_from_before_on_delete() {
974 let event = Event {
975 before: Some(json!({"id": 7, "name": "bob"})),
976 after: None,
977 op: Operation::Delete,
978 primary_key: Some(vec!["id".into()]),
979 ..Event::default()
980 };
981 let kv = event.primary_key_values().unwrap();
982 assert_eq!(kv["id"], json!(7));
983 }
984
985 #[test]
986 fn primary_key_values_returns_none_when_no_pk_defined() {
987 let event = Event {
988 after: Some(json!({"id": 1})),
989 op: Operation::Insert,
990 primary_key: None,
991 ..Event::default()
992 };
993 assert!(event.primary_key_values().is_none());
994 }
995
996 #[test]
997 fn primary_key_values_returns_none_when_pk_fields_absent_from_row() {
998 let event = Event {
999 after: Some(json!({"name": "only_name"})),
1000 op: Operation::Insert,
1001 primary_key: Some(vec!["id".into()]),
1002 ..Event::default()
1003 };
1004 assert!(event.primary_key_values().is_none());
1006 }
1007
1008 #[test]
1009 fn primary_key_values_handles_composite_keys() {
1010 let event = Event {
1011 after: Some(json!({"tenant_id": 1, "user_id": 99, "name": "charlie"})),
1012 op: Operation::Insert,
1013 primary_key: Some(vec!["tenant_id".into(), "user_id".into()]),
1014 ..Event::default()
1015 };
1016 let kv = event.primary_key_values().unwrap();
1017 assert_eq!(kv["tenant_id"], json!(1));
1018 assert_eq!(kv["user_id"], json!(99));
1019 assert!(kv.get("name").is_none());
1020 }
1021
1022 #[test]
1023 fn operation_from_str_parses_all_variants() {
1024 use std::str::FromStr;
1025 assert_eq!(Operation::from_str("insert").unwrap(), Operation::Insert);
1026 assert_eq!(Operation::from_str("update").unwrap(), Operation::Update);
1027 assert_eq!(Operation::from_str("delete").unwrap(), Operation::Delete);
1028 assert_eq!(Operation::from_str("read").unwrap(), Operation::Read);
1029 assert_eq!(
1030 Operation::from_str("schema_change").unwrap(),
1031 Operation::SchemaChange
1032 );
1033 assert_eq!(
1034 Operation::from_str("truncate").unwrap(),
1035 Operation::Truncate
1036 );
1037 }
1038
1039 #[test]
1040 fn operation_from_str_rejects_unknown_and_wrong_case() {
1041 use std::str::FromStr;
1042 assert!(Operation::from_str("INSERT").is_err()); assert!(Operation::from_str("unknown").is_err());
1044 assert!(Operation::from_str("").is_err());
1045 }
1046
1047 #[test]
1048 fn operation_round_trips_through_str() {
1049 use std::str::FromStr;
1050 for op in [
1051 Operation::Insert,
1052 Operation::Update,
1053 Operation::Delete,
1054 Operation::Read,
1055 Operation::SchemaChange,
1056 Operation::Truncate,
1057 ] {
1058 assert_eq!(
1059 Operation::from_str(op.to_str()).unwrap(),
1060 op,
1061 "round-trip failed for {op}"
1062 );
1063 }
1064 }
1065
1066 #[test]
1067 fn validation_errors_display_joins_all_failures() {
1068 let event = Event::default(); let errs = event.validate().unwrap_err();
1070 assert!(errs.len() >= 3); let display = errs.to_string();
1072 assert!(display.contains("ts"));
1073 assert!(display.contains("table"));
1074 }
1075
1076 #[test]
1077 fn validation_errors_iterates_individually() {
1078 let event = Event::default();
1079 let errs = event.validate().unwrap_err();
1080 let fields: Vec<&str> = errs.iter().map(|e| e.field.as_str()).collect();
1081 assert!(fields.contains(&"ts"));
1082 assert!(fields.contains(&"table"));
1083 }
1084
1085 #[test]
1086 fn validation_errors_into_iter_consuming_works() {
1087 let event = Event::default();
1088 let errs = event.validate().unwrap_err();
1089 let count = errs.len();
1090 let collected: Vec<_> = errs.into_iter().collect();
1091 assert_eq!(collected.len(), count);
1092 }
1093
1094 #[test]
1095 fn validation_error_implements_std_error() {
1096 use std::error::Error as StdError;
1097 let ve = super::ValidationError {
1098 field: "ts".into(),
1099 message: "must be non-zero".into(),
1100 };
1101 let _: &dyn StdError = &ve;
1103 }
1104
1105 #[test]
1106 fn qualified_table_name_includes_schema_when_present() {
1107 let mut event = Event {
1108 table: "orders".into(),
1109 ..Event::default()
1110 };
1111 assert_eq!(event.qualified_table_name(), "orders");
1112 event.schema = Some("public".into());
1113 assert_eq!(event.qualified_table_name(), "public.orders");
1114 }
1115
1116 #[test]
1117 fn qualified_table_name_ignores_empty_schema() {
1118 let event = Event {
1119 table: "users".into(),
1120 schema: Some(String::new()),
1121 ..Event::default()
1122 };
1123 assert_eq!(event.qualified_table_name(), "users");
1124 }
1125}