pub mod auth;
pub mod conversion;
pub mod debug;
pub mod protobuf_serialization;
pub mod retry;
pub mod zerobus;
use crate::config::WrapperConfiguration;
use crate::error::ZerobusError;
use crate::observability::ObservabilityManager;
use crate::wrapper::retry::RetryConfig;
use arrow::record_batch::RecordBatch;
use secrecy::ExposeSecret;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
struct BatchTransmissionResult {
successful_rows: Vec<usize>,
failed_rows: Vec<(usize, ZerobusError)>,
}
#[derive(Debug, Clone)]
pub struct TransmissionResult {
pub success: bool,
pub error: Option<ZerobusError>,
pub attempts: u32,
pub latency_ms: Option<u64>,
pub batch_size_bytes: usize,
pub failed_rows: Option<Vec<(usize, ZerobusError)>>,
pub successful_rows: Option<Vec<usize>>,
pub total_rows: usize,
pub successful_count: usize,
pub failed_count: usize,
}
impl TransmissionResult {
pub fn is_partial_success(&self) -> bool {
self.success && self.successful_count > 0 && self.failed_count > 0
}
pub fn has_failed_rows(&self) -> bool {
self.failed_rows
.as_ref()
.map(|rows| !rows.is_empty())
.unwrap_or(false)
}
pub fn has_successful_rows(&self) -> bool {
self.successful_rows
.as_ref()
.map(|rows| !rows.is_empty())
.unwrap_or(false)
}
pub fn get_failed_row_indices(&self) -> Vec<usize> {
self.failed_rows
.as_ref()
.map(|rows| rows.iter().map(|(idx, _)| *idx).collect())
.unwrap_or_default()
}
pub fn get_successful_row_indices(&self) -> Vec<usize> {
self.successful_rows.clone().unwrap_or_default()
}
pub fn extract_failed_batch(&self, original_batch: &RecordBatch) -> Option<RecordBatch> {
let failed_indices = self.get_failed_row_indices();
if failed_indices.is_empty() {
return None;
}
let mut rows_to_extract = failed_indices;
rows_to_extract.sort();
let mut arrays = Vec::new();
for array in original_batch.columns() {
let taken = arrow::compute::take(
array,
&arrow::array::UInt32Array::from(
rows_to_extract
.iter()
.map(|&idx| idx as u32)
.collect::<Vec<_>>(),
),
None,
)
.ok()?;
arrays.push(taken);
}
RecordBatch::try_new(original_batch.schema(), arrays).ok()
}
pub fn extract_successful_batch(&self, original_batch: &RecordBatch) -> Option<RecordBatch> {
let successful_indices = self.get_successful_row_indices();
if successful_indices.is_empty() {
return None;
}
let mut rows_to_extract = successful_indices;
rows_to_extract.sort();
let mut arrays = Vec::new();
for array in original_batch.columns() {
let taken = arrow::compute::take(
array,
&arrow::array::UInt32Array::from(
rows_to_extract
.iter()
.map(|&idx| idx as u32)
.collect::<Vec<_>>(),
),
None,
)
.ok()?;
arrays.push(taken);
}
RecordBatch::try_new(original_batch.schema(), arrays).ok()
}
pub fn get_failed_row_indices_by_error_type<F>(&self, predicate: F) -> Vec<usize>
where
F: Fn(&ZerobusError) -> bool,
{
self.failed_rows
.as_ref()
.map(|rows| {
rows.iter()
.filter_map(
|(idx, error)| {
if predicate(error) {
Some(*idx)
} else {
None
}
},
)
.collect()
})
.unwrap_or_default()
}
pub fn group_errors_by_type(&self) -> std::collections::HashMap<String, Vec<usize>> {
let mut grouped: std::collections::HashMap<String, Vec<usize>> =
std::collections::HashMap::new();
if let Some(failed_rows) = &self.failed_rows {
for (row_idx, error) in failed_rows {
let error_type = match error {
ZerobusError::ConfigurationError(_) => "ConfigurationError",
ZerobusError::AuthenticationError(_) => "AuthenticationError",
ZerobusError::ConnectionError(_) => "ConnectionError",
ZerobusError::ConversionError(_) => "ConversionError",
ZerobusError::TransmissionError(_) => "TransmissionError",
ZerobusError::RetryExhausted(_) => "RetryExhausted",
ZerobusError::TokenRefreshError(_) => "TokenRefreshError",
};
grouped
.entry(error_type.to_string())
.or_default()
.push(*row_idx);
}
}
grouped
}
pub fn get_error_statistics(&self) -> ErrorStatistics {
let success_rate = if self.total_rows > 0 {
self.successful_count as f64 / self.total_rows as f64
} else {
0.0
};
let failure_rate = if self.total_rows > 0 {
self.failed_count as f64 / self.total_rows as f64
} else {
0.0
};
let mut error_type_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
if let Some(failed_rows) = &self.failed_rows {
for (_, error) in failed_rows {
let error_type = match error {
ZerobusError::ConfigurationError(_) => "ConfigurationError",
ZerobusError::AuthenticationError(_) => "AuthenticationError",
ZerobusError::ConnectionError(_) => "ConnectionError",
ZerobusError::ConversionError(_) => "ConversionError",
ZerobusError::TransmissionError(_) => "TransmissionError",
ZerobusError::RetryExhausted(_) => "RetryExhausted",
ZerobusError::TokenRefreshError(_) => "TokenRefreshError",
};
*error_type_counts.entry(error_type.to_string()).or_insert(0) += 1;
}
}
ErrorStatistics {
total_rows: self.total_rows,
successful_count: self.successful_count,
failed_count: self.failed_count,
success_rate,
failure_rate,
error_type_counts,
}
}
pub fn get_error_messages(&self) -> Vec<String> {
self.failed_rows
.as_ref()
.map(|rows| rows.iter().map(|(_, error)| error.to_string()).collect())
.unwrap_or_default()
}
}
#[derive(Debug, Clone)]
pub struct ErrorStatistics {
pub total_rows: usize,
pub successful_count: usize,
pub failed_count: usize,
pub success_rate: f64,
pub failure_rate: f64,
pub error_type_counts: std::collections::HashMap<String, usize>,
}
pub struct ZerobusWrapper {
config: Arc<WrapperConfiguration>,
sdk: Arc<Mutex<Option<databricks_zerobus_ingest_sdk::ZerobusSdk>>>,
stream: Arc<Mutex<Option<databricks_zerobus_ingest_sdk::ZerobusStream>>>,
retry_config: RetryConfig,
observability: Option<ObservabilityManager>,
debug_writer: Option<Arc<crate::wrapper::debug::DebugWriter>>,
descriptor_written: Arc<tokio::sync::Mutex<bool>>,
}
impl ZerobusWrapper {
fn validate_and_normalize_endpoint(endpoint: &str) -> Result<String, ZerobusError> {
let normalized_endpoint = endpoint.trim().to_string();
if normalized_endpoint.is_empty() {
return Err(ZerobusError::ConfigurationError(
"zerobus_endpoint cannot be empty".to_string(),
));
}
if !normalized_endpoint.starts_with("https://")
&& !normalized_endpoint.starts_with("http://")
{
return Err(ZerobusError::ConfigurationError(format!(
"zerobus_endpoint must start with 'https://' or 'http://'. Got: '{}'",
normalized_endpoint
)));
}
Ok(normalized_endpoint)
}
pub async fn new(config: WrapperConfiguration) -> Result<Self, ZerobusError> {
info!("Initializing ZerobusWrapper");
config.validate()?;
let normalized_endpoint = Self::validate_and_normalize_endpoint(&config.zerobus_endpoint)?;
if !config.zerobus_writer_disabled {
let unity_catalog_url = config
.unity_catalog_url
.as_ref()
.ok_or_else(|| {
ZerobusError::ConfigurationError(
"unity_catalog_url is required for SDK".to_string(),
)
})?
.clone();
let _client_id = config.client_id.as_ref().ok_or_else(|| {
ZerobusError::ConfigurationError("client_id is required for SDK".to_string())
})?;
let _client_secret = config.client_secret.as_ref().ok_or_else(|| {
ZerobusError::ConfigurationError("client_secret is required for SDK".to_string())
})?;
info!("Zerobus endpoint: {}", normalized_endpoint);
info!("Unity Catalog URL: {}", unity_catalog_url);
} else {
info!(
"Zerobus endpoint: {} (writer disabled mode)",
normalized_endpoint
);
}
let sdk = Arc::new(Mutex::new(None));
let retry_config = RetryConfig::new(
config.retry_max_attempts,
config.retry_base_delay_ms,
config.retry_max_delay_ms,
);
let observability = if config.observability_enabled {
ObservabilityManager::new_async(config.observability_config.clone()).await
} else {
None
};
if observability.is_some() {
info!("Observability enabled");
}
let any_debug_enabled =
config.debug_arrow_enabled || config.debug_protobuf_enabled || config.debug_enabled;
info!(
"ZerobusWrapper::new: debug_arrow_enabled={}, debug_protobuf_enabled={}, debug_enabled={}, debug_output_dir={:?}",
config.debug_arrow_enabled, config.debug_protobuf_enabled, config.debug_enabled, config.debug_output_dir
);
let debug_writer = if any_debug_enabled {
if let Some(output_dir) = &config.debug_output_dir {
use crate::wrapper::debug::DebugWriter;
use std::time::Duration;
info!(
"Initializing debug writer with output_dir: {}, table_name: {}, arrow_enabled: {}, protobuf_enabled: {}",
output_dir.display(),
config.table_name,
config.debug_arrow_enabled,
config.debug_protobuf_enabled
);
match DebugWriter::new(
output_dir.clone(),
config.table_name.clone(),
Duration::from_secs(config.debug_flush_interval_secs),
config.debug_max_file_size,
config.debug_max_files_retained,
) {
Ok(writer) => {
info!(
"Debug file output enabled: {} (Arrow: {}, Protobuf: {})",
output_dir.display(),
config.debug_arrow_enabled,
config.debug_protobuf_enabled
);
Some(Arc::new(writer))
}
Err(e) => {
warn!("Failed to initialize debug writer: {}", e);
None
}
}
} else {
let mut enabled_flags = Vec::new();
if config.debug_arrow_enabled {
enabled_flags.push("debug_arrow_enabled");
}
if config.debug_protobuf_enabled {
enabled_flags.push("debug_protobuf_enabled");
}
if config.debug_enabled {
enabled_flags.push("debug_enabled");
}
warn!(
"Debug flag(s) enabled ({}) but debug_output_dir is None - debug files will not be written",
enabled_flags.join(", ")
);
None
}
} else {
info!("All debug flags are false - debug files will not be written");
None
};
Ok(Self {
config: Arc::new(config),
sdk,
stream: Arc::new(Mutex::new(None)),
retry_config,
observability,
debug_writer,
descriptor_written: Arc::new(tokio::sync::Mutex::new(false)),
})
}
pub async fn send_batch(&self, batch: RecordBatch) -> Result<TransmissionResult, ZerobusError> {
self.send_batch_with_descriptor(batch, None).await
}
pub async fn send_batch_with_descriptor(
&self,
batch: RecordBatch,
descriptor: Option<prost_types::DescriptorProto>,
) -> Result<TransmissionResult, ZerobusError> {
let start_time = std::time::Instant::now();
let batch_size_bytes = batch.get_array_memory_size();
debug!(
"Sending batch with {} rows, {} bytes",
batch.num_rows(),
batch_size_bytes
);
if self.config.debug_arrow_enabled {
if let Some(ref debug_writer) = self.debug_writer {
if let Err(e) = debug_writer.write_arrow(&batch).await {
warn!("Failed to write Arrow debug file: {}", e);
}
}
}
let _span = self
.observability
.as_ref()
.map(|obs| obs.start_send_batch_span(&self.config.table_name));
let (result, attempts) = self
.retry_config
.execute_with_retry_tracked(|| {
let batch = batch.clone();
let descriptor = descriptor.clone();
let wrapper = self.clone();
async move { wrapper.send_batch_internal(batch, descriptor).await }
})
.await;
let latency_ms = start_time.elapsed().as_millis() as u64;
if let Some(obs) = &self.observability {
let success = result.is_ok();
obs.record_batch_sent(batch_size_bytes, success, latency_ms)
.await;
}
let total_rows = batch.num_rows();
if total_rows == 0 {
return Ok(TransmissionResult {
success: true, error: None,
attempts,
latency_ms: Some(latency_ms),
batch_size_bytes,
failed_rows: None,
successful_rows: None,
total_rows: 0,
successful_count: 0,
failed_count: 0,
});
}
match result {
Ok(batch_result) => {
let mut all_failed_rows = batch_result.failed_rows;
let successful_rows = batch_result.successful_rows;
let successful_count = successful_rows.len();
let failed_count = all_failed_rows.len();
let overall_success = successful_count > 0;
all_failed_rows.sort_by_key(|(idx, _)| *idx);
crate::wrapper::zerobus::update_failure_rate(
&self.config.table_name,
total_rows,
&all_failed_rows,
);
Ok(TransmissionResult {
success: overall_success,
error: None, attempts,
latency_ms: Some(latency_ms),
batch_size_bytes,
failed_rows: if all_failed_rows.is_empty() {
None
} else {
Some(all_failed_rows)
},
successful_rows: if successful_rows.is_empty() {
None
} else {
Some(successful_rows)
},
total_rows,
successful_count,
failed_count,
})
}
Err(e) => {
error!("Failed to send batch after retries: {}", e);
Ok(TransmissionResult {
success: false,
error: Some(e),
attempts,
latency_ms: Some(latency_ms),
batch_size_bytes,
failed_rows: None, successful_rows: None,
total_rows,
successful_count: 0,
failed_count: 0, })
}
}
}
async fn send_batch_internal(
&self,
batch: RecordBatch,
descriptor: Option<prost_types::DescriptorProto>,
) -> Result<BatchTransmissionResult, ZerobusError> {
if self.config.zerobus_writer_disabled {
debug!(
"Writer disabled mode enabled - skipping SDK initialization and Zerobus SDK calls"
);
} else {
{
let mut sdk_guard = self.sdk.lock().await;
if sdk_guard.is_none() {
let unity_catalog_url = self
.config
.unity_catalog_url
.as_ref()
.ok_or_else(|| {
ZerobusError::ConfigurationError(
"unity_catalog_url is required".to_string(),
)
})?
.clone();
let sdk = crate::wrapper::zerobus::create_sdk(
self.config.zerobus_endpoint.clone(),
unity_catalog_url,
)
.await?;
*sdk_guard = Some(sdk);
}
}
}
let descriptor = if let Some(provided_descriptor) = descriptor {
crate::wrapper::conversion::validate_protobuf_descriptor(&provided_descriptor)
.map_err(|e| {
ZerobusError::ConfigurationError(format!("Invalid Protobuf descriptor: {}", e))
})?;
let descriptor_name = provided_descriptor.name.as_deref().unwrap_or("unknown");
info!("🔍 [DEBUG] Using provided Protobuf descriptor: name='{}', fields={}, nested_types={}",
descriptor_name, provided_descriptor.field.len(), provided_descriptor.nested_type.len());
provided_descriptor
} else {
debug!("Auto-generating Protobuf descriptor from Arrow schema");
let generated =
crate::wrapper::conversion::generate_protobuf_descriptor(batch.schema().as_ref())
.map_err(|e| {
ZerobusError::ConversionError(format!(
"Failed to generate Protobuf descriptor: {}",
e
))
})?;
crate::wrapper::conversion::validate_protobuf_descriptor(&generated).map_err(|e| {
ZerobusError::ConversionError(format!(
"Generated Protobuf descriptor failed validation: {}",
e
))
})?;
let descriptor_name = generated.name.as_deref().unwrap_or("unknown");
info!("🔍 [DEBUG] Auto-generated Protobuf descriptor: name='{}', fields={}, nested_types={}",
descriptor_name, generated.field.len(), generated.nested_type.len());
generated
};
if self.config.debug_arrow_enabled || self.config.debug_protobuf_enabled {
if let Some(ref debug_writer) = self.debug_writer {
let mut written_guard = self.descriptor_written.lock().await;
if !*written_guard {
if let Err(e) = debug_writer
.write_descriptor(&self.config.table_name, &descriptor)
.await
{
warn!("Failed to write Protobuf descriptor to debug file: {}", e);
} else {
*written_guard = true;
}
}
}
}
let conversion_result =
crate::wrapper::conversion::record_batch_to_protobuf_bytes(&batch, &descriptor);
let conversion_errors = conversion_result.failed_rows;
if self.config.debug_protobuf_enabled {
if let Some(ref debug_writer) = self.debug_writer {
info!(
"Writing {} protobuf messages to debug file",
conversion_result.successful_bytes.len()
);
let num_rows = conversion_result.successful_bytes.len();
for (idx, (_, bytes)) in conversion_result.successful_bytes.iter().enumerate() {
let flush_immediately = idx == num_rows - 1;
if let Err(e) = debug_writer.write_protobuf(bytes, flush_immediately).await {
warn!("Failed to write Protobuf debug file: {}", e);
} else if flush_immediately {
info!(
"✅ Flushed protobuf debug file after batch ({} messages)",
num_rows
);
}
}
} else {
warn!("⚠️ Debug writer is None - protobuf debug files will not be written. Check debug_protobuf_enabled and debug_output_dir config.");
}
}
if self.config.zerobus_writer_disabled {
debug!(
"Writer disabled mode enabled - skipping Zerobus SDK calls. Debug files written successfully."
);
let successful_indices: Vec<usize> = conversion_result
.successful_bytes
.iter()
.map(|(idx, _)| *idx)
.collect();
return Ok(BatchTransmissionResult {
successful_rows: successful_indices,
failed_rows: conversion_errors,
});
}
let sdk_guard = self.sdk.lock().await;
let sdk = sdk_guard.as_ref().ok_or_else(|| {
ZerobusError::ConfigurationError(
"SDK not initialized - this should not happen".to_string(),
)
})?;
let client_id = self
.config
.client_id
.as_ref()
.ok_or_else(|| ZerobusError::ConfigurationError("client_id is required".to_string()))?
.expose_secret()
.clone();
let client_secret = self
.config
.client_secret
.as_ref()
.ok_or_else(|| {
ZerobusError::ConfigurationError("client_secret is required".to_string())
})?
.expose_secret()
.clone();
{
use crate::wrapper::zerobus::{check_error_6006_backoff, check_failure_rate_backoff};
check_error_6006_backoff(&self.config.table_name).await?;
check_failure_rate_backoff(&self.config.table_name).await?;
}
let mut retry_count = 0;
const MAX_STREAM_RECREATE_ATTEMPTS: u32 = 3;
let mut transmission_errors: Vec<(usize, ZerobusError)> = Vec::new();
let mut successful_indices: Vec<usize> = Vec::new();
loop {
let mut stream_guard = self.stream.lock().await;
if stream_guard.is_none() {
info!(
"Stream not found, creating new stream for table: {}",
self.config.table_name
);
let stream = crate::wrapper::zerobus::ensure_stream(
sdk,
self.config.table_name.clone(),
descriptor.clone(),
client_id.clone(),
client_secret.clone(),
)
.await?;
*stream_guard = Some(stream);
info!("✅ Stream created successfully");
}
if stream_guard.is_none() {
return Err(ZerobusError::ConnectionError(
"Stream was None after creation - this should not happen".to_string(),
));
}
drop(stream_guard);
let mut attempt_transmission_errors: Vec<(usize, ZerobusError)> = Vec::new();
let mut attempt_successful_indices: Vec<usize> = Vec::new();
let mut all_succeeded = true;
let mut failed_at_idx = 0;
const BATCH_SIZE: usize = 1000; const BATCH_SIZE_BYTES: usize = 10 * 1024 * 1024; type IngestFuture = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<i64, databricks_zerobus_ingest_sdk::ZerobusError>,
> + Send,
>,
>;
let mut pending_futures: Vec<(usize, IngestFuture)> = Vec::new();
let mut total_bytes_buffered = 0usize;
let mut should_break_outer = false;
for (original_row_idx, bytes) in conversion_result.successful_bytes.iter() {
let idx = *original_row_idx;
{
use crate::wrapper::zerobus::{
check_error_6006_backoff, check_failure_rate_backoff,
};
if let Err(_backoff_err) =
check_error_6006_backoff(&self.config.table_name).await
{
let mut stream_guard = self.stream.lock().await;
*stream_guard = None;
drop(stream_guard);
for remaining_idx in idx..conversion_result.successful_bytes.len() {
if let Some((orig_idx, _)) =
conversion_result.successful_bytes.get(remaining_idx)
{
attempt_transmission_errors.push((
*orig_idx,
ZerobusError::ConnectionError(
"Backoff period active - row processing stopped"
.to_string(),
),
));
}
}
all_succeeded = false;
failed_at_idx = idx;
break;
}
if let Err(_backoff_err) =
check_failure_rate_backoff(&self.config.table_name).await
{
let mut stream_guard = self.stream.lock().await;
*stream_guard = None;
drop(stream_guard);
for remaining_idx in idx..conversion_result.successful_bytes.len() {
if let Some((orig_idx, _)) =
conversion_result.successful_bytes.get(remaining_idx)
{
attempt_transmission_errors.push((
*orig_idx,
ZerobusError::ConnectionError(
"High failure rate backoff active - row processing stopped"
.to_string(),
),
));
}
}
all_succeeded = false;
failed_at_idx = idx;
break;
}
}
let mut stream_guard = self.stream.lock().await;
if stream_guard.is_none() {
info!(
"Stream was cleared, recreating for table: {}",
self.config.table_name
);
let stream = crate::wrapper::zerobus::ensure_stream(
sdk,
self.config.table_name.clone(),
descriptor.clone(),
client_id.clone(),
client_secret.clone(),
)
.await?;
*stream_guard = Some(stream);
}
let stream = stream_guard.as_mut().ok_or_else(|| {
ZerobusError::ConnectionError(
"Stream was None after recreation - this should not happen".to_string(),
)
})?;
match stream.ingest_record(bytes.clone()).await {
Ok(ingest_future) => {
drop(stream_guard);
pending_futures.push((idx, Box::pin(ingest_future)));
total_bytes_buffered += bytes.len();
if pending_futures.len() >= BATCH_SIZE
|| total_bytes_buffered >= BATCH_SIZE_BYTES
{
{
let mut stream_guard = self.stream.lock().await;
if let Some(ref mut stream) = *stream_guard {
if let Err(e) = stream.flush().await {
error!(
"Failed to flush Zerobus stream during batch: {}",
e
);
for (pending_idx, _) in pending_futures.drain(..) {
attempt_transmission_errors.push((
pending_idx,
ZerobusError::ConnectionError(format!(
"Flush failed during batch processing: {}",
e
)),
));
}
all_succeeded = false;
failed_at_idx = idx;
break;
}
}
}
for (pending_idx, mut future) in pending_futures.drain(..) {
match future.as_mut().await {
Ok(_ack_id) => {
debug!(
"✅ Successfully sent record to Zerobus stream (row {}, ack_id={})",
pending_idx, _ack_id
);
attempt_successful_indices.push(pending_idx);
}
Err(e) => {
let err_msg = format!("{}", e);
if err_msg.contains("Stream is closed")
|| err_msg.contains("Stream closed")
{
let is_first = pending_idx == 0;
error!(
"Stream closed: row={}, first_record={}, error={}",
pending_idx, is_first, err_msg
);
if is_first {
error!("Diagnostics: Stream closed during batch processing");
error!("Possible causes:");
error!(" 1. Schema mismatch between descriptor and table");
error!(" 2. Validation error");
error!(" 3. Server-side issue");
}
let mut stream_guard = self.stream.lock().await;
*stream_guard = None;
drop(stream_guard);
attempt_transmission_errors.push((
pending_idx,
ZerobusError::ConnectionError(format!(
"Stream closed: row={}, error={}",
pending_idx, err_msg
)),
));
all_succeeded = false;
failed_at_idx = pending_idx;
break;
} else {
attempt_transmission_errors.push((
pending_idx,
ZerobusError::TransmissionError(format!(
"Record ingestion failed: row={}, error={}",
pending_idx, e
)),
));
all_succeeded = false;
}
}
}
}
total_bytes_buffered = 0;
if !all_succeeded && failed_at_idx > 0 {
should_break_outer = true;
}
}
}
Err(e) => {
let err_msg = format!("{}", e);
if err_msg.contains("Stream is closed") || err_msg.contains("Stream closed")
{
let is_first = idx == 0;
error!(
"Stream closed: row={}, first_record={}, error={}",
idx, is_first, err_msg
);
if is_first {
error!("Diagnostics: This is the FIRST record - stream closed immediately");
error!("Possible causes:");
error!(" 1. Schema mismatch between descriptor and table");
error!(" 2. Validation error on first record");
error!(" 3. Table schema not yet propagated");
error!(
"Descriptor info: fields={}, nested_types={}",
descriptor.field.len(),
descriptor.nested_type.len()
);
}
*stream_guard = None;
drop(stream_guard);
let stream_error = ZerobusError::ConnectionError(format!(
"Stream closed: row={}, error={}",
idx, err_msg
));
attempt_transmission_errors.push((idx, stream_error));
all_succeeded = false;
failed_at_idx = idx;
should_break_outer = true;
break;
} else {
let transmission_error = ZerobusError::ConnectionError(format!(
"Record creation failed: row={}, error={}",
idx, e
));
attempt_transmission_errors.push((idx, transmission_error));
all_succeeded = false;
}
}
}
}
if !pending_futures.is_empty() {
{
let mut stream_guard = self.stream.lock().await;
if let Some(ref mut stream) = *stream_guard {
match stream.flush().await {
Ok(_) => {
debug!(
"✅ Flushed Zerobus stream for {} remaining pending futures",
pending_futures.len()
);
}
Err(e) => {
warn!("Failed to flush Zerobus stream for remaining records (stream may be closed): {}", e);
}
}
} else {
warn!("Stream is None when trying to flush remaining records - records may be lost");
for (pending_idx, _) in pending_futures.drain(..) {
attempt_transmission_errors.push((
pending_idx,
ZerobusError::ConnectionError(
"Stream was closed before flushing remaining records"
.to_string(),
),
));
}
all_succeeded = false;
}
}
for (pending_idx, mut future) in pending_futures.drain(..) {
match future.as_mut().await {
Ok(_ack_id) => {
debug!(
"✅ Successfully acknowledged record (row {}, ack_id={})",
pending_idx, _ack_id
);
attempt_successful_indices.push(pending_idx);
}
Err(e) => {
let err_msg = format!("{}", e);
if err_msg.contains("Stream is closed")
|| err_msg.contains("Stream closed")
{
let mut stream_guard = self.stream.lock().await;
*stream_guard = None;
drop(stream_guard);
attempt_transmission_errors.push((
pending_idx,
ZerobusError::ConnectionError(format!(
"Stream closed before acknowledgment: row={}, error={}",
pending_idx, err_msg
)),
));
all_succeeded = false;
} else {
attempt_transmission_errors.push((
pending_idx,
ZerobusError::TransmissionError(format!(
"Record acknowledgment failed: row={}, error={}",
pending_idx, e
)),
));
all_succeeded = false;
}
}
}
}
}
if should_break_outer {
break;
}
if all_succeeded {
{
let mut stream_guard = self.stream.lock().await;
if let Some(ref mut stream) = *stream_guard {
if let Err(e) = stream.flush().await {
error!("Failed to flush Zerobus stream after batch: {}", e);
} else {
debug!(
"✅ Flushed Zerobus stream after sending {} records",
attempt_successful_indices.len()
);
}
}
}
successful_indices = attempt_successful_indices;
transmission_errors = attempt_transmission_errors;
break;
} else {
retry_count += 1;
if retry_count > MAX_STREAM_RECREATE_ATTEMPTS {
let mut final_transmission_errors = attempt_transmission_errors;
let final_successful_indices = attempt_successful_indices;
for (idx, _) in conversion_result.successful_bytes.iter() {
if !final_successful_indices.contains(idx)
&& !final_transmission_errors.iter().any(|(i, _)| i == idx)
{
final_transmission_errors.push((*idx, ZerobusError::ConnectionError(format!(
"Stream recreation exhausted: row={}, possible_causes='schema_mismatch,validation_error,server_issue'",
idx
))));
}
}
successful_indices = final_successful_indices;
transmission_errors = final_transmission_errors;
break;
}
warn!(
"Stream recreation retry: attempt={}/{}, failed_at_row={}",
retry_count, MAX_STREAM_RECREATE_ATTEMPTS, failed_at_idx
);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
attempt_successful_indices.clear();
attempt_transmission_errors.clear();
}
}
let mut all_failed_rows = conversion_errors;
all_failed_rows.extend(transmission_errors);
Ok(BatchTransmissionResult {
successful_rows: successful_indices,
failed_rows: all_failed_rows,
})
}
pub async fn flush(&self) -> Result<(), ZerobusError> {
{
let mut stream_guard = self.stream.lock().await;
if let Some(ref mut stream) = *stream_guard {
stream.flush().await.map_err(|e| {
ZerobusError::ConnectionError(format!("Failed to flush Zerobus stream: {}", e))
})?;
debug!("✅ Flushed Zerobus stream");
}
}
if let Some(ref debug_writer) = self.debug_writer {
if let Err(e) = debug_writer.flush().await {
warn!("Failed to flush debug files: {}", e);
}
}
if let Some(ref obs) = self.observability {
obs.flush().await?;
}
Ok(())
}
pub async fn shutdown(&self) -> Result<(), ZerobusError> {
info!("Shutting down ZerobusWrapper");
let mut stream_guard = self.stream.lock().await;
if let Some(mut stream) = stream_guard.take() {
if let Err(e) = stream.close().await {
warn!("Error closing Zerobus stream: {}", e);
} else {
debug!("Stream closed successfully");
}
}
Ok(())
}
}
impl Clone for ZerobusWrapper {
fn clone(&self) -> Self {
Self {
config: Arc::clone(&self.config),
sdk: Arc::clone(&self.sdk),
stream: Arc::clone(&self.stream),
retry_config: self.retry_config.clone(),
observability: self.observability.clone(),
debug_writer: self.debug_writer.as_ref().map(Arc::clone),
descriptor_written: Arc::clone(&self.descriptor_written),
}
}
}