use crate::error::{CdcError, Result};
use crate::storage::sql_parser::SqlStreamParser;
use crate::storage::traits::TransactionStorage;
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
use tracing::debug;
#[derive(Debug, Clone)]
pub struct UncompressedStorage;
impl UncompressedStorage {
pub fn new() -> Self {
Self
}
}
impl Default for UncompressedStorage {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl TransactionStorage for UncompressedStorage {
async fn write_transaction(&self, file_path: &Path, data: &[String]) -> Result<PathBuf> {
let file = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(file_path)
.await
.map_err(|e| CdcError::generic(format!("Failed to create file {file_path:?}: {e}")))?;
let mut writer = BufWriter::new(file);
for statement in data {
let stmt = statement.trim();
writer.write_all(stmt.as_bytes()).await?;
if !stmt.ends_with(';') {
writer.write_all(b";").await?;
}
writer.write_all(b"\n").await?;
}
writer.flush().await?;
debug!(
"Wrote {} statements to uncompressed file: {:?}",
data.len(),
file_path
);
Ok(file_path.to_path_buf())
}
async fn write_transaction_from_file(&self, file_path: &Path) -> Result<(PathBuf, usize)> {
let file = tokio::fs::File::open(file_path)
.await
.map_err(|e| CdcError::generic(format!("Failed to open file {file_path:?}: {e}")))?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut parser = SqlStreamParser::new();
let mut statement_count = 0usize;
let mut statements: Vec<String> = Vec::new();
while let Some(line) = lines
.next_line()
.await
.map_err(|e| CdcError::generic(format!("Failed to read line: {e}")))?
{
statements.clear();
parser.parse_line(&line, &mut statements)?;
statement_count += statements.len();
}
if parser.finish_statement().is_some() {
statement_count += 1;
}
Ok((file_path.to_path_buf(), statement_count))
}
async fn read_transaction(&self, file_path: &Path, start_index: usize) -> Result<Vec<String>> {
let mut parser = SqlStreamParser::new();
let statements = parser
.parse_file_from_index_collect(file_path, start_index)
.await?;
debug!(
"Read {} statements from uncompressed file {:?} (starting from index {})",
statements.len(),
file_path,
start_index
);
Ok(statements)
}
async fn delete_transaction(&self, file_path: &Path) -> Result<()> {
if tokio::fs::metadata(file_path).await.is_ok() {
fs::remove_file(file_path).await.map_err(|e| {
CdcError::generic(format!("Failed to delete file {file_path:?}: {e}"))
})?;
debug!("Deleted uncompressed file: {:?}", file_path);
}
Ok(())
}
async fn file_exists(&self, file_path: &Path) -> bool {
tokio::fs::metadata(file_path).await.is_ok()
}
fn file_extension(&self) -> &str {
"sql"
}
fn transform_path(&self, base_path: &Path) -> PathBuf {
base_path.with_extension("sql")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
async fn create_temp_dir() -> PathBuf {
let temp_dir =
std::env::temp_dir().join(format!("pg2any_uncomp_test_{}", std::process::id()));
tokio::fs::create_dir_all(&temp_dir).await.unwrap();
temp_dir
}
#[tokio::test]
async fn test_write_and_read_uncompressed() {
let temp_dir = create_temp_dir().await;
let file_path = temp_dir.join("test.sql");
let storage = UncompressedStorage::new();
let statements = vec![
"INSERT INTO users VALUES (1, 'Alice');".to_string(),
"INSERT INTO users VALUES (2, 'Bob');".to_string(),
];
let written_path = storage
.write_transaction(&file_path, &statements)
.await
.unwrap();
assert_eq!(written_path, file_path);
assert!(storage.file_exists(&file_path).await);
let read_statements = storage.read_transaction(&file_path, 0).await.unwrap();
assert_eq!(read_statements.len(), 2);
assert_eq!(read_statements[0], "INSERT INTO users VALUES (1, 'Alice')");
assert_eq!(read_statements[1], "INSERT INTO users VALUES (2, 'Bob')");
storage.delete_transaction(&file_path).await.unwrap();
assert!(!storage.file_exists(&file_path).await);
}
#[tokio::test]
async fn test_read_with_start_index() {
let temp_dir = create_temp_dir().await;
let file_path = temp_dir.join("test_index.sql");
let storage = UncompressedStorage::new();
let statements = vec![
"INSERT INTO users VALUES (1, 'Alice');".to_string(),
"INSERT INTO users VALUES (2, 'Bob');".to_string(),
"INSERT INTO users VALUES (3, 'Charlie');".to_string(),
];
storage
.write_transaction(&file_path, &statements)
.await
.unwrap();
let read_statements = storage.read_transaction(&file_path, 1).await.unwrap();
assert_eq!(read_statements.len(), 2);
assert_eq!(read_statements[0], "INSERT INTO users VALUES (2, 'Bob')");
assert_eq!(
read_statements[1],
"INSERT INTO users VALUES (3, 'Charlie')"
);
storage.delete_transaction(&file_path).await.unwrap();
}
#[tokio::test]
async fn test_file_extension() {
let storage = UncompressedStorage::new();
assert_eq!(storage.file_extension(), "sql");
}
#[tokio::test]
async fn test_transform_path() {
let storage = UncompressedStorage::new();
let base = PathBuf::from("/tmp/transaction_123");
let transformed = storage.transform_path(&base);
assert_eq!(transformed, PathBuf::from("/tmp/transaction_123.sql"));
}
}