arrow_zerobus_sdk_wrapper/wrapper/
mod.rs

1//! Main wrapper implementation for Zerobus SDK
2//!
3//! This module provides the core ZerobusWrapper that handles data transmission
4//! to Zerobus with automatic protocol conversion, authentication, and retry logic.
5
6pub mod auth;
7pub mod conversion;
8pub mod debug;
9pub mod protobuf_serialization;
10pub mod retry;
11pub mod zerobus;
12
13use crate::config::WrapperConfiguration;
14use crate::error::ZerobusError;
15use crate::observability::ObservabilityManager;
16use crate::wrapper::retry::RetryConfig;
17use arrow::record_batch::RecordBatch;
18use secrecy::ExposeSecret;
19use std::sync::Arc;
20use tokio::sync::Mutex;
21use tracing::{debug, error, info, warn};
22
23/// Internal result from send_batch_internal containing per-row error information
24struct BatchTransmissionResult {
25    /// Successful row indices
26    successful_rows: Vec<usize>,
27    /// Failed rows with errors
28    failed_rows: Vec<(usize, ZerobusError)>,
29}
30
31/// Result of a data transmission operation
32///
33/// This struct provides comprehensive information about the result of sending a batch
34/// to Zerobus, including per-row success/failure tracking and error details.
35///
36/// # Per-Row Error Tracking
37///
38/// The struct supports per-row error tracking, allowing identification of which
39/// specific rows succeeded or failed during batch transmission. This enables:
40///
41/// - **Partial batch success**: Some rows can succeed while others fail
42/// - **Quarantine workflows**: Extract and quarantine only failed rows
43/// - **Error analysis**: Group errors by type, analyze patterns, track statistics
44///
45/// # Field Semantics
46///
47/// - **`success`**: `true` if ANY rows succeeded, `false` if ALL rows failed or batch-level error occurred
48/// - **`error`**: Batch-level error (e.g., authentication failure, connection error before processing)
49///   - `None` if no batch-level error occurred (even if some rows failed)
50/// - **`failed_rows`**: Per-row failures
51///   - `None` if all rows succeeded
52///   - `Some(vec![])` if batch-level error only (no per-row processing occurred)
53///   - `Some(vec![...])` for per-row failures
54/// - **`successful_rows`**: Per-row successes
55///   - `None` if all rows failed
56///   - `Some(vec![])` if no rows succeeded
57///   - `Some(vec![...])` for successful rows
58/// - **`total_rows`**: Total number of rows in the batch (0 for empty batches)
59/// - **`successful_count`**: Number of rows that succeeded (always equals `successful_rows.len()` if `Some`)
60/// - **`failed_count`**: Number of rows that failed (always equals `failed_rows.len()` if `Some`)
61///
62/// # Edge Cases
63///
64/// - **Empty batch** (`total_rows == 0`): Returns `success=true`, `successful_count=0`, `failed_count=0`
65/// - **Batch-level errors**: Authentication/connection errors before processing return `error=Some(...)`, `failed_rows=None`
66/// - **All rows failed**: Returns `success=false`, `failed_rows=Some([...])`, `successful_rows=None`
67/// - **All rows succeeded**: Returns `success=true`, `failed_rows=None`, `successful_rows=Some([...])`
68///
69/// # Examples
70///
71/// ```no_run
72/// use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
73/// use arrow::record_batch::RecordBatch;
74///
75/// # async fn example() -> Result<(), arrow_zerobus_sdk_wrapper::ZerobusError> {
76/// # use arrow::array::Int64Array;
77/// # use arrow::datatypes::{DataType, Field, Schema};
78/// # use std::sync::Arc;
79/// # let config = WrapperConfiguration::new("https://workspace.cloud.databricks.com".to_string(), "table".to_string());
80/// # let wrapper = ZerobusWrapper::new(config).await?;
81/// # let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
82/// # let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
83/// let result = wrapper.send_batch(batch.clone()).await?;
84///
85/// // Check for partial success
86/// if result.is_partial_success() {
87///     // Extract failed rows for quarantine
88///     if let Some(failed_batch) = result.extract_failed_batch(&batch) {
89///         // Quarantine failed_batch
90///     }
91///     
92///     // Extract successful rows for writing
93///     if let Some(successful_batch) = result.extract_successful_batch(&batch) {
94///         // Write successful_batch to main table
95///     }
96/// }
97///
98/// // Analyze error patterns
99/// let stats = result.get_error_statistics();
100/// println!("Success rate: {:.1}%", stats.success_rate * 100.0);
101///
102/// # Ok(())
103/// # }
104/// ```
105#[derive(Debug, Clone)]
106pub struct TransmissionResult {
107    /// Whether transmission succeeded
108    ///
109    /// `true` if ANY rows succeeded, `false` if ALL rows failed or batch-level error occurred.
110    pub success: bool,
111    /// Error information if transmission failed at batch level
112    ///
113    /// Batch-level errors occur before per-row processing (e.g., authentication failure,
114    /// connection error). If `Some`, indicates no per-row processing occurred.
115    pub error: Option<ZerobusError>,
116    /// Number of retry attempts made
117    pub attempts: u32,
118    /// Transmission latency in milliseconds (if successful)
119    pub latency_ms: Option<u64>,
120    /// Size of transmitted batch in bytes
121    pub batch_size_bytes: usize,
122    /// Indices of rows that failed, along with their specific errors
123    ///
124    /// - `None` if all rows succeeded
125    /// - `Some(vec![])` if batch-level error only (no per-row processing occurred)
126    /// - `Some(vec![(row_idx, error), ...])` for per-row failures
127    ///
128    /// Each tuple contains:
129    /// - `row_idx`: 0-based index of the failed row in the original batch
130    /// - `error`: Specific `ZerobusError` describing why the row failed
131    pub failed_rows: Option<Vec<(usize, ZerobusError)>>,
132    /// Indices of rows that were successfully written
133    ///
134    /// - `None` if all rows failed
135    /// - `Some(vec![])` if no rows succeeded
136    /// - `Some(vec![row_idx, ...])` for successful rows
137    ///
138    /// Each value is a 0-based index of the successful row in the original batch.
139    pub successful_rows: Option<Vec<usize>>,
140    /// Total number of rows in the batch
141    ///
142    /// Always equals `successful_count + failed_count`.
143    /// For empty batches, this is `0`.
144    pub total_rows: usize,
145    /// Number of rows that succeeded
146    ///
147    /// Always equals `successful_rows.len()` if `successful_rows` is `Some`.
148    pub successful_count: usize,
149    /// Number of rows that failed
150    ///
151    /// Always equals `failed_rows.len()` if `failed_rows` is `Some`.
152    pub failed_count: usize,
153}
154
155impl TransmissionResult {
156    /// Check if this result represents a partial success (some rows succeeded, some failed)
157    ///
158    /// Returns `true` if there are both successful and failed rows.
159    pub fn is_partial_success(&self) -> bool {
160        self.success && self.successful_count > 0 && self.failed_count > 0
161    }
162
163    /// Check if there are any failed rows
164    ///
165    /// Returns `true` if `failed_rows` contains any entries.
166    pub fn has_failed_rows(&self) -> bool {
167        self.failed_rows
168            .as_ref()
169            .map(|rows| !rows.is_empty())
170            .unwrap_or(false)
171    }
172
173    /// Check if there are any successful rows
174    ///
175    /// Returns `true` if `successful_rows` contains any entries.
176    pub fn has_successful_rows(&self) -> bool {
177        self.successful_rows
178            .as_ref()
179            .map(|rows| !rows.is_empty())
180            .unwrap_or(false)
181    }
182
183    /// Get indices of failed rows
184    ///
185    /// Returns a vector of row indices that failed, or empty vector if none failed.
186    pub fn get_failed_row_indices(&self) -> Vec<usize> {
187        self.failed_rows
188            .as_ref()
189            .map(|rows| rows.iter().map(|(idx, _)| *idx).collect())
190            .unwrap_or_default()
191    }
192
193    /// Get indices of successful rows
194    ///
195    /// Returns a vector of row indices that succeeded, or empty vector if none succeeded.
196    pub fn get_successful_row_indices(&self) -> Vec<usize> {
197        self.successful_rows.clone().unwrap_or_default()
198    }
199
200    /// Extract a RecordBatch containing only the failed rows from the original batch
201    ///
202    /// # Arguments
203    ///
204    /// * `original_batch` - The original RecordBatch that was sent
205    ///
206    /// # Returns
207    ///
208    /// Returns `Some(RecordBatch)` containing only the failed rows, or `None` if there are no failed rows.
209    /// Rows are extracted in the order they appear in `failed_rows`.
210    pub fn extract_failed_batch(&self, original_batch: &RecordBatch) -> Option<RecordBatch> {
211        let failed_indices = self.get_failed_row_indices();
212        if failed_indices.is_empty() {
213            return None;
214        }
215
216        // Extract rows by index
217        let mut rows_to_extract = failed_indices;
218        rows_to_extract.sort(); // Ensure consistent ordering
219
220        // Use take to extract specific row indices
221        // Note: This requires Arrow's take kernel functionality
222        // For now, we'll use a simple approach: filter the batch
223        let mut arrays = Vec::new();
224        for array in original_batch.columns() {
225            // Use take to extract rows at specific indices
226            let taken = arrow::compute::take(
227                array,
228                &arrow::array::UInt32Array::from(
229                    rows_to_extract
230                        .iter()
231                        .map(|&idx| idx as u32)
232                        .collect::<Vec<_>>(),
233                ),
234                None,
235            )
236            .ok()?;
237            arrays.push(taken);
238        }
239
240        RecordBatch::try_new(original_batch.schema(), arrays).ok()
241    }
242
243    /// Extract a RecordBatch containing only the successful rows from the original batch
244    ///
245    /// # Arguments
246    ///
247    /// * `original_batch` - The original RecordBatch that was sent
248    ///
249    /// # Returns
250    ///
251    /// Returns `Some(RecordBatch)` containing only the successful rows, or `None` if there are no successful rows.
252    /// Rows are extracted in the order they appear in `successful_rows`.
253    pub fn extract_successful_batch(&self, original_batch: &RecordBatch) -> Option<RecordBatch> {
254        let successful_indices = self.get_successful_row_indices();
255        if successful_indices.is_empty() {
256            return None;
257        }
258
259        // Extract rows by index
260        let mut rows_to_extract = successful_indices;
261        rows_to_extract.sort(); // Ensure consistent ordering
262
263        // Use take to extract specific row indices
264        let mut arrays = Vec::new();
265        for array in original_batch.columns() {
266            // Use take to extract rows at specific indices
267            let taken = arrow::compute::take(
268                array,
269                &arrow::array::UInt32Array::from(
270                    rows_to_extract
271                        .iter()
272                        .map(|&idx| idx as u32)
273                        .collect::<Vec<_>>(),
274                ),
275                None,
276            )
277            .ok()?;
278            arrays.push(taken);
279        }
280
281        RecordBatch::try_new(original_batch.schema(), arrays).ok()
282    }
283
284    /// Get indices of failed rows filtered by error type
285    ///
286    /// # Arguments
287    ///
288    /// * `predicate` - A closure that returns `true` for errors that should be included
289    ///
290    /// # Returns
291    ///
292    /// Returns a vector of row indices for failed rows that match the predicate.
293    pub fn get_failed_row_indices_by_error_type<F>(&self, predicate: F) -> Vec<usize>
294    where
295        F: Fn(&ZerobusError) -> bool,
296    {
297        self.failed_rows
298            .as_ref()
299            .map(|rows| {
300                rows.iter()
301                    .filter_map(
302                        |(idx, error)| {
303                            if predicate(error) {
304                                Some(*idx)
305                            } else {
306                                None
307                            }
308                        },
309                    )
310                    .collect()
311            })
312            .unwrap_or_default()
313    }
314
315    /// Group failed rows by error type
316    ///
317    /// # Returns
318    ///
319    /// Returns a HashMap where keys are error type names (e.g., "ConversionError")
320    /// and values are vectors of row indices that failed with that error type.
321    pub fn group_errors_by_type(&self) -> std::collections::HashMap<String, Vec<usize>> {
322        let mut grouped: std::collections::HashMap<String, Vec<usize>> =
323            std::collections::HashMap::new();
324
325        if let Some(failed_rows) = &self.failed_rows {
326            for (row_idx, error) in failed_rows {
327                let error_type = match error {
328                    ZerobusError::ConfigurationError(_) => "ConfigurationError",
329                    ZerobusError::AuthenticationError(_) => "AuthenticationError",
330                    ZerobusError::ConnectionError(_) => "ConnectionError",
331                    ZerobusError::ConversionError(_) => "ConversionError",
332                    ZerobusError::TransmissionError(_) => "TransmissionError",
333                    ZerobusError::RetryExhausted(_) => "RetryExhausted",
334                    ZerobusError::TokenRefreshError(_) => "TokenRefreshError",
335                };
336                grouped
337                    .entry(error_type.to_string())
338                    .or_default()
339                    .push(*row_idx);
340            }
341        }
342
343        grouped
344    }
345
346    /// Get error statistics for this transmission result
347    ///
348    /// # Returns
349    ///
350    /// Returns an `ErrorStatistics` struct containing comprehensive error analysis
351    /// including success/failure rates and error type counts.
352    pub fn get_error_statistics(&self) -> ErrorStatistics {
353        let success_rate = if self.total_rows > 0 {
354            self.successful_count as f64 / self.total_rows as f64
355        } else {
356            0.0
357        };
358
359        let failure_rate = if self.total_rows > 0 {
360            self.failed_count as f64 / self.total_rows as f64
361        } else {
362            0.0
363        };
364
365        let mut error_type_counts: std::collections::HashMap<String, usize> =
366            std::collections::HashMap::new();
367
368        if let Some(failed_rows) = &self.failed_rows {
369            for (_, error) in failed_rows {
370                let error_type = match error {
371                    ZerobusError::ConfigurationError(_) => "ConfigurationError",
372                    ZerobusError::AuthenticationError(_) => "AuthenticationError",
373                    ZerobusError::ConnectionError(_) => "ConnectionError",
374                    ZerobusError::ConversionError(_) => "ConversionError",
375                    ZerobusError::TransmissionError(_) => "TransmissionError",
376                    ZerobusError::RetryExhausted(_) => "RetryExhausted",
377                    ZerobusError::TokenRefreshError(_) => "TokenRefreshError",
378                };
379                *error_type_counts.entry(error_type.to_string()).or_insert(0) += 1;
380            }
381        }
382
383        ErrorStatistics {
384            total_rows: self.total_rows,
385            successful_count: self.successful_count,
386            failed_count: self.failed_count,
387            success_rate,
388            failure_rate,
389            error_type_counts,
390        }
391    }
392
393    /// Get all error messages from failed rows
394    ///
395    /// # Returns
396    ///
397    /// Returns a vector of error message strings for all failed rows.
398    pub fn get_error_messages(&self) -> Vec<String> {
399        self.failed_rows
400            .as_ref()
401            .map(|rows| rows.iter().map(|(_, error)| error.to_string()).collect())
402            .unwrap_or_default()
403    }
404}
405
406/// Error statistics for a transmission result
407#[derive(Debug, Clone)]
408pub struct ErrorStatistics {
409    /// Total number of rows in the batch
410    pub total_rows: usize,
411    /// Number of rows that succeeded
412    pub successful_count: usize,
413    /// Number of rows that failed
414    pub failed_count: usize,
415    /// Success rate (0.0 to 1.0)
416    pub success_rate: f64,
417    /// Failure rate (0.0 to 1.0)
418    pub failure_rate: f64,
419    /// Count of errors by type
420    pub error_type_counts: std::collections::HashMap<String, usize>,
421}
422
423/// Main wrapper for sending data to Zerobus
424///
425/// Thread-safe wrapper that handles Arrow RecordBatch to Protobuf conversion,
426/// authentication, retry logic, and transmission to Zerobus.
427pub struct ZerobusWrapper {
428    /// Configuration (immutable)
429    config: Arc<WrapperConfiguration>,
430    /// Zerobus SDK instance (thread-safe)
431    sdk: Arc<Mutex<Option<databricks_zerobus_ingest_sdk::ZerobusSdk>>>,
432    /// Active stream (lazy initialization)
433    stream: Arc<Mutex<Option<databricks_zerobus_ingest_sdk::ZerobusStream>>>,
434    /// Retry configuration
435    retry_config: RetryConfig,
436    /// Observability manager (optional)
437    observability: Option<ObservabilityManager>,
438    /// Debug writer (optional)
439    debug_writer: Option<Arc<crate::wrapper::debug::DebugWriter>>,
440    /// Track if we've written the descriptor for this table (once per table)
441    descriptor_written: Arc<tokio::sync::Mutex<bool>>,
442}
443
444impl ZerobusWrapper {
445    /// Validate and normalize the Zerobus endpoint URL.
446    ///
447    /// # Arguments
448    ///
449    /// * `endpoint` - Raw endpoint string from configuration
450    ///
451    /// # Returns
452    ///
453    /// Returns `Ok(String)` with normalized endpoint, or `Err(ZerobusError)` if validation fails.
454    fn validate_and_normalize_endpoint(endpoint: &str) -> Result<String, ZerobusError> {
455        let normalized_endpoint = endpoint.trim().to_string();
456
457        if normalized_endpoint.is_empty() {
458            return Err(ZerobusError::ConfigurationError(
459                "zerobus_endpoint cannot be empty".to_string(),
460            ));
461        }
462
463        if !normalized_endpoint.starts_with("https://")
464            && !normalized_endpoint.starts_with("http://")
465        {
466            return Err(ZerobusError::ConfigurationError(format!(
467                "zerobus_endpoint must start with 'https://' or 'http://'. Got: '{}'",
468                normalized_endpoint
469            )));
470        }
471
472        Ok(normalized_endpoint)
473    }
474
475    /// Create a new ZerobusWrapper with the provided configuration
476    ///
477    /// # Arguments
478    ///
479    /// * `config` - Configuration for initializing the wrapper
480    ///
481    /// # Returns
482    ///
483    /// Returns `Ok(ZerobusWrapper)` if initialization succeeds, or `Err(ZerobusError)` if:
484    /// - Configuration validation fails
485    /// - SDK initialization fails
486    ///
487    /// # Example
488    ///
489    /// ```no_run
490    /// use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
491    ///
492    /// # async fn example() -> Result<(), arrow_zerobus_sdk_wrapper::ZerobusError> {
493    /// let config = WrapperConfiguration::new(
494    ///     "https://workspace.cloud.databricks.com".to_string(),
495    ///     "my_table".to_string(),
496    /// );
497    /// let wrapper = ZerobusWrapper::new(config).await?;
498    /// # Ok(())
499    /// # }
500    /// ```
501    pub async fn new(config: WrapperConfiguration) -> Result<Self, ZerobusError> {
502        info!("Initializing ZerobusWrapper");
503
504        // Validate configuration
505        config.validate()?;
506
507        // Validate and normalize endpoint (required for both enabled and disabled modes)
508        let normalized_endpoint = Self::validate_and_normalize_endpoint(&config.zerobus_endpoint)?;
509
510        // Skip credential validation if writer is disabled (credentials optional in this mode)
511        if !config.zerobus_writer_disabled {
512            // Get required OAuth credentials
513            let unity_catalog_url = config
514                .unity_catalog_url
515                .as_ref()
516                .ok_or_else(|| {
517                    ZerobusError::ConfigurationError(
518                        "unity_catalog_url is required for SDK".to_string(),
519                    )
520                })?
521                .clone();
522
523            // Validate credentials are present (but don't expose them unnecessarily)
524            let _client_id = config.client_id.as_ref().ok_or_else(|| {
525                ZerobusError::ConfigurationError("client_id is required for SDK".to_string())
526            })?;
527
528            let _client_secret = config.client_secret.as_ref().ok_or_else(|| {
529                ZerobusError::ConfigurationError("client_secret is required for SDK".to_string())
530            })?;
531
532            info!("Zerobus endpoint: {}", normalized_endpoint);
533            info!("Unity Catalog URL: {}", unity_catalog_url);
534        } else {
535            // When writer is disabled, we still validate endpoint format but don't require credentials
536            info!(
537                "Zerobus endpoint: {} (writer disabled mode)",
538                normalized_endpoint
539            );
540        }
541
542        // Initialize SDK (will be created lazily when needed)
543        // For now, we'll store None and create it on first use
544        let sdk = Arc::new(Mutex::new(None));
545
546        // Create retry config from wrapper config
547        let retry_config = RetryConfig::new(
548            config.retry_max_attempts,
549            config.retry_base_delay_ms,
550            config.retry_max_delay_ms,
551        );
552
553        // Initialize observability if enabled
554        let observability = if config.observability_enabled {
555            ObservabilityManager::new_async(config.observability_config.clone()).await
556        } else {
557            None
558        };
559
560        if observability.is_some() {
561            info!("Observability enabled");
562        }
563
564        // Initialize debug writer if any format is enabled
565        // Check new flags first, fall back to legacy flag for backward compatibility
566        let any_debug_enabled =
567            config.debug_arrow_enabled || config.debug_protobuf_enabled || config.debug_enabled;
568
569        // Info logging to diagnose why debug writer isn't being initialized
570        info!(
571            "ZerobusWrapper::new: debug_arrow_enabled={}, debug_protobuf_enabled={}, debug_enabled={}, debug_output_dir={:?}",
572            config.debug_arrow_enabled, config.debug_protobuf_enabled, config.debug_enabled, config.debug_output_dir
573        );
574
575        let debug_writer = if any_debug_enabled {
576            if let Some(output_dir) = &config.debug_output_dir {
577                use crate::wrapper::debug::DebugWriter;
578                use std::time::Duration;
579
580                info!(
581                    "Initializing debug writer with output_dir: {}, table_name: {}, arrow_enabled: {}, protobuf_enabled: {}",
582                    output_dir.display(),
583                    config.table_name,
584                    config.debug_arrow_enabled,
585                    config.debug_protobuf_enabled
586                );
587                match DebugWriter::new(
588                    output_dir.clone(),
589                    config.table_name.clone(),
590                    Duration::from_secs(config.debug_flush_interval_secs),
591                    config.debug_max_file_size,
592                    config.debug_max_files_retained,
593                ) {
594                    Ok(writer) => {
595                        info!(
596                            "Debug file output enabled: {} (Arrow: {}, Protobuf: {})",
597                            output_dir.display(),
598                            config.debug_arrow_enabled,
599                            config.debug_protobuf_enabled
600                        );
601                        Some(Arc::new(writer))
602                    }
603                    Err(e) => {
604                        warn!("Failed to initialize debug writer: {}", e);
605                        None
606                    }
607                }
608            } else {
609                // Collect which debug flags are enabled for more specific warning
610                let mut enabled_flags = Vec::new();
611                if config.debug_arrow_enabled {
612                    enabled_flags.push("debug_arrow_enabled");
613                }
614                if config.debug_protobuf_enabled {
615                    enabled_flags.push("debug_protobuf_enabled");
616                }
617                if config.debug_enabled {
618                    enabled_flags.push("debug_enabled");
619                }
620                warn!(
621                    "Debug flag(s) enabled ({}) but debug_output_dir is None - debug files will not be written",
622                    enabled_flags.join(", ")
623                );
624                None
625            }
626        } else {
627            info!("All debug flags are false - debug files will not be written");
628            None
629        };
630
631        Ok(Self {
632            config: Arc::new(config),
633            sdk,
634            stream: Arc::new(Mutex::new(None)),
635            retry_config,
636            observability,
637            debug_writer,
638            descriptor_written: Arc::new(tokio::sync::Mutex::new(false)),
639        })
640    }
641
642    /// Send a data batch to Zerobus
643    ///
644    /// Converts Arrow RecordBatch to Protobuf format and transmits to Zerobus
645    /// with automatic retry on transient failures.
646    ///
647    /// # Arguments
648    ///
649    /// * `batch` - Arrow RecordBatch to send
650    /// * `descriptor` - Optional Protobuf descriptor. If provided, uses this descriptor
651    ///   instead of auto-generating from Arrow schema. This ensures correct nested types.
652    ///
653    /// # Returns
654    ///
655    /// Returns `TransmissionResult` indicating success or failure.
656    ///
657    /// # Errors
658    ///
659    /// Returns error if transmission fails after all retry attempts.
660    pub async fn send_batch(&self, batch: RecordBatch) -> Result<TransmissionResult, ZerobusError> {
661        self.send_batch_with_descriptor(batch, None).await
662    }
663
664    /// Send a data batch to Zerobus with an optional Protobuf descriptor
665    ///
666    /// Converts Arrow RecordBatch to Protobuf format and transmits to Zerobus
667    /// with automatic retry on transient failures.
668    ///
669    /// # Arguments
670    ///
671    /// * `batch` - Arrow RecordBatch to send
672    /// * `descriptor` - Optional Protobuf descriptor. If provided, uses this descriptor
673    ///   instead of auto-generating from Arrow schema. This ensures correct nested types.
674    ///
675    /// # Returns
676    ///
677    /// Returns `TransmissionResult` indicating success or failure.
678    ///
679    /// # Errors
680    ///
681    /// Returns error if transmission fails after all retry attempts.
682    pub async fn send_batch_with_descriptor(
683        &self,
684        batch: RecordBatch,
685        descriptor: Option<prost_types::DescriptorProto>,
686    ) -> Result<TransmissionResult, ZerobusError> {
687        let start_time = std::time::Instant::now();
688        let batch_size_bytes = batch.get_array_memory_size();
689
690        debug!(
691            "Sending batch with {} rows, {} bytes",
692            batch.num_rows(),
693            batch_size_bytes
694        );
695
696        // Write Arrow batch to debug file if Arrow debug is enabled
697        if self.config.debug_arrow_enabled {
698            if let Some(ref debug_writer) = self.debug_writer {
699                if let Err(e) = debug_writer.write_arrow(&batch).await {
700                    warn!("Failed to write Arrow debug file: {}", e);
701                    // Don't fail the operation if debug writing fails
702                }
703            }
704        }
705
706        // Start observability span if enabled
707        let _span = self
708            .observability
709            .as_ref()
710            .map(|obs| obs.start_send_batch_span(&self.config.table_name));
711
712        // Use retry logic for transmission
713        let (result, attempts) = self
714            .retry_config
715            .execute_with_retry_tracked(|| {
716                let batch = batch.clone();
717                let descriptor = descriptor.clone();
718                let wrapper = self.clone();
719                async move { wrapper.send_batch_internal(batch, descriptor).await }
720            })
721            .await;
722
723        let latency_ms = start_time.elapsed().as_millis() as u64;
724
725        // Record metrics if observability is enabled
726        if let Some(obs) = &self.observability {
727            let success = result.is_ok();
728            obs.record_batch_sent(batch_size_bytes, success, latency_ms)
729                .await;
730        }
731
732        let total_rows = batch.num_rows();
733
734        // Handle empty batch edge case
735        if total_rows == 0 {
736            return Ok(TransmissionResult {
737                success: true, // Empty batch is considered successful
738                error: None,
739                attempts,
740                latency_ms: Some(latency_ms),
741                batch_size_bytes,
742                failed_rows: None,
743                successful_rows: None,
744                total_rows: 0,
745                successful_count: 0,
746                failed_count: 0,
747            });
748        }
749
750        match result {
751            Ok(batch_result) => {
752                // Merge conversion and transmission errors
753                let mut all_failed_rows = batch_result.failed_rows;
754                let successful_rows = batch_result.successful_rows;
755
756                let successful_count = successful_rows.len();
757                let failed_count = all_failed_rows.len();
758
759                // Determine overall success: true if ANY rows succeeded
760                // Edge case: If all rows failed, success is false
761                let overall_success = successful_count > 0;
762
763                // Sort failed rows by index for consistency
764                all_failed_rows.sort_by_key(|(idx, _)| *idx);
765
766                // Update failure rate tracking (only counts network/transmission errors)
767                crate::wrapper::zerobus::update_failure_rate(
768                    &self.config.table_name,
769                    total_rows,
770                    &all_failed_rows,
771                );
772
773                Ok(TransmissionResult {
774                    success: overall_success,
775                    error: None, // No batch-level error, only per-row errors
776                    attempts,
777                    latency_ms: Some(latency_ms),
778                    batch_size_bytes,
779                    failed_rows: if all_failed_rows.is_empty() {
780                        None
781                    } else {
782                        Some(all_failed_rows)
783                    },
784                    successful_rows: if successful_rows.is_empty() {
785                        None
786                    } else {
787                        Some(successful_rows)
788                    },
789                    total_rows,
790                    successful_count,
791                    failed_count,
792                })
793            }
794            Err(e) => {
795                error!("Failed to send batch after retries: {}", e);
796                // Batch-level error (e.g., authentication, connection before processing)
797                // Edge case: Batch-level errors occur before per-row processing
798                Ok(TransmissionResult {
799                    success: false,
800                    error: Some(e),
801                    attempts,
802                    latency_ms: Some(latency_ms),
803                    batch_size_bytes,
804                    failed_rows: None, // Batch-level error, no per-row processing occurred
805                    successful_rows: None,
806                    total_rows,
807                    successful_count: 0,
808                    failed_count: 0, // Batch-level error, no per-row processing
809                })
810            }
811        }
812    }
813
814    /// Internal method to send a batch (without retry wrapper)
815    /// Returns per-row transmission information
816    async fn send_batch_internal(
817        &self,
818        batch: RecordBatch,
819        descriptor: Option<prost_types::DescriptorProto>,
820    ) -> Result<BatchTransmissionResult, ZerobusError> {
821        // CRITICAL: Check if writer is disabled FIRST, before any SDK initialization or credential access
822        // This prevents errors when credentials are not provided (which is allowed when writer is disabled)
823        if self.config.zerobus_writer_disabled {
824            // When writer is disabled, we still perform conversion and write debug files,
825            // but skip all SDK calls. This enables local development and testing without credentials.
826            debug!(
827                "Writer disabled mode enabled - skipping SDK initialization and Zerobus SDK calls"
828            );
829            // Continue to conversion and debug file writing below, then return early
830        } else {
831            // 1. Ensure SDK is initialized (only when writer is NOT disabled)
832            {
833                let mut sdk_guard = self.sdk.lock().await;
834                if sdk_guard.is_none() {
835                    let unity_catalog_url = self
836                        .config
837                        .unity_catalog_url
838                        .as_ref()
839                        .ok_or_else(|| {
840                            ZerobusError::ConfigurationError(
841                                "unity_catalog_url is required".to_string(),
842                            )
843                        })?
844                        .clone();
845
846                    let sdk = crate::wrapper::zerobus::create_sdk(
847                        self.config.zerobus_endpoint.clone(),
848                        unity_catalog_url,
849                    )
850                    .await?;
851                    *sdk_guard = Some(sdk);
852                }
853            }
854        }
855
856        // 2. Get Protobuf descriptor (use provided one or generate from Arrow schema)
857        let descriptor = if let Some(provided_descriptor) = descriptor {
858            // Validate user-provided descriptor to prevent security issues
859            crate::wrapper::conversion::validate_protobuf_descriptor(&provided_descriptor)
860                .map_err(|e| {
861                    ZerobusError::ConfigurationError(format!("Invalid Protobuf descriptor: {}", e))
862                })?;
863            let descriptor_name = provided_descriptor.name.as_deref().unwrap_or("unknown");
864            info!("🔍 [DEBUG] Using provided Protobuf descriptor: name='{}', fields={}, nested_types={}", 
865                  descriptor_name, provided_descriptor.field.len(), provided_descriptor.nested_type.len());
866            provided_descriptor
867        } else {
868            debug!("Auto-generating Protobuf descriptor from Arrow schema");
869            let generated =
870                crate::wrapper::conversion::generate_protobuf_descriptor(batch.schema().as_ref())
871                    .map_err(|e| {
872                    ZerobusError::ConversionError(format!(
873                        "Failed to generate Protobuf descriptor: {}",
874                        e
875                    ))
876                })?;
877            // Validate generated descriptor (should always pass, but safety check)
878            crate::wrapper::conversion::validate_protobuf_descriptor(&generated).map_err(|e| {
879                ZerobusError::ConversionError(format!(
880                    "Generated Protobuf descriptor failed validation: {}",
881                    e
882                ))
883            })?;
884            let descriptor_name = generated.name.as_deref().unwrap_or("unknown");
885            info!("🔍 [DEBUG] Auto-generated Protobuf descriptor: name='{}', fields={}, nested_types={}", 
886                  descriptor_name, generated.field.len(), generated.nested_type.len());
887            generated
888        };
889
890        // Write descriptor to file once per table (if either Arrow or Protobuf debug is enabled)
891        if self.config.debug_arrow_enabled || self.config.debug_protobuf_enabled {
892            if let Some(ref debug_writer) = self.debug_writer {
893                let mut written_guard = self.descriptor_written.lock().await;
894                if !*written_guard {
895                    if let Err(e) = debug_writer
896                        .write_descriptor(&self.config.table_name, &descriptor)
897                        .await
898                    {
899                        warn!("Failed to write Protobuf descriptor to debug file: {}", e);
900                        // Don't fail the operation if descriptor writing fails
901                    } else {
902                        *written_guard = true;
903                    }
904                }
905            }
906        }
907
908        // 3. Convert Arrow RecordBatch to Protobuf bytes (one per row)
909        // This now returns ProtobufConversionResult with per-row conversion errors
910        let conversion_result =
911            crate::wrapper::conversion::record_batch_to_protobuf_bytes(&batch, &descriptor);
912
913        // Track conversion errors (will be merged with transmission errors later)
914        let conversion_errors = conversion_result.failed_rows;
915
916        // Write Protobuf bytes to debug file if Protobuf debug is enabled (only successful conversions)
917        // Flush after each batch to ensure files are immediately available for debugging
918        // CRITICAL: Write protobuf files BEFORE Zerobus write attempts, so we have them even if Zerobus fails
919        if self.config.debug_protobuf_enabled {
920            if let Some(ref debug_writer) = self.debug_writer {
921                info!(
922                    "Writing {} protobuf messages to debug file",
923                    conversion_result.successful_bytes.len()
924                );
925                let num_rows = conversion_result.successful_bytes.len();
926                for (idx, (_, bytes)) in conversion_result.successful_bytes.iter().enumerate() {
927                    // Flush immediately after last row in batch
928                    let flush_immediately = idx == num_rows - 1;
929                    if let Err(e) = debug_writer.write_protobuf(bytes, flush_immediately).await {
930                        warn!("Failed to write Protobuf debug file: {}", e);
931                        // Don't fail the operation if debug writing fails
932                    } else if flush_immediately {
933                        info!(
934                            "✅ Flushed protobuf debug file after batch ({} messages)",
935                            num_rows
936                        );
937                    }
938                }
939            } else {
940                warn!("⚠️  Debug writer is None - protobuf debug files will not be written. Check debug_protobuf_enabled and debug_output_dir config.");
941            }
942        }
943
944        // Check if writer is disabled - if so, skip all SDK calls and return success
945        // Performance: Operations complete in <50ms (excluding file I/O) when writer disabled
946        // This enables performance testing of conversion logic without network overhead
947        if self.config.zerobus_writer_disabled {
948            debug!(
949                "Writer disabled mode enabled - skipping Zerobus SDK calls. Debug files written successfully."
950            );
951            // Return success with conversion results tracked
952            // All successfully converted rows are considered successful when writer is disabled
953            let successful_indices: Vec<usize> = conversion_result
954                .successful_bytes
955                .iter()
956                .map(|(idx, _)| *idx)
957                .collect();
958            return Ok(BatchTransmissionResult {
959                successful_rows: successful_indices,
960                failed_rows: conversion_errors,
961            });
962        }
963
964        // Get SDK reference (lock is released, so we can lock again for stream creation)
965        // This is safe because we only reach here when writer is NOT disabled, so SDK was initialized above
966        let sdk_guard = self.sdk.lock().await;
967        let sdk = sdk_guard.as_ref().ok_or_else(|| {
968            ZerobusError::ConfigurationError(
969                "SDK not initialized - this should not happen".to_string(),
970            )
971        })?;
972
973        // 4. Ensure stream is created
974        // Expose secrets only when needed for API calls
975        let client_id = self
976            .config
977            .client_id
978            .as_ref()
979            .ok_or_else(|| ZerobusError::ConfigurationError("client_id is required".to_string()))?
980            .expose_secret()
981            .clone();
982        let client_secret = self
983            .config
984            .client_secret
985            .as_ref()
986            .ok_or_else(|| {
987                ZerobusError::ConfigurationError("client_secret is required".to_string())
988            })?
989            .expose_secret()
990            .clone();
991
992        // ========================================================================
993        // STEP 5: Check backoff conditions BEFORE attempting any writes
994        // ========================================================================
995        // CRITICAL: Check backoff BEFORE attempting writes, even if stream exists.
996        // This prevents writes during backoff period even if stream was created before
997        // backoff started. We check for:
998        // 1. Error 6006 backoff (server-initiated, pipeline blocked)
999        // 2. High failure rate backoff (client-initiated, >1% failure rate)
1000        //
1001        // Edge case: Backoff can start during batch processing, so we check again
1002        // before each record in the loop below.
1003        {
1004            use crate::wrapper::zerobus::{check_error_6006_backoff, check_failure_rate_backoff};
1005            check_error_6006_backoff(&self.config.table_name).await?;
1006            check_failure_rate_backoff(&self.config.table_name).await?;
1007        }
1008
1009        // ========================================================================
1010        // STEP 6: Write each row to Zerobus with stream recreation on failure
1011        // ========================================================================
1012        // This implements a retry loop that handles stream closure and recreation.
1013        //
1014        // Algorithm:
1015        // 1. Ensure stream exists (create if None)
1016        // 2. For each row in the batch:
1017        //    a. Check backoff again (backoff can start during batch processing)
1018        //    b. Re-acquire stream lock (stream may have been cleared)
1019        //    c. Recreate stream if it was cleared
1020        //    d. Send row to Zerobus
1021        //    e. Handle stream closure errors by clearing stream and retrying
1022        // 3. If all rows succeed, break
1023        // 4. If stream closed, retry up to MAX_STREAM_RECREATE_ATTEMPTS
1024        //
1025        // Edge cases handled:
1026        // - Stream closed immediately after creation (first record fails)
1027        //   → Indicates schema mismatch or validation error
1028        // - Stream closed mid-batch
1029        //   → Clear stream, recreate, and retry from failed row
1030        // - Backoff starts during batch processing
1031        //   → Clear stream, break loop, return error
1032        //
1033        // Performance considerations:
1034        // - Lock is released before async operations to avoid blocking
1035        // - Stream is only recreated when necessary (not for every row)
1036        // - Maximum retry attempts prevent infinite loops
1037        //
1038        // Thread safety:
1039        // - Uses async Mutex to prevent blocking the runtime
1040        // - Lock is held only when accessing/modifying stream
1041        // - Lock is released before network I/O operations
1042        let mut retry_count = 0;
1043        const MAX_STREAM_RECREATE_ATTEMPTS: u32 = 3;
1044
1045        // Track per-row transmission results across retries
1046        // These will be assigned from attempt_* variables after processing completes
1047        let mut transmission_errors: Vec<(usize, ZerobusError)> = Vec::new();
1048        let mut successful_indices: Vec<usize> = Vec::new();
1049
1050        loop {
1051            // Ensure stream exists and is valid
1052            let mut stream_guard = self.stream.lock().await;
1053            if stream_guard.is_none() {
1054                info!(
1055                    "Stream not found, creating new stream for table: {}",
1056                    self.config.table_name
1057                );
1058                let stream = crate::wrapper::zerobus::ensure_stream(
1059                    sdk,
1060                    self.config.table_name.clone(),
1061                    descriptor.clone(),
1062                    client_id.clone(),
1063                    client_secret.clone(),
1064                )
1065                .await?;
1066                *stream_guard = Some(stream);
1067                info!("✅ Stream created successfully");
1068            }
1069            // Verify stream exists before dropping lock
1070            if stream_guard.is_none() {
1071                return Err(ZerobusError::ConnectionError(
1072                    "Stream was None after creation - this should not happen".to_string(),
1073                ));
1074            }
1075            drop(stream_guard); // Release lock before sending data
1076
1077            // Try to send all successfully converted rows
1078            // Reset tracking for this retry attempt (but preserve across retries for final result)
1079            let mut attempt_transmission_errors: Vec<(usize, ZerobusError)> = Vec::new();
1080            let mut attempt_successful_indices: Vec<usize> = Vec::new();
1081            let mut all_succeeded = true;
1082            let mut failed_at_idx = 0;
1083
1084            // Batch futures for better throughput: collect futures and await in batches
1085            // This allows the SDK to queue multiple records before flushing, improving performance
1086            const BATCH_SIZE: usize = 1000; // Flush every 1000 records
1087            const BATCH_SIZE_BYTES: usize = 10 * 1024 * 1024; // Or every 10MB
1088                                                              // Store futures with their row indices - using a type-erased future
1089            type IngestFuture = std::pin::Pin<
1090                Box<
1091                    dyn std::future::Future<
1092                            Output = Result<i64, databricks_zerobus_ingest_sdk::ZerobusError>,
1093                        > + Send,
1094                >,
1095            >;
1096            let mut pending_futures: Vec<(usize, IngestFuture)> = Vec::new();
1097            let mut total_bytes_buffered = 0usize;
1098            let mut should_break_outer = false; // Track if we need to break outer retry loop
1099
1100            // Process only successfully converted rows
1101            for (original_row_idx, bytes) in conversion_result.successful_bytes.iter() {
1102                let idx = *original_row_idx;
1103                // ========================================================================
1104                // STEP 6a: Check backoff before each record
1105                // ========================================================================
1106                // Edge case: Backoff can start during batch processing (e.g., another thread
1107                // encountered error 6006 or high failure rate). We check before each record to prevent writes
1108                // during backoff period.
1109                {
1110                    use crate::wrapper::zerobus::{
1111                        check_error_6006_backoff, check_failure_rate_backoff,
1112                    };
1113                    if let Err(_backoff_err) =
1114                        check_error_6006_backoff(&self.config.table_name).await
1115                    {
1116                        // Backoff error: track per-row and break (backoff is batch-level concern)
1117                        // Clear stream so it gets recreated after backoff
1118                        let mut stream_guard = self.stream.lock().await;
1119                        *stream_guard = None;
1120                        drop(stream_guard);
1121                        // Backoff affects remaining rows, but we've processed up to idx
1122                        // Mark remaining rows as affected by backoff
1123                        for remaining_idx in idx..conversion_result.successful_bytes.len() {
1124                            if let Some((orig_idx, _)) =
1125                                conversion_result.successful_bytes.get(remaining_idx)
1126                            {
1127                                attempt_transmission_errors.push((
1128                                    *orig_idx,
1129                                    ZerobusError::ConnectionError(
1130                                        "Backoff period active - row processing stopped"
1131                                            .to_string(),
1132                                    ),
1133                                ));
1134                            }
1135                        }
1136                        all_succeeded = false;
1137                        failed_at_idx = idx;
1138                        break;
1139                    }
1140                    // Also check failure rate backoff
1141                    if let Err(_backoff_err) =
1142                        check_failure_rate_backoff(&self.config.table_name).await
1143                    {
1144                        // Backoff error: track per-row and break (backoff is batch-level concern)
1145                        // Clear stream so it gets recreated after backoff
1146                        let mut stream_guard = self.stream.lock().await;
1147                        *stream_guard = None;
1148                        drop(stream_guard);
1149                        // Backoff affects remaining rows, but we've processed up to idx
1150                        // Mark remaining rows as affected by backoff
1151                        for remaining_idx in idx..conversion_result.successful_bytes.len() {
1152                            if let Some((orig_idx, _)) =
1153                                conversion_result.successful_bytes.get(remaining_idx)
1154                            {
1155                                attempt_transmission_errors.push((
1156                                    *orig_idx,
1157                                    ZerobusError::ConnectionError(
1158                                        "High failure rate backoff active - row processing stopped"
1159                                            .to_string(),
1160                                    ),
1161                                ));
1162                            }
1163                        }
1164                        all_succeeded = false;
1165                        failed_at_idx = idx;
1166                        break;
1167                    }
1168                }
1169
1170                // ========================================================================
1171                // STEP 6b: Re-acquire stream lock and ensure stream exists
1172                // ========================================================================
1173                // We re-acquire the lock for each record because:
1174                // 1. Stream may have been cleared by error handling in previous iteration
1175                // 2. Lock was released before async operations to avoid blocking
1176                // 3. Multiple threads may be sending batches concurrently
1177                //
1178                // Performance: Lock is held only briefly, released before network I/O.
1179                let mut stream_guard = self.stream.lock().await;
1180                if stream_guard.is_none() {
1181                    // Stream was cleared (e.g., by error handling), recreate it
1182                    info!(
1183                        "Stream was cleared, recreating for table: {}",
1184                        self.config.table_name
1185                    );
1186                    let stream = crate::wrapper::zerobus::ensure_stream(
1187                        sdk,
1188                        self.config.table_name.clone(),
1189                        descriptor.clone(),
1190                        client_id.clone(),
1191                        client_secret.clone(),
1192                    )
1193                    .await?;
1194                    *stream_guard = Some(stream);
1195                }
1196                let stream = stream_guard.as_mut().ok_or_else(|| {
1197                    ZerobusError::ConnectionError(
1198                        "Stream was None after recreation - this should not happen".to_string(),
1199                    )
1200                })?;
1201
1202                // ========================================================================
1203                // STEP 6c: Send bytes to Zerobus stream (batched for performance)
1204                // ========================================================================
1205                // The Zerobus SDK's ingest_record returns a Future that resolves when acknowledged.
1206                // We collect futures and await them in batches for better throughput.
1207                //
1208                // Error handling:
1209                // - Stream closed errors: Clear stream, mark failure, break loop to retry
1210                // - Other errors: Track per-row and continue
1211                // - First record failures: Log detailed diagnostics for schema issues
1212                match stream.ingest_record(bytes.clone()).await {
1213                    Ok(ingest_future) => {
1214                        // Release lock before collecting future to avoid blocking
1215                        drop(stream_guard);
1216
1217                        // Collect future for batch processing
1218                        // Box the future to store in Vec (type erasure for different future types)
1219                        pending_futures.push((idx, Box::pin(ingest_future)));
1220                        total_bytes_buffered += bytes.len();
1221
1222                        // Periodically flush and await futures to manage memory and ensure progress
1223                        if pending_futures.len() >= BATCH_SIZE
1224                            || total_bytes_buffered >= BATCH_SIZE_BYTES
1225                        {
1226                            // Flush stream to send buffered records
1227                            {
1228                                let mut stream_guard = self.stream.lock().await;
1229                                if let Some(ref mut stream) = *stream_guard {
1230                                    if let Err(e) = stream.flush().await {
1231                                        error!(
1232                                            "Failed to flush Zerobus stream during batch: {}",
1233                                            e
1234                                        );
1235                                        // Mark all pending futures as failed
1236                                        for (pending_idx, _) in pending_futures.drain(..) {
1237                                            attempt_transmission_errors.push((
1238                                                pending_idx,
1239                                                ZerobusError::ConnectionError(format!(
1240                                                    "Flush failed during batch processing: {}",
1241                                                    e
1242                                                )),
1243                                            ));
1244                                        }
1245                                        all_succeeded = false;
1246                                        failed_at_idx = idx;
1247                                        break;
1248                                    }
1249                                }
1250                            }
1251
1252                            // Await all pending futures and track results
1253                            for (pending_idx, mut future) in pending_futures.drain(..) {
1254                                match future.as_mut().await {
1255                                    Ok(_ack_id) => {
1256                                        debug!(
1257                                            "✅ Successfully sent record to Zerobus stream (row {}, ack_id={})",
1258                                            pending_idx, _ack_id
1259                                        );
1260                                        attempt_successful_indices.push(pending_idx);
1261                                    }
1262                                    Err(e) => {
1263                                        let err_msg = format!("{}", e);
1264                                        // Check if stream is closed
1265                                        if err_msg.contains("Stream is closed")
1266                                            || err_msg.contains("Stream closed")
1267                                        {
1268                                            let is_first = pending_idx == 0;
1269                                            error!(
1270                                                "Stream closed: row={}, first_record={}, error={}",
1271                                                pending_idx, is_first, err_msg
1272                                            );
1273                                            if is_first {
1274                                                error!("Diagnostics: Stream closed during batch processing");
1275                                                error!("Possible causes:");
1276                                                error!("  1. Schema mismatch between descriptor and table");
1277                                                error!("  2. Validation error");
1278                                                error!("  3. Server-side issue");
1279                                            }
1280                                            // Clear stream and break to retry
1281                                            let mut stream_guard = self.stream.lock().await;
1282                                            *stream_guard = None;
1283                                            drop(stream_guard);
1284                                            attempt_transmission_errors.push((
1285                                                pending_idx,
1286                                                ZerobusError::ConnectionError(format!(
1287                                                    "Stream closed: row={}, error={}",
1288                                                    pending_idx, err_msg
1289                                                )),
1290                                            ));
1291                                            all_succeeded = false;
1292                                            failed_at_idx = pending_idx;
1293                                            break;
1294                                        } else {
1295                                            // Non-stream-closure errors
1296                                            attempt_transmission_errors.push((
1297                                                pending_idx,
1298                                                ZerobusError::TransmissionError(format!(
1299                                                    "Record ingestion failed: row={}, error={}",
1300                                                    pending_idx, e
1301                                                )),
1302                                            ));
1303                                            all_succeeded = false;
1304                                        }
1305                                    }
1306                                }
1307                            }
1308                            total_bytes_buffered = 0;
1309
1310                            // If we broke due to stream closure, mark for outer loop break
1311                            // But continue to process remaining pending futures below
1312                            if !all_succeeded && failed_at_idx > 0 {
1313                                should_break_outer = true;
1314                            }
1315                        }
1316                    }
1317                    Err(e) => {
1318                        let err_msg = format!("{}", e);
1319                        // Check if stream is closed (indicates server-side closure)
1320                        if err_msg.contains("Stream is closed") || err_msg.contains("Stream closed")
1321                        {
1322                            // Standardized error logging with context
1323                            let is_first = idx == 0;
1324                            error!(
1325                                "Stream closed: row={}, first_record={}, error={}",
1326                                idx, is_first, err_msg
1327                            );
1328                            if is_first {
1329                                // First record failure indicates schema/validation issues
1330                                error!("Diagnostics: This is the FIRST record - stream closed immediately");
1331                                error!("Possible causes:");
1332                                error!("  1. Schema mismatch between descriptor and table");
1333                                error!("  2. Validation error on first record");
1334                                error!("  3. Table schema not yet propagated");
1335                                error!(
1336                                    "Descriptor info: fields={}, nested_types={}",
1337                                    descriptor.field.len(),
1338                                    descriptor.nested_type.len()
1339                                );
1340                            }
1341                            // Stream closure error: track per-row and continue
1342                            // Clear stream so it gets recreated on next iteration
1343                            *stream_guard = None;
1344                            drop(stream_guard);
1345                            let stream_error = ZerobusError::ConnectionError(format!(
1346                                "Stream closed: row={}, error={}",
1347                                idx, err_msg
1348                            ));
1349                            attempt_transmission_errors.push((idx, stream_error));
1350                            all_succeeded = false;
1351                            failed_at_idx = idx;
1352                            // Mark for outer loop break, but continue to process pending futures
1353                            should_break_outer = true;
1354                            break;
1355                        } else {
1356                            // Non-stream-closure errors: track per-row and continue
1357                            let transmission_error = ZerobusError::ConnectionError(format!(
1358                                "Record creation failed: row={}, error={}",
1359                                idx, e
1360                            ));
1361                            attempt_transmission_errors.push((idx, transmission_error));
1362                            all_succeeded = false;
1363                            // Continue processing remaining rows instead of returning immediately
1364                        }
1365                    }
1366                }
1367            }
1368
1369            // CRITICAL: Flush and await any remaining pending futures before proceeding
1370            // This ensures all queued records are sent and acknowledged, even if we broke early
1371            if !pending_futures.is_empty() {
1372                // Always flush remaining records before awaiting acknowledgments
1373                // This ensures records are sent even if we broke early due to errors
1374                {
1375                    let mut stream_guard = self.stream.lock().await;
1376                    if let Some(ref mut stream) = *stream_guard {
1377                        // Attempt to flush - if stream is closed, this will fail but we still want to await futures
1378                        match stream.flush().await {
1379                            Ok(_) => {
1380                                debug!(
1381                                    "✅ Flushed Zerobus stream for {} remaining pending futures",
1382                                    pending_futures.len()
1383                                );
1384                            }
1385                            Err(e) => {
1386                                warn!("Failed to flush Zerobus stream for remaining records (stream may be closed): {}", e);
1387                                // Don't mark futures as failed yet - await them to get actual acknowledgment status
1388                                // The stream might be closed, but some records may have been sent before closure
1389                            }
1390                        }
1391                    } else {
1392                        warn!("Stream is None when trying to flush remaining records - records may be lost");
1393                        // Mark all pending futures as failed since we can't flush
1394                        for (pending_idx, _) in pending_futures.drain(..) {
1395                            attempt_transmission_errors.push((
1396                                pending_idx,
1397                                ZerobusError::ConnectionError(
1398                                    "Stream was closed before flushing remaining records"
1399                                        .to_string(),
1400                                ),
1401                            ));
1402                        }
1403                        all_succeeded = false;
1404                    }
1405                }
1406
1407                // CRITICAL: Always await all pending futures to get acknowledgment status
1408                // Even if stream is closed, we need to know which records succeeded/failed
1409                for (pending_idx, mut future) in pending_futures.drain(..) {
1410                    match future.as_mut().await {
1411                        Ok(_ack_id) => {
1412                            debug!(
1413                                "✅ Successfully acknowledged record (row {}, ack_id={})",
1414                                pending_idx, _ack_id
1415                            );
1416                            attempt_successful_indices.push(pending_idx);
1417                        }
1418                        Err(e) => {
1419                            let err_msg = format!("{}", e);
1420                            if err_msg.contains("Stream is closed")
1421                                || err_msg.contains("Stream closed")
1422                            {
1423                                // Stream was closed - clear it and mark as failed
1424                                let mut stream_guard = self.stream.lock().await;
1425                                *stream_guard = None;
1426                                drop(stream_guard);
1427                                attempt_transmission_errors.push((
1428                                    pending_idx,
1429                                    ZerobusError::ConnectionError(format!(
1430                                        "Stream closed before acknowledgment: row={}, error={}",
1431                                        pending_idx, err_msg
1432                                    )),
1433                                ));
1434                                all_succeeded = false;
1435                            } else {
1436                                // Other errors (network, timeout, etc.)
1437                                attempt_transmission_errors.push((
1438                                    pending_idx,
1439                                    ZerobusError::TransmissionError(format!(
1440                                        "Record acknowledgment failed: row={}, error={}",
1441                                        pending_idx, e
1442                                    )),
1443                                ));
1444                                all_succeeded = false;
1445                            }
1446                        }
1447                    }
1448                }
1449            }
1450
1451            // If we broke early due to stream closure, exit the retry loop
1452            if should_break_outer {
1453                break;
1454            }
1455
1456            // ========================================================================
1457            // STEP 6d: Handle retry logic
1458            // ========================================================================
1459            // If all rows succeeded, we're done. Otherwise, retry with stream recreation.
1460            // The retry mechanism handles transient stream closure issues.
1461            //
1462            // Edge case: If stream closes repeatedly, it may indicate:
1463            // - Schema mismatch (descriptor doesn't match table schema)
1464            // - Server-side validation errors
1465            // - Network issues causing stream closure
1466            //
1467            // Performance: Small delay (100ms) prevents tight retry loops.
1468            if all_succeeded {
1469                // All rows sent successfully - flush stream to ensure records are transmitted
1470                // CRITICAL: The SDK buffers records internally and requires flush() to send them
1471                {
1472                    let mut stream_guard = self.stream.lock().await;
1473                    if let Some(ref mut stream) = *stream_guard {
1474                        if let Err(e) = stream.flush().await {
1475                            error!("Failed to flush Zerobus stream after batch: {}", e);
1476                            // Don't fail the entire batch if flush fails - records may still be in transit
1477                            // But log the error for monitoring
1478                        } else {
1479                            debug!(
1480                                "✅ Flushed Zerobus stream after sending {} records",
1481                                attempt_successful_indices.len()
1482                            );
1483                        }
1484                    }
1485                }
1486                // Update final results with this attempt's results
1487                successful_indices = attempt_successful_indices;
1488                transmission_errors = attempt_transmission_errors;
1489                break;
1490            } else {
1491                // Some rows failed due to stream closure - retry with stream recreation
1492                retry_count += 1;
1493                if retry_count > MAX_STREAM_RECREATE_ATTEMPTS {
1494                    // Exhausted retry attempts - use what we have from this attempt
1495                    let mut final_transmission_errors = attempt_transmission_errors;
1496                    let final_successful_indices = attempt_successful_indices;
1497                    // Mark remaining rows as failed due to stream closure
1498                    for (idx, _) in conversion_result.successful_bytes.iter() {
1499                        if !final_successful_indices.contains(idx)
1500                            && !final_transmission_errors.iter().any(|(i, _)| i == idx)
1501                        {
1502                            final_transmission_errors.push((*idx, ZerobusError::ConnectionError(format!(
1503                                "Stream recreation exhausted: row={}, possible_causes='schema_mismatch,validation_error,server_issue'",
1504                                idx
1505                            ))));
1506                        }
1507                    }
1508                    successful_indices = final_successful_indices;
1509                    transmission_errors = final_transmission_errors;
1510                    break;
1511                }
1512                warn!(
1513                    "Stream recreation retry: attempt={}/{}, failed_at_row={}",
1514                    retry_count, MAX_STREAM_RECREATE_ATTEMPTS, failed_at_idx
1515                );
1516                // Small delay before retry to avoid tight retry loops
1517                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1518                // Reset attempt tracking for retry - will retry all remaining rows
1519                attempt_successful_indices.clear();
1520                attempt_transmission_errors.clear();
1521                // Note: all_succeeded will be set to true at start of next loop iteration
1522            }
1523        }
1524
1525        // Merge conversion errors with transmission errors
1526        let mut all_failed_rows = conversion_errors;
1527        all_failed_rows.extend(transmission_errors);
1528        Ok(BatchTransmissionResult {
1529            successful_rows: successful_indices,
1530            failed_rows: all_failed_rows,
1531        })
1532    }
1533
1534    /// Flush any pending operations and ensure data is transmitted
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns error if flush operation fails.
1539    pub async fn flush(&self) -> Result<(), ZerobusError> {
1540        // CRITICAL: Flush Zerobus stream to ensure buffered records are sent
1541        // The SDK buffers records internally and requires flush() to transmit them
1542        {
1543            let mut stream_guard = self.stream.lock().await;
1544            if let Some(ref mut stream) = *stream_guard {
1545                stream.flush().await.map_err(|e| {
1546                    ZerobusError::ConnectionError(format!("Failed to flush Zerobus stream: {}", e))
1547                })?;
1548                debug!("✅ Flushed Zerobus stream");
1549            }
1550        }
1551
1552        // Flush debug files if enabled
1553        if let Some(ref debug_writer) = self.debug_writer {
1554            if let Err(e) = debug_writer.flush().await {
1555                warn!("Failed to flush debug files: {}", e);
1556            }
1557        }
1558
1559        // Flush observability if enabled
1560        if let Some(ref obs) = self.observability {
1561            obs.flush().await?;
1562        }
1563
1564        Ok(())
1565    }
1566
1567    /// Shutdown the wrapper gracefully, closing connections and cleaning up resources
1568    ///
1569    /// # Errors
1570    ///
1571    /// Returns error if shutdown fails.
1572    pub async fn shutdown(&self) -> Result<(), ZerobusError> {
1573        info!("Shutting down ZerobusWrapper");
1574
1575        // Close stream if it exists
1576        let mut stream_guard = self.stream.lock().await;
1577        if let Some(mut stream) = stream_guard.take() {
1578            // Close the stream gracefully
1579            // ZerobusStream has a close() method that returns ZerobusResult
1580            if let Err(e) = stream.close().await {
1581                warn!("Error closing Zerobus stream: {}", e);
1582            } else {
1583                debug!("Stream closed successfully");
1584            }
1585        }
1586
1587        Ok(())
1588    }
1589}
1590
1591// Implement Clone for use in async closures
1592impl Clone for ZerobusWrapper {
1593    fn clone(&self) -> Self {
1594        Self {
1595            config: Arc::clone(&self.config),
1596            sdk: Arc::clone(&self.sdk),
1597            stream: Arc::clone(&self.stream),
1598            retry_config: self.retry_config.clone(),
1599            observability: self.observability.clone(),
1600            debug_writer: self.debug_writer.as_ref().map(Arc::clone),
1601            descriptor_written: Arc::clone(&self.descriptor_written),
1602        }
1603    }
1604}
1605
1606// ZerobusWrapper is automatically Send + Sync because all its fields are Send + Sync:
1607// - Arc<WrapperConfiguration>: Send + Sync (Arc is Send + Sync, WrapperConfiguration is Send + Sync)
1608// - Arc<Mutex<Option<ZerobusSdk>>>: Send + Sync (Arc and Mutex are Send + Sync)
1609// - Arc<Mutex<Option<ZerobusStream>>>: Send + Sync
1610// - RetryConfig: Send + Sync (contains only primitive types)
1611// - Option<ObservabilityManager>: Send + Sync (ObservabilityManager is Send + Sync)
1612// - Option<Arc<DebugWriter>>: Send + Sync
1613// - Arc<Mutex<bool>>: Send + Sync
1614// The compiler automatically derives Send + Sync for this struct, so explicit unsafe impl is not needed.