1use std::path::PathBuf;
8
9use tokio::task::JoinHandle;
10
11use crate::query::import::Compression;
12use crate::transport::HttpTransportClient;
13
14use super::ImportError;
15
16const CHUNK_SIZE: usize = 64 * 1024;
18
19#[derive(Debug, Clone)]
24pub struct ImportFileEntry {
25 pub address: String,
27 pub file_name: String,
29 pub public_key: Option<String>,
31}
32
33impl ImportFileEntry {
34 pub fn new(address: String, file_name: String, public_key: Option<String>) -> Self {
36 Self {
37 address,
38 file_name,
39 public_key,
40 }
41 }
42}
43
44pub struct ParallelTransportPool {
49 connections: Vec<HttpTransportClient>,
51 entries: Vec<ImportFileEntry>,
53}
54
55impl std::fmt::Debug for ParallelTransportPool {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("ParallelTransportPool")
58 .field("connection_count", &self.connections.len())
59 .field("entries", &self.entries)
60 .finish()
61 }
62}
63
64impl ParallelTransportPool {
65 pub async fn connect(
86 host: &str,
87 port: u16,
88 use_tls: bool,
89 file_count: usize,
90 ) -> Result<Self, ImportError> {
91 if file_count == 0 {
92 return Err(ImportError::InvalidConfig(
93 "file_count must be at least 1".to_string(),
94 ));
95 }
96
97 let mut connect_handles: Vec<JoinHandle<Result<HttpTransportClient, ImportError>>> =
99 Vec::with_capacity(file_count);
100
101 for _ in 0..file_count {
102 let host = host.to_string();
103 let handle = tokio::spawn(async move {
104 HttpTransportClient::connect(&host, port, use_tls)
105 .await
106 .map_err(|e| {
107 ImportError::HttpTransportError(format!("Failed to connect to Exasol: {e}"))
108 })
109 });
110 connect_handles.push(handle);
111 }
112
113 let mut connections = Vec::with_capacity(file_count);
115 let mut entries = Vec::with_capacity(file_count);
116
117 for (idx, handle) in connect_handles.into_iter().enumerate() {
118 let client = handle
119 .await
120 .map_err(|e| {
121 ImportError::ParallelImportError(format!(
122 "Connection task {} panicked: {e}",
123 idx
124 ))
125 })?
126 .map_err(|e| {
127 ImportError::ParallelImportError(format!("Connection {} failed: {e}", idx))
128 })?;
129
130 let file_name = format!("{:03}.csv", idx + 1);
132 let entry = ImportFileEntry::new(
133 client.internal_address().to_string(),
134 file_name,
135 client.public_key_fingerprint().map(String::from),
136 );
137
138 connections.push(client);
139 entries.push(entry);
140 }
141
142 Ok(Self {
143 connections,
144 entries,
145 })
146 }
147
148 #[must_use]
153 pub fn file_entries(&self) -> &[ImportFileEntry] {
154 &self.entries
155 }
156
157 #[must_use]
159 pub fn len(&self) -> usize {
160 self.connections.len()
161 }
162
163 #[must_use]
165 pub fn is_empty(&self) -> bool {
166 self.connections.is_empty()
167 }
168
169 #[must_use]
174 pub fn into_connections(self) -> Vec<HttpTransportClient> {
175 self.connections
176 }
177}
178
179pub async fn stream_files_parallel(
198 connections: Vec<HttpTransportClient>,
199 file_data: Vec<Vec<u8>>,
200 _compression: Compression,
201) -> Result<(), ImportError> {
202 if connections.len() != file_data.len() {
203 return Err(ImportError::InvalidConfig(format!(
204 "Connection count ({}) != file data count ({})",
205 connections.len(),
206 file_data.len()
207 )));
208 }
209
210 let mut stream_handles: Vec<JoinHandle<Result<(), ImportError>>> =
212 Vec::with_capacity(connections.len());
213
214 for (idx, (mut client, data)) in connections.into_iter().zip(file_data).enumerate() {
215 let handle = tokio::spawn(async move {
216 client.handle_import_request().await.map_err(|e| {
218 ImportError::ParallelImportError(format!(
219 "File {} failed to handle import request: {e}",
220 idx
221 ))
222 })?;
223
224 for chunk in data.chunks(CHUNK_SIZE) {
226 client.write_chunked_body(chunk).await.map_err(|e| {
227 ImportError::ParallelImportError(format!("File {} streaming failed: {e}", idx))
228 })?;
229 }
230
231 client.write_final_chunk().await.map_err(|e| {
233 ImportError::ParallelImportError(format!(
234 "File {} failed to send final chunk: {e}",
235 idx
236 ))
237 })?;
238
239 Ok(())
240 });
241
242 stream_handles.push(handle);
243 }
244
245 for (idx, handle) in stream_handles.into_iter().enumerate() {
247 handle
248 .await
249 .map_err(|e| {
250 ImportError::ParallelImportError(format!("Stream task {} panicked: {e}", idx))
251 })?
252 .map_err(|e| ImportError::ParallelImportError(format!("Stream {} failed: {e}", idx)))?;
253 }
254
255 Ok(())
256}
257
258pub async fn convert_parquet_files_to_csv(
279 paths: Vec<PathBuf>,
280 batch_size: usize,
281 null_value: String,
282 column_separator: char,
283 column_delimiter: char,
284) -> Result<Vec<Vec<u8>>, ImportError> {
285 use crate::import::parquet::{record_batch_to_csv, ParquetImportOptions};
286 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
287
288 let mut conversion_handles: Vec<JoinHandle<Result<Vec<u8>, ImportError>>> =
290 Vec::with_capacity(paths.len());
291
292 for (idx, path) in paths.into_iter().enumerate() {
293 let null_value = null_value.clone();
294 let handle = tokio::task::spawn_blocking(move || {
295 let file = std::fs::File::open(&path).map_err(|e| {
297 ImportError::ParallelImportError(format!(
298 "Failed to open Parquet file {}: {e}",
299 path.display()
300 ))
301 })?;
302
303 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
304 ImportError::ParallelImportError(format!(
305 "Failed to read Parquet file {}: {e}",
306 path.display()
307 ))
308 })?;
309
310 let reader = builder.with_batch_size(batch_size).build().map_err(|e| {
311 ImportError::ParallelImportError(format!(
312 "Failed to build Parquet reader for {}: {e}",
313 path.display()
314 ))
315 })?;
316
317 let options = ParquetImportOptions::default()
319 .with_null_value(&null_value)
320 .with_column_separator(column_separator)
321 .with_column_delimiter(column_delimiter);
322
323 let mut csv_data = Vec::new();
325 for batch_result in reader {
326 let batch = batch_result.map_err(|e| {
327 ImportError::ParallelImportError(format!(
328 "Failed to read batch from {}: {e}",
329 path.display()
330 ))
331 })?;
332
333 let csv_rows = record_batch_to_csv(&batch, &options).map_err(|e| {
334 ImportError::ParallelImportError(format!(
335 "Failed to convert batch to CSV from {}: {e}",
336 path.display()
337 ))
338 })?;
339
340 for row in csv_rows {
341 csv_data.extend_from_slice(row.as_bytes());
342 csv_data.push(b'\n');
343 }
344 }
345
346 Ok(csv_data)
347 });
348
349 let handle = tokio::spawn(async move {
351 handle.await.map_err(|e| {
352 ImportError::ParallelImportError(format!(
353 "Parquet conversion task {} panicked: {e}",
354 idx
355 ))
356 })?
357 });
358
359 conversion_handles.push(handle);
360 }
361
362 let mut results = Vec::with_capacity(conversion_handles.len());
364 for (idx, handle) in conversion_handles.into_iter().enumerate() {
365 let csv_data = handle
366 .await
367 .map_err(|e| {
368 ImportError::ParallelImportError(format!("Conversion task {} panicked: {e}", idx))
369 })?
370 .map_err(|e| {
371 ImportError::ParallelImportError(format!("Conversion {} failed: {e}", idx))
372 })?;
373 results.push(csv_data);
374 }
375
376 Ok(results)
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_import_file_entry_new() {
385 let entry = ImportFileEntry::new(
386 "10.0.0.5:8563".to_string(),
387 "001.csv".to_string(),
388 Some("sha256//abc123".to_string()),
389 );
390
391 assert_eq!(entry.address, "10.0.0.5:8563");
392 assert_eq!(entry.file_name, "001.csv");
393 assert_eq!(entry.public_key, Some("sha256//abc123".to_string()));
394 }
395
396 #[test]
397 fn test_import_file_entry_no_tls() {
398 let entry = ImportFileEntry::new("10.0.0.5:8563".to_string(), "002.csv".to_string(), None);
399
400 assert_eq!(entry.address, "10.0.0.5:8563");
401 assert_eq!(entry.file_name, "002.csv");
402 assert!(entry.public_key.is_none());
403 }
404
405 #[tokio::test]
406 async fn test_parallel_transport_pool_zero_count_error() {
407 let result = ParallelTransportPool::connect("localhost", 8563, false, 0).await;
408 assert!(result.is_err());
409 let err = result.unwrap_err();
410 assert!(matches!(err, ImportError::InvalidConfig(_)));
411 }
412
413 #[tokio::test]
414 async fn test_stream_files_parallel_mismatched_counts() {
415 let connections = vec![];
416 let file_data = vec![vec![1, 2, 3]];
417
418 let result = stream_files_parallel(connections, file_data, Compression::None).await;
419 assert!(result.is_err());
420 let err = result.unwrap_err();
421 assert!(matches!(err, ImportError::InvalidConfig(_)));
422 }
423
424 #[tokio::test]
425 async fn test_stream_files_parallel_empty() {
426 let connections = vec![];
427 let file_data = vec![];
428
429 let result = stream_files_parallel(connections, file_data, Compression::None).await;
430 assert!(result.is_ok());
431 }
432}