use crate::error::ZerobusError;
use crate::utils::file_rotation::rotate_file_if_needed;
use arrow::record_batch::RecordBatch;
use prost::Message;
use prost_types::DescriptorProto;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::{debug, info};
const ROTATION_BATCH_SIZE: usize = 1000;
pub struct DebugWriter {
#[allow(dead_code)]
output_dir: PathBuf,
arrow_writer:
Arc<tokio::sync::Mutex<Option<arrow::ipc::writer::StreamWriter<BufWriter<std::fs::File>>>>>,
protobuf_writer: Arc<tokio::sync::Mutex<Option<BufWriter<std::fs::File>>>>,
arrow_file_path: Arc<tokio::sync::Mutex<PathBuf>>,
protobuf_file_path: Arc<tokio::sync::Mutex<PathBuf>>,
flush_interval: Duration,
max_file_size: Option<u64>,
last_flush: Arc<Mutex<Instant>>,
arrow_record_count: Arc<Mutex<usize>>,
protobuf_record_count: Arc<Mutex<usize>>,
}
impl DebugWriter {
pub fn new(
output_dir: PathBuf,
table_name: String,
flush_interval: Duration,
max_file_size: Option<u64>,
) -> Result<Self, ZerobusError> {
let arrow_dir = output_dir.join("zerobus/arrow");
let proto_dir = output_dir.join("zerobus/proto");
std::fs::create_dir_all(&arrow_dir).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to create arrow output directory: {}",
e
))
})?;
std::fs::create_dir_all(&proto_dir).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to create proto output directory: {}",
e
))
})?;
let sanitized_table_name = table_name.replace(['.', '/'], "_");
let arrow_file_path = arrow_dir.join(format!("{}.arrows", sanitized_table_name));
let protobuf_file_path = proto_dir.join(format!("{}.proto", sanitized_table_name));
Ok(Self {
output_dir,
arrow_writer: Arc::new(tokio::sync::Mutex::new(None)),
protobuf_writer: Arc::new(tokio::sync::Mutex::new(None)),
arrow_file_path: Arc::new(tokio::sync::Mutex::new(arrow_file_path)),
protobuf_file_path: Arc::new(tokio::sync::Mutex::new(protobuf_file_path)),
flush_interval,
max_file_size,
last_flush: Arc::new(Mutex::new(Instant::now())),
arrow_record_count: Arc::new(Mutex::new(0)),
protobuf_record_count: Arc::new(Mutex::new(0)),
})
}
fn generate_rotated_path(base_path: &std::path::Path) -> PathBuf {
let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
let parent = base_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."));
let stem = base_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("file");
let extension = base_path.extension().and_then(|s| s.to_str()).unwrap_or("");
parent.join(format!("{}_{}.{}", stem, timestamp, extension))
}
async fn ensure_arrow_writer(
&self,
schema: &arrow::datatypes::Schema,
) -> Result<(), ZerobusError> {
let mut writer_guard = self.arrow_writer.lock().await;
if writer_guard.is_none() {
let file_path_guard = self.arrow_file_path.lock().await;
let file_path = file_path_guard.clone();
drop(file_path_guard);
let file = std::fs::File::create(&file_path).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to create Arrow debug file: {}",
e
))
})?;
let buf_writer = BufWriter::new(file);
let writer =
arrow::ipc::writer::StreamWriter::try_new(buf_writer, schema).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to create Arrow IPC stream writer: {}",
e
))
})?;
*writer_guard = Some(writer);
info!("✅ Created Arrow IPC stream file: {}", file_path.display());
}
Ok(())
}
async fn ensure_protobuf_writer(&self) -> Result<(), ZerobusError> {
let mut writer_guard = self.protobuf_writer.lock().await;
if writer_guard.is_none() {
let file_path_guard = self.protobuf_file_path.lock().await;
let file_path = file_path_guard.clone();
drop(file_path_guard);
let file = std::fs::File::create(&file_path).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to create Protobuf debug file: {}",
e
))
})?;
*writer_guard = Some(BufWriter::new(file));
info!("✅ Created Protobuf file: {}", file_path.display());
}
Ok(())
}
async fn rotate_arrow_file_if_needed(&self, batch_rows: usize) -> Result<bool, ZerobusError> {
let mut record_count_guard = self.arrow_record_count.lock().await;
let current_count = *record_count_guard;
let new_count = current_count + batch_rows;
let needs_rotation = new_count >= ROTATION_BATCH_SIZE;
if needs_rotation {
let mut writer_guard = self.arrow_writer.lock().await;
if let Some(writer) = writer_guard.take() {
drop(writer);
}
drop(writer_guard);
let mut file_path_guard = self.arrow_file_path.lock().await;
let old_path = file_path_guard.clone();
let new_path = Self::generate_rotated_path(&old_path);
*file_path_guard = new_path.clone();
drop(file_path_guard);
*record_count_guard = 0;
info!(
"🔄 Rotated Arrow file: {} -> {} (wrote {} records)",
old_path.display(),
new_path.display(),
current_count
);
Ok(true)
} else {
if let Some(max_size) = self.max_file_size {
let file_path_guard = self.arrow_file_path.lock().await;
let file_path = file_path_guard.clone();
drop(file_path_guard);
if let Some(new_path) =
rotate_file_if_needed(&file_path, max_size).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to check Arrow file size: {}",
e
))
})?
{
let mut writer_guard = self.arrow_writer.lock().await;
if let Some(writer) = writer_guard.take() {
drop(writer);
}
drop(writer_guard);
let mut file_path_guard = self.arrow_file_path.lock().await;
*file_path_guard = new_path.clone();
drop(file_path_guard);
*record_count_guard = 0;
info!(
"🔄 Rotated Arrow file due to size limit: {}",
new_path.display()
);
return Ok(true);
}
}
Ok(false)
}
}
async fn rotate_protobuf_file_if_needed(
&self,
record_count: usize,
) -> Result<bool, ZerobusError> {
let mut record_count_guard = self.protobuf_record_count.lock().await;
let current_count = *record_count_guard;
let new_count = current_count + record_count;
let needs_rotation = new_count >= ROTATION_BATCH_SIZE;
if needs_rotation {
let mut writer_guard = self.protobuf_writer.lock().await;
if let Some(mut writer) = writer_guard.take() {
writer.flush().map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to flush Protobuf file before rotation: {}",
e
))
})?;
drop(writer);
}
drop(writer_guard);
let mut file_path_guard = self.protobuf_file_path.lock().await;
let old_path = file_path_guard.clone();
let new_path = Self::generate_rotated_path(&old_path);
*file_path_guard = new_path.clone();
drop(file_path_guard);
*record_count_guard = 0;
info!(
"🔄 Rotated Protobuf file: {} -> {} (wrote {} records)",
old_path.display(),
new_path.display(),
current_count
);
Ok(true)
} else {
if let Some(max_size) = self.max_file_size {
let file_path_guard = self.protobuf_file_path.lock().await;
let file_path = file_path_guard.clone();
drop(file_path_guard);
if let Some(new_path) =
rotate_file_if_needed(&file_path, max_size).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to check Protobuf file size: {}",
e
))
})?
{
let mut writer_guard = self.protobuf_writer.lock().await;
if let Some(mut writer) = writer_guard.take() {
writer.flush().map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to flush Protobuf file before rotation: {}",
e
))
})?;
drop(writer);
}
drop(writer_guard);
let mut file_path_guard = self.protobuf_file_path.lock().await;
*file_path_guard = new_path.clone();
drop(file_path_guard);
*record_count_guard = 0;
info!(
"🔄 Rotated Protobuf file due to size limit: {}",
new_path.display()
);
return Ok(true);
}
}
Ok(false)
}
}
pub async fn write_arrow(&self, batch: &RecordBatch) -> Result<(), ZerobusError> {
let batch_rows = batch.num_rows();
let _rotated = self.rotate_arrow_file_if_needed(batch_rows).await?;
self.ensure_arrow_writer(batch.schema().as_ref()).await?;
let mut writer_guard = self.arrow_writer.lock().await;
if let Some(ref mut writer) = *writer_guard {
writer.write(batch).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to write Arrow RecordBatch: {}",
e
))
})?;
}
drop(writer_guard);
let mut record_count_guard = self.arrow_record_count.lock().await;
*record_count_guard += batch_rows;
drop(record_count_guard);
debug!(
"Wrote Arrow RecordBatch ({} rows) to debug file",
batch_rows
);
Ok(())
}
pub async fn write_protobuf(
&self,
protobuf_bytes: &[u8],
flush_immediately: bool,
) -> Result<(), ZerobusError> {
let _rotated = self.rotate_protobuf_file_if_needed(1).await?;
self.ensure_protobuf_writer().await?;
let mut writer_guard = self.protobuf_writer.lock().await;
if let Some(ref mut writer) = *writer_guard {
writer.write_all(protobuf_bytes).map_err(|e| {
ZerobusError::ConfigurationError(format!("Failed to write Protobuf bytes: {}", e))
})?;
writer.write_all(b"\n").map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to write Protobuf separator: {}",
e
))
})?;
if flush_immediately {
writer.flush().map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to flush Protobuf file: {}",
e
))
})?;
}
}
drop(writer_guard);
let mut record_count_guard = self.protobuf_record_count.lock().await;
*record_count_guard += 1;
drop(record_count_guard);
debug!(
"Wrote {} bytes to Protobuf debug file{}",
protobuf_bytes.len(),
if flush_immediately { " (flushed)" } else { "" }
);
Ok(())
}
pub async fn write_descriptor(
&self,
table_name: &str,
descriptor: &DescriptorProto,
) -> Result<(), ZerobusError> {
let descriptors_dir = self.output_dir.join("zerobus/descriptors");
std::fs::create_dir_all(&descriptors_dir).map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to create descriptors directory: {}",
e
))
})?;
let sanitized_table_name = table_name.replace(['.', '/'], "_");
let descriptor_file_path = descriptors_dir.join(format!("{}.pb", sanitized_table_name));
if descriptor_file_path.exists() {
debug!(
"Descriptor file already exists for table {}: {}",
table_name,
descriptor_file_path.display()
);
return Ok(());
}
let mut descriptor_bytes = Vec::new();
descriptor.encode(&mut descriptor_bytes).map_err(|e| {
ZerobusError::ConfigurationError(format!("Failed to encode Protobuf descriptor: {}", e))
})?;
let mut file = std::fs::File::create(&descriptor_file_path).map_err(|e| {
ZerobusError::ConfigurationError(format!("Failed to create descriptor file: {}", e))
})?;
file.write_all(&descriptor_bytes).map_err(|e| {
ZerobusError::ConfigurationError(format!("Failed to write descriptor bytes: {}", e))
})?;
file.sync_all().map_err(|e| {
ZerobusError::ConfigurationError(format!("Failed to sync descriptor file: {}", e))
})?;
let descriptor_name = descriptor.name.as_deref().unwrap_or("unknown");
info!("✅ Wrote Protobuf descriptor for table '{}' to: {} (descriptor name: '{}', {} fields, {} nested types)",
table_name, descriptor_file_path.display(), descriptor_name,
descriptor.field.len(), descriptor.nested_type.len());
Ok(())
}
pub async fn flush(&self) -> Result<(), ZerobusError> {
let _arrow_guard = self.arrow_writer.lock().await;
let mut proto_guard = self.protobuf_writer.lock().await;
if let Some(ref mut writer) = *proto_guard {
writer.flush().map_err(|e| {
ZerobusError::ConfigurationError(format!("Failed to flush Protobuf file: {}", e))
})?;
}
drop(proto_guard);
let mut last_flush = self.last_flush.lock().await;
*last_flush = Instant::now();
debug!("Flushed debug files to disk");
Ok(())
}
pub async fn should_flush(&self) -> bool {
let last_flush = self.last_flush.lock().await;
last_flush.elapsed() >= self.flush_interval
}
}