arrow_zerobus_sdk_wrapper/wrapper/mod.rs
1//! Main wrapper implementation for Zerobus SDK
2//!
3//! This module provides the core ZerobusWrapper that handles data transmission
4//! to Zerobus with automatic protocol conversion, authentication, and retry logic.
5
6pub mod auth;
7pub mod conversion;
8pub mod debug;
9pub mod protobuf_serialization;
10pub mod retry;
11pub mod zerobus;
12
13use crate::config::WrapperConfiguration;
14use crate::error::ZerobusError;
15use crate::observability::ObservabilityManager;
16use crate::wrapper::retry::RetryConfig;
17use arrow::record_batch::RecordBatch;
18use secrecy::ExposeSecret;
19use std::sync::Arc;
20use tokio::sync::Mutex;
21use tracing::{debug, error, info, warn};
22
23/// Internal result from send_batch_internal containing per-row error information
24struct BatchTransmissionResult {
25 /// Successful row indices
26 successful_rows: Vec<usize>,
27 /// Failed rows with errors
28 failed_rows: Vec<(usize, ZerobusError)>,
29}
30
31/// Result of a data transmission operation
32///
33/// This struct provides comprehensive information about the result of sending a batch
34/// to Zerobus, including per-row success/failure tracking and error details.
35///
36/// # Per-Row Error Tracking
37///
38/// The struct supports per-row error tracking, allowing identification of which
39/// specific rows succeeded or failed during batch transmission. This enables:
40///
41/// - **Partial batch success**: Some rows can succeed while others fail
42/// - **Quarantine workflows**: Extract and quarantine only failed rows
43/// - **Error analysis**: Group errors by type, analyze patterns, track statistics
44///
45/// # Field Semantics
46///
47/// - **`success`**: `true` if ANY rows succeeded, `false` if ALL rows failed or batch-level error occurred
48/// - **`error`**: Batch-level error (e.g., authentication failure, connection error before processing)
49/// - `None` if no batch-level error occurred (even if some rows failed)
50/// - **`failed_rows`**: Per-row failures
51/// - `None` if all rows succeeded
52/// - `Some(vec![])` if batch-level error only (no per-row processing occurred)
53/// - `Some(vec![...])` for per-row failures
54/// - **`successful_rows`**: Per-row successes
55/// - `None` if all rows failed
56/// - `Some(vec![])` if no rows succeeded
57/// - `Some(vec![...])` for successful rows
58/// - **`total_rows`**: Total number of rows in the batch (0 for empty batches)
59/// - **`successful_count`**: Number of rows that succeeded (always equals `successful_rows.len()` if `Some`)
60/// - **`failed_count`**: Number of rows that failed (always equals `failed_rows.len()` if `Some`)
61///
62/// # Edge Cases
63///
64/// - **Empty batch** (`total_rows == 0`): Returns `success=true`, `successful_count=0`, `failed_count=0`
65/// - **Batch-level errors**: Authentication/connection errors before processing return `error=Some(...)`, `failed_rows=None`
66/// - **All rows failed**: Returns `success=false`, `failed_rows=Some([...])`, `successful_rows=None`
67/// - **All rows succeeded**: Returns `success=true`, `failed_rows=None`, `successful_rows=Some([...])`
68///
69/// # Examples
70///
71/// ```no_run
72/// use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
73/// use arrow::record_batch::RecordBatch;
74///
75/// # async fn example() -> Result<(), arrow_zerobus_sdk_wrapper::ZerobusError> {
76/// # use arrow::array::Int64Array;
77/// # use arrow::datatypes::{DataType, Field, Schema};
78/// # use std::sync::Arc;
79/// # let config = WrapperConfiguration::new("https://workspace.cloud.databricks.com".to_string(), "table".to_string());
80/// # let wrapper = ZerobusWrapper::new(config).await?;
81/// # let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
82/// # let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
83/// let result = wrapper.send_batch(batch.clone()).await?;
84///
85/// // Check for partial success
86/// if result.is_partial_success() {
87/// // Extract failed rows for quarantine
88/// if let Some(failed_batch) = result.extract_failed_batch(&batch) {
89/// // Quarantine failed_batch
90/// }
91///
92/// // Extract successful rows for writing
93/// if let Some(successful_batch) = result.extract_successful_batch(&batch) {
94/// // Write successful_batch to main table
95/// }
96/// }
97///
98/// // Analyze error patterns
99/// let stats = result.get_error_statistics();
100/// println!("Success rate: {:.1}%", stats.success_rate * 100.0);
101///
102/// # Ok(())
103/// # }
104/// ```
105#[derive(Debug, Clone)]
106pub struct TransmissionResult {
107 /// Whether transmission succeeded
108 ///
109 /// `true` if ANY rows succeeded, `false` if ALL rows failed or batch-level error occurred.
110 pub success: bool,
111 /// Error information if transmission failed at batch level
112 ///
113 /// Batch-level errors occur before per-row processing (e.g., authentication failure,
114 /// connection error). If `Some`, indicates no per-row processing occurred.
115 pub error: Option<ZerobusError>,
116 /// Number of retry attempts made
117 pub attempts: u32,
118 /// Transmission latency in milliseconds (if successful)
119 pub latency_ms: Option<u64>,
120 /// Size of transmitted batch in bytes
121 pub batch_size_bytes: usize,
122 /// Indices of rows that failed, along with their specific errors
123 ///
124 /// - `None` if all rows succeeded
125 /// - `Some(vec![])` if batch-level error only (no per-row processing occurred)
126 /// - `Some(vec![(row_idx, error), ...])` for per-row failures
127 ///
128 /// Each tuple contains:
129 /// - `row_idx`: 0-based index of the failed row in the original batch
130 /// - `error`: Specific `ZerobusError` describing why the row failed
131 pub failed_rows: Option<Vec<(usize, ZerobusError)>>,
132 /// Indices of rows that were successfully written
133 ///
134 /// - `None` if all rows failed
135 /// - `Some(vec![])` if no rows succeeded
136 /// - `Some(vec![row_idx, ...])` for successful rows
137 ///
138 /// Each value is a 0-based index of the successful row in the original batch.
139 pub successful_rows: Option<Vec<usize>>,
140 /// Total number of rows in the batch
141 ///
142 /// Always equals `successful_count + failed_count`.
143 /// For empty batches, this is `0`.
144 pub total_rows: usize,
145 /// Number of rows that succeeded
146 ///
147 /// Always equals `successful_rows.len()` if `successful_rows` is `Some`.
148 pub successful_count: usize,
149 /// Number of rows that failed
150 ///
151 /// Always equals `failed_rows.len()` if `failed_rows` is `Some`.
152 pub failed_count: usize,
153}
154
155impl TransmissionResult {
156 /// Check if this result represents a partial success (some rows succeeded, some failed)
157 ///
158 /// Returns `true` if there are both successful and failed rows.
159 pub fn is_partial_success(&self) -> bool {
160 self.success && self.successful_count > 0 && self.failed_count > 0
161 }
162
163 /// Check if there are any failed rows
164 ///
165 /// Returns `true` if `failed_rows` contains any entries.
166 pub fn has_failed_rows(&self) -> bool {
167 self.failed_rows
168 .as_ref()
169 .map(|rows| !rows.is_empty())
170 .unwrap_or(false)
171 }
172
173 /// Check if there are any successful rows
174 ///
175 /// Returns `true` if `successful_rows` contains any entries.
176 pub fn has_successful_rows(&self) -> bool {
177 self.successful_rows
178 .as_ref()
179 .map(|rows| !rows.is_empty())
180 .unwrap_or(false)
181 }
182
183 /// Get indices of failed rows
184 ///
185 /// Returns a vector of row indices that failed, or empty vector if none failed.
186 pub fn get_failed_row_indices(&self) -> Vec<usize> {
187 self.failed_rows
188 .as_ref()
189 .map(|rows| rows.iter().map(|(idx, _)| *idx).collect())
190 .unwrap_or_default()
191 }
192
193 /// Get indices of successful rows
194 ///
195 /// Returns a vector of row indices that succeeded, or empty vector if none succeeded.
196 pub fn get_successful_row_indices(&self) -> Vec<usize> {
197 self.successful_rows.clone().unwrap_or_default()
198 }
199
200 /// Extract a RecordBatch containing only the failed rows from the original batch
201 ///
202 /// # Arguments
203 ///
204 /// * `original_batch` - The original RecordBatch that was sent
205 ///
206 /// # Returns
207 ///
208 /// Returns `Some(RecordBatch)` containing only the failed rows, or `None` if there are no failed rows.
209 /// Rows are extracted in the order they appear in `failed_rows`.
210 pub fn extract_failed_batch(&self, original_batch: &RecordBatch) -> Option<RecordBatch> {
211 let failed_indices = self.get_failed_row_indices();
212 if failed_indices.is_empty() {
213 return None;
214 }
215
216 // Extract rows by index
217 let mut rows_to_extract = failed_indices;
218 rows_to_extract.sort(); // Ensure consistent ordering
219
220 // Use take to extract specific row indices
221 // Note: This requires Arrow's take kernel functionality
222 // For now, we'll use a simple approach: filter the batch
223 let mut arrays = Vec::new();
224 for array in original_batch.columns() {
225 // Use take to extract rows at specific indices
226 let taken = arrow::compute::take(
227 array,
228 &arrow::array::UInt32Array::from(
229 rows_to_extract
230 .iter()
231 .map(|&idx| idx as u32)
232 .collect::<Vec<_>>(),
233 ),
234 None,
235 )
236 .ok()?;
237 arrays.push(taken);
238 }
239
240 RecordBatch::try_new(original_batch.schema(), arrays).ok()
241 }
242
243 /// Extract a RecordBatch containing only the successful rows from the original batch
244 ///
245 /// # Arguments
246 ///
247 /// * `original_batch` - The original RecordBatch that was sent
248 ///
249 /// # Returns
250 ///
251 /// Returns `Some(RecordBatch)` containing only the successful rows, or `None` if there are no successful rows.
252 /// Rows are extracted in the order they appear in `successful_rows`.
253 pub fn extract_successful_batch(&self, original_batch: &RecordBatch) -> Option<RecordBatch> {
254 let successful_indices = self.get_successful_row_indices();
255 if successful_indices.is_empty() {
256 return None;
257 }
258
259 // Extract rows by index
260 let mut rows_to_extract = successful_indices;
261 rows_to_extract.sort(); // Ensure consistent ordering
262
263 // Use take to extract specific row indices
264 let mut arrays = Vec::new();
265 for array in original_batch.columns() {
266 // Use take to extract rows at specific indices
267 let taken = arrow::compute::take(
268 array,
269 &arrow::array::UInt32Array::from(
270 rows_to_extract
271 .iter()
272 .map(|&idx| idx as u32)
273 .collect::<Vec<_>>(),
274 ),
275 None,
276 )
277 .ok()?;
278 arrays.push(taken);
279 }
280
281 RecordBatch::try_new(original_batch.schema(), arrays).ok()
282 }
283
284 /// Get indices of failed rows filtered by error type
285 ///
286 /// # Arguments
287 ///
288 /// * `predicate` - A closure that returns `true` for errors that should be included
289 ///
290 /// # Returns
291 ///
292 /// Returns a vector of row indices for failed rows that match the predicate.
293 pub fn get_failed_row_indices_by_error_type<F>(&self, predicate: F) -> Vec<usize>
294 where
295 F: Fn(&ZerobusError) -> bool,
296 {
297 self.failed_rows
298 .as_ref()
299 .map(|rows| {
300 rows.iter()
301 .filter_map(
302 |(idx, error)| {
303 if predicate(error) {
304 Some(*idx)
305 } else {
306 None
307 }
308 },
309 )
310 .collect()
311 })
312 .unwrap_or_default()
313 }
314
315 /// Group failed rows by error type
316 ///
317 /// # Returns
318 ///
319 /// Returns a HashMap where keys are error type names (e.g., "ConversionError")
320 /// and values are vectors of row indices that failed with that error type.
321 pub fn group_errors_by_type(&self) -> std::collections::HashMap<String, Vec<usize>> {
322 let mut grouped: std::collections::HashMap<String, Vec<usize>> =
323 std::collections::HashMap::new();
324
325 if let Some(failed_rows) = &self.failed_rows {
326 for (row_idx, error) in failed_rows {
327 let error_type = match error {
328 ZerobusError::ConfigurationError(_) => "ConfigurationError",
329 ZerobusError::AuthenticationError(_) => "AuthenticationError",
330 ZerobusError::ConnectionError(_) => "ConnectionError",
331 ZerobusError::ConversionError(_) => "ConversionError",
332 ZerobusError::TransmissionError(_) => "TransmissionError",
333 ZerobusError::RetryExhausted(_) => "RetryExhausted",
334 ZerobusError::TokenRefreshError(_) => "TokenRefreshError",
335 };
336 grouped
337 .entry(error_type.to_string())
338 .or_default()
339 .push(*row_idx);
340 }
341 }
342
343 grouped
344 }
345
346 /// Get error statistics for this transmission result
347 ///
348 /// # Returns
349 ///
350 /// Returns an `ErrorStatistics` struct containing comprehensive error analysis
351 /// including success/failure rates and error type counts.
352 pub fn get_error_statistics(&self) -> ErrorStatistics {
353 let success_rate = if self.total_rows > 0 {
354 self.successful_count as f64 / self.total_rows as f64
355 } else {
356 0.0
357 };
358
359 let failure_rate = if self.total_rows > 0 {
360 self.failed_count as f64 / self.total_rows as f64
361 } else {
362 0.0
363 };
364
365 let mut error_type_counts: std::collections::HashMap<String, usize> =
366 std::collections::HashMap::new();
367
368 if let Some(failed_rows) = &self.failed_rows {
369 for (_, error) in failed_rows {
370 let error_type = match error {
371 ZerobusError::ConfigurationError(_) => "ConfigurationError",
372 ZerobusError::AuthenticationError(_) => "AuthenticationError",
373 ZerobusError::ConnectionError(_) => "ConnectionError",
374 ZerobusError::ConversionError(_) => "ConversionError",
375 ZerobusError::TransmissionError(_) => "TransmissionError",
376 ZerobusError::RetryExhausted(_) => "RetryExhausted",
377 ZerobusError::TokenRefreshError(_) => "TokenRefreshError",
378 };
379 *error_type_counts.entry(error_type.to_string()).or_insert(0) += 1;
380 }
381 }
382
383 ErrorStatistics {
384 total_rows: self.total_rows,
385 successful_count: self.successful_count,
386 failed_count: self.failed_count,
387 success_rate,
388 failure_rate,
389 error_type_counts,
390 }
391 }
392
393 /// Get all error messages from failed rows
394 ///
395 /// # Returns
396 ///
397 /// Returns a vector of error message strings for all failed rows.
398 pub fn get_error_messages(&self) -> Vec<String> {
399 self.failed_rows
400 .as_ref()
401 .map(|rows| rows.iter().map(|(_, error)| error.to_string()).collect())
402 .unwrap_or_default()
403 }
404}
405
406/// Error statistics for a transmission result
407#[derive(Debug, Clone)]
408pub struct ErrorStatistics {
409 /// Total number of rows in the batch
410 pub total_rows: usize,
411 /// Number of rows that succeeded
412 pub successful_count: usize,
413 /// Number of rows that failed
414 pub failed_count: usize,
415 /// Success rate (0.0 to 1.0)
416 pub success_rate: f64,
417 /// Failure rate (0.0 to 1.0)
418 pub failure_rate: f64,
419 /// Count of errors by type
420 pub error_type_counts: std::collections::HashMap<String, usize>,
421}
422
423/// Main wrapper for sending data to Zerobus
424///
425/// Thread-safe wrapper that handles Arrow RecordBatch to Protobuf conversion,
426/// authentication, retry logic, and transmission to Zerobus.
427pub struct ZerobusWrapper {
428 /// Configuration (immutable)
429 config: Arc<WrapperConfiguration>,
430 /// Zerobus SDK instance (thread-safe)
431 sdk: Arc<Mutex<Option<databricks_zerobus_ingest_sdk::ZerobusSdk>>>,
432 /// Active stream (lazy initialization)
433 stream: Arc<Mutex<Option<databricks_zerobus_ingest_sdk::ZerobusStream>>>,
434 /// Retry configuration
435 retry_config: RetryConfig,
436 /// Observability manager (optional)
437 observability: Option<ObservabilityManager>,
438 /// Debug writer (optional)
439 debug_writer: Option<Arc<crate::wrapper::debug::DebugWriter>>,
440 /// Track if we've written the descriptor for this table (once per table)
441 descriptor_written: Arc<tokio::sync::Mutex<bool>>,
442}
443
444impl ZerobusWrapper {
445 /// Validate and normalize the Zerobus endpoint URL.
446 ///
447 /// # Arguments
448 ///
449 /// * `endpoint` - Raw endpoint string from configuration
450 ///
451 /// # Returns
452 ///
453 /// Returns `Ok(String)` with normalized endpoint, or `Err(ZerobusError)` if validation fails.
454 fn validate_and_normalize_endpoint(endpoint: &str) -> Result<String, ZerobusError> {
455 let normalized_endpoint = endpoint.trim().to_string();
456
457 if normalized_endpoint.is_empty() {
458 return Err(ZerobusError::ConfigurationError(
459 "zerobus_endpoint cannot be empty".to_string(),
460 ));
461 }
462
463 if !normalized_endpoint.starts_with("https://")
464 && !normalized_endpoint.starts_with("http://")
465 {
466 return Err(ZerobusError::ConfigurationError(format!(
467 "zerobus_endpoint must start with 'https://' or 'http://'. Got: '{}'",
468 normalized_endpoint
469 )));
470 }
471
472 Ok(normalized_endpoint)
473 }
474
475 /// Create a new ZerobusWrapper with the provided configuration
476 ///
477 /// # Arguments
478 ///
479 /// * `config` - Configuration for initializing the wrapper
480 ///
481 /// # Returns
482 ///
483 /// Returns `Ok(ZerobusWrapper)` if initialization succeeds, or `Err(ZerobusError)` if:
484 /// - Configuration validation fails
485 /// - SDK initialization fails
486 ///
487 /// # Example
488 ///
489 /// ```no_run
490 /// use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
491 ///
492 /// # async fn example() -> Result<(), arrow_zerobus_sdk_wrapper::ZerobusError> {
493 /// let config = WrapperConfiguration::new(
494 /// "https://workspace.cloud.databricks.com".to_string(),
495 /// "my_table".to_string(),
496 /// );
497 /// let wrapper = ZerobusWrapper::new(config).await?;
498 /// # Ok(())
499 /// # }
500 /// ```
501 pub async fn new(config: WrapperConfiguration) -> Result<Self, ZerobusError> {
502 info!("Initializing ZerobusWrapper");
503
504 // Validate configuration
505 config.validate()?;
506
507 // Validate and normalize endpoint (required for both enabled and disabled modes)
508 let normalized_endpoint = Self::validate_and_normalize_endpoint(&config.zerobus_endpoint)?;
509
510 // Skip credential validation if writer is disabled (credentials optional in this mode)
511 if !config.zerobus_writer_disabled {
512 // Get required OAuth credentials
513 let unity_catalog_url = config
514 .unity_catalog_url
515 .as_ref()
516 .ok_or_else(|| {
517 ZerobusError::ConfigurationError(
518 "unity_catalog_url is required for SDK".to_string(),
519 )
520 })?
521 .clone();
522
523 // Validate credentials are present (but don't expose them unnecessarily)
524 let _client_id = config.client_id.as_ref().ok_or_else(|| {
525 ZerobusError::ConfigurationError("client_id is required for SDK".to_string())
526 })?;
527
528 let _client_secret = config.client_secret.as_ref().ok_or_else(|| {
529 ZerobusError::ConfigurationError("client_secret is required for SDK".to_string())
530 })?;
531
532 info!("Zerobus endpoint: {}", normalized_endpoint);
533 info!("Unity Catalog URL: {}", unity_catalog_url);
534 } else {
535 // When writer is disabled, we still validate endpoint format but don't require credentials
536 info!(
537 "Zerobus endpoint: {} (writer disabled mode)",
538 normalized_endpoint
539 );
540 }
541
542 // Initialize SDK (will be created lazily when needed)
543 // For now, we'll store None and create it on first use
544 let sdk = Arc::new(Mutex::new(None));
545
546 // Create retry config from wrapper config
547 let retry_config = RetryConfig::new(
548 config.retry_max_attempts,
549 config.retry_base_delay_ms,
550 config.retry_max_delay_ms,
551 );
552
553 // Initialize observability if enabled
554 let observability = if config.observability_enabled {
555 ObservabilityManager::new_async(config.observability_config.clone()).await
556 } else {
557 None
558 };
559
560 if observability.is_some() {
561 info!("Observability enabled");
562 }
563
564 // Initialize debug writer if any format is enabled
565 // Check new flags first, fall back to legacy flag for backward compatibility
566 let any_debug_enabled =
567 config.debug_arrow_enabled || config.debug_protobuf_enabled || config.debug_enabled;
568
569 // Info logging to diagnose why debug writer isn't being initialized
570 info!(
571 "ZerobusWrapper::new: debug_arrow_enabled={}, debug_protobuf_enabled={}, debug_enabled={}, debug_output_dir={:?}",
572 config.debug_arrow_enabled, config.debug_protobuf_enabled, config.debug_enabled, config.debug_output_dir
573 );
574
575 let debug_writer = if any_debug_enabled {
576 if let Some(output_dir) = &config.debug_output_dir {
577 use crate::wrapper::debug::DebugWriter;
578 use std::time::Duration;
579
580 info!(
581 "Initializing debug writer with output_dir: {}, table_name: {}, arrow_enabled: {}, protobuf_enabled: {}",
582 output_dir.display(),
583 config.table_name,
584 config.debug_arrow_enabled,
585 config.debug_protobuf_enabled
586 );
587 match DebugWriter::new(
588 output_dir.clone(),
589 config.table_name.clone(),
590 Duration::from_secs(config.debug_flush_interval_secs),
591 config.debug_max_file_size,
592 config.debug_max_files_retained,
593 ) {
594 Ok(writer) => {
595 info!(
596 "Debug file output enabled: {} (Arrow: {}, Protobuf: {})",
597 output_dir.display(),
598 config.debug_arrow_enabled,
599 config.debug_protobuf_enabled
600 );
601 Some(Arc::new(writer))
602 }
603 Err(e) => {
604 warn!("Failed to initialize debug writer: {}", e);
605 None
606 }
607 }
608 } else {
609 // Collect which debug flags are enabled for more specific warning
610 let mut enabled_flags = Vec::new();
611 if config.debug_arrow_enabled {
612 enabled_flags.push("debug_arrow_enabled");
613 }
614 if config.debug_protobuf_enabled {
615 enabled_flags.push("debug_protobuf_enabled");
616 }
617 if config.debug_enabled {
618 enabled_flags.push("debug_enabled");
619 }
620 warn!(
621 "Debug flag(s) enabled ({}) but debug_output_dir is None - debug files will not be written",
622 enabled_flags.join(", ")
623 );
624 None
625 }
626 } else {
627 info!("All debug flags are false - debug files will not be written");
628 None
629 };
630
631 Ok(Self {
632 config: Arc::new(config),
633 sdk,
634 stream: Arc::new(Mutex::new(None)),
635 retry_config,
636 observability,
637 debug_writer,
638 descriptor_written: Arc::new(tokio::sync::Mutex::new(false)),
639 })
640 }
641
642 /// Send a data batch to Zerobus
643 ///
644 /// Converts Arrow RecordBatch to Protobuf format and transmits to Zerobus
645 /// with automatic retry on transient failures.
646 ///
647 /// # Arguments
648 ///
649 /// * `batch` - Arrow RecordBatch to send
650 /// * `descriptor` - Optional Protobuf descriptor. If provided, uses this descriptor
651 /// instead of auto-generating from Arrow schema. This ensures correct nested types.
652 ///
653 /// # Returns
654 ///
655 /// Returns `TransmissionResult` indicating success or failure.
656 ///
657 /// # Errors
658 ///
659 /// Returns error if transmission fails after all retry attempts.
660 pub async fn send_batch(&self, batch: RecordBatch) -> Result<TransmissionResult, ZerobusError> {
661 self.send_batch_with_descriptor(batch, None).await
662 }
663
664 /// Send a data batch to Zerobus with an optional Protobuf descriptor
665 ///
666 /// Converts Arrow RecordBatch to Protobuf format and transmits to Zerobus
667 /// with automatic retry on transient failures.
668 ///
669 /// # Arguments
670 ///
671 /// * `batch` - Arrow RecordBatch to send
672 /// * `descriptor` - Optional Protobuf descriptor. If provided, uses this descriptor
673 /// instead of auto-generating from Arrow schema. This ensures correct nested types.
674 ///
675 /// # Returns
676 ///
677 /// Returns `TransmissionResult` indicating success or failure.
678 ///
679 /// # Errors
680 ///
681 /// Returns error if transmission fails after all retry attempts.
682 pub async fn send_batch_with_descriptor(
683 &self,
684 batch: RecordBatch,
685 descriptor: Option<prost_types::DescriptorProto>,
686 ) -> Result<TransmissionResult, ZerobusError> {
687 let start_time = std::time::Instant::now();
688 let batch_size_bytes = batch.get_array_memory_size();
689
690 debug!(
691 "Sending batch with {} rows, {} bytes",
692 batch.num_rows(),
693 batch_size_bytes
694 );
695
696 // Write Arrow batch to debug file if Arrow debug is enabled
697 if self.config.debug_arrow_enabled {
698 if let Some(ref debug_writer) = self.debug_writer {
699 if let Err(e) = debug_writer.write_arrow(&batch).await {
700 warn!("Failed to write Arrow debug file: {}", e);
701 // Don't fail the operation if debug writing fails
702 }
703 }
704 }
705
706 // Start observability span if enabled
707 let _span = self
708 .observability
709 .as_ref()
710 .map(|obs| obs.start_send_batch_span(&self.config.table_name));
711
712 // Use retry logic for transmission
713 let (result, attempts) = self
714 .retry_config
715 .execute_with_retry_tracked(|| {
716 let batch = batch.clone();
717 let descriptor = descriptor.clone();
718 let wrapper = self.clone();
719 async move { wrapper.send_batch_internal(batch, descriptor).await }
720 })
721 .await;
722
723 let latency_ms = start_time.elapsed().as_millis() as u64;
724
725 // Record metrics if observability is enabled
726 if let Some(obs) = &self.observability {
727 let success = result.is_ok();
728 obs.record_batch_sent(batch_size_bytes, success, latency_ms)
729 .await;
730 }
731
732 let total_rows = batch.num_rows();
733
734 // Handle empty batch edge case
735 if total_rows == 0 {
736 return Ok(TransmissionResult {
737 success: true, // Empty batch is considered successful
738 error: None,
739 attempts,
740 latency_ms: Some(latency_ms),
741 batch_size_bytes,
742 failed_rows: None,
743 successful_rows: None,
744 total_rows: 0,
745 successful_count: 0,
746 failed_count: 0,
747 });
748 }
749
750 match result {
751 Ok(batch_result) => {
752 // Merge conversion and transmission errors
753 let mut all_failed_rows = batch_result.failed_rows;
754 let successful_rows = batch_result.successful_rows;
755
756 let successful_count = successful_rows.len();
757 let failed_count = all_failed_rows.len();
758
759 // Determine overall success: true if ANY rows succeeded
760 // Edge case: If all rows failed, success is false
761 let overall_success = successful_count > 0;
762
763 // Sort failed rows by index for consistency
764 all_failed_rows.sort_by_key(|(idx, _)| *idx);
765
766 // Update failure rate tracking (only counts network/transmission errors)
767 crate::wrapper::zerobus::update_failure_rate(
768 &self.config.table_name,
769 total_rows,
770 &all_failed_rows,
771 );
772
773 Ok(TransmissionResult {
774 success: overall_success,
775 error: None, // No batch-level error, only per-row errors
776 attempts,
777 latency_ms: Some(latency_ms),
778 batch_size_bytes,
779 failed_rows: if all_failed_rows.is_empty() {
780 None
781 } else {
782 Some(all_failed_rows)
783 },
784 successful_rows: if successful_rows.is_empty() {
785 None
786 } else {
787 Some(successful_rows)
788 },
789 total_rows,
790 successful_count,
791 failed_count,
792 })
793 }
794 Err(e) => {
795 error!("Failed to send batch after retries: {}", e);
796 // Batch-level error (e.g., authentication, connection before processing)
797 // Edge case: Batch-level errors occur before per-row processing
798 Ok(TransmissionResult {
799 success: false,
800 error: Some(e),
801 attempts,
802 latency_ms: Some(latency_ms),
803 batch_size_bytes,
804 failed_rows: None, // Batch-level error, no per-row processing occurred
805 successful_rows: None,
806 total_rows,
807 successful_count: 0,
808 failed_count: 0, // Batch-level error, no per-row processing
809 })
810 }
811 }
812 }
813
814 /// Internal method to send a batch (without retry wrapper)
815 /// Returns per-row transmission information
816 async fn send_batch_internal(
817 &self,
818 batch: RecordBatch,
819 descriptor: Option<prost_types::DescriptorProto>,
820 ) -> Result<BatchTransmissionResult, ZerobusError> {
821 // CRITICAL: Check if writer is disabled FIRST, before any SDK initialization or credential access
822 // This prevents errors when credentials are not provided (which is allowed when writer is disabled)
823 if self.config.zerobus_writer_disabled {
824 // When writer is disabled, we still perform conversion and write debug files,
825 // but skip all SDK calls. This enables local development and testing without credentials.
826 debug!(
827 "Writer disabled mode enabled - skipping SDK initialization and Zerobus SDK calls"
828 );
829 // Continue to conversion and debug file writing below, then return early
830 } else {
831 // 1. Ensure SDK is initialized (only when writer is NOT disabled)
832 {
833 let mut sdk_guard = self.sdk.lock().await;
834 if sdk_guard.is_none() {
835 let unity_catalog_url = self
836 .config
837 .unity_catalog_url
838 .as_ref()
839 .ok_or_else(|| {
840 ZerobusError::ConfigurationError(
841 "unity_catalog_url is required".to_string(),
842 )
843 })?
844 .clone();
845
846 let sdk = crate::wrapper::zerobus::create_sdk(
847 self.config.zerobus_endpoint.clone(),
848 unity_catalog_url,
849 )
850 .await?;
851 *sdk_guard = Some(sdk);
852 }
853 }
854 }
855
856 // 2. Get Protobuf descriptor (use provided one or generate from Arrow schema)
857 let descriptor = if let Some(provided_descriptor) = descriptor {
858 // Validate user-provided descriptor to prevent security issues
859 crate::wrapper::conversion::validate_protobuf_descriptor(&provided_descriptor)
860 .map_err(|e| {
861 ZerobusError::ConfigurationError(format!("Invalid Protobuf descriptor: {}", e))
862 })?;
863 let descriptor_name = provided_descriptor.name.as_deref().unwrap_or("unknown");
864 info!("🔍 [DEBUG] Using provided Protobuf descriptor: name='{}', fields={}, nested_types={}",
865 descriptor_name, provided_descriptor.field.len(), provided_descriptor.nested_type.len());
866 provided_descriptor
867 } else {
868 debug!("Auto-generating Protobuf descriptor from Arrow schema");
869 let generated =
870 crate::wrapper::conversion::generate_protobuf_descriptor(batch.schema().as_ref())
871 .map_err(|e| {
872 ZerobusError::ConversionError(format!(
873 "Failed to generate Protobuf descriptor: {}",
874 e
875 ))
876 })?;
877 // Validate generated descriptor (should always pass, but safety check)
878 crate::wrapper::conversion::validate_protobuf_descriptor(&generated).map_err(|e| {
879 ZerobusError::ConversionError(format!(
880 "Generated Protobuf descriptor failed validation: {}",
881 e
882 ))
883 })?;
884 let descriptor_name = generated.name.as_deref().unwrap_or("unknown");
885 info!("🔍 [DEBUG] Auto-generated Protobuf descriptor: name='{}', fields={}, nested_types={}",
886 descriptor_name, generated.field.len(), generated.nested_type.len());
887 generated
888 };
889
890 // Write descriptor to file once per table (if either Arrow or Protobuf debug is enabled)
891 if self.config.debug_arrow_enabled || self.config.debug_protobuf_enabled {
892 if let Some(ref debug_writer) = self.debug_writer {
893 let mut written_guard = self.descriptor_written.lock().await;
894 if !*written_guard {
895 if let Err(e) = debug_writer
896 .write_descriptor(&self.config.table_name, &descriptor)
897 .await
898 {
899 warn!("Failed to write Protobuf descriptor to debug file: {}", e);
900 // Don't fail the operation if descriptor writing fails
901 } else {
902 *written_guard = true;
903 }
904 }
905 }
906 }
907
908 // 3. Convert Arrow RecordBatch to Protobuf bytes (one per row)
909 // This now returns ProtobufConversionResult with per-row conversion errors
910 let conversion_result =
911 crate::wrapper::conversion::record_batch_to_protobuf_bytes(&batch, &descriptor);
912
913 // Track conversion errors (will be merged with transmission errors later)
914 let conversion_errors = conversion_result.failed_rows;
915
916 // Write Protobuf bytes to debug file if Protobuf debug is enabled (only successful conversions)
917 // Flush after each batch to ensure files are immediately available for debugging
918 // CRITICAL: Write protobuf files BEFORE Zerobus write attempts, so we have them even if Zerobus fails
919 if self.config.debug_protobuf_enabled {
920 if let Some(ref debug_writer) = self.debug_writer {
921 info!(
922 "Writing {} protobuf messages to debug file",
923 conversion_result.successful_bytes.len()
924 );
925 let num_rows = conversion_result.successful_bytes.len();
926 for (idx, (_, bytes)) in conversion_result.successful_bytes.iter().enumerate() {
927 // Flush immediately after last row in batch
928 let flush_immediately = idx == num_rows - 1;
929 if let Err(e) = debug_writer.write_protobuf(bytes, flush_immediately).await {
930 warn!("Failed to write Protobuf debug file: {}", e);
931 // Don't fail the operation if debug writing fails
932 } else if flush_immediately {
933 info!(
934 "✅ Flushed protobuf debug file after batch ({} messages)",
935 num_rows
936 );
937 }
938 }
939 } else {
940 warn!("⚠️ Debug writer is None - protobuf debug files will not be written. Check debug_protobuf_enabled and debug_output_dir config.");
941 }
942 }
943
944 // Check if writer is disabled - if so, skip all SDK calls and return success
945 // Performance: Operations complete in <50ms (excluding file I/O) when writer disabled
946 // This enables performance testing of conversion logic without network overhead
947 if self.config.zerobus_writer_disabled {
948 debug!(
949 "Writer disabled mode enabled - skipping Zerobus SDK calls. Debug files written successfully."
950 );
951 // Return success with conversion results tracked
952 // All successfully converted rows are considered successful when writer is disabled
953 let successful_indices: Vec<usize> = conversion_result
954 .successful_bytes
955 .iter()
956 .map(|(idx, _)| *idx)
957 .collect();
958 return Ok(BatchTransmissionResult {
959 successful_rows: successful_indices,
960 failed_rows: conversion_errors,
961 });
962 }
963
964 // Get SDK reference (lock is released, so we can lock again for stream creation)
965 // This is safe because we only reach here when writer is NOT disabled, so SDK was initialized above
966 let sdk_guard = self.sdk.lock().await;
967 let sdk = sdk_guard.as_ref().ok_or_else(|| {
968 ZerobusError::ConfigurationError(
969 "SDK not initialized - this should not happen".to_string(),
970 )
971 })?;
972
973 // 4. Ensure stream is created
974 // Expose secrets only when needed for API calls
975 let client_id = self
976 .config
977 .client_id
978 .as_ref()
979 .ok_or_else(|| ZerobusError::ConfigurationError("client_id is required".to_string()))?
980 .expose_secret()
981 .clone();
982 let client_secret = self
983 .config
984 .client_secret
985 .as_ref()
986 .ok_or_else(|| {
987 ZerobusError::ConfigurationError("client_secret is required".to_string())
988 })?
989 .expose_secret()
990 .clone();
991
992 // ========================================================================
993 // STEP 5: Check backoff conditions BEFORE attempting any writes
994 // ========================================================================
995 // CRITICAL: Check backoff BEFORE attempting writes, even if stream exists.
996 // This prevents writes during backoff period even if stream was created before
997 // backoff started. We check for:
998 // 1. Error 6006 backoff (server-initiated, pipeline blocked)
999 // 2. High failure rate backoff (client-initiated, >1% failure rate)
1000 //
1001 // Edge case: Backoff can start during batch processing, so we check again
1002 // before each record in the loop below.
1003 {
1004 use crate::wrapper::zerobus::{check_error_6006_backoff, check_failure_rate_backoff};
1005 check_error_6006_backoff(&self.config.table_name).await?;
1006 check_failure_rate_backoff(&self.config.table_name).await?;
1007 }
1008
1009 // ========================================================================
1010 // STEP 6: Write each row to Zerobus with stream recreation on failure
1011 // ========================================================================
1012 // This implements a retry loop that handles stream closure and recreation.
1013 //
1014 // Algorithm:
1015 // 1. Ensure stream exists (create if None)
1016 // 2. For each row in the batch:
1017 // a. Check backoff again (backoff can start during batch processing)
1018 // b. Re-acquire stream lock (stream may have been cleared)
1019 // c. Recreate stream if it was cleared
1020 // d. Send row to Zerobus
1021 // e. Handle stream closure errors by clearing stream and retrying
1022 // 3. If all rows succeed, break
1023 // 4. If stream closed, retry up to MAX_STREAM_RECREATE_ATTEMPTS
1024 //
1025 // Edge cases handled:
1026 // - Stream closed immediately after creation (first record fails)
1027 // → Indicates schema mismatch or validation error
1028 // - Stream closed mid-batch
1029 // → Clear stream, recreate, and retry from failed row
1030 // - Backoff starts during batch processing
1031 // → Clear stream, break loop, return error
1032 //
1033 // Performance considerations:
1034 // - Lock is released before async operations to avoid blocking
1035 // - Stream is only recreated when necessary (not for every row)
1036 // - Maximum retry attempts prevent infinite loops
1037 //
1038 // Thread safety:
1039 // - Uses async Mutex to prevent blocking the runtime
1040 // - Lock is held only when accessing/modifying stream
1041 // - Lock is released before network I/O operations
1042 let mut retry_count = 0;
1043 const MAX_STREAM_RECREATE_ATTEMPTS: u32 = 3;
1044
1045 // Track per-row transmission results across retries
1046 // These will be assigned from attempt_* variables after processing completes
1047 let mut transmission_errors: Vec<(usize, ZerobusError)> = Vec::new();
1048 let mut successful_indices: Vec<usize> = Vec::new();
1049
1050 loop {
1051 // Ensure stream exists and is valid
1052 let mut stream_guard = self.stream.lock().await;
1053 if stream_guard.is_none() {
1054 info!(
1055 "Stream not found, creating new stream for table: {}",
1056 self.config.table_name
1057 );
1058 let stream = crate::wrapper::zerobus::ensure_stream(
1059 sdk,
1060 self.config.table_name.clone(),
1061 descriptor.clone(),
1062 client_id.clone(),
1063 client_secret.clone(),
1064 )
1065 .await?;
1066 *stream_guard = Some(stream);
1067 info!("✅ Stream created successfully");
1068 }
1069 // Verify stream exists before dropping lock
1070 if stream_guard.is_none() {
1071 return Err(ZerobusError::ConnectionError(
1072 "Stream was None after creation - this should not happen".to_string(),
1073 ));
1074 }
1075 drop(stream_guard); // Release lock before sending data
1076
1077 // Try to send all successfully converted rows
1078 // Reset tracking for this retry attempt (but preserve across retries for final result)
1079 let mut attempt_transmission_errors: Vec<(usize, ZerobusError)> = Vec::new();
1080 let mut attempt_successful_indices: Vec<usize> = Vec::new();
1081 let mut all_succeeded = true;
1082 let mut failed_at_idx = 0;
1083
1084 // Batch futures for better throughput: collect futures and await in batches
1085 // This allows the SDK to queue multiple records before flushing, improving performance
1086 const BATCH_SIZE: usize = 1000; // Flush every 1000 records
1087 const BATCH_SIZE_BYTES: usize = 10 * 1024 * 1024; // Or every 10MB
1088 // Store futures with their row indices - using a type-erased future
1089 type IngestFuture = std::pin::Pin<
1090 Box<
1091 dyn std::future::Future<
1092 Output = Result<i64, databricks_zerobus_ingest_sdk::ZerobusError>,
1093 > + Send,
1094 >,
1095 >;
1096 let mut pending_futures: Vec<(usize, IngestFuture)> = Vec::new();
1097 let mut total_bytes_buffered = 0usize;
1098 let mut should_break_outer = false; // Track if we need to break outer retry loop
1099
1100 // Process only successfully converted rows
1101 for (original_row_idx, bytes) in conversion_result.successful_bytes.iter() {
1102 let idx = *original_row_idx;
1103 // ========================================================================
1104 // STEP 6a: Check backoff before each record
1105 // ========================================================================
1106 // Edge case: Backoff can start during batch processing (e.g., another thread
1107 // encountered error 6006 or high failure rate). We check before each record to prevent writes
1108 // during backoff period.
1109 {
1110 use crate::wrapper::zerobus::{
1111 check_error_6006_backoff, check_failure_rate_backoff,
1112 };
1113 if let Err(_backoff_err) =
1114 check_error_6006_backoff(&self.config.table_name).await
1115 {
1116 // Backoff error: track per-row and break (backoff is batch-level concern)
1117 // Clear stream so it gets recreated after backoff
1118 let mut stream_guard = self.stream.lock().await;
1119 *stream_guard = None;
1120 drop(stream_guard);
1121 // Backoff affects remaining rows, but we've processed up to idx
1122 // Mark remaining rows as affected by backoff
1123 for remaining_idx in idx..conversion_result.successful_bytes.len() {
1124 if let Some((orig_idx, _)) =
1125 conversion_result.successful_bytes.get(remaining_idx)
1126 {
1127 attempt_transmission_errors.push((
1128 *orig_idx,
1129 ZerobusError::ConnectionError(
1130 "Backoff period active - row processing stopped"
1131 .to_string(),
1132 ),
1133 ));
1134 }
1135 }
1136 all_succeeded = false;
1137 failed_at_idx = idx;
1138 break;
1139 }
1140 // Also check failure rate backoff
1141 if let Err(_backoff_err) =
1142 check_failure_rate_backoff(&self.config.table_name).await
1143 {
1144 // Backoff error: track per-row and break (backoff is batch-level concern)
1145 // Clear stream so it gets recreated after backoff
1146 let mut stream_guard = self.stream.lock().await;
1147 *stream_guard = None;
1148 drop(stream_guard);
1149 // Backoff affects remaining rows, but we've processed up to idx
1150 // Mark remaining rows as affected by backoff
1151 for remaining_idx in idx..conversion_result.successful_bytes.len() {
1152 if let Some((orig_idx, _)) =
1153 conversion_result.successful_bytes.get(remaining_idx)
1154 {
1155 attempt_transmission_errors.push((
1156 *orig_idx,
1157 ZerobusError::ConnectionError(
1158 "High failure rate backoff active - row processing stopped"
1159 .to_string(),
1160 ),
1161 ));
1162 }
1163 }
1164 all_succeeded = false;
1165 failed_at_idx = idx;
1166 break;
1167 }
1168 }
1169
1170 // ========================================================================
1171 // STEP 6b: Re-acquire stream lock and ensure stream exists
1172 // ========================================================================
1173 // We re-acquire the lock for each record because:
1174 // 1. Stream may have been cleared by error handling in previous iteration
1175 // 2. Lock was released before async operations to avoid blocking
1176 // 3. Multiple threads may be sending batches concurrently
1177 //
1178 // Performance: Lock is held only briefly, released before network I/O.
1179 let mut stream_guard = self.stream.lock().await;
1180 if stream_guard.is_none() {
1181 // Stream was cleared (e.g., by error handling), recreate it
1182 info!(
1183 "Stream was cleared, recreating for table: {}",
1184 self.config.table_name
1185 );
1186 let stream = crate::wrapper::zerobus::ensure_stream(
1187 sdk,
1188 self.config.table_name.clone(),
1189 descriptor.clone(),
1190 client_id.clone(),
1191 client_secret.clone(),
1192 )
1193 .await?;
1194 *stream_guard = Some(stream);
1195 }
1196 let stream = stream_guard.as_mut().ok_or_else(|| {
1197 ZerobusError::ConnectionError(
1198 "Stream was None after recreation - this should not happen".to_string(),
1199 )
1200 })?;
1201
1202 // ========================================================================
1203 // STEP 6c: Send bytes to Zerobus stream (batched for performance)
1204 // ========================================================================
1205 // The Zerobus SDK's ingest_record returns a Future that resolves when acknowledged.
1206 // We collect futures and await them in batches for better throughput.
1207 //
1208 // Error handling:
1209 // - Stream closed errors: Clear stream, mark failure, break loop to retry
1210 // - Other errors: Track per-row and continue
1211 // - First record failures: Log detailed diagnostics for schema issues
1212 match stream.ingest_record(bytes.clone()).await {
1213 Ok(ingest_future) => {
1214 // Release lock before collecting future to avoid blocking
1215 drop(stream_guard);
1216
1217 // Collect future for batch processing
1218 // Box the future to store in Vec (type erasure for different future types)
1219 pending_futures.push((idx, Box::pin(ingest_future)));
1220 total_bytes_buffered += bytes.len();
1221
1222 // Periodically flush and await futures to manage memory and ensure progress
1223 if pending_futures.len() >= BATCH_SIZE
1224 || total_bytes_buffered >= BATCH_SIZE_BYTES
1225 {
1226 // Flush stream to send buffered records
1227 {
1228 let mut stream_guard = self.stream.lock().await;
1229 if let Some(ref mut stream) = *stream_guard {
1230 if let Err(e) = stream.flush().await {
1231 error!(
1232 "Failed to flush Zerobus stream during batch: {}",
1233 e
1234 );
1235 // Mark all pending futures as failed
1236 for (pending_idx, _) in pending_futures.drain(..) {
1237 attempt_transmission_errors.push((
1238 pending_idx,
1239 ZerobusError::ConnectionError(format!(
1240 "Flush failed during batch processing: {}",
1241 e
1242 )),
1243 ));
1244 }
1245 all_succeeded = false;
1246 failed_at_idx = idx;
1247 break;
1248 }
1249 }
1250 }
1251
1252 // Await all pending futures and track results
1253 for (pending_idx, mut future) in pending_futures.drain(..) {
1254 match future.as_mut().await {
1255 Ok(_ack_id) => {
1256 debug!(
1257 "✅ Successfully sent record to Zerobus stream (row {}, ack_id={})",
1258 pending_idx, _ack_id
1259 );
1260 attempt_successful_indices.push(pending_idx);
1261 }
1262 Err(e) => {
1263 let err_msg = format!("{}", e);
1264 // Check if stream is closed
1265 if err_msg.contains("Stream is closed")
1266 || err_msg.contains("Stream closed")
1267 {
1268 let is_first = pending_idx == 0;
1269 error!(
1270 "Stream closed: row={}, first_record={}, error={}",
1271 pending_idx, is_first, err_msg
1272 );
1273 if is_first {
1274 error!("Diagnostics: Stream closed during batch processing");
1275 error!("Possible causes:");
1276 error!(" 1. Schema mismatch between descriptor and table");
1277 error!(" 2. Validation error");
1278 error!(" 3. Server-side issue");
1279 }
1280 // Clear stream and break to retry
1281 let mut stream_guard = self.stream.lock().await;
1282 *stream_guard = None;
1283 drop(stream_guard);
1284 attempt_transmission_errors.push((
1285 pending_idx,
1286 ZerobusError::ConnectionError(format!(
1287 "Stream closed: row={}, error={}",
1288 pending_idx, err_msg
1289 )),
1290 ));
1291 all_succeeded = false;
1292 failed_at_idx = pending_idx;
1293 break;
1294 } else {
1295 // Non-stream-closure errors
1296 attempt_transmission_errors.push((
1297 pending_idx,
1298 ZerobusError::TransmissionError(format!(
1299 "Record ingestion failed: row={}, error={}",
1300 pending_idx, e
1301 )),
1302 ));
1303 all_succeeded = false;
1304 }
1305 }
1306 }
1307 }
1308 total_bytes_buffered = 0;
1309
1310 // If we broke due to stream closure, mark for outer loop break
1311 // But continue to process remaining pending futures below
1312 if !all_succeeded && failed_at_idx > 0 {
1313 should_break_outer = true;
1314 }
1315 }
1316 }
1317 Err(e) => {
1318 let err_msg = format!("{}", e);
1319 // Check if stream is closed (indicates server-side closure)
1320 if err_msg.contains("Stream is closed") || err_msg.contains("Stream closed")
1321 {
1322 // Standardized error logging with context
1323 let is_first = idx == 0;
1324 error!(
1325 "Stream closed: row={}, first_record={}, error={}",
1326 idx, is_first, err_msg
1327 );
1328 if is_first {
1329 // First record failure indicates schema/validation issues
1330 error!("Diagnostics: This is the FIRST record - stream closed immediately");
1331 error!("Possible causes:");
1332 error!(" 1. Schema mismatch between descriptor and table");
1333 error!(" 2. Validation error on first record");
1334 error!(" 3. Table schema not yet propagated");
1335 error!(
1336 "Descriptor info: fields={}, nested_types={}",
1337 descriptor.field.len(),
1338 descriptor.nested_type.len()
1339 );
1340 }
1341 // Stream closure error: track per-row and continue
1342 // Clear stream so it gets recreated on next iteration
1343 *stream_guard = None;
1344 drop(stream_guard);
1345 let stream_error = ZerobusError::ConnectionError(format!(
1346 "Stream closed: row={}, error={}",
1347 idx, err_msg
1348 ));
1349 attempt_transmission_errors.push((idx, stream_error));
1350 all_succeeded = false;
1351 failed_at_idx = idx;
1352 // Mark for outer loop break, but continue to process pending futures
1353 should_break_outer = true;
1354 break;
1355 } else {
1356 // Non-stream-closure errors: track per-row and continue
1357 let transmission_error = ZerobusError::ConnectionError(format!(
1358 "Record creation failed: row={}, error={}",
1359 idx, e
1360 ));
1361 attempt_transmission_errors.push((idx, transmission_error));
1362 all_succeeded = false;
1363 // Continue processing remaining rows instead of returning immediately
1364 }
1365 }
1366 }
1367 }
1368
1369 // CRITICAL: Flush and await any remaining pending futures before proceeding
1370 // This ensures all queued records are sent and acknowledged, even if we broke early
1371 if !pending_futures.is_empty() {
1372 // Always flush remaining records before awaiting acknowledgments
1373 // This ensures records are sent even if we broke early due to errors
1374 {
1375 let mut stream_guard = self.stream.lock().await;
1376 if let Some(ref mut stream) = *stream_guard {
1377 // Attempt to flush - if stream is closed, this will fail but we still want to await futures
1378 match stream.flush().await {
1379 Ok(_) => {
1380 debug!(
1381 "✅ Flushed Zerobus stream for {} remaining pending futures",
1382 pending_futures.len()
1383 );
1384 }
1385 Err(e) => {
1386 warn!("Failed to flush Zerobus stream for remaining records (stream may be closed): {}", e);
1387 // Don't mark futures as failed yet - await them to get actual acknowledgment status
1388 // The stream might be closed, but some records may have been sent before closure
1389 }
1390 }
1391 } else {
1392 warn!("Stream is None when trying to flush remaining records - records may be lost");
1393 // Mark all pending futures as failed since we can't flush
1394 for (pending_idx, _) in pending_futures.drain(..) {
1395 attempt_transmission_errors.push((
1396 pending_idx,
1397 ZerobusError::ConnectionError(
1398 "Stream was closed before flushing remaining records"
1399 .to_string(),
1400 ),
1401 ));
1402 }
1403 all_succeeded = false;
1404 }
1405 }
1406
1407 // CRITICAL: Always await all pending futures to get acknowledgment status
1408 // Even if stream is closed, we need to know which records succeeded/failed
1409 for (pending_idx, mut future) in pending_futures.drain(..) {
1410 match future.as_mut().await {
1411 Ok(_ack_id) => {
1412 debug!(
1413 "✅ Successfully acknowledged record (row {}, ack_id={})",
1414 pending_idx, _ack_id
1415 );
1416 attempt_successful_indices.push(pending_idx);
1417 }
1418 Err(e) => {
1419 let err_msg = format!("{}", e);
1420 if err_msg.contains("Stream is closed")
1421 || err_msg.contains("Stream closed")
1422 {
1423 // Stream was closed - clear it and mark as failed
1424 let mut stream_guard = self.stream.lock().await;
1425 *stream_guard = None;
1426 drop(stream_guard);
1427 attempt_transmission_errors.push((
1428 pending_idx,
1429 ZerobusError::ConnectionError(format!(
1430 "Stream closed before acknowledgment: row={}, error={}",
1431 pending_idx, err_msg
1432 )),
1433 ));
1434 all_succeeded = false;
1435 } else {
1436 // Other errors (network, timeout, etc.)
1437 attempt_transmission_errors.push((
1438 pending_idx,
1439 ZerobusError::TransmissionError(format!(
1440 "Record acknowledgment failed: row={}, error={}",
1441 pending_idx, e
1442 )),
1443 ));
1444 all_succeeded = false;
1445 }
1446 }
1447 }
1448 }
1449 }
1450
1451 // If we broke early due to stream closure, exit the retry loop
1452 if should_break_outer {
1453 break;
1454 }
1455
1456 // ========================================================================
1457 // STEP 6d: Handle retry logic
1458 // ========================================================================
1459 // If all rows succeeded, we're done. Otherwise, retry with stream recreation.
1460 // The retry mechanism handles transient stream closure issues.
1461 //
1462 // Edge case: If stream closes repeatedly, it may indicate:
1463 // - Schema mismatch (descriptor doesn't match table schema)
1464 // - Server-side validation errors
1465 // - Network issues causing stream closure
1466 //
1467 // Performance: Small delay (100ms) prevents tight retry loops.
1468 if all_succeeded {
1469 // All rows sent successfully - flush stream to ensure records are transmitted
1470 // CRITICAL: The SDK buffers records internally and requires flush() to send them
1471 {
1472 let mut stream_guard = self.stream.lock().await;
1473 if let Some(ref mut stream) = *stream_guard {
1474 if let Err(e) = stream.flush().await {
1475 error!("Failed to flush Zerobus stream after batch: {}", e);
1476 // Don't fail the entire batch if flush fails - records may still be in transit
1477 // But log the error for monitoring
1478 } else {
1479 debug!(
1480 "✅ Flushed Zerobus stream after sending {} records",
1481 attempt_successful_indices.len()
1482 );
1483 }
1484 }
1485 }
1486 // Update final results with this attempt's results
1487 successful_indices = attempt_successful_indices;
1488 transmission_errors = attempt_transmission_errors;
1489 break;
1490 } else {
1491 // Some rows failed due to stream closure - retry with stream recreation
1492 retry_count += 1;
1493 if retry_count > MAX_STREAM_RECREATE_ATTEMPTS {
1494 // Exhausted retry attempts - use what we have from this attempt
1495 let mut final_transmission_errors = attempt_transmission_errors;
1496 let final_successful_indices = attempt_successful_indices;
1497 // Mark remaining rows as failed due to stream closure
1498 for (idx, _) in conversion_result.successful_bytes.iter() {
1499 if !final_successful_indices.contains(idx)
1500 && !final_transmission_errors.iter().any(|(i, _)| i == idx)
1501 {
1502 final_transmission_errors.push((*idx, ZerobusError::ConnectionError(format!(
1503 "Stream recreation exhausted: row={}, possible_causes='schema_mismatch,validation_error,server_issue'",
1504 idx
1505 ))));
1506 }
1507 }
1508 successful_indices = final_successful_indices;
1509 transmission_errors = final_transmission_errors;
1510 break;
1511 }
1512 warn!(
1513 "Stream recreation retry: attempt={}/{}, failed_at_row={}",
1514 retry_count, MAX_STREAM_RECREATE_ATTEMPTS, failed_at_idx
1515 );
1516 // Small delay before retry to avoid tight retry loops
1517 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1518 // Reset attempt tracking for retry - will retry all remaining rows
1519 attempt_successful_indices.clear();
1520 attempt_transmission_errors.clear();
1521 // Note: all_succeeded will be set to true at start of next loop iteration
1522 }
1523 }
1524
1525 // Merge conversion errors with transmission errors
1526 let mut all_failed_rows = conversion_errors;
1527 all_failed_rows.extend(transmission_errors);
1528 Ok(BatchTransmissionResult {
1529 successful_rows: successful_indices,
1530 failed_rows: all_failed_rows,
1531 })
1532 }
1533
1534 /// Flush any pending operations and ensure data is transmitted
1535 ///
1536 /// # Errors
1537 ///
1538 /// Returns error if flush operation fails.
1539 pub async fn flush(&self) -> Result<(), ZerobusError> {
1540 // CRITICAL: Flush Zerobus stream to ensure buffered records are sent
1541 // The SDK buffers records internally and requires flush() to transmit them
1542 {
1543 let mut stream_guard = self.stream.lock().await;
1544 if let Some(ref mut stream) = *stream_guard {
1545 stream.flush().await.map_err(|e| {
1546 ZerobusError::ConnectionError(format!("Failed to flush Zerobus stream: {}", e))
1547 })?;
1548 debug!("✅ Flushed Zerobus stream");
1549 }
1550 }
1551
1552 // Flush debug files if enabled
1553 if let Some(ref debug_writer) = self.debug_writer {
1554 if let Err(e) = debug_writer.flush().await {
1555 warn!("Failed to flush debug files: {}", e);
1556 }
1557 }
1558
1559 // Flush observability if enabled
1560 if let Some(ref obs) = self.observability {
1561 obs.flush().await?;
1562 }
1563
1564 Ok(())
1565 }
1566
1567 /// Shutdown the wrapper gracefully, closing connections and cleaning up resources
1568 ///
1569 /// # Errors
1570 ///
1571 /// Returns error if shutdown fails.
1572 pub async fn shutdown(&self) -> Result<(), ZerobusError> {
1573 info!("Shutting down ZerobusWrapper");
1574
1575 // Close stream if it exists
1576 let mut stream_guard = self.stream.lock().await;
1577 if let Some(mut stream) = stream_guard.take() {
1578 // Close the stream gracefully
1579 // ZerobusStream has a close() method that returns ZerobusResult
1580 if let Err(e) = stream.close().await {
1581 warn!("Error closing Zerobus stream: {}", e);
1582 } else {
1583 debug!("Stream closed successfully");
1584 }
1585 }
1586
1587 Ok(())
1588 }
1589}
1590
1591// Implement Clone for use in async closures
1592impl Clone for ZerobusWrapper {
1593 fn clone(&self) -> Self {
1594 Self {
1595 config: Arc::clone(&self.config),
1596 sdk: Arc::clone(&self.sdk),
1597 stream: Arc::clone(&self.stream),
1598 retry_config: self.retry_config.clone(),
1599 observability: self.observability.clone(),
1600 debug_writer: self.debug_writer.as_ref().map(Arc::clone),
1601 descriptor_written: Arc::clone(&self.descriptor_written),
1602 }
1603 }
1604}
1605
1606// ZerobusWrapper is automatically Send + Sync because all its fields are Send + Sync:
1607// - Arc<WrapperConfiguration>: Send + Sync (Arc is Send + Sync, WrapperConfiguration is Send + Sync)
1608// - Arc<Mutex<Option<ZerobusSdk>>>: Send + Sync (Arc and Mutex are Send + Sync)
1609// - Arc<Mutex<Option<ZerobusStream>>>: Send + Sync
1610// - RetryConfig: Send + Sync (contains only primitive types)
1611// - Option<ObservabilityManager>: Send + Sync (ObservabilityManager is Send + Sync)
1612// - Option<Arc<DebugWriter>>: Send + Sync
1613// - Arc<Mutex<bool>>: Send + Sync
1614// The compiler automatically derives Send + Sync for this struct, so explicit unsafe impl is not needed.