use pg2any_lib::{format_lsn, parse_lsn, ColumnData, ColumnInfo, TupleData};
use pg_walstream::buffer::{BufferReader, BufferWriter};
#[test]
fn test_buffer_operations() {
let mut writer = BufferWriter::new();
writer.write_u8(0x42).unwrap();
writer.write_u16(0x1234).unwrap();
writer.write_u32(0x12345678).unwrap();
writer.write_u64(0x123456789ABCDEF0).unwrap();
let buffer = writer.freeze();
let mut reader = BufferReader::new(&buffer);
assert_eq!(reader.read_u8().unwrap(), 0x42);
assert_eq!(reader.read_u16().unwrap(), 0x1234);
assert_eq!(reader.read_u32().unwrap(), 0x12345678);
assert_eq!(reader.read_u64().unwrap(), 0x123456789ABCDEF0);
}
#[test]
fn test_lsn_operations() {
let lsn_str = "1/23456789";
let lsn = parse_lsn(lsn_str).unwrap();
assert_eq!(lsn, 0x100000000 + 0x23456789);
let formatted = format_lsn(lsn);
assert_eq!(formatted, "1/23456789");
}
#[test]
fn test_replication_data_structures() {
let null_col = ColumnData::null();
assert!(null_col.is_null());
assert_eq!(null_col.as_string(), None);
let text_col = ColumnData::text(b"Hello World".to_vec());
assert!(!text_col.is_null());
assert_eq!(text_col.as_string(), Some("Hello World".to_string()));
let unchanged_col = ColumnData::unchanged();
assert!(unchanged_col.is_unchanged());
let col_info = ColumnInfo::new(1, "id".to_string(), 23, -1);
assert!(col_info.is_key());
assert_eq!(col_info.name.as_ref(), "id");
let tuple = TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"John Doe".to_vec()),
ColumnData::null(),
]);
assert_eq!(tuple.column_count(), 3);
assert_eq!(
tuple.get_column(0).unwrap().as_string(),
Some("1".to_string())
);
assert!(tuple.get_column(2).unwrap().is_null());
}
#[test]
fn test_message_parsing() {
use pg2any_lib::LogicalReplicationParser;
use pg_walstream::protocol::message_types;
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut begin_msg = vec![message_types::BEGIN];
begin_msg.extend_from_slice(&0x123456789ABCDEF0u64.to_be_bytes()); begin_msg.extend_from_slice(&0x1234567890ABCDEFi64.to_be_bytes()); begin_msg.extend_from_slice(&0x12345678u32.to_be_bytes());
let result = parser.parse_wal_message(&begin_msg);
assert!(result.is_ok());
let streaming_msg = result.unwrap();
match streaming_msg.message {
pg2any_lib::LogicalReplicationMessage::Begin {
final_lsn,
timestamp,
xid,
} => {
assert_eq!(final_lsn, 0x123456789ABCDEF0);
assert_eq!(timestamp, 0x1234567890ABCDEF);
assert_eq!(xid, 0x12345678);
}
_ => panic!("Expected Begin message"),
}
}
#[test]
fn test_config_integration() {
use pg2any_lib::{Config, DestinationType};
use std::time::Duration;
let config = Config::builder()
.source_connection_string("postgresql://test:test@localhost:5432/test".to_string())
.destination_type(DestinationType::MySQL)
.destination_connection_string("mysql://test:test@localhost:3306/test".to_string())
.replication_slot_name("test_slot".to_string())
.publication_name("test_pub".to_string())
.protocol_version(2)
.streaming(true)
.connection_timeout(Duration::from_secs(30))
.build()
.unwrap();
assert_eq!(config.protocol_version, 2);
assert!(config.streaming);
assert_eq!(config.replication_slot_name, "test_slot");
}