use async_compression::tokio::bufread::GzipDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use pg2any_lib::storage::SqlStreamParser;
use std::io::Write;
use std::path::PathBuf;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
async fn create_large_sql_file(path: &PathBuf, statement_count: usize) -> std::io::Result<()> {
let mut file = fs::File::create(path).await?;
for i in 0..statement_count {
let stmt = format!(
"INSERT INTO large_table (id, data) VALUES ({}, '{}');\n",
i,
"x".repeat(100) );
file.write_all(stmt.as_bytes()).await?;
}
file.flush().await?;
Ok(())
}
fn compress_file(input_path: &PathBuf, output_path: &PathBuf) -> std::io::Result<()> {
let input_data = std::fs::read(input_path)?;
let output_file = std::fs::File::create(output_path)?;
let mut encoder = GzEncoder::new(output_file, Compression::default());
encoder.write_all(&input_data)?;
encoder.finish()?;
Ok(())
}
#[tokio::test]
async fn test_large_compressed_file_streaming_decompression() {
let temp_dir =
std::env::temp_dir().join(format!("pg2any_large_file_test_{}", std::process::id()));
fs::create_dir_all(&temp_dir).await.unwrap();
let uncompressed_path = temp_dir.join("large_transaction.sql");
create_large_sql_file(&uncompressed_path, 10_000)
.await
.unwrap();
let compressed_path = temp_dir.join("large_transaction.sql.gz");
compress_file(&uncompressed_path, &compressed_path).unwrap();
let uncompressed_size = fs::metadata(&uncompressed_path).await.unwrap().len();
let compressed_size = fs::metadata(&compressed_path).await.unwrap().len();
println!("Uncompressed: {} bytes", uncompressed_size);
println!("Compressed: {} bytes", compressed_size);
println!(
"Compression ratio: {:.1}%",
(compressed_size as f64 / uncompressed_size as f64) * 100.0
);
assert!(
compressed_size < uncompressed_size,
"Compression should reduce file size"
);
let file = fs::File::open(&compressed_path).await.unwrap();
let buf_reader = BufReader::with_capacity(65536, file);
let decoder = GzipDecoder::new(buf_reader);
let mut parser = SqlStreamParser::new();
let statements = parser.parse_stream_collect(decoder, 0).await.unwrap();
assert_eq!(
statements.len(),
10_000,
"Should read all 10,000 statements"
);
assert!(statements[0].contains("INSERT INTO large_table"));
assert!(statements[0].contains("VALUES (0,"));
assert!(statements[9_999].contains("VALUES (9999,"));
fs::remove_dir_all(&temp_dir).await.unwrap();
println!(" Large compressed file test passed - streaming decompression works!");
}
#[tokio::test]
async fn test_compressed_file_memory_efficiency() {
let temp_dir = std::env::temp_dir().join(format!("pg2any_memory_test_{}", std::process::id()));
fs::create_dir_all(&temp_dir).await.unwrap();
let uncompressed_path = temp_dir.join("medium_transaction.sql");
create_large_sql_file(&uncompressed_path, 50_000)
.await
.unwrap();
let compressed_path = temp_dir.join("medium_transaction.sql.gz");
compress_file(&uncompressed_path, &compressed_path).unwrap();
let file = fs::File::open(&compressed_path).await.unwrap();
let buf_reader = BufReader::with_capacity(65536, file);
let decoder = GzipDecoder::new(buf_reader);
let mut parser = SqlStreamParser::new();
let statements = parser.parse_stream_collect(decoder, 0).await.unwrap();
assert_eq!(statements.len(), 50_000);
fs::remove_dir_all(&temp_dir).await.unwrap();
println!(" Memory efficiency test passed - no OOM with 50K statements!");
}
#[tokio::test]
async fn test_graceful_cancellation_during_decompression() {
let temp_dir = std::env::temp_dir().join(format!("pg2any_cancel_test_{}", std::process::id()));
fs::create_dir_all(&temp_dir).await.unwrap();
let uncompressed_path = temp_dir.join("cancel_test.sql");
create_large_sql_file(&uncompressed_path, 1000)
.await
.unwrap();
let compressed_path = temp_dir.join("cancel_test.sql.gz");
compress_file(&uncompressed_path, &compressed_path).unwrap();
let file = fs::File::open(&compressed_path).await.unwrap();
let buf_reader = BufReader::with_capacity(65536, file);
let decoder = GzipDecoder::new(buf_reader);
let mut parser = SqlStreamParser::new();
let statements = parser.parse_stream_collect(decoder, 0).await.unwrap();
assert_eq!(statements.len(), 1000);
fs::remove_dir_all(&temp_dir).await.unwrap();
println!("Cancellation test passed - graceful shutdown supported!");
}