1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/// Error handling tests for MySQL destination WHERE clause generation
/// These tests verify the specific error conditions and messages that should be
/// generated by the build_where_clause method in MySQL destination
use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity};
use pg_walstream::ColumnValue;
use pg_walstream::Lsn;
use pg_walstream::RowData;
use std::sync::Arc;
/// Tests for error conditions in WHERE clause generation
/// These mirror the error handling logic in the MySQL destination
#[cfg(test)]
mod where_clause_error_tests {
use super::*;
#[test]
fn test_error_message_content_expectations() {
// Test the event structures that should generate specific error messages
// in the MySQL destination's build_where_clause method
// Test Case 1: Missing key column in data
let incomplete_data = RowData::from_pairs(vec![("name", ColumnValue::text("test"))]);
// Missing "id" which is the key column
let event_missing_key = ChangeEvent::update(
"public",
"users",
16384,
Some(incomplete_data),
RowData::from_pairs(vec![]),
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::from(300),
);
// Verify this event structure would trigger the missing key column error
let key_columns = event_missing_key.get_key_columns().unwrap();
assert_eq!(key_columns.len(), 1);
assert_eq!(key_columns[0].as_ref(), "id");
if let EventType::Update { old_data, .. } = &event_missing_key.event_type {
let data_source = old_data.as_ref().unwrap();
assert!(data_source.get("id").is_none());
// Expected error: "Key column 'id' not found in data for Update on public.users"
}
}
#[test]
fn test_replica_identity_nothing_error_conditions() {
// Test error conditions specific to NOTHING replica identity
let new_data = RowData::from_pairs(vec![("data", ColumnValue::text("some data"))]);
// UPDATE with NOTHING replica identity
let update_event = ChangeEvent::update(
"public",
"no_replica_table",
16384,
None, // No old_data for NOTHING replica identity
new_data.clone(),
ReplicaIdentity::Nothing,
vec![], // No key columns
Lsn::from(300),
);
assert_eq!(
update_event.get_replica_identity().unwrap(),
&ReplicaIdentity::Nothing
);
assert!(update_event.get_key_columns().unwrap().is_empty());
}
#[test]
fn test_operation_name_in_error_messages() {
// Test that error messages correctly identify the operation type
// UPDATE operation error
let update_event = ChangeEvent::update(
"schema1",
"table1",
16384,
None,
RowData::from_pairs(vec![]),
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::from(300),
);
// Verify the events are structured correctly for error generation
if let EventType::Update { schema, table, .. } = &update_event.event_type {
assert_eq!(schema.as_ref(), "schema1");
assert_eq!(table.as_ref(), "table1");
}
}
}
/// Test validation helpers for error message formatting
#[cfg(test)]
mod error_message_validation {
/// Helper function to validate error message components
/// This simulates what the MySQL destination should produce
fn validate_error_components(
operation: &str,
schema: &str,
table: &str,
error_type: &str,
key_column: Option<&str>,
) -> String {
match error_type {
"missing_key_column" => {
format!(
"Key column '{}' not found in data for {} on {}.{}",
key_column.unwrap(),
operation,
schema,
table
)
}
"no_data_available" => {
format!(
"No data available to build WHERE clause for {} on {}.{}",
operation, schema, table
)
}
"no_key_columns" => {
format!(
"No key columns available for {} operation on {}.{}. Check table's replica identity setting.",
operation,
schema,
table
)
}
_ => panic!("Unknown error type: {}", error_type),
}
}
#[test]
fn test_error_message_formatting() {
// Test expected error message formats
assert_eq!(
validate_error_components(
"Update",
"public",
"users",
"missing_key_column",
Some("id")
),
"Key column 'id' not found in data for Update on public.users"
);
assert_eq!(
validate_error_components("Delete", "app", "orders", "no_data_available", None),
"No data available to build WHERE clause for Delete on app.orders"
);
assert_eq!(
validate_error_components("Update", "inventory", "products", "no_key_columns", None),
"No key columns available for Update operation on inventory.products. Check table's replica identity setting."
);
// Test with composite key column name
assert_eq!(
validate_error_components(
"Update",
"public",
"user_sessions",
"missing_key_column",
Some("session_token")
),
"Key column 'session_token' not found in data for Update on public.user_sessions"
);
}
}