Skip to main content

exarrow_rs/import/
parallel.rs

1//! Parallel import infrastructure for multi-file operations.
2//!
3//! This module provides the `ParallelTransportPool` for managing multiple HTTP transport
4//! connections for parallel file imports, and utilities for streaming multiple files
5//! concurrently.
6
7use std::path::PathBuf;
8
9use tokio::task::JoinHandle;
10
11use crate::query::import::Compression;
12use crate::transport::HttpTransportClient;
13
14use super::ImportError;
15
16/// Default chunk size for HTTP chunked transfer encoding (64KB).
17const CHUNK_SIZE: usize = 64 * 1024;
18
19/// Entry describing a file for parallel import.
20///
21/// Each entry contains the address and file name needed to build
22/// the multi-FILE IMPORT SQL statement.
23#[derive(Debug, Clone)]
24pub struct ImportFileEntry {
25    /// Internal address from EXA handshake (format: "host:port")
26    pub address: String,
27    /// File name for this entry (e.g., "001.csv", "002.csv")
28    pub file_name: String,
29    /// Optional public key fingerprint for TLS
30    pub public_key: Option<String>,
31}
32
33impl ImportFileEntry {
34    /// Create a new import file entry.
35    pub fn new(address: String, file_name: String, public_key: Option<String>) -> Self {
36        Self {
37            address,
38            file_name,
39            public_key,
40        }
41    }
42}
43
44/// Pool of parallel HTTP transport connections for multi-file import.
45///
46/// This struct manages multiple HTTP connections, each performing the EXA
47/// tunneling handshake to obtain unique internal addresses for the IMPORT SQL.
48pub struct ParallelTransportPool {
49    /// HTTP transport clients (one per file)
50    connections: Vec<HttpTransportClient>,
51    /// File entries for SQL query building
52    entries: Vec<ImportFileEntry>,
53}
54
55impl std::fmt::Debug for ParallelTransportPool {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("ParallelTransportPool")
58            .field("connection_count", &self.connections.len())
59            .field("entries", &self.entries)
60            .finish()
61    }
62}
63
64impl ParallelTransportPool {
65    /// Establishes N parallel HTTP connections with EXA handshake.
66    ///
67    /// This method creates `file_count` HTTP transport connections in parallel,
68    /// each performing the EXA tunneling handshake to obtain unique internal addresses.
69    ///
70    /// # Arguments
71    ///
72    /// * `host` - The Exasol host to connect to
73    /// * `port` - The port to connect to
74    /// * `use_tls` - Whether to use TLS encryption
75    /// * `file_count` - Number of parallel connections to establish
76    ///
77    /// # Returns
78    ///
79    /// A `ParallelTransportPool` with established connections.
80    ///
81    /// # Errors
82    ///
83    /// Returns `ImportError::ParallelConnectionError` if any connection fails.
84    /// Uses fail-fast semantics - aborts all remaining connections on first failure.
85    pub async fn connect(
86        host: &str,
87        port: u16,
88        use_tls: bool,
89        file_count: usize,
90    ) -> Result<Self, ImportError> {
91        if file_count == 0 {
92            return Err(ImportError::InvalidConfig(
93                "file_count must be at least 1".to_string(),
94            ));
95        }
96
97        // Spawn connection tasks in parallel
98        let mut connect_handles: Vec<JoinHandle<Result<HttpTransportClient, ImportError>>> =
99            Vec::with_capacity(file_count);
100
101        for _ in 0..file_count {
102            let host = host.to_string();
103            let handle = tokio::spawn(async move {
104                HttpTransportClient::connect(&host, port, use_tls)
105                    .await
106                    .map_err(|e| {
107                        ImportError::HttpTransportError(format!("Failed to connect to Exasol: {e}"))
108                    })
109            });
110            connect_handles.push(handle);
111        }
112
113        // Collect results with fail-fast semantics
114        let mut connections = Vec::with_capacity(file_count);
115        let mut entries = Vec::with_capacity(file_count);
116
117        for (idx, handle) in connect_handles.into_iter().enumerate() {
118            let client = handle
119                .await
120                .map_err(|e| {
121                    ImportError::ParallelImportError(format!(
122                        "Connection task {} panicked: {e}",
123                        idx
124                    ))
125                })?
126                .map_err(|e| {
127                    ImportError::ParallelImportError(format!("Connection {} failed: {e}", idx))
128                })?;
129
130            // Generate file entry with unique file name
131            let file_name = format!("{:03}.csv", idx + 1);
132            let entry = ImportFileEntry::new(
133                client.internal_address().to_string(),
134                file_name,
135                client.public_key_fingerprint().map(String::from),
136            );
137
138            connections.push(client);
139            entries.push(entry);
140        }
141
142        Ok(Self {
143            connections,
144            entries,
145        })
146    }
147
148    /// Returns file entries for SQL query building.
149    ///
150    /// These entries contain the internal addresses and file names
151    /// needed to construct the multi-FILE IMPORT SQL statement.
152    #[must_use]
153    pub fn file_entries(&self) -> &[ImportFileEntry] {
154        &self.entries
155    }
156
157    /// Returns the number of connections in the pool.
158    #[must_use]
159    pub fn len(&self) -> usize {
160        self.connections.len()
161    }
162
163    /// Returns true if the pool has no connections.
164    #[must_use]
165    pub fn is_empty(&self) -> bool {
166        self.connections.is_empty()
167    }
168
169    /// Consumes pool and returns connections for streaming.
170    ///
171    /// This method takes ownership of the pool and returns the
172    /// individual connections for use in parallel streaming.
173    #[must_use]
174    pub fn into_connections(self) -> Vec<HttpTransportClient> {
175        self.connections
176    }
177}
178
179/// Streams multiple files through HTTP connections in parallel.
180///
181/// This function takes ownership of the connections and file data,
182/// streaming each file through its corresponding connection concurrently.
183///
184/// # Arguments
185///
186/// * `connections` - HTTP transport clients (one per file)
187/// * `file_data` - File data to stream (one Vec<u8> per file)
188/// * `compression` - Compression to apply (already applied to data)
189///
190/// # Returns
191///
192/// Ok(()) on success.
193///
194/// # Errors
195///
196/// Returns `ImportError` on first failure (fail-fast).
197pub async fn stream_files_parallel(
198    connections: Vec<HttpTransportClient>,
199    file_data: Vec<Vec<u8>>,
200    _compression: Compression,
201) -> Result<(), ImportError> {
202    if connections.len() != file_data.len() {
203        return Err(ImportError::InvalidConfig(format!(
204            "Connection count ({}) != file data count ({})",
205            connections.len(),
206            file_data.len()
207        )));
208    }
209
210    // Spawn streaming tasks for each connection/file pair
211    let mut stream_handles: Vec<JoinHandle<Result<(), ImportError>>> =
212        Vec::with_capacity(connections.len());
213
214    for (idx, (mut client, data)) in connections.into_iter().zip(file_data).enumerate() {
215        let handle = tokio::spawn(async move {
216            // Wait for HTTP GET from Exasol
217            client.handle_import_request().await.map_err(|e| {
218                ImportError::ParallelImportError(format!(
219                    "File {} failed to handle import request: {e}",
220                    idx
221                ))
222            })?;
223
224            // Stream data in chunks
225            for chunk in data.chunks(CHUNK_SIZE) {
226                client.write_chunked_body(chunk).await.map_err(|e| {
227                    ImportError::ParallelImportError(format!("File {} streaming failed: {e}", idx))
228                })?;
229            }
230
231            // Send final chunk
232            client.write_final_chunk().await.map_err(|e| {
233                ImportError::ParallelImportError(format!(
234                    "File {} failed to send final chunk: {e}",
235                    idx
236                ))
237            })?;
238
239            Ok(())
240        });
241
242        stream_handles.push(handle);
243    }
244
245    // Wait for all streams to complete with fail-fast
246    for (idx, handle) in stream_handles.into_iter().enumerate() {
247        handle
248            .await
249            .map_err(|e| {
250                ImportError::ParallelImportError(format!("Stream task {} panicked: {e}", idx))
251            })?
252            .map_err(|e| ImportError::ParallelImportError(format!("Stream {} failed: {e}", idx)))?;
253    }
254
255    Ok(())
256}
257
258/// Converts multiple Parquet files to CSV format in parallel.
259///
260/// This function spawns blocking tasks to convert each Parquet file
261/// to CSV format concurrently.
262///
263/// # Arguments
264///
265/// * `paths` - Paths to Parquet files
266/// * `batch_size` - Batch size for Parquet reader
267/// * `null_value` - String representation of NULL values
268/// * `column_separator` - CSV column separator
269/// * `column_delimiter` - CSV column delimiter
270///
271/// # Returns
272///
273/// Vector of CSV data (one Vec<u8> per file).
274///
275/// # Errors
276///
277/// Returns `ImportError` on first conversion failure (fail-fast).
278pub async fn convert_parquet_files_to_csv(
279    paths: Vec<PathBuf>,
280    batch_size: usize,
281    null_value: String,
282    column_separator: char,
283    column_delimiter: char,
284) -> Result<Vec<Vec<u8>>, ImportError> {
285    use crate::import::parquet::{record_batch_to_csv, ParquetImportOptions};
286    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
287
288    // Spawn blocking conversion tasks in parallel
289    let mut conversion_handles: Vec<JoinHandle<Result<Vec<u8>, ImportError>>> =
290        Vec::with_capacity(paths.len());
291
292    for (idx, path) in paths.into_iter().enumerate() {
293        let null_value = null_value.clone();
294        let handle = tokio::task::spawn_blocking(move || {
295            // Read Parquet file
296            let file = std::fs::File::open(&path).map_err(|e| {
297                ImportError::ParallelImportError(format!(
298                    "Failed to open Parquet file {}: {e}",
299                    path.display()
300                ))
301            })?;
302
303            let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
304                ImportError::ParallelImportError(format!(
305                    "Failed to read Parquet file {}: {e}",
306                    path.display()
307                ))
308            })?;
309
310            let reader = builder.with_batch_size(batch_size).build().map_err(|e| {
311                ImportError::ParallelImportError(format!(
312                    "Failed to build Parquet reader for {}: {e}",
313                    path.display()
314                ))
315            })?;
316
317            // Create options for CSV conversion
318            let options = ParquetImportOptions::default()
319                .with_null_value(&null_value)
320                .with_column_separator(column_separator)
321                .with_column_delimiter(column_delimiter);
322
323            // Convert all batches to CSV
324            let mut csv_data = Vec::new();
325            for batch_result in reader {
326                let batch = batch_result.map_err(|e| {
327                    ImportError::ParallelImportError(format!(
328                        "Failed to read batch from {}: {e}",
329                        path.display()
330                    ))
331                })?;
332
333                let csv_rows = record_batch_to_csv(&batch, &options).map_err(|e| {
334                    ImportError::ParallelImportError(format!(
335                        "Failed to convert batch to CSV from {}: {e}",
336                        path.display()
337                    ))
338                })?;
339
340                for row in csv_rows {
341                    csv_data.extend_from_slice(row.as_bytes());
342                    csv_data.push(b'\n');
343                }
344            }
345
346            Ok(csv_data)
347        });
348
349        // Store handle with index for error messages
350        let handle = tokio::spawn(async move {
351            handle.await.map_err(|e| {
352                ImportError::ParallelImportError(format!(
353                    "Parquet conversion task {} panicked: {e}",
354                    idx
355                ))
356            })?
357        });
358
359        conversion_handles.push(handle);
360    }
361
362    // Collect results with fail-fast semantics
363    let mut results = Vec::with_capacity(conversion_handles.len());
364    for (idx, handle) in conversion_handles.into_iter().enumerate() {
365        let csv_data = handle
366            .await
367            .map_err(|e| {
368                ImportError::ParallelImportError(format!("Conversion task {} panicked: {e}", idx))
369            })?
370            .map_err(|e| {
371                ImportError::ParallelImportError(format!("Conversion {} failed: {e}", idx))
372            })?;
373        results.push(csv_data);
374    }
375
376    Ok(results)
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_import_file_entry_new() {
385        let entry = ImportFileEntry::new(
386            "10.0.0.5:8563".to_string(),
387            "001.csv".to_string(),
388            Some("sha256//abc123".to_string()),
389        );
390
391        assert_eq!(entry.address, "10.0.0.5:8563");
392        assert_eq!(entry.file_name, "001.csv");
393        assert_eq!(entry.public_key, Some("sha256//abc123".to_string()));
394    }
395
396    #[test]
397    fn test_import_file_entry_no_tls() {
398        let entry = ImportFileEntry::new("10.0.0.5:8563".to_string(), "002.csv".to_string(), None);
399
400        assert_eq!(entry.address, "10.0.0.5:8563");
401        assert_eq!(entry.file_name, "002.csv");
402        assert!(entry.public_key.is_none());
403    }
404
405    #[tokio::test]
406    async fn test_parallel_transport_pool_zero_count_error() {
407        let result = ParallelTransportPool::connect("localhost", 8563, false, 0).await;
408        assert!(result.is_err());
409        let err = result.unwrap_err();
410        assert!(matches!(err, ImportError::InvalidConfig(_)));
411    }
412
413    #[tokio::test]
414    async fn test_stream_files_parallel_mismatched_counts() {
415        let connections = vec![];
416        let file_data = vec![vec![1, 2, 3]];
417
418        let result = stream_files_parallel(connections, file_data, Compression::None).await;
419        assert!(result.is_err());
420        let err = result.unwrap_err();
421        assert!(matches!(err, ImportError::InvalidConfig(_)));
422    }
423
424    #[tokio::test]
425    async fn test_stream_files_parallel_empty() {
426        let connections = vec![];
427        let file_data = vec![];
428
429        let result = stream_files_parallel(connections, file_data, Compression::None).await;
430        assert!(result.is_ok());
431    }
432}