pg2any_lib 0.9.0

PostgreSQL to Any database library with Change Data Capture (CDC) and logical replication support
Documentation
/// Error handling tests for MySQL destination WHERE clause generation
/// These tests verify the specific error conditions and messages that should be
/// generated by the build_where_clause method in MySQL destination
use pg2any_lib::types::{ChangeEvent, EventType, ReplicaIdentity};
use pg_walstream::ColumnValue;
use pg_walstream::Lsn;
use pg_walstream::RowData;
use std::sync::Arc;

/// Tests for error conditions in WHERE clause generation
/// These mirror the error handling logic in the MySQL destination
#[cfg(test)]
mod where_clause_error_tests {
    use super::*;

    #[test]
    fn test_error_message_content_expectations() {
        // Test the event structures that should generate specific error messages
        // in the MySQL destination's build_where_clause method

        // Test Case 1: Missing key column in data
        let incomplete_data = RowData::from_pairs(vec![("name", ColumnValue::text("test"))]);
        // Missing "id" which is the key column

        let event_missing_key = ChangeEvent::update(
            "public",
            "users",
            16384,
            Some(incomplete_data),
            RowData::from_pairs(vec![]),
            ReplicaIdentity::Default,
            vec![Arc::from("id")],
            Lsn::from(300),
        );

        // Verify this event structure would trigger the missing key column error
        let key_columns = event_missing_key.get_key_columns().unwrap();
        assert_eq!(key_columns.len(), 1);
        assert_eq!(key_columns[0].as_ref(), "id");

        if let EventType::Update { old_data, .. } = &event_missing_key.event_type {
            let data_source = old_data.as_ref().unwrap();
            assert!(data_source.get("id").is_none());

            // Expected error: "Key column 'id' not found in data for Update on public.users"
        }
    }

    #[test]
    fn test_replica_identity_nothing_error_conditions() {
        // Test error conditions specific to NOTHING replica identity

        let new_data = RowData::from_pairs(vec![("data", ColumnValue::text("some data"))]);

        // UPDATE with NOTHING replica identity
        let update_event = ChangeEvent::update(
            "public",
            "no_replica_table",
            16384,
            None, // No old_data for NOTHING replica identity
            new_data.clone(),
            ReplicaIdentity::Nothing,
            vec![], // No key columns
            Lsn::from(300),
        );

        assert_eq!(
            update_event.get_replica_identity().unwrap(),
            &ReplicaIdentity::Nothing
        );
        assert!(update_event.get_key_columns().unwrap().is_empty());
    }

    #[test]
    fn test_operation_name_in_error_messages() {
        // Test that error messages correctly identify the operation type

        // UPDATE operation error
        let update_event = ChangeEvent::update(
            "schema1",
            "table1",
            16384,
            None,
            RowData::from_pairs(vec![]),
            ReplicaIdentity::Default,
            vec![Arc::from("id")],
            Lsn::from(300),
        );

        // Verify the events are structured correctly for error generation
        if let EventType::Update { schema, table, .. } = &update_event.event_type {
            assert_eq!(schema.as_ref(), "schema1");
            assert_eq!(table.as_ref(), "table1");
        }
    }
}

/// Test validation helpers for error message formatting
#[cfg(test)]
mod error_message_validation {

    /// Helper function to validate error message components
    /// This simulates what the MySQL destination should produce
    fn validate_error_components(
        operation: &str,
        schema: &str,
        table: &str,
        error_type: &str,
        key_column: Option<&str>,
    ) -> String {
        match error_type {
            "missing_key_column" => {
                format!(
                    "Key column '{}' not found in data for {} on {}.{}",
                    key_column.unwrap(),
                    operation,
                    schema,
                    table
                )
            }
            "no_data_available" => {
                format!(
                    "No data available to build WHERE clause for {} on {}.{}",
                    operation, schema, table
                )
            }
            "no_key_columns" => {
                format!(
                    "No key columns available for {} operation on {}.{}. Check table's replica identity setting.",
                    operation,
                    schema,
                    table
                )
            }
            _ => panic!("Unknown error type: {}", error_type),
        }
    }

    #[test]
    fn test_error_message_formatting() {
        // Test expected error message formats

        assert_eq!(
            validate_error_components(
                "Update",
                "public",
                "users",
                "missing_key_column",
                Some("id")
            ),
            "Key column 'id' not found in data for Update on public.users"
        );

        assert_eq!(
            validate_error_components("Delete", "app", "orders", "no_data_available", None),
            "No data available to build WHERE clause for Delete on app.orders"
        );

        assert_eq!(
            validate_error_components("Update", "inventory", "products", "no_key_columns", None),
            "No key columns available for Update operation on inventory.products. Check table's replica identity setting."
        );

        // Test with composite key column name
        assert_eq!(
            validate_error_components(
                "Update",
                "public",
                "user_sessions",
                "missing_key_column",
                Some("session_token")
            ),
            "Key column 'session_token' not found in data for Update on public.user_sessions"
        );
    }
}