use crate::error::{CdcError, Result};
use crate::storage::sql_parser::SqlStreamParser;
use crate::storage::traits::TransactionStorage;
use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, SeekFrom};
use tracing::{debug, info};
const SYNC_POINT_INTERVAL: usize = 1000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatementOffset {
pub statement_index: usize,
pub compressed_offset: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CompressionIndex {
pub total_statements: usize,
pub sync_points: Vec<StatementOffset>,
}
impl CompressionIndex {
pub fn new() -> Self {
Self {
total_statements: 0,
sync_points: Vec::new(),
}
}
pub fn find_sync_point_for_index(&self, target_index: usize) -> Option<&StatementOffset> {
let partition_idx = self
.sync_points
.partition_point(|sp| sp.statement_index <= target_index);
self.sync_points.get(partition_idx.saturating_sub(1))
}
pub async fn save_to_file(&self, path: &Path) -> Result<()> {
let json = serde_json::to_string_pretty(self)
.map_err(|e| CdcError::generic(format!("Failed to serialize index: {e}")))?;
tokio::fs::write(path, json)
.await
.map_err(|e| CdcError::generic(format!("Failed to write index file: {e}")))?;
Ok(())
}
pub async fn load_from_file(path: &Path) -> Result<Self> {
let content = tokio::fs::read_to_string(path)
.await
.map_err(|e| CdcError::generic(format!("Failed to read index file: {e}")))?;
let index: Self = serde_json::from_str(&content)
.map_err(|e| CdcError::generic(format!("Failed to parse index: {e}")))?;
Ok(index)
}
}
impl Default for CompressionIndex {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct CompressedStorage;
fn build_chunk_text(chunk: &[String]) -> String {
let mut cap: usize = 0;
for stmt in chunk {
cap += stmt.len() + 2; }
let mut out = String::with_capacity(cap);
for stmt in chunk {
let trimmed = stmt.trim();
if trimmed.is_empty() {
continue;
}
out.push_str(trimmed);
if !trimmed.ends_with(';') {
out.push(';');
}
out.push('\n');
}
if out.ends_with('\n') {
out.pop();
}
out
}
impl CompressedStorage {
pub fn new() -> Self {
Self
}
fn index_path(compressed_path: &Path) -> PathBuf {
compressed_path.with_extension("sql.gz.idx")
}
}
impl Default for CompressedStorage {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl TransactionStorage for CompressedStorage {
async fn write_transaction(&self, file_path: &Path, data: &[String]) -> Result<PathBuf> {
let compressed_path = file_path.with_extension("sql.gz");
info!(
"Compressing {:?} to {:?} with sync points (interval: {})",
file_path, compressed_path, SYNC_POINT_INTERVAL
);
let total_statements = data.len();
if total_statements == 0 {
return Err(CdcError::generic("No statements to compress"));
}
let mut dest_file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&compressed_path)
.await
.map_err(|e| CdcError::generic(format!("Failed to create dest file: {e}")))?;
let mut current_offset: u64 = 0;
let mut index = CompressionIndex::new();
index.total_statements = total_statements;
for (chunk_idx, chunk) in data.chunks(SYNC_POINT_INTERVAL).enumerate() {
let statement_index = chunk_idx * SYNC_POINT_INTERVAL;
index.sync_points.push(StatementOffset {
statement_index,
compressed_offset: current_offset,
});
let chunk_data = build_chunk_text(chunk);
let buffer = tokio::task::spawn_blocking(move || -> Result<Vec<u8>> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(chunk_data.as_bytes())
.map_err(|e| CdcError::generic(format!("Compression write failed: {e}")))?;
encoder
.finish()
.map_err(|e| CdcError::generic(format!("Compression finish failed: {e}")))
})
.await
.map_err(|e| CdcError::generic(format!("Compression task failed: {e}")))?;
let buffer = buffer?;
dest_file
.write_all(&buffer)
.await
.map_err(|e| CdcError::generic(format!("Failed to write compressed data: {e}")))?;
let compressed_size = buffer.len() as u64;
current_offset += compressed_size;
debug!(
"Compressed chunk {} (statements {}-{}): {} bytes compressed",
chunk_idx,
statement_index,
statement_index + chunk.len() - 1,
compressed_size
);
}
dest_file
.flush()
.await
.map_err(|e| CdcError::generic(format!("Failed to flush dest file: {e}")))?;
let index_path = Self::index_path(&compressed_path);
index.save_to_file(&index_path).await?;
info!(
"Created compression index: {:?} ({} sync points, {} statements)",
index_path,
index.sync_points.len(),
total_statements
);
Ok(compressed_path)
}
async fn write_transaction_from_file(&self, file_path: &Path) -> Result<(PathBuf, usize)> {
let compressed_path = file_path.with_extension("sql.gz");
info!(
"Compressing {:?} to {:?} with sync points (interval: {})",
file_path, compressed_path, SYNC_POINT_INTERVAL
);
let source_file = tokio::fs::File::open(file_path).await.map_err(|e| {
CdcError::generic(format!("Failed to open source file {file_path:?}: {e}"))
})?;
let mut dest_file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&compressed_path)
.await
.map_err(|e| CdcError::generic(format!("Failed to create dest file: {e}")))?;
let mut parser = SqlStreamParser::new();
let mut index = CompressionIndex::new();
let mut total_statements: usize = 0;
let mut current_offset: u64 = 0;
let mut current_chunk: Vec<String> = Vec::with_capacity(SYNC_POINT_INTERVAL);
let buf_reader = BufReader::with_capacity(65536, source_file);
let mut lines = buf_reader.lines();
let mut statements: Vec<String> = Vec::new();
while let Some(line) = lines
.next_line()
.await
.map_err(|e| CdcError::generic(format!("Failed to read line: {e}")))?
{
statements.clear();
parser.parse_line(&line, &mut statements)?;
for stmt in statements.drain(..) {
self.add_statement_to_chunk(
stmt,
&mut current_chunk,
&mut index,
&mut total_statements,
current_offset,
);
if current_chunk.len() >= SYNC_POINT_INTERVAL {
let compressed = Self::compress_chunk(¤t_chunk).await?;
dest_file.write_all(&compressed).await.map_err(|e| {
CdcError::generic(format!("Failed to write compressed data: {e}"))
})?;
current_offset += compressed.len() as u64;
current_chunk.clear();
}
}
}
if let Some(stmt) = parser.finish_statement() {
self.add_statement_to_chunk(
stmt,
&mut current_chunk,
&mut index,
&mut total_statements,
current_offset,
);
}
if !current_chunk.is_empty() {
let compressed = Self::compress_chunk(¤t_chunk).await?;
dest_file
.write_all(&compressed)
.await
.map_err(|e| CdcError::generic(format!("Failed to write compressed data: {e}")))?;
}
if total_statements == 0 {
let _ = fs::remove_file(&compressed_path).await;
return Err(CdcError::generic("No statements to compress"));
}
dest_file
.flush()
.await
.map_err(|e| CdcError::generic(format!("Failed to flush dest file: {e}")))?;
index.total_statements = total_statements;
let index_path = Self::index_path(&compressed_path);
index.save_to_file(&index_path).await?;
info!(
"Created compression index: {:?} ({} sync points, {} statements)",
index_path,
index.sync_points.len(),
total_statements
);
Ok((compressed_path, total_statements))
}
async fn read_transaction(&self, file_path: &Path, start_index: usize) -> Result<Vec<String>> {
let index_path = Self::index_path(file_path);
if tokio::fs::metadata(&index_path).await.is_err() {
debug!(
"No index file found for {:?}, falling back to full decompression",
file_path
);
return self.read_full(file_path, start_index).await;
}
let index = CompressionIndex::load_from_file(&index_path).await?;
if start_index >= index.total_statements {
return Ok(Vec::new());
}
let sync_point = index.find_sync_point_for_index(start_index);
match sync_point {
Some(sp) => {
debug!(
"Using sync point at statement {} to read from index {}",
sp.statement_index, start_index
);
self.read_from_sync_point(file_path, sp, start_index).await
}
None => {
debug!("No sync point found, reading from beginning");
self.read_full(file_path, start_index).await
}
}
}
async fn delete_transaction(&self, file_path: &Path) -> Result<()> {
if tokio::fs::metadata(file_path).await.is_ok() {
fs::remove_file(file_path).await.map_err(|e| {
CdcError::generic(format!("Failed to delete file {file_path:?}: {e}"))
})?;
debug!("Deleted compressed file: {:?}", file_path);
}
let index_path = Self::index_path(file_path);
if tokio::fs::metadata(&index_path).await.is_ok() {
fs::remove_file(&index_path).await.map_err(|e| {
CdcError::generic(format!("Failed to delete index {index_path:?}: {e}"))
})?;
debug!("Deleted index file: {:?}", index_path);
}
Ok(())
}
async fn file_exists(&self, file_path: &Path) -> bool {
tokio::fs::metadata(file_path).await.is_ok()
}
fn file_extension(&self) -> &str {
"sql.gz"
}
fn transform_path(&self, base_path: &Path) -> PathBuf {
base_path.with_extension("sql.gz")
}
}
impl CompressedStorage {
fn add_statement_to_chunk(
&self,
stmt: String,
current_chunk: &mut Vec<String>,
index: &mut CompressionIndex,
total_statements: &mut usize,
current_offset: u64,
) {
if current_chunk.is_empty() {
index.sync_points.push(StatementOffset {
statement_index: *total_statements,
compressed_offset: current_offset,
});
}
current_chunk.push(stmt);
*total_statements += 1;
}
async fn compress_chunk(chunk: &[String]) -> Result<Vec<u8>> {
let chunk_data = build_chunk_text(chunk);
let buffer = tokio::task::spawn_blocking(move || -> Result<Vec<u8>> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(chunk_data.as_bytes())
.map_err(|e| CdcError::generic(format!("Compression write failed: {e}")))?;
encoder
.finish()
.map_err(|e| CdcError::generic(format!("Compression finish failed: {e}")))
})
.await
.map_err(|e| CdcError::generic(format!("Compression task failed: {e}")))?;
buffer
}
async fn read_from_sync_point(
&self,
compressed_path: &Path,
sync_point: &StatementOffset,
start_index: usize,
) -> Result<Vec<String>> {
let mut file = tokio::fs::File::open(compressed_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to open compressed file {compressed_path:?}: {e}"
))
})?;
let offset = sync_point.compressed_offset;
file.seek(SeekFrom::Start(offset))
.await
.map_err(|e| CdcError::generic(format!("Failed to seek to offset {offset}: {e}")))?;
debug!(
"Seeking to compressed offset {} (sync point at statement {})",
offset, sync_point.statement_index
);
let buf_reader = BufReader::new(file);
let mut decoder = GzipDecoder::new(buf_reader);
decoder.multiple_members(true);
let mut parser = SqlStreamParser::new();
let skip_count = start_index.saturating_sub(sync_point.statement_index);
let result = parser.parse_stream_collect(decoder, skip_count).await?;
Ok(result)
}
async fn read_full(&self, compressed_path: &Path, start_index: usize) -> Result<Vec<String>> {
let file = tokio::fs::File::open(compressed_path).await.map_err(|e| {
CdcError::generic(format!(
"Failed to open compressed file {compressed_path:?}: {e}"
))
})?;
let buf_reader = BufReader::new(file);
let mut decoder = GzipDecoder::new(buf_reader);
decoder.multiple_members(true);
let mut parser = SqlStreamParser::new();
let statements = parser.parse_stream_collect(decoder, start_index).await?;
debug!(
"Read {} statements from compressed file {:?} (starting from index {})",
statements.len(),
compressed_path,
start_index
);
Ok(statements)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
async fn create_temp_dir() -> PathBuf {
let temp_dir =
std::env::temp_dir().join(format!("pg2any_comp_test_{}", std::process::id()));
tokio::fs::create_dir_all(&temp_dir).await.unwrap();
temp_dir
}
#[tokio::test]
async fn test_write_and_read_compressed() {
let temp_dir = create_temp_dir().await;
let base_path = temp_dir.join("test");
let storage = CompressedStorage::new();
let statements = vec![
"INSERT INTO users VALUES (1, 'Alice');".to_string(),
"INSERT INTO users VALUES (2, 'Bob');".to_string(),
];
let written_path = storage
.write_transaction(&base_path, &statements)
.await
.unwrap();
assert!(written_path.to_string_lossy().ends_with(".sql.gz"));
assert!(storage.file_exists(&written_path).await);
let index_path = CompressedStorage::index_path(&written_path);
assert!(index_path.exists());
let read_statements = storage.read_transaction(&written_path, 0).await.unwrap();
assert_eq!(read_statements.len(), 2);
assert_eq!(read_statements[0], "INSERT INTO users VALUES (1, 'Alice')");
assert_eq!(read_statements[1], "INSERT INTO users VALUES (2, 'Bob')");
storage.delete_transaction(&written_path).await.unwrap();
assert!(!storage.file_exists(&written_path).await);
assert!(!index_path.exists());
}
#[tokio::test]
async fn test_compression_with_sync_points() {
let temp_dir = create_temp_dir().await;
let base_path = temp_dir.join("test_sync");
let storage = CompressedStorage::new();
let statements: Vec<String> = (0..2500)
.map(|i| format!("INSERT INTO test VALUES ({});", i))
.collect();
let written_path = storage
.write_transaction(&base_path, &statements)
.await
.unwrap();
let index_path = CompressedStorage::index_path(&written_path);
let index = CompressionIndex::load_from_file(&index_path).await.unwrap();
assert_eq!(index.total_statements, 2500);
assert_eq!(index.sync_points.len(), 3);
let read_statements = storage.read_transaction(&written_path, 1100).await.unwrap();
assert_eq!(read_statements.len(), 1400); assert!(read_statements[0].contains("1100"));
storage.delete_transaction(&written_path).await.unwrap();
}
#[tokio::test]
async fn test_file_extension() {
let storage = CompressedStorage::new();
assert_eq!(storage.file_extension(), "sql.gz");
}
#[tokio::test]
async fn test_transform_path() {
let storage = CompressedStorage::new();
let base = PathBuf::from("/tmp/transaction_123");
let transformed = storage.transform_path(&base);
assert_eq!(transformed, PathBuf::from("/tmp/transaction_123.sql.gz"));
}
}