use crate::destinations::{DestinationHandler, PreCommitHook};
use crate::error::{CdcError, Result};
use crate::lsn_tracker::{LsnTracker, SharedLsnFeedback};
use crate::monitoring::{MetricsCollector, MetricsCollectorTrait};
use crate::storage::{CompressionIndex, SqlStreamParser, StorageFactory, TransactionStorage};
use crate::types::{ChangeEvent, DestinationType, EventType, Lsn, ReplicaIdentity, RowData};
use async_compression::tokio::bufread::GzipDecoder;
use chrono::{DateTime, Utc};
use pg_walstream::ColumnValue;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::fs::{self, File};
use tokio::io::{
AsyncBufReadExt, AsyncRead, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom,
};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
const MB: usize = 1024 * 1024;
const RECEIVED_TX_DIR: &str = "sql_received_tx";
const PENDING_TX_DIR: &str = "sql_pending_tx";
const DATA_TX_DIR: &str = "sql_data_tx";
const DEFAULT_BUFFER_SIZE: usize = 8 * MB;
struct BufferedEventWriter {
file_path: PathBuf,
buffer: String,
max_buffer_size: usize,
writer: Option<BufWriter<File>>,
}
impl BufferedEventWriter {
fn new(file_path: PathBuf, max_buffer_size: usize) -> Self {
Self {
file_path,
buffer: String::with_capacity(max_buffer_size),
max_buffer_size,
writer: None,
}
}
fn append(&mut self, sql: &str) -> bool {
self.buffer.reserve(sql.len() + 1);
self.buffer.push_str(sql);
self.buffer.push('\n');
self.buffer.len() >= self.max_buffer_size
}
async fn flush(&mut self) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
if self.writer.is_none() {
let file = fs::OpenOptions::new()
.append(true)
.open(&self.file_path)
.await?;
self.writer = Some(BufWriter::with_capacity(64 * 1024, file));
}
let writer = self.writer.as_mut().unwrap();
writer.write_all(self.buffer.as_bytes()).await?;
writer.flush().await?;
debug!(
"Flushed {} bytes to {:?}",
self.buffer.len(),
self.file_path
);
self.buffer.clear();
Ok(())
}
fn buffer_size(&self) -> usize {
self.buffer.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionSegment {
pub path: PathBuf,
#[serde(default)]
pub statement_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionFileMetadata {
pub transaction_id: u32,
pub commit_timestamp: DateTime<Utc>,
pub commit_lsn: Option<Lsn>,
pub destination_type: DestinationType,
#[serde(default)]
pub segments: Vec<TransactionSegment>,
#[serde(default)]
pub current_segment_index: usize,
#[serde(default)]
pub last_executed_command_index: Option<usize>,
#[serde(default)]
pub last_update_timestamp: Option<DateTime<Utc>>,
#[serde(default = "default_transaction_type")]
pub transaction_type: String,
}
fn default_transaction_type() -> String {
"normal".to_string()
}
#[derive(Debug, Clone)]
pub struct PendingTransactionFile {
pub file_path: PathBuf,
pub metadata: TransactionFileMetadata,
}
impl Eq for PendingTransactionFile {}
impl PartialEq for PendingTransactionFile {
fn eq(&self, other: &Self) -> bool {
self.metadata.commit_lsn == other.metadata.commit_lsn
&& self.metadata.transaction_id == other.metadata.transaction_id
}
}
impl Ord for PendingTransactionFile {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self.metadata.commit_lsn, other.metadata.commit_lsn) {
(Some(a), Some(b)) => a.cmp(&b).then_with(|| {
self.metadata
.transaction_id
.cmp(&other.metadata.transaction_id)
}),
(Some(_), None) => std::cmp::Ordering::Less, (None, Some(_)) => std::cmp::Ordering::Greater, (None, None) => self
.metadata
.transaction_id
.cmp(&other.metadata.transaction_id),
}
}
}
impl PartialOrd for PendingTransactionFile {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
struct ActiveTransactionState {
segments: Vec<PathBuf>,
segment_statement_counts: Vec<usize>,
current_segment_index: usize,
current_segment_size_bytes: usize,
writer: BufferedEventWriter,
}
struct StatementProcessingState<'a> {
batch: &'a mut Vec<String>,
current_command_index: &'a mut usize,
processed_count: &'a mut usize,
batch_count: &'a mut usize,
}
#[derive(Debug, Clone)]
struct PendingProgress {
last_executed_command_index: usize,
last_update_timestamp: DateTime<Utc>,
}
pub struct TransactionManager {
base_path: PathBuf,
destination_type: DestinationType,
schema_mappings: HashMap<String, String>,
active_transactions: Arc<Mutex<HashMap<u32, ActiveTransactionState>>>,
staged_pending_progress: Arc<Mutex<HashMap<PathBuf, PendingProgress>>>,
buffer_size: usize,
segment_size_bytes: usize,
storage: Arc<dyn TransactionStorage>,
}
impl TransactionManager {
pub async fn new(
base_path: impl AsRef<Path>,
destination_type: DestinationType,
segment_size_bytes: usize,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
let received_tx_dir = base_path.join(RECEIVED_TX_DIR);
let pending_tx_dir = base_path.join(PENDING_TX_DIR);
let data_tx_dir = base_path.join(DATA_TX_DIR);
fs::create_dir_all(&received_tx_dir).await?;
fs::create_dir_all(&pending_tx_dir).await?;
fs::create_dir_all(&data_tx_dir).await?;
let storage = StorageFactory::from_env();
info!(
"Transaction file manager initialized at {:?} for {:?}, segment_size_bytes={:?}",
base_path, destination_type, segment_size_bytes
);
Ok(Self {
base_path,
destination_type,
schema_mappings: HashMap::new(),
active_transactions: Arc::new(Mutex::new(HashMap::new())),
staged_pending_progress: Arc::new(Mutex::new(HashMap::new())),
buffer_size: DEFAULT_BUFFER_SIZE,
segment_size_bytes,
storage,
})
}
pub fn set_schema_mappings(&mut self, mappings: HashMap<String, String>) {
self.schema_mappings = mappings;
}
pub async fn flush_all_buffers(&self) -> Result<()> {
let mut transactions = self.active_transactions.lock().await;
let mut flush_count = 0;
let mut total_bytes = 0;
for (_, tx_state) in transactions.iter_mut() {
let buffer_size = tx_state.writer.buffer_size();
if buffer_size > 0 {
tx_state.writer.flush().await?;
tx_state.current_segment_size_bytes += buffer_size;
flush_count += 1;
total_bytes += buffer_size;
}
}
if flush_count > 0 {
info!(
"Flushed {} buffer(s) totaling {} bytes during shutdown",
flush_count, total_bytes
);
}
Ok(())
}
fn get_received_tx_path(&self, tx_id: u32) -> PathBuf {
let filename = format!("{}.meta", tx_id);
self.base_path.join(RECEIVED_TX_DIR).join(filename)
}
fn get_pending_tx_path(&self, tx_id: u32) -> PathBuf {
let filename = format!("{}.meta", tx_id);
self.base_path.join(PENDING_TX_DIR).join(filename)
}
fn get_segment_data_file_path(&self, tx_id: u32, segment_index: usize) -> PathBuf {
let filename = format!("{}_{:06}.sql", tx_id, segment_index + 1);
self.base_path.join(DATA_TX_DIR).join(filename)
}
fn get_final_segment_info(
&self,
tx_id: u32,
metadata: &TransactionFileMetadata,
tx_state: Option<ActiveTransactionState>,
) -> (Vec<PathBuf>, Vec<usize>) {
if let Some(mut state) = tx_state.filter(|state| !state.segments.is_empty()) {
(
std::mem::take(&mut state.segments),
std::mem::take(&mut state.segment_statement_counts),
)
} else if !metadata.segments.is_empty() {
let paths = metadata
.segments
.iter()
.map(|seg| seg.path.clone())
.collect::<Vec<_>>();
let counts = metadata
.segments
.iter()
.map(|seg| seg.statement_count)
.collect::<Vec<_>>();
(paths, counts)
} else {
(vec![self.get_segment_data_file_path(tx_id, 0)], vec![0])
}
}
pub async fn begin_transaction(
&self,
tx_id: u32,
timestamp: DateTime<Utc>,
transaction_type: &str,
) -> Result<PathBuf> {
let data_file_path = self.get_segment_data_file_path(tx_id, 0);
let metadata_path = self.get_received_tx_path(tx_id);
File::create(&data_file_path).await?;
debug!("Created data file: {:?}", data_file_path);
let metadata = TransactionFileMetadata {
transaction_id: tx_id,
commit_timestamp: timestamp,
commit_lsn: None,
destination_type: self.destination_type.clone(),
segments: vec![TransactionSegment {
path: data_file_path.clone(),
statement_count: 0,
}],
current_segment_index: 0,
last_executed_command_index: None,
last_update_timestamp: None,
transaction_type: transaction_type.to_string(),
};
let metadata_json = serde_json::to_string_pretty(&metadata)?;
let mut metadata_file = File::create(&metadata_path).await?;
metadata_file.write_all(metadata_json.as_bytes()).await?;
metadata_file.flush().await?;
let mut transactions = self.active_transactions.lock().await;
transactions.insert(
tx_id,
ActiveTransactionState {
segments: vec![data_file_path.clone()],
segment_statement_counts: vec![0],
current_segment_index: 0,
current_segment_size_bytes: 0,
writer: BufferedEventWriter::new(data_file_path.clone(), self.buffer_size),
},
);
info!(
"Started transaction {}: data={:?}, metadata={:?}",
tx_id, data_file_path, metadata_path
);
Ok(data_file_path)
}
async fn update_received_metadata_segments(
&self,
tx_id: u32,
segments: &[PathBuf],
current_segment_index: usize,
) -> Result<()> {
let received_metadata_path = self.get_received_tx_path(tx_id);
let metadata_content = fs::read_to_string(&received_metadata_path)
.await
.map_err(|e| {
CdcError::generic(format!(
"Failed to read metadata from {received_metadata_path:?}: {e}"
))
})?;
let mut metadata: TransactionFileMetadata = serde_json::from_str(&metadata_content)
.map_err(|e| CdcError::generic(format!("Failed to parse metadata: {e}")))?;
metadata.segments = segments
.iter()
.map(|path| TransactionSegment {
path: path.clone(),
statement_count: 0,
})
.collect();
metadata.current_segment_index = current_segment_index;
let metadata_json = serde_json::to_string_pretty(&metadata)
.map_err(|e| CdcError::generic(format!("Failed to serialize metadata: {e}")))?;
let mut metadata_file = File::create(&received_metadata_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to create received metadata {received_metadata_path:?}: {e}"
))
})?;
metadata_file.write_all(metadata_json.as_bytes()).await?;
metadata_file.flush().await?;
Ok(())
}
pub async fn append_event(&self, tx_id: u32, event: &ChangeEvent) -> Result<()> {
let sql = self.generate_sql_for_event(event)?;
if sql.is_empty() {
return Ok(());
}
let mut transactions = self.active_transactions.lock().await;
let tx_state = transactions.get_mut(&tx_id).ok_or_else(|| {
CdcError::generic(format!(
"Active transaction {} not found for append_event",
tx_id
))
})?;
let sql_bytes = sql.len() + 1; let estimated_size =
tx_state.current_segment_size_bytes + tx_state.writer.buffer_size() + sql_bytes;
let should_rotate = estimated_size > self.segment_size_bytes
&& (tx_state.current_segment_size_bytes > 0 || tx_state.writer.buffer_size() > 0);
if should_rotate {
let buffered_bytes = tx_state.writer.buffer_size();
tx_state.writer.flush().await?;
tx_state.current_segment_size_bytes += buffered_bytes;
let next_segment_index = tx_state.current_segment_index + 1;
let next_segment_path = self.get_segment_data_file_path(tx_id, next_segment_index);
File::create(&next_segment_path).await?;
tx_state.segments.push(next_segment_path.clone());
tx_state.segment_statement_counts.push(0);
tx_state.current_segment_index = next_segment_index;
tx_state.current_segment_size_bytes = 0;
tx_state.writer = BufferedEventWriter::new(next_segment_path.clone(), self.buffer_size);
self.update_received_metadata_segments(
tx_id,
&tx_state.segments,
tx_state.current_segment_index,
)
.await?;
info!(
"Rotated transaction {} to new segment {:?} ({} segments total)",
tx_id,
next_segment_path,
tx_state.segments.len()
);
}
let should_flush = tx_state.writer.append(&sql);
if let Some(count) = tx_state
.segment_statement_counts
.get_mut(tx_state.current_segment_index)
{
*count += 1;
}
if should_flush {
let buffered_bytes = tx_state.writer.buffer_size();
tx_state.writer.flush().await?;
tx_state.current_segment_size_bytes += buffered_bytes;
}
Ok(())
}
async fn read_received_metadata(
&self,
received_metadata_path: &Path,
) -> Result<TransactionFileMetadata> {
let metadata_content = fs::read_to_string(received_metadata_path)
.await
.map_err(|e| {
CdcError::generic(format!(
"Failed to read metadata from {received_metadata_path:?}: {e}"
))
})?;
serde_json::from_str(&metadata_content)
.map_err(|e| CdcError::generic(format!("Failed to parse metadata: {e}")))
}
async fn take_and_flush_active_transaction(
&self,
tx_id: u32,
) -> Result<Option<ActiveTransactionState>> {
let mut tx_state = {
let mut transactions = self.active_transactions.lock().await;
transactions.remove(&tx_id)
};
if let Some(state) = tx_state.as_mut() {
let buffered_bytes = state.writer.buffer_size();
state.writer.flush().await?;
if buffered_bytes > 0 {
debug!(
"Flushed final buffer for transaction {} ({} bytes)",
tx_id, buffered_bytes
);
}
}
Ok(tx_state)
}
async fn build_final_segments(
&self,
segment_paths: &[PathBuf],
segment_counts: &[usize],
) -> Result<Vec<TransactionSegment>> {
let mut final_segments = Vec::new();
for (idx, segment_path) in segment_paths.iter().enumerate() {
let (final_data_path, statement_count) = self
.storage
.write_transaction_from_file(segment_path)
.await?;
let fallback_count = segment_counts.get(idx).copied().unwrap_or(0);
let final_count = if statement_count == 0 {
fallback_count
} else {
statement_count
};
final_segments.push(TransactionSegment {
path: final_data_path,
statement_count: final_count,
});
}
Ok(final_segments)
}
fn apply_commit_metadata(
&self,
metadata: &mut TransactionFileMetadata,
final_segments: Vec<TransactionSegment>,
commit_lsn: Option<Lsn>,
) {
metadata.segments = final_segments;
metadata.current_segment_index = 0;
metadata.last_executed_command_index = None;
metadata.last_update_timestamp = None;
metadata.commit_lsn = commit_lsn;
}
async fn write_pending_metadata_file(
&self,
pending_metadata_path: &Path,
metadata: &TransactionFileMetadata,
) -> Result<()> {
let updated_json = serde_json::to_string_pretty(metadata)
.map_err(|e| CdcError::generic(format!("Failed to serialize metadata: {e}")))?;
let mut pending_file = File::create(pending_metadata_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to create pending metadata {pending_metadata_path:?}: {e}"
))
})?;
pending_file
.write_all(updated_json.as_bytes())
.await
.map_err(|e| CdcError::generic(format!("Failed to write metadata: {e}")))?;
pending_file
.flush()
.await
.map_err(|e| CdcError::generic(format!("Failed to flush metadata: {e}")))?;
Ok(())
}
async fn remove_received_metadata(&self, received_metadata_path: &Path) -> Result<()> {
fs::remove_file(received_metadata_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to remove received metadata {received_metadata_path:?}: {e}"
))
})
}
pub async fn commit_transaction(&self, tx_id: u32, commit_lsn: Option<Lsn>) -> Result<PathBuf> {
let received_metadata_path = self.get_received_tx_path(tx_id);
let pending_metadata_path = self.get_pending_tx_path(tx_id);
let mut metadata = self.read_received_metadata(&received_metadata_path).await?;
let tx_state = self.take_and_flush_active_transaction(tx_id).await?;
let (segment_paths, segment_counts) =
self.get_final_segment_info(tx_id, &metadata, tx_state);
if segment_paths.is_empty() {
return Err(CdcError::generic(format!(
"No transaction segments found for tx {}",
tx_id
)));
}
let final_segments = self
.build_final_segments(&segment_paths, &segment_counts)
.await?;
self.apply_commit_metadata(&mut metadata, final_segments, commit_lsn);
self.write_pending_metadata_file(&pending_metadata_path, &metadata)
.await?;
self.remove_received_metadata(&received_metadata_path)
.await?;
info!(
"Committed transaction {}: moved metadata to sql_pending_tx/ (LSN: {:?}), data stays in sql_data_tx/",
tx_id, commit_lsn
);
Ok(pending_metadata_path)
}
pub async fn abort_transaction(&self, tx_id: u32, _timestamp: DateTime<Utc>) -> Result<()> {
let received_metadata_path = self.get_received_tx_path(tx_id);
let first_segment_path = self.get_segment_data_file_path(tx_id, 0);
let segment_paths = 'paths: {
if tokio::fs::metadata(&received_metadata_path).await.is_err() {
break 'paths vec![first_segment_path.clone()];
}
let Ok(metadata_content) = fs::read_to_string(&received_metadata_path).await else {
break 'paths vec![first_segment_path.clone()];
};
let Ok(metadata) = serde_json::from_str::<TransactionFileMetadata>(&metadata_content)
else {
break 'paths vec![first_segment_path.clone()];
};
if metadata.segments.is_empty() {
break 'paths vec![first_segment_path.clone()];
}
metadata
.segments
.into_iter()
.map(|seg| seg.path)
.collect::<Vec<_>>()
};
{
let mut transactions = self.active_transactions.lock().await;
transactions.remove(&tx_id);
}
if tokio::fs::metadata(&received_metadata_path).await.is_ok() {
fs::remove_file(&received_metadata_path).await?;
debug!("Deleted metadata file: {:?}", received_metadata_path);
}
for path in segment_paths {
self.storage.delete_transaction(&path).await?;
}
info!(
"Aborted transaction {}, deleted metadata and data files",
tx_id
);
Ok(())
}
pub async fn list_pending_transactions(&self) -> Result<Vec<PendingTransactionFile>> {
let pending_dir = self.base_path.join(PENDING_TX_DIR);
let mut entries = fs::read_dir(&pending_dir).await?;
let mut files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("meta") {
if let Ok(metadata) = self.read_metadata(&path).await {
files.push(PendingTransactionFile {
file_path: path,
metadata,
});
}
}
}
files.sort_by_key(|f| f.metadata.commit_timestamp);
Ok(files)
}
pub async fn restore_received_transactions(&self) -> Result<Vec<TransactionFileMetadata>> {
let received_dir = self.base_path.join(RECEIVED_TX_DIR);
let mut entries = fs::read_dir(&received_dir).await?;
let mut metas = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("meta") {
let meta = fs::metadata(&path).await?;
let time = meta.created().or_else(|_| meta.modified())?;
metas.push((path, DateTime::<Utc>::from(time)));
}
}
metas.sort_by_key(|(_, t)| *t);
let mut active_txs = Vec::new();
for (path, _file_time) in metas {
if let Ok(metadata) = self.read_metadata(&path).await {
self.restore_active_transaction(&metadata).await?;
active_txs.push(metadata);
}
}
Ok(active_txs)
}
pub(crate) async fn read_metadata(&self, file_path: &Path) -> Result<TransactionFileMetadata> {
let metadata_content = fs::read_to_string(file_path).await?;
let metadata: TransactionFileMetadata = serde_json::from_str(&metadata_content)?;
Ok(metadata)
}
async fn write_pending_metadata(
&self,
metadata_file_path: &Path,
metadata: &TransactionFileMetadata,
) -> Result<()> {
let metadata_json = serde_json::to_string_pretty(metadata)
.map_err(|e| CdcError::generic(format!("Failed to serialize metadata: {e}")))?;
let temp_path = metadata_file_path.with_extension("meta.tmp");
let mut metadata_file = File::create(&temp_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to create pending metadata {temp_path:?}: {e}"
))
})?;
metadata_file.write_all(metadata_json.as_bytes()).await?;
metadata_file.flush().await?;
fs::rename(&temp_path, metadata_file_path)
.await
.map_err(|e| {
CdcError::generic(format!(
"Failed to replace pending metadata {metadata_file_path:?}: {e}"
))
})?;
Ok(())
}
pub async fn stage_pending_metadata_progress(
&self,
metadata_file_path: &Path,
last_executed_command_index: usize,
) -> Result<()> {
let mut staged = self.staged_pending_progress.lock().await;
staged.insert(
metadata_file_path.to_path_buf(),
PendingProgress {
last_executed_command_index,
last_update_timestamp: Utc::now(),
},
);
Ok(())
}
pub async fn flush_staged_pending_progress(&self) -> Result<()> {
let staged_entries = {
let mut staged = self.staged_pending_progress.lock().await;
if staged.is_empty() {
return Ok(());
}
staged.drain().collect::<Vec<_>>()
};
let mut last_error: Option<CdcError> = None;
for (metadata_path, progress) in staged_entries {
if fs::metadata(&metadata_path).await.is_err() {
continue;
}
match self.read_metadata(&metadata_path).await {
Ok(mut metadata) => {
metadata.last_executed_command_index =
Some(progress.last_executed_command_index);
metadata.last_update_timestamp = Some(progress.last_update_timestamp);
if let Err(e) = self.write_pending_metadata(&metadata_path, &metadata).await {
last_error = Some(e);
}
}
Err(e) => {
last_error = Some(e);
}
}
}
if let Some(err) = last_error {
return Err(err);
}
Ok(())
}
pub async fn clear_staged_pending_progress(&self, metadata_file_path: &Path) {
let mut staged = self.staged_pending_progress.lock().await;
staged.remove(metadata_file_path);
}
async fn restore_active_transaction(&self, metadata: &TransactionFileMetadata) -> Result<()> {
let segments = if !metadata.segments.is_empty() {
metadata
.segments
.iter()
.map(|seg| seg.path.clone())
.collect::<Vec<_>>()
} else {
vec![self.get_segment_data_file_path(metadata.transaction_id, 0)]
};
let segment_statement_counts = if !metadata.segments.is_empty() {
metadata
.segments
.iter()
.map(|seg| seg.statement_count)
.collect::<Vec<_>>()
} else {
vec![0]
};
let mut current_segment_index = metadata.current_segment_index;
if current_segment_index >= segments.len() {
current_segment_index = segments.len() - 1;
}
let current_segment_path = segments[current_segment_index].clone();
tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(¤t_segment_path)
.await?;
let current_size = match tokio::fs::metadata(¤t_segment_path).await {
Ok(meta) => meta.len() as usize,
Err(_) => 0,
};
let mut transactions = self.active_transactions.lock().await;
transactions.insert(
metadata.transaction_id,
ActiveTransactionState {
segments,
segment_statement_counts,
current_segment_index,
current_segment_size_bytes: current_size,
writer: BufferedEventWriter::new(current_segment_path, self.buffer_size),
},
);
Ok(())
}
pub async fn delete_pending_transaction(&self, metadata_file_path: &Path) -> Result<()> {
let metadata = self.read_metadata(metadata_file_path).await?;
let data_file_paths = if !metadata.segments.is_empty() {
metadata
.segments
.iter()
.map(|seg| seg.path.clone())
.collect::<Vec<_>>()
} else {
vec![self.get_segment_data_file_path(metadata.transaction_id, 0)]
};
if tokio::fs::metadata(metadata_file_path).await.is_ok() {
fs::remove_file(metadata_file_path).await?;
}
for path in data_file_paths.iter() {
self.storage.delete_transaction(path).await?;
}
info!(
"Deleted executed transaction files: metadata={:?}, data_files={:?}",
metadata_file_path, data_file_paths
);
Ok(())
}
fn generate_sql_for_event(&self, event: &ChangeEvent) -> Result<String> {
match &event.event_type {
EventType::Insert {
schema,
table,
data,
..
} => self.generate_insert_sql(schema, table, data),
EventType::Update {
schema,
table,
old_data,
new_data,
replica_identity,
key_columns,
..
} => self.generate_update_sql(
schema,
table,
new_data,
old_data.as_ref(),
replica_identity,
key_columns,
),
EventType::Delete {
schema,
table,
old_data,
replica_identity,
key_columns,
..
} => self.generate_delete_sql(schema, table, old_data, replica_identity, key_columns),
EventType::Truncate(tables) => self.generate_truncate_sql(tables),
_ => {
Ok(String::new())
}
}
}
fn generate_insert_sql(&self, schema: &str, table: &str, new_data: &RowData) -> Result<String> {
let schema = self.map_schema(Some(schema));
let mut sql = String::with_capacity(64 + new_data.len() * 48);
sql.push_str("INSERT INTO ");
self.append_qualified_table(&mut sql, &schema, table);
sql.push_str(" (");
for (i, (k, _)) in new_data.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
self.append_quoted_identifier(&mut sql, k);
}
sql.push_str(") VALUES (");
for (i, (_, v)) in new_data.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
self.append_value(&mut sql, v);
}
sql.push_str(");");
Ok(sql)
}
fn generate_update_sql(
&self,
schema: &str,
table: &str,
new_data: &RowData,
old_data: Option<&RowData>,
replica_identity: &ReplicaIdentity,
key_columns: &[Arc<str>],
) -> Result<String> {
let schema = self.map_schema(Some(schema));
let mut sql = String::with_capacity(64 + new_data.len() * 64);
sql.push_str("UPDATE ");
self.append_qualified_table(&mut sql, &schema, table);
sql.push_str(" SET ");
for (i, (col, val)) in new_data.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
self.append_quoted_identifier(&mut sql, col);
sql.push_str(" = ");
self.append_value(&mut sql, val);
}
sql.push_str(" WHERE ");
self.append_where_clause(&mut sql, replica_identity, key_columns, old_data, new_data)?;
sql.push(';');
Ok(sql)
}
fn generate_delete_sql(
&self,
schema: &str,
table: &str,
old_data: &RowData,
replica_identity: &ReplicaIdentity,
key_columns: &[Arc<str>],
) -> Result<String> {
let schema = self.map_schema(Some(schema));
let mut sql = String::with_capacity(64 + key_columns.len() * 32);
sql.push_str("DELETE FROM ");
self.append_qualified_table(&mut sql, &schema, table);
sql.push_str(" WHERE ");
self.append_where_clause(
&mut sql,
replica_identity,
key_columns,
Some(old_data),
old_data,
)?;
sql.push(';');
Ok(sql)
}
fn generate_truncate_sql(&self, tables: &[Arc<str>]) -> Result<String> {
let mut sql = String::with_capacity(tables.len() * 48);
for (i, table_spec) in tables.iter().enumerate() {
let table_spec: &str = table_spec;
let (schema, table) = match table_spec.split_once('.') {
Some((s, t)) if !t.contains('.') => (self.map_schema(Some(s)), t),
_ => (self.map_schema(Some("public")), table_spec),
};
if i > 0 {
sql.push('\n');
}
match self.destination_type {
DestinationType::MySQL | DestinationType::SqlServer => {
sql.push_str("TRUNCATE TABLE ");
self.append_quoted_identifier(&mut sql, &schema);
sql.push('.');
self.append_quoted_identifier(&mut sql, table);
sql.push(';');
}
DestinationType::SQLite => {
sql.push_str("DELETE FROM ");
self.append_quoted_identifier(&mut sql, table);
sql.push(';');
}
}
}
Ok(sql)
}
fn append_where_clause(
&self,
out: &mut String,
replica_identity: &ReplicaIdentity,
key_columns: &[Arc<str>],
old_data: Option<&RowData>,
new_data: &RowData,
) -> Result<()> {
match replica_identity {
ReplicaIdentity::Default | ReplicaIdentity::Index => {
let data = old_data.unwrap_or(new_data);
for (i, col) in key_columns.iter().enumerate() {
if i > 0 {
out.push_str(" AND ");
}
let col_str: &str = col;
let val = data.get(col_str).ok_or_else(|| {
CdcError::Generic(format!("Key column {col_str} not found"))
})?;
self.append_quoted_identifier(out, col_str);
out.push_str(" = ");
self.append_value(out, val);
}
}
ReplicaIdentity::Full => {
let data = old_data.ok_or_else(|| {
CdcError::Generic("FULL replica identity requires old data".to_string())
})?;
for (i, (col, val)) in data.iter().enumerate() {
if i > 0 {
out.push_str(" AND ");
}
self.append_quoted_identifier(out, col);
if val.is_null() {
out.push_str(" IS NULL");
} else {
out.push_str(" = ");
self.append_value(out, val);
}
}
}
ReplicaIdentity::Nothing => {
return Err(CdcError::Generic(
"Cannot generate WHERE clause with NOTHING replica identity".to_string(),
));
}
}
Ok(())
}
#[inline]
fn append_qualified_table(&self, out: &mut String, schema: &str, table: &str) {
match self.destination_type {
DestinationType::MySQL | DestinationType::SqlServer => {
self.append_quoted_identifier(out, schema);
out.push('.');
self.append_quoted_identifier(out, table);
}
DestinationType::SQLite => {
self.append_quoted_identifier(out, table);
}
}
}
#[inline]
fn append_quoted_identifier(&self, out: &mut String, name: &str) {
let (open, close, escape_ch) = match self.destination_type {
DestinationType::MySQL => ('`', '`', '`'),
DestinationType::SqlServer => ('[', ']', ']'),
DestinationType::SQLite => ('"', '"', '"'),
};
out.reserve(name.len() + 2);
out.push(open);
if name.as_bytes().contains(&(escape_ch as u8)) {
for ch in name.chars() {
if ch == escape_ch {
out.push(escape_ch);
}
out.push(ch);
}
} else {
out.push_str(name);
}
out.push(close);
}
#[cfg(test)]
fn quote_identifier(&self, name: &str) -> String {
let mut out = String::with_capacity(name.len() + 2);
self.append_quoted_identifier(&mut out, name);
out
}
fn append_hex_literal(&self, out: &mut String, bytes: &[u8]) {
static HEX: &[u8; 16] = b"0123456789abcdef";
let (prefix, suffix) = match self.destination_type {
DestinationType::SqlServer => ("0x", ""),
DestinationType::MySQL | DestinationType::SQLite => ("X'", "'"),
};
out.reserve(prefix.len() + bytes.len() * 2 + suffix.len());
out.push_str(prefix);
let buf = unsafe { out.as_mut_vec() };
for &b in bytes {
buf.push(HEX[(b >> 4) as usize]);
buf.push(HEX[(b & 0x0f) as usize]);
}
out.push_str(suffix);
}
fn append_value(&self, out: &mut String, value: &ColumnValue) {
match value {
ColumnValue::Null => out.push_str("NULL"),
ColumnValue::Text(_) => match value.as_str() {
Some(s) => {
if s == "t" {
out.push('1');
return;
}
if s == "f" {
out.push('0');
return;
}
out.reserve(s.len() + 2);
out.push('\'');
let escape_backslash = matches!(self.destination_type, DestinationType::MySQL);
let needs_escape = if escape_backslash {
s.as_bytes().iter().any(|&b| b == b'\'' || b == b'\\')
} else {
s.as_bytes().contains(&b'\'')
};
if needs_escape {
for ch in s.chars() {
match ch {
'\'' => out.push_str("''"),
'\\' if escape_backslash => out.push_str("\\\\"),
_ => out.push(ch),
}
}
} else {
out.push_str(s);
}
out.push('\'');
}
None => {
self.append_hex_literal(out, value.as_bytes());
}
},
ColumnValue::Binary(_) => self.append_hex_literal(out, value.as_bytes()),
}
}
#[cfg(test)]
fn format_value(&self, value: &ColumnValue) -> String {
let mut out = String::new();
self.append_value(&mut out, value);
out
}
fn map_schema<'a>(&'a self, source_schema: Option<&'a str>) -> &'a str {
match source_schema {
Some(schema) => self
.schema_mappings
.get(schema)
.map(String::as_str)
.unwrap_or(schema),
None => "public",
}
}
}
impl TransactionManager {
async fn execute_sql_batch(
self: &Arc<Self>,
destination_handler: &mut Box<dyn DestinationHandler>,
metadata_path: &Path,
commands: &[String],
last_executed_index: usize,
batch_idx: usize,
metrics_collector: &Arc<MetricsCollector>,
) -> Result<()> {
let batch_start_time = Instant::now();
let metadata_path = metadata_path.to_path_buf();
let metadata_path_for_log = metadata_path.clone();
let file_manager_for_hook = self.clone();
let staged_index = last_executed_index;
let pre_commit_hook: Option<PreCommitHook> = Some(Box::new(move || {
let metadata_path = metadata_path.clone();
let file_manager_for_hook = file_manager_for_hook.clone();
Box::pin(async move {
file_manager_for_hook
.stage_pending_metadata_progress(&metadata_path, staged_index)
.await?;
Ok(())
})
}));
if let Err(e) = destination_handler
.execute_sql_batch_with_hook(commands, pre_commit_hook)
.await
{
error!(
"Failed to execute SQL batch {} from file {}: {}",
batch_idx,
metadata_path_for_log.display(),
e
);
metrics_collector.record_error("transaction_file_execution_failed", "consumer");
info!(
"Batch and checkpoint rolled back together, will retry from last committed position on restart"
);
return Err(e);
}
let batch_duration = batch_start_time.elapsed();
debug!(
"Successfully executed batch {} with {} commands in {:?}",
batch_idx,
commands.len(),
batch_duration
);
Ok(())
}
async fn process_reader_statements<R>(
self: &Arc<Self>,
reader: R,
initial_statement_index: usize,
start_index: usize,
pending_tx: &PendingTransactionFile,
destination_handler: &mut Box<dyn DestinationHandler>,
cancellation_token: &CancellationToken,
metrics_collector: &Arc<MetricsCollector>,
batch_size: usize,
state: &mut StatementProcessingState<'_>,
) -> Result<()>
where
R: AsyncRead + Unpin,
{
let mut parser = SqlStreamParser::new();
let mut statement_index = initial_statement_index;
let buf_reader = BufReader::new(reader);
let mut lines = buf_reader.lines();
let mut statements: Vec<String> = Vec::new();
while let Some(line) = lines
.next_line()
.await
.map_err(|e| CdcError::generic(format!("Failed to read line: {e}")))?
{
statements.clear();
parser.parse_line(&line, &mut statements)?;
for stmt in statements.drain(..) {
if statement_index >= start_index {
state.batch.push(stmt);
if state.batch.len() >= batch_size {
if cancellation_token.is_cancelled() {
return Err(CdcError::cancelled(
"Transaction file processing cancelled by shutdown signal",
));
}
let batch_len = state.batch.len();
let next_command_index = *state.current_command_index + batch_len;
let last_executed_index = next_command_index - 1;
*state.batch_count += 1;
self.execute_sql_batch(
destination_handler,
&pending_tx.file_path,
state.batch,
last_executed_index,
*state.batch_count,
metrics_collector,
)
.await?;
*state.current_command_index = next_command_index;
*state.processed_count += batch_len;
state.batch.clear();
}
}
statement_index += 1;
}
}
if let Some(stmt) = parser.finish_statement() {
if statement_index >= start_index {
state.batch.push(stmt);
}
}
Ok(())
}
async fn process_segment_statements(
self: &Arc<Self>,
segment_path: &Path,
start_index: usize,
pending_tx: &PendingTransactionFile,
destination_handler: &mut Box<dyn DestinationHandler>,
cancellation_token: &CancellationToken,
metrics_collector: &Arc<MetricsCollector>,
batch_size: usize,
state: &mut StatementProcessingState<'_>,
) -> Result<()> {
let is_compressed = segment_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.eq_ignore_ascii_case("gz"))
.unwrap_or(false);
if !is_compressed {
let file = tokio::fs::File::open(segment_path).await.map_err(|e| {
CdcError::generic(format!("Failed to open SQL file {segment_path:?}: {e}"))
})?;
return self
.process_reader_statements(
file,
0,
start_index,
pending_tx,
destination_handler,
cancellation_token,
metrics_collector,
batch_size,
state,
)
.await;
}
let index_path = segment_path.with_extension("sql.gz.idx");
let mut initial_statement_index = 0usize;
let mut start_offset = 0u64;
if tokio::fs::metadata(&index_path).await.is_ok() {
if let Ok(index) = CompressionIndex::load_from_file(&index_path).await {
if let Some(sync_point) = index.find_sync_point_for_index(start_index) {
initial_statement_index = sync_point.statement_index;
start_offset = sync_point.compressed_offset;
}
}
}
let mut file = tokio::fs::File::open(segment_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to open compressed file {segment_path:?}: {e}"
))
})?;
if start_offset > 0 {
file.seek(SeekFrom::Start(start_offset))
.await
.map_err(|e| {
CdcError::generic(format!(
"Failed to seek compressed file {segment_path:?}: {e}"
))
})?;
}
let buf_reader = BufReader::new(file);
let mut decoder = GzipDecoder::new(buf_reader);
decoder.multiple_members(true);
self.process_reader_statements(
decoder,
initial_statement_index,
start_index,
pending_tx,
destination_handler,
cancellation_token,
metrics_collector,
batch_size,
state,
)
.await
}
pub(crate) async fn process_transaction_file(
self: Arc<Self>,
pending_tx: &PendingTransactionFile,
destination_handler: &mut Box<dyn DestinationHandler>,
cancellation_token: &CancellationToken,
lsn_tracker: &Arc<LsnTracker>,
metrics_collector: &Arc<MetricsCollector>,
batch_size: usize,
shared_lsn_feedback: &Arc<SharedLsnFeedback>,
) -> Result<()> {
let start_time = Instant::now();
let tx_id = pending_tx.metadata.transaction_id;
let latest_metadata = self.read_metadata(&pending_tx.file_path).await?;
let start_index = latest_metadata
.last_executed_command_index
.map(|idx| idx + 1)
.unwrap_or(0);
info!(
"Processing transaction file: {} (tx_id: {}, lsn: {:?}, start_index: {})",
pending_tx.file_path.display(),
tx_id,
pending_tx.metadata.commit_lsn,
start_index
);
let mut segments = if !latest_metadata.segments.is_empty() {
latest_metadata.segments
} else {
pending_tx.metadata.segments.clone()
};
if segments.is_empty() {
return Err(CdcError::generic(format!(
"No transaction segments found for tx {}",
tx_id
)));
}
let mut batch: Vec<String> = Vec::with_capacity(batch_size);
let mut batch_count = 0usize;
let mut processed_count = 0usize;
let mut current_command_index = start_index;
let mut remaining_start_index = start_index;
let mut state = StatementProcessingState {
batch: &mut batch,
current_command_index: &mut current_command_index,
processed_count: &mut processed_count,
batch_count: &mut batch_count,
};
for segment in segments.drain(..) {
if remaining_start_index > 0
&& segment.statement_count > 0
&& remaining_start_index >= segment.statement_count
{
remaining_start_index -= segment.statement_count;
continue;
}
let segment_start_index = remaining_start_index;
remaining_start_index = 0;
let stream_result = self
.process_segment_statements(
&segment.path,
segment_start_index,
pending_tx,
destination_handler,
cancellation_token,
metrics_collector,
batch_size,
&mut state,
)
.await;
if let Err(e) = stream_result {
if e.is_cancelled() {
warn!(
"Transaction file processing cancelled by shutdown signal (tx_id: {})",
tx_id
);
return Ok(());
}
return Err(e);
}
}
if !batch.is_empty() {
if cancellation_token.is_cancelled() {
warn!(
"Transaction file processing cancelled by shutdown signal (tx_id: {})",
tx_id
);
return Ok(());
}
let batch_len = batch.len();
let next_command_index = current_command_index + batch_len;
let last_executed_index = next_command_index - 1;
batch_count += 1;
self.execute_sql_batch(
destination_handler,
&pending_tx.file_path,
&batch,
last_executed_index,
batch_count,
metrics_collector,
)
.await?;
current_command_index = next_command_index;
processed_count += batch_len;
batch.clear();
}
let total_commands = current_command_index;
if processed_count == 0 {
info!(
"All commands already executed for transaction file: {} (tx_id: {})",
pending_tx.file_path.display(),
tx_id
);
self.finalize_transaction_file(
pending_tx,
lsn_tracker,
metrics_collector,
total_commands,
shared_lsn_feedback,
)
.await?;
return Ok(());
}
let duration = start_time.elapsed();
info!(
"Successfully executed {} remaining commands ({} total) in {} batches in {:?} (tx_id: {}, avg: {:?}/batch)",
processed_count,
total_commands,
batch_count,
duration,
tx_id,
duration / batch_count.max(1) as u32
);
self.finalize_transaction_file(
pending_tx,
lsn_tracker,
metrics_collector,
total_commands,
shared_lsn_feedback,
)
.await?;
Ok(())
}
async fn finalize_transaction_file(
self: &Arc<Self>,
pending_tx: &PendingTransactionFile,
lsn_tracker: &Arc<LsnTracker>,
metrics_collector: &Arc<MetricsCollector>,
total_commands: usize,
shared_lsn_feedback: &Arc<SharedLsnFeedback>,
) -> Result<()> {
let tx_id = pending_tx.metadata.transaction_id;
if let Some(commit_lsn) = pending_tx.metadata.commit_lsn {
info!(
"Transaction {} successfully applied to destination, commit_lsn: {}",
tx_id, commit_lsn
);
lsn_tracker.commit_lsn(commit_lsn.0);
shared_lsn_feedback.update_applied_lsn(commit_lsn.0);
info!(
"Updated apply_lsn to {} (transaction {} applied to destination)",
commit_lsn, tx_id
);
} else {
warn!(
"Transaction {} has no commit_lsn, cannot send ACK (this should not happen for committed transactions)",
tx_id
);
}
let destination_type_str = pending_tx.metadata.destination_type.to_string();
let mut transaction = crate::types::Transaction::new(
pending_tx.metadata.transaction_id,
pending_tx.metadata.commit_timestamp,
);
transaction.commit_lsn = pending_tx.metadata.commit_lsn;
metrics_collector.record_transaction_processed(&transaction, &destination_type_str);
metrics_collector.record_full_transaction_processed(&transaction, &destination_type_str);
debug!(
"Successfully processed transaction file with {} commands and recorded metrics",
total_commands
);
if let Err(e) = self.delete_pending_transaction(&pending_tx.file_path).await {
error!(
"Failed to delete processed transaction file {}: {}",
pending_tx.file_path.display(),
e
);
}
self.clear_staged_pending_progress(&pending_tx.file_path)
.await;
if pending_tx.metadata.commit_lsn.is_some() {
let pending_count = self.list_pending_transactions().await?.len();
lsn_tracker.update_consumer_state(
tx_id,
pending_tx.metadata.commit_timestamp,
pending_count,
);
debug!(
"Updated LSN tracker consumer state: tx_id={}, pending_count={} (after deletion)",
tx_id, pending_count
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use pg_walstream::ColumnValue;
async fn test_manager(dest: DestinationType) -> TransactionManager {
let dir = std::env::temp_dir().join(format!(
"pg2any_format_value_test_{}_{}",
dest,
std::process::id()
));
let _ = tokio::fs::create_dir_all(&dir).await;
TransactionManager::new(&dir, dest, 10 * 1024 * 1024)
.await
.expect("test manager creation should succeed")
}
#[tokio::test]
async fn test_mysql_backslash_injection_is_escaped() {
let mgr = test_manager(DestinationType::MySQL).await;
let val = ColumnValue::text(r"foo\'; DROP TABLE users; --");
let formatted = mgr.format_value(&val);
assert_eq!(formatted, r"'foo\\''; DROP TABLE users; --'");
}
#[tokio::test]
async fn test_mysql_backslash_at_end_of_string() {
let mgr = test_manager(DestinationType::MySQL).await;
let val = ColumnValue::text(r"trailing\");
let formatted = mgr.format_value(&val);
assert_eq!(formatted, r"'trailing\\'");
}
#[tokio::test]
async fn test_sqlite_does_not_double_escape_backslashes() {
let mgr = test_manager(DestinationType::SQLite).await;
let val = ColumnValue::text(r"path\to\file");
let formatted = mgr.format_value(&val);
assert_eq!(formatted, r"'path\to\file'");
}
#[tokio::test]
async fn test_sqlserver_does_not_double_escape_backslashes() {
let mgr = test_manager(DestinationType::SqlServer).await;
let val = ColumnValue::text(r"path\to\file");
let formatted = mgr.format_value(&val);
assert_eq!(formatted, r"'path\to\file'");
}
#[tokio::test]
async fn test_numeric_text_is_always_quoted() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(mgr.format_value(&ColumnValue::text("42")), "'42'");
assert_eq!(mgr.format_value(&ColumnValue::text("-1")), "'-1'");
assert_eq!(mgr.format_value(&ColumnValue::text("0")), "'0'");
assert_eq!(mgr.format_value(&ColumnValue::text("3.14")), "'3.14'");
assert_eq!(mgr.format_value(&ColumnValue::text("01234")), "'01234'");
}
#[tokio::test]
async fn test_pgoutput_boolean_true() {
for dest in [
DestinationType::MySQL,
DestinationType::SQLite,
DestinationType::SqlServer,
] {
let mgr = test_manager(dest.clone()).await;
assert_eq!(mgr.format_value(&ColumnValue::text("t")), "1");
}
}
#[tokio::test]
async fn test_pgoutput_boolean_false() {
for dest in [
DestinationType::MySQL,
DestinationType::SQLite,
DestinationType::SqlServer,
] {
let mgr = test_manager(dest.clone()).await;
assert_eq!(mgr.format_value(&ColumnValue::text("f")), "0");
}
}
#[tokio::test]
async fn test_full_word_true_false_is_quoted() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(mgr.format_value(&ColumnValue::text("true")), "'true'");
assert_eq!(mgr.format_value(&ColumnValue::text("false")), "'false'");
assert_eq!(mgr.format_value(&ColumnValue::text("TRUE")), "'TRUE'");
}
#[tokio::test]
async fn test_regular_string_is_quoted() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(mgr.format_value(&ColumnValue::text("hello")), "'hello'");
}
#[tokio::test]
async fn test_string_with_single_quote_is_escaped() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(
mgr.format_value(&ColumnValue::text("it's here")),
"'it''s here'"
);
}
#[tokio::test]
async fn test_null_value() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(mgr.format_value(&ColumnValue::Null), "NULL");
}
#[tokio::test]
async fn test_binary_value_hex_encoded_mysql_sqlite() {
for dest in [DestinationType::MySQL, DestinationType::SQLite] {
let mgr = test_manager(dest).await;
let val = ColumnValue::Binary(Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]));
assert_eq!(mgr.format_value(&val), "X'deadbeef'");
}
}
#[tokio::test]
async fn test_binary_value_hex_encoded_sqlserver() {
let mgr = test_manager(DestinationType::SqlServer).await;
let val = ColumnValue::Binary(Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]));
assert_eq!(mgr.format_value(&val), "0xdeadbeef");
}
#[tokio::test]
async fn test_binary_empty_bytes() {
let mgr_mysql = test_manager(DestinationType::MySQL).await;
let mgr_sqlserver = test_manager(DestinationType::SqlServer).await;
let val = ColumnValue::Binary(Bytes::from_static(&[]));
assert_eq!(mgr_mysql.format_value(&val), "X''");
assert_eq!(mgr_sqlserver.format_value(&val), "0x");
}
#[tokio::test]
async fn test_non_utf8_text_falls_back_to_hex_literal() {
let mgr_mysql = test_manager(DestinationType::MySQL).await;
let mgr_sqlserver = test_manager(DestinationType::SqlServer).await;
let val = ColumnValue::Text(Bytes::from_static(&[0x80, 0xFF, 0x01]));
assert_eq!(mgr_mysql.format_value(&val), "X'80ff01'");
assert_eq!(mgr_sqlserver.format_value(&val), "0x80ff01");
}
#[tokio::test]
async fn test_mysql_identifier_escapes_backticks() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(mgr.quote_identifier("normal"), "`normal`");
assert_eq!(mgr.quote_identifier("ta`ble"), "`ta``ble`");
assert_eq!(mgr.quote_identifier("`inject`"), "```inject```");
}
#[tokio::test]
async fn test_sqlserver_identifier_escapes_brackets() {
let mgr = test_manager(DestinationType::SqlServer).await;
assert_eq!(mgr.quote_identifier("normal"), "[normal]");
assert_eq!(mgr.quote_identifier("ta]ble"), "[ta]]ble]");
}
#[tokio::test]
async fn test_sqlite_identifier_escapes_double_quotes() {
let mgr = test_manager(DestinationType::SQLite).await;
assert_eq!(mgr.quote_identifier("normal"), "\"normal\"");
assert_eq!(mgr.quote_identifier("ta\"ble"), "\"ta\"\"ble\"");
}
#[tokio::test]
async fn test_empty_string_is_quoted() {
let mgr = test_manager(DestinationType::MySQL).await;
assert_eq!(mgr.format_value(&ColumnValue::text("")), "''");
}
}