exarrow_rs/adbc/connection.rs
1//! ADBC Connection implementation.
2//!
3//! This module provides the `Connection` type which represents an active
4//! database connection and provides methods for executing queries.
5//!
6//! # New in v2.0.0
7//!
8//! Connection now owns the transport directly and Statement is a pure data container.
9//! Execute statements via Connection methods: `execute_statement()`, `execute_prepared()`.
10
11use crate::adbc::Statement;
12use crate::connection::auth::AuthResponseData;
13use crate::connection::params::ConnectionParams;
14use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
15use crate::error::{ConnectionError, ExasolError, QueryError};
16use crate::query::prepared::PreparedStatement;
17use crate::query::results::ResultSet;
18use crate::transport::protocol::{
19 ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
20 QueryResult, TransportProtocol,
21};
22use crate::transport::WebSocketTransport;
23use arrow::array::RecordBatch;
24use std::sync::{Arc, OnceLock};
25use std::time::Duration;
26use tokio::runtime::Runtime;
27use tokio::sync::Mutex;
28use tokio::time::timeout;
29
30/// Session is a type alias for Connection.
31///
32/// This alias provides a more intuitive name for database sessions when
33/// performing import/export operations. Both `Session` and `Connection`
34/// can be used interchangeably.
35///
36/// # Example
37///
38pub type Session = Connection;
39
40fn blocking_runtime() -> &'static Runtime {
41 static RUNTIME: OnceLock<Runtime> = OnceLock::new();
42 RUNTIME.get_or_init(|| {
43 tokio::runtime::Builder::new_current_thread()
44 .enable_all()
45 .build()
46 .expect("Failed to create tokio runtime for blocking operations")
47 })
48}
49
50/// ADBC Connection to an Exasol database.
51///
52/// The `Connection` type represents an active database connection and provides
53/// methods for executing queries, managing transactions, and retrieving metadata.
54///
55/// # v2.0.0 Breaking Changes
56///
57/// - Connection now owns the transport directly
58/// - `create_statement()` is now synchronous and returns a pure data container
59/// - Use `execute_statement()` instead of `Statement::execute()`
60/// - Use `prepare()` instead of `Statement::prepare()`
61///
62/// # Example
63///
64pub struct Connection {
65 /// Transport layer for communication (owned by Connection)
66 transport: Arc<Mutex<dyn TransportProtocol>>,
67 /// Session information
68 session: SessionInfo,
69 /// Connection parameters
70 params: ConnectionParams,
71}
72
73impl Connection {
74 /// Create a connection from connection parameters.
75 ///
76 /// This establishes a connection to the Exasol database using WebSocket transport,
77 /// authenticates the user, and creates a session.
78 ///
79 /// # Arguments
80 ///
81 /// * `params` - Connection parameters
82 ///
83 /// # Returns
84 ///
85 /// A connected `Connection` instance.
86 ///
87 /// # Errors
88 ///
89 /// Returns `ConnectionError` if the connection or authentication fails.
90 pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
91 // Create transport
92 let mut transport = WebSocketTransport::new();
93
94 // Convert ConnectionParams to TransportConnectionParams
95 let transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
96 .with_tls(params.use_tls)
97 .with_validate_server_certificate(params.validate_server_certificate)
98 .with_timeout(params.connection_timeout.as_millis() as u64);
99
100 // Connect
101 transport.connect(&transport_params).await.map_err(|e| {
102 ConnectionError::ConnectionFailed {
103 host: params.host.clone(),
104 port: params.port,
105 message: e.to_string(),
106 }
107 })?;
108
109 // Authenticate
110 let credentials =
111 TransportCredentials::new(params.username.clone(), params.password().to_string());
112 let session_info = transport
113 .authenticate(&credentials)
114 .await
115 .map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
116
117 // Create session config from connection params
118 let session_config = SessionConfig {
119 idle_timeout: params.idle_timeout,
120 query_timeout: params.query_timeout,
121 ..Default::default()
122 };
123
124 // Extract session_id once to avoid double clone
125 let session_id = session_info.session_id.clone();
126
127 // Convert SessionInfo to AuthResponseData
128 let auth_response = AuthResponseData {
129 session_id: session_id.clone(),
130 protocol_version: session_info.protocol_version,
131 release_version: session_info.release_version,
132 database_name: session_info.database_name,
133 product_name: session_info.product_name,
134 max_data_message_size: session_info.max_data_message_size,
135 max_identifier_length: 128, // Default value
136 max_varchar_length: 2_000_000, // Default value
137 identifier_quote_string: "\"".to_string(),
138 time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
139 time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
140 };
141
142 // Create session (owned, not Arc)
143 let session = SessionInfo::new(session_id, auth_response, session_config);
144
145 // Set schema if specified
146 if let Some(schema) = ¶ms.schema {
147 session.set_current_schema(Some(schema.clone())).await;
148 }
149
150 Ok(Self {
151 transport: Arc::new(Mutex::new(transport)),
152 session,
153 params,
154 })
155 }
156
157 /// Create a builder for constructing a connection.
158 ///
159 /// # Returns
160 ///
161 /// A `ConnectionBuilder` instance.
162 ///
163 /// # Example
164 ///
165 pub fn builder() -> ConnectionBuilder {
166 ConnectionBuilder::new()
167 }
168
169 // ========================================================================
170 // Statement Creation (synchronous - Statement is now a pure data container)
171 // ========================================================================
172
173 /// Create a new statement for executing SQL.
174 ///
175 /// Statement is now a pure data container. Use `execute_statement()` to execute it.
176 ///
177 /// # Arguments
178 ///
179 /// * `sql` - SQL query text
180 ///
181 /// # Returns
182 ///
183 /// A `Statement` instance ready for execution via `execute_statement()`.
184 ///
185 /// # Example
186 ///
187 pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
188 Statement::new(sql)
189 }
190
191 // ========================================================================
192 // Statement Execution Methods
193 // ========================================================================
194
195 /// Execute a statement and return results.
196 ///
197 /// # Arguments
198 ///
199 /// * `stmt` - Statement to execute
200 ///
201 /// # Returns
202 ///
203 /// A `ResultSet` containing the query results.
204 ///
205 /// # Errors
206 ///
207 /// Returns `QueryError` if execution fails or times out.
208 ///
209 /// # Example
210 ///
211 pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
212 // Validate session state
213 self.session
214 .validate_ready()
215 .await
216 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
217
218 // Update session state
219 self.session.set_state(SessionState::Executing).await;
220
221 // Increment query counter
222 self.session.increment_query_count();
223
224 // Build final SQL with parameters
225 let final_sql = stmt.build_sql()?;
226
227 // Execute with timeout
228 let timeout_duration = Duration::from_millis(stmt.timeout_ms());
229 let transport = Arc::clone(&self.transport);
230
231 let result = timeout(timeout_duration, async move {
232 let mut transport_guard = transport.lock().await;
233 transport_guard.execute_query(&final_sql).await
234 })
235 .await
236 .map_err(|_| QueryError::Timeout {
237 timeout_ms: stmt.timeout_ms(),
238 })?
239 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
240
241 // Update session state back to ready/in_transaction
242 self.update_session_state_after_query().await;
243
244 // Convert transport result to ResultSet
245 ResultSet::from_transport_result(result, Arc::clone(&self.transport))
246 }
247
248 /// Execute a statement and return the row count (for non-SELECT statements).
249 ///
250 /// # Arguments
251 ///
252 /// * `stmt` - Statement to execute
253 ///
254 /// # Returns
255 ///
256 /// The number of rows affected.
257 ///
258 /// # Errors
259 ///
260 /// Returns `QueryError` if execution fails or if statement is a SELECT.
261 ///
262 /// # Example
263 ///
264 pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
265 let result_set = self.execute_statement(stmt).await?;
266
267 result_set.row_count().ok_or_else(|| {
268 QueryError::NoResultSet("Expected row count, got result set".to_string())
269 })
270 }
271
272 // ========================================================================
273 // Prepared Statement Methods
274 // ========================================================================
275
276 /// Create a prepared statement for parameterized query execution.
277 ///
278 /// This creates a server-side prepared statement that can be executed
279 /// multiple times with different parameter values.
280 ///
281 /// # Arguments
282 ///
283 /// * `sql` - SQL statement with parameter placeholders (?)
284 ///
285 /// # Returns
286 ///
287 /// A `PreparedStatement` ready for parameter binding and execution.
288 ///
289 /// # Errors
290 ///
291 /// Returns `QueryError` if preparation fails.
292 ///
293 /// # Example
294 ///
295 pub async fn prepare(
296 &mut self,
297 sql: impl Into<String>,
298 ) -> Result<PreparedStatement, QueryError> {
299 let sql = sql.into();
300
301 // Validate session state
302 self.session
303 .validate_ready()
304 .await
305 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
306
307 let mut transport = self.transport.lock().await;
308 let handle = transport
309 .create_prepared_statement(&sql)
310 .await
311 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
312
313 Ok(PreparedStatement::new(handle))
314 }
315
316 /// Execute a prepared statement and return results.
317 ///
318 /// # Arguments
319 ///
320 /// * `stmt` - Prepared statement to execute
321 ///
322 /// # Returns
323 ///
324 /// A `ResultSet` containing the query results.
325 ///
326 /// # Errors
327 ///
328 /// Returns `QueryError` if execution fails.
329 ///
330 /// # Example
331 ///
332 pub async fn execute_prepared(
333 &mut self,
334 stmt: &PreparedStatement,
335 ) -> Result<ResultSet, QueryError> {
336 if stmt.is_closed() {
337 return Err(QueryError::StatementClosed);
338 }
339
340 // Validate session state
341 self.session
342 .validate_ready()
343 .await
344 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
345
346 // Update session state
347 self.session.set_state(SessionState::Executing).await;
348
349 // Increment query counter
350 self.session.increment_query_count();
351
352 // Convert parameters to column-major JSON format
353 let params_data = stmt.build_parameters_data()?;
354
355 let mut transport = self.transport.lock().await;
356 let result = transport
357 .execute_prepared_statement(stmt.handle_ref(), params_data)
358 .await
359 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
360
361 drop(transport);
362
363 // Update session state back to ready/in_transaction
364 self.update_session_state_after_query().await;
365
366 ResultSet::from_transport_result(result, Arc::clone(&self.transport))
367 }
368
369 /// Execute a prepared statement and return the number of affected rows.
370 ///
371 /// Use this for INSERT, UPDATE, DELETE statements.
372 ///
373 /// # Arguments
374 ///
375 /// * `stmt` - Prepared statement to execute
376 ///
377 /// # Returns
378 ///
379 /// The number of rows affected.
380 ///
381 /// # Errors
382 ///
383 /// Returns `QueryError` if execution fails or statement returns a result set.
384 pub async fn execute_prepared_update(
385 &mut self,
386 stmt: &PreparedStatement,
387 ) -> Result<i64, QueryError> {
388 if stmt.is_closed() {
389 return Err(QueryError::StatementClosed);
390 }
391
392 // Validate session state
393 self.session
394 .validate_ready()
395 .await
396 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
397
398 // Update session state
399 self.session.set_state(SessionState::Executing).await;
400
401 // Convert parameters to column-major JSON format
402 let params_data = stmt.build_parameters_data()?;
403
404 let mut transport = self.transport.lock().await;
405 let result = transport
406 .execute_prepared_statement(stmt.handle_ref(), params_data)
407 .await
408 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
409
410 drop(transport);
411
412 // Update session state back to ready/in_transaction
413 self.update_session_state_after_query().await;
414
415 match result {
416 QueryResult::RowCount { count } => Ok(count),
417 QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
418 }
419 }
420
421 /// Close a prepared statement and release server-side resources.
422 ///
423 /// # Arguments
424 ///
425 /// * `stmt` - Prepared statement to close (consumed)
426 ///
427 /// # Errors
428 ///
429 /// Returns `QueryError` if closing fails.
430 ///
431 /// # Example
432 ///
433 pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
434 if stmt.is_closed() {
435 return Ok(());
436 }
437
438 let mut transport = self.transport.lock().await;
439 transport
440 .close_prepared_statement(stmt.handle_ref())
441 .await
442 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
443
444 stmt.mark_closed();
445 Ok(())
446 }
447
448 // ========================================================================
449 // Convenience Methods (these internally use execute_statement)
450 // ========================================================================
451
452 /// Execute a SQL query and return results.
453 ///
454 /// This is a convenience method that creates a statement and executes it.
455 ///
456 /// # Arguments
457 ///
458 /// * `sql` - SQL query text
459 ///
460 /// # Returns
461 ///
462 /// A `ResultSet` containing the query results.
463 ///
464 /// # Errors
465 ///
466 /// Returns `QueryError` if execution fails.
467 ///
468 /// # Example
469 ///
470 pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
471 let stmt = self.create_statement(sql);
472 self.execute_statement(&stmt).await
473 }
474
475 /// Execute a SQL query and return all results as RecordBatches.
476 ///
477 /// This is a convenience method that fetches all results into memory.
478 ///
479 /// # Arguments
480 ///
481 /// * `sql` - SQL query text
482 ///
483 /// # Returns
484 ///
485 /// A vector of `RecordBatch` instances.
486 ///
487 /// # Errors
488 ///
489 /// Returns `QueryError` if execution fails.
490 ///
491 /// # Example
492 ///
493 pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
494 let result_set = self.execute(sql).await?;
495 result_set.fetch_all().await
496 }
497
498 /// Execute a non-SELECT statement and return the row count.
499 ///
500 /// # Arguments
501 ///
502 /// * `sql` - SQL statement (INSERT, UPDATE, DELETE, etc.)
503 ///
504 /// # Returns
505 ///
506 /// The number of rows affected.
507 ///
508 /// # Errors
509 ///
510 /// Returns `QueryError` if execution fails.
511 ///
512 /// # Example
513 ///
514 pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
515 let stmt = self.create_statement(sql);
516 self.execute_statement_update(&stmt).await
517 }
518
519 // ========================================================================
520 // Transaction Methods
521 // ========================================================================
522
523 pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
524 // Disable autocommit on the server so statements don't auto-commit
525 self.transport
526 .lock()
527 .await
528 .set_autocommit(false)
529 .await
530 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
531
532 self.session
533 .begin_transaction()
534 .await
535 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
536
537 Ok(())
538 }
539
540 pub async fn commit(&mut self) -> Result<(), QueryError> {
541 if !self.in_transaction() {
542 return Ok(());
543 }
544
545 self.execute_update("COMMIT").await?;
546
547 self.session
548 .commit_transaction()
549 .await
550 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
551
552 Ok(())
553 }
554
555 pub async fn rollback(&mut self) -> Result<(), QueryError> {
556 if !self.in_transaction() {
557 return Ok(());
558 }
559
560 self.execute_update("ROLLBACK").await?;
561
562 self.session
563 .rollback_transaction()
564 .await
565 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
566
567 Ok(())
568 }
569
570 /// Check if a transaction is currently active.
571 ///
572 /// # Returns
573 ///
574 /// `true` if a transaction is active, `false` otherwise.
575 pub fn in_transaction(&self) -> bool {
576 self.session.in_transaction()
577 }
578
579 // ========================================================================
580 // Session and Schema Methods
581 // ========================================================================
582
583 /// Get the current schema.
584 ///
585 /// # Returns
586 ///
587 /// The current schema name, or `None` if no schema is set.
588 pub async fn current_schema(&self) -> Option<String> {
589 self.session.current_schema().await
590 }
591
592 /// Set the current schema.
593 ///
594 /// # Arguments
595 ///
596 /// * `schema` - The schema name to set
597 ///
598 /// # Errors
599 ///
600 /// Returns `QueryError` if the operation fails.
601 ///
602 /// # Example
603 ///
604 pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
605 let schema_name = schema.into();
606 self.execute_update(format!("OPEN SCHEMA {}", schema_name))
607 .await?;
608 self.session.set_current_schema(Some(schema_name)).await;
609 Ok(())
610 }
611
612 // ========================================================================
613 // Metadata Methods
614 // ========================================================================
615
616 /// Get metadata about catalogs.
617 ///
618 /// # Returns
619 ///
620 /// A `ResultSet` containing catalog metadata.
621 ///
622 /// # Errors
623 ///
624 /// Returns `QueryError` if the operation fails.
625 pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
626 self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
627 .await
628 }
629
630 /// Get metadata about schemas.
631 ///
632 /// # Arguments
633 ///
634 /// * `catalog` - Optional catalog name filter
635 ///
636 /// # Returns
637 ///
638 /// A `ResultSet` containing schema metadata.
639 ///
640 /// # Errors
641 ///
642 /// Returns `QueryError` if the operation fails.
643 pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
644 let sql = if let Some(cat) = catalog {
645 format!(
646 "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
647 cat.replace('\'', "''")
648 )
649 } else {
650 "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
651 };
652 self.execute(sql).await
653 }
654
655 /// Get metadata about tables.
656 ///
657 /// # Arguments
658 ///
659 /// * `catalog` - Optional catalog name filter
660 /// * `schema` - Optional schema name filter
661 /// * `table` - Optional table name filter
662 ///
663 /// # Returns
664 ///
665 /// A `ResultSet` containing table metadata.
666 ///
667 /// # Errors
668 ///
669 /// Returns `QueryError` if the operation fails.
670 pub async fn get_tables(
671 &mut self,
672 catalog: Option<&str>,
673 schema: Option<&str>,
674 table: Option<&str>,
675 ) -> Result<ResultSet, QueryError> {
676 let mut conditions = vec!["OBJECT_TYPE IN ('TABLE', 'VIEW')".to_string()];
677
678 // Exasol has no catalogs — ignore the catalog parameter
679 let _ = catalog;
680 if let Some(sch) = schema {
681 conditions.push(format!("ROOT_NAME = '{}'", sch.replace('\'', "''")));
682 }
683 if let Some(tbl) = table {
684 conditions.push(format!("OBJECT_NAME = '{}'", tbl.replace('\'', "''")));
685 }
686
687 let where_clause = format!("WHERE {}", conditions.join(" AND "));
688
689 let sql = format!(
690 "SELECT ROOT_NAME AS TABLE_SCHEMA, OBJECT_NAME AS TABLE_NAME, OBJECT_TYPE AS TABLE_TYPE FROM SYS.EXA_ALL_OBJECTS {} ORDER BY ROOT_NAME, OBJECT_NAME",
691 where_clause
692 );
693
694 self.execute(sql).await
695 }
696
697 /// Get metadata about columns.
698 ///
699 /// # Arguments
700 ///
701 /// * `catalog` - Optional catalog name filter
702 /// * `schema` - Optional schema name filter
703 /// * `table` - Optional table name filter
704 /// * `column` - Optional column name filter
705 ///
706 /// # Returns
707 ///
708 /// A `ResultSet` containing column metadata.
709 ///
710 /// # Errors
711 ///
712 /// Returns `QueryError` if the operation fails.
713 pub async fn get_columns(
714 &mut self,
715 catalog: Option<&str>,
716 schema: Option<&str>,
717 table: Option<&str>,
718 column: Option<&str>,
719 ) -> Result<ResultSet, QueryError> {
720 let mut conditions = Vec::new();
721
722 if let Some(cat) = catalog {
723 conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
724 }
725 if let Some(sch) = schema {
726 conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
727 }
728 if let Some(tbl) = table {
729 conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
730 }
731 if let Some(col) = column {
732 conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
733 }
734
735 let where_clause = if conditions.is_empty() {
736 String::new()
737 } else {
738 format!("WHERE {}", conditions.join(" AND "))
739 };
740
741 let sql = format!(
742 "SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
743 FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
744 where_clause
745 );
746
747 self.execute(sql).await
748 }
749
750 // ========================================================================
751 // Session Information Methods
752 // ========================================================================
753
754 /// Get session information.
755 ///
756 /// # Returns
757 ///
758 /// The session ID.
759 pub fn session_id(&self) -> &str {
760 self.session.session_id()
761 }
762
763 /// Get connection parameters.
764 ///
765 /// # Returns
766 ///
767 /// A reference to the connection parameters.
768 pub fn params(&self) -> &ConnectionParams {
769 &self.params
770 }
771
772 /// Check if the connection is closed.
773 ///
774 /// # Returns
775 ///
776 /// `true` if the connection is closed, `false` otherwise.
777 pub async fn is_closed(&self) -> bool {
778 self.session.is_closed().await
779 }
780
781 /// Close the connection.
782 ///
783 /// This closes the session and transport layer.
784 ///
785 /// # Errors
786 ///
787 /// Returns `ConnectionError` if closing fails.
788 ///
789 /// # Example
790 ///
791 pub async fn close(self) -> Result<(), ConnectionError> {
792 // Close session
793 self.session.close().await?;
794
795 // Close transport
796 let mut transport = self.transport.lock().await;
797 transport
798 .close()
799 .await
800 .map_err(|e| ConnectionError::ConnectionFailed {
801 host: self.params.host.clone(),
802 port: self.params.port,
803 message: e.to_string(),
804 })?;
805
806 Ok(())
807 }
808
809 /// Shut down the connection without consuming self.
810 ///
811 /// Unlike `close()`, this can be called through a shared reference,
812 /// making it suitable for use in `Drop` implementations where ownership
813 /// transfer is not possible (e.g., when the connection is behind `Arc<Mutex<>>`).
814 pub async fn shutdown(&self) -> Result<(), ConnectionError> {
815 self.session.close().await?;
816
817 let mut transport = self.transport.lock().await;
818 transport
819 .close()
820 .await
821 .map_err(|e| ConnectionError::ConnectionFailed {
822 host: self.params.host.clone(),
823 port: self.params.port,
824 message: e.to_string(),
825 })?;
826
827 Ok(())
828 }
829
830 // ========================================================================
831 // Import/Export Methods
832 // ========================================================================
833
834 /// Creates an SQL executor closure for import/export operations.
835 ///
836 /// This is a helper method that creates a closure which can execute SQL
837 /// statements and return the row count. The closure captures a cloned
838 /// reference to the transport, allowing it to be called multiple times
839 /// (e.g., for parallel file imports).
840 ///
841 /// # Returns
842 ///
843 /// A closure that takes an SQL string and returns a Future resolving to
844 /// either the affected row count or an error string.
845 fn make_sql_executor(
846 &self,
847 ) -> impl Fn(
848 String,
849 )
850 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
851 let transport = Arc::clone(&self.transport);
852 move |sql: String| {
853 let transport = Arc::clone(&transport);
854 Box::pin(async move {
855 let mut transport_guard = transport.lock().await;
856 match transport_guard.execute_query(&sql).await {
857 Ok(QueryResult::RowCount { count }) => Ok(count as u64),
858 Ok(QueryResult::ResultSet { .. }) => Ok(0),
859 Err(e) => Err(e.to_string()),
860 }
861 })
862 }
863 }
864
865 /// Import CSV data from a file into an Exasol table.
866 ///
867 /// This method reads CSV data from the specified file and imports it into
868 /// the target table using Exasol's HTTP transport layer.
869 ///
870 /// # Arguments
871 ///
872 /// * `table` - Name of the target table
873 /// * `file_path` - Path to the CSV file
874 /// * `options` - Import options
875 ///
876 /// # Returns
877 ///
878 /// The number of rows imported on success.
879 ///
880 /// # Errors
881 ///
882 /// Returns `ImportError` if the import fails.
883 ///
884 /// # Example
885 ///
886 pub async fn import_csv_from_file(
887 &mut self,
888 table: &str,
889 file_path: &std::path::Path,
890 options: crate::import::csv::CsvImportOptions,
891 ) -> Result<u64, crate::import::ImportError> {
892 // Pass Exasol host/port from connection params to import options
893 let options = options
894 .exasol_host(&self.params.host)
895 .exasol_port(self.params.port);
896
897 crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
898 .await
899 }
900
901 /// Import CSV data from an async reader into an Exasol table.
902 ///
903 /// This method reads CSV data from an async reader and imports it into
904 /// the target table using Exasol's HTTP transport layer.
905 ///
906 /// # Arguments
907 ///
908 /// * `table` - Name of the target table
909 /// * `reader` - Async reader providing CSV data
910 /// * `options` - Import options
911 ///
912 /// # Returns
913 ///
914 /// The number of rows imported on success.
915 ///
916 /// # Errors
917 ///
918 /// Returns `ImportError` if the import fails.
919 pub async fn import_csv_from_stream<R>(
920 &mut self,
921 table: &str,
922 reader: R,
923 options: crate::import::csv::CsvImportOptions,
924 ) -> Result<u64, crate::import::ImportError>
925 where
926 R: tokio::io::AsyncRead + Unpin + Send + 'static,
927 {
928 // Pass Exasol host/port from connection params to import options
929 let options = options
930 .exasol_host(&self.params.host)
931 .exasol_port(self.params.port);
932
933 crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
934 .await
935 }
936
937 /// Import CSV data from an iterator into an Exasol table.
938 ///
939 /// This method converts iterator rows to CSV format and imports them into
940 /// the target table using Exasol's HTTP transport layer.
941 ///
942 /// # Arguments
943 ///
944 /// * `table` - Name of the target table
945 /// * `rows` - Iterator of rows, where each row is an iterator of field values
946 /// * `options` - Import options
947 ///
948 /// # Returns
949 ///
950 /// The number of rows imported on success.
951 ///
952 /// # Errors
953 ///
954 /// Returns `ImportError` if the import fails.
955 ///
956 /// # Example
957 ///
958 pub async fn import_csv_from_iter<I, T, S>(
959 &mut self,
960 table: &str,
961 rows: I,
962 options: crate::import::csv::CsvImportOptions,
963 ) -> Result<u64, crate::import::ImportError>
964 where
965 I: IntoIterator<Item = T> + Send + 'static,
966 T: IntoIterator<Item = S> + Send,
967 S: AsRef<str>,
968 {
969 // Pass Exasol host/port from connection params to import options
970 let options = options
971 .exasol_host(&self.params.host)
972 .exasol_port(self.params.port);
973
974 crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
975 }
976
977 /// Export data from an Exasol table or query to a CSV file.
978 ///
979 /// This method exports data from the specified source to a CSV file
980 /// using Exasol's HTTP transport layer.
981 ///
982 /// # Arguments
983 ///
984 /// * `source` - The data source (table or query)
985 /// * `file_path` - Path to the output file
986 /// * `options` - Export options
987 ///
988 /// # Returns
989 ///
990 /// The number of rows exported on success.
991 ///
992 /// # Errors
993 ///
994 /// Returns `ExportError` if the export fails.
995 ///
996 /// # Example
997 ///
998 pub async fn export_csv_to_file(
999 &mut self,
1000 source: crate::query::export::ExportSource,
1001 file_path: &std::path::Path,
1002 options: crate::export::csv::CsvExportOptions,
1003 ) -> Result<u64, crate::export::csv::ExportError> {
1004 // Pass Exasol host/port from connection params to export options
1005 let options = options
1006 .exasol_host(&self.params.host)
1007 .exasol_port(self.params.port);
1008
1009 let mut transport_guard = self.transport.lock().await;
1010 crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
1011 }
1012
1013 /// Export data from an Exasol table or query to an async writer.
1014 ///
1015 /// This method exports data from the specified source to an async writer
1016 /// using Exasol's HTTP transport layer.
1017 ///
1018 /// # Arguments
1019 ///
1020 /// * `source` - The data source (table or query)
1021 /// * `writer` - Async writer to write the CSV data to
1022 /// * `options` - Export options
1023 ///
1024 /// # Returns
1025 ///
1026 /// The number of rows exported on success.
1027 ///
1028 /// # Errors
1029 ///
1030 /// Returns `ExportError` if the export fails.
1031 pub async fn export_csv_to_stream<W>(
1032 &mut self,
1033 source: crate::query::export::ExportSource,
1034 writer: W,
1035 options: crate::export::csv::CsvExportOptions,
1036 ) -> Result<u64, crate::export::csv::ExportError>
1037 where
1038 W: tokio::io::AsyncWrite + Unpin,
1039 {
1040 // Pass Exasol host/port from connection params to export options
1041 let options = options
1042 .exasol_host(&self.params.host)
1043 .exasol_port(self.params.port);
1044
1045 let mut transport_guard = self.transport.lock().await;
1046 crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
1047 }
1048
1049 /// Export data from an Exasol table or query to an in-memory list of rows.
1050 ///
1051 /// Each row is represented as a vector of string values.
1052 ///
1053 /// # Arguments
1054 ///
1055 /// * `source` - The data source (table or query)
1056 /// * `options` - Export options
1057 ///
1058 /// # Returns
1059 ///
1060 /// A vector of rows, where each row is a vector of column values.
1061 ///
1062 /// # Errors
1063 ///
1064 /// Returns `ExportError` if the export fails.
1065 ///
1066 /// # Example
1067 ///
1068 pub async fn export_csv_to_list(
1069 &mut self,
1070 source: crate::query::export::ExportSource,
1071 options: crate::export::csv::CsvExportOptions,
1072 ) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
1073 // Pass Exasol host/port from connection params to export options
1074 let options = options
1075 .exasol_host(&self.params.host)
1076 .exasol_port(self.params.port);
1077
1078 let mut transport_guard = self.transport.lock().await;
1079 crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
1080 }
1081
1082 /// Import multiple CSV files in parallel into an Exasol table.
1083 ///
1084 /// This method reads CSV data from multiple files and imports them into
1085 /// the target table using parallel HTTP transport connections. Each file
1086 /// gets its own connection with a unique internal address.
1087 ///
1088 /// For a single file, this method delegates to `import_csv_from_file`.
1089 ///
1090 /// # Arguments
1091 ///
1092 /// * `table` - Name of the target table
1093 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1094 /// * `options` - Import options
1095 ///
1096 /// # Returns
1097 ///
1098 /// The number of rows imported on success.
1099 ///
1100 /// # Errors
1101 ///
1102 /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1103 ///
1104 /// # Example
1105 ///
1106 /// ```no_run
1107 /// use exarrow_rs::adbc::Connection;
1108 /// use exarrow_rs::import::CsvImportOptions;
1109 /// use std::path::PathBuf;
1110 ///
1111 /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1112 /// let files = vec![
1113 /// PathBuf::from("/data/part1.csv"),
1114 /// PathBuf::from("/data/part2.csv"),
1115 /// ];
1116 ///
1117 /// let options = CsvImportOptions::default();
1118 /// let rows = conn.import_csv_from_files("my_table", files, options).await?;
1119 /// # Ok(())
1120 /// # }
1121 /// ```
1122 pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
1123 &mut self,
1124 table: &str,
1125 paths: S,
1126 options: crate::import::csv::CsvImportOptions,
1127 ) -> Result<u64, crate::import::ImportError> {
1128 // Pass Exasol host/port from connection params to import options
1129 let options = options
1130 .exasol_host(&self.params.host)
1131 .exasol_port(self.params.port);
1132
1133 crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
1134 }
1135
1136 /// Import data from a Parquet file into an Exasol table.
1137 ///
1138 /// This method reads a Parquet file, converts the data to CSV format,
1139 /// and imports it into the target table using Exasol's HTTP transport layer.
1140 ///
1141 /// # Arguments
1142 ///
1143 /// * `table` - Name of the target table
1144 /// * `file_path` - Path to the Parquet file
1145 /// * `options` - Import options
1146 ///
1147 /// # Returns
1148 ///
1149 /// The number of rows imported on success.
1150 ///
1151 /// # Errors
1152 ///
1153 /// Returns `ImportError` if the import fails.
1154 ///
1155 /// # Example
1156 ///
1157 pub async fn import_from_parquet(
1158 &mut self,
1159 table: &str,
1160 file_path: &std::path::Path,
1161 options: crate::import::parquet::ParquetImportOptions,
1162 ) -> Result<u64, crate::import::ImportError> {
1163 // Pass Exasol host/port from connection params to import options
1164 let options = options
1165 .with_exasol_host(&self.params.host)
1166 .with_exasol_port(self.params.port);
1167
1168 crate::import::parquet::import_from_parquet(
1169 self.make_sql_executor(),
1170 table,
1171 file_path,
1172 options,
1173 )
1174 .await
1175 }
1176
1177 /// Import multiple Parquet files in parallel into an Exasol table.
1178 ///
1179 /// This method converts each Parquet file to CSV format concurrently,
1180 /// then streams the data through parallel HTTP transport connections.
1181 ///
1182 /// For a single file, this method delegates to `import_from_parquet`.
1183 ///
1184 /// # Arguments
1185 ///
1186 /// * `table` - Name of the target table
1187 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1188 /// * `options` - Import options
1189 ///
1190 /// # Returns
1191 ///
1192 /// The number of rows imported on success.
1193 ///
1194 /// # Errors
1195 ///
1196 /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1197 ///
1198 /// # Example
1199 ///
1200 /// ```no_run
1201 /// use exarrow_rs::adbc::Connection;
1202 /// use exarrow_rs::import::ParquetImportOptions;
1203 /// use std::path::PathBuf;
1204 ///
1205 /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1206 /// let files = vec![
1207 /// PathBuf::from("/data/part1.parquet"),
1208 /// PathBuf::from("/data/part2.parquet"),
1209 /// ];
1210 ///
1211 /// let options = ParquetImportOptions::default();
1212 /// let rows = conn.import_parquet_from_files("my_table", files, options).await?;
1213 /// # Ok(())
1214 /// # }
1215 /// ```
1216 pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
1217 &mut self,
1218 table: &str,
1219 paths: S,
1220 options: crate::import::parquet::ParquetImportOptions,
1221 ) -> Result<u64, crate::import::ImportError> {
1222 // Pass Exasol host/port from connection params to import options
1223 let options = options
1224 .with_exasol_host(&self.params.host)
1225 .with_exasol_port(self.params.port);
1226
1227 crate::import::parquet::import_from_parquet_files(
1228 self.make_sql_executor(),
1229 table,
1230 paths,
1231 options,
1232 )
1233 .await
1234 }
1235
1236 /// Export data from an Exasol table or query to a Parquet file.
1237 ///
1238 /// This method exports data from the specified source to a Parquet file.
1239 /// The data is first received as CSV from Exasol, then converted to Parquet format.
1240 ///
1241 /// # Arguments
1242 ///
1243 /// * `source` - The data source (table or query)
1244 /// * `file_path` - Path to the output Parquet file
1245 /// * `options` - Export options
1246 ///
1247 /// # Returns
1248 ///
1249 /// The number of rows exported on success.
1250 ///
1251 /// # Errors
1252 ///
1253 /// Returns `ExportError` if the export fails.
1254 ///
1255 /// # Example
1256 ///
1257 pub async fn export_to_parquet(
1258 &mut self,
1259 source: crate::query::export::ExportSource,
1260 file_path: &std::path::Path,
1261 options: crate::export::parquet::ParquetExportOptions,
1262 ) -> Result<u64, crate::export::csv::ExportError> {
1263 // Pass Exasol host/port from connection params to export options
1264 let options = options
1265 .exasol_host(&self.params.host)
1266 .exasol_port(self.params.port);
1267
1268 let mut transport_guard = self.transport.lock().await;
1269 crate::export::parquet::export_to_parquet_via_transport(
1270 &mut *transport_guard,
1271 source,
1272 file_path,
1273 options,
1274 )
1275 .await
1276 }
1277
1278 /// Import data from an Arrow RecordBatch into an Exasol table.
1279 ///
1280 /// This method converts the RecordBatch to CSV format and imports it
1281 /// into the target table using Exasol's HTTP transport layer.
1282 ///
1283 /// # Arguments
1284 ///
1285 /// * `table` - Name of the target table
1286 /// * `batch` - The RecordBatch to import
1287 /// * `options` - Import options
1288 ///
1289 /// # Returns
1290 ///
1291 /// The number of rows imported on success.
1292 ///
1293 /// # Errors
1294 ///
1295 /// Returns `ImportError` if the import fails.
1296 ///
1297 /// # Example
1298 ///
1299 pub async fn import_from_record_batch(
1300 &mut self,
1301 table: &str,
1302 batch: &RecordBatch,
1303 options: crate::import::arrow::ArrowImportOptions,
1304 ) -> Result<u64, crate::import::ImportError> {
1305 // Pass Exasol host/port from connection params to import options
1306 let options = options
1307 .exasol_host(&self.params.host)
1308 .exasol_port(self.params.port);
1309
1310 crate::import::arrow::import_from_record_batch(
1311 self.make_sql_executor(),
1312 table,
1313 batch,
1314 options,
1315 )
1316 .await
1317 }
1318
1319 /// Import data from multiple Arrow RecordBatches into an Exasol table.
1320 ///
1321 /// This method converts each RecordBatch to CSV format and imports them
1322 /// into the target table using Exasol's HTTP transport layer.
1323 ///
1324 /// # Arguments
1325 ///
1326 /// * `table` - Name of the target table
1327 /// * `batches` - An iterator of RecordBatches to import
1328 /// * `options` - Import options
1329 ///
1330 /// # Returns
1331 ///
1332 /// The number of rows imported on success.
1333 ///
1334 /// # Errors
1335 ///
1336 /// Returns `ImportError` if the import fails.
1337 pub async fn import_from_record_batches<I>(
1338 &mut self,
1339 table: &str,
1340 batches: I,
1341 options: crate::import::arrow::ArrowImportOptions,
1342 ) -> Result<u64, crate::import::ImportError>
1343 where
1344 I: IntoIterator<Item = RecordBatch>,
1345 {
1346 // Pass Exasol host/port from connection params to import options
1347 let options = options
1348 .exasol_host(&self.params.host)
1349 .exasol_port(self.params.port);
1350
1351 crate::import::arrow::import_from_record_batches(
1352 self.make_sql_executor(),
1353 table,
1354 batches,
1355 options,
1356 )
1357 .await
1358 }
1359
1360 /// Import data from an Arrow IPC file/stream into an Exasol table.
1361 ///
1362 /// This method reads Arrow IPC format data, converts it to CSV,
1363 /// and imports it into the target table using Exasol's HTTP transport layer.
1364 ///
1365 /// # Arguments
1366 ///
1367 /// * `table` - Name of the target table
1368 /// * `reader` - An async reader containing Arrow IPC data
1369 /// * `options` - Import options
1370 ///
1371 /// # Returns
1372 ///
1373 /// The number of rows imported on success.
1374 ///
1375 /// # Errors
1376 ///
1377 /// Returns `ImportError` if the import fails.
1378 ///
1379 /// # Example
1380 ///
1381 pub async fn import_from_arrow_ipc<R>(
1382 &mut self,
1383 table: &str,
1384 reader: R,
1385 options: crate::import::arrow::ArrowImportOptions,
1386 ) -> Result<u64, crate::import::ImportError>
1387 where
1388 R: tokio::io::AsyncRead + Unpin + Send,
1389 {
1390 // Pass Exasol host/port from connection params to import options
1391 let options = options
1392 .exasol_host(&self.params.host)
1393 .exasol_port(self.params.port);
1394
1395 crate::import::arrow::import_from_arrow_ipc(
1396 self.make_sql_executor(),
1397 table,
1398 reader,
1399 options,
1400 )
1401 .await
1402 }
1403
1404 /// Export data from an Exasol table or query to Arrow RecordBatches.
1405 ///
1406 /// This method exports data from the specified source and converts it
1407 /// to Arrow RecordBatches.
1408 ///
1409 /// # Arguments
1410 ///
1411 /// * `source` - The data source (table or query)
1412 /// * `options` - Export options
1413 ///
1414 /// # Returns
1415 ///
1416 /// A vector of RecordBatches on success.
1417 ///
1418 /// # Errors
1419 ///
1420 /// Returns `ExportError` if the export fails.
1421 ///
1422 /// # Example
1423 ///
1424 pub async fn export_to_record_batches(
1425 &mut self,
1426 source: crate::query::export::ExportSource,
1427 options: crate::export::arrow::ArrowExportOptions,
1428 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1429 // Pass Exasol host/port from connection params to export options
1430 let options = options
1431 .exasol_host(&self.params.host)
1432 .exasol_port(self.params.port);
1433
1434 let mut transport_guard = self.transport.lock().await;
1435 crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
1436 }
1437
1438 /// Export data from an Exasol table or query to an Arrow IPC file.
1439 ///
1440 /// This method exports data from the specified source to an Arrow IPC file.
1441 ///
1442 /// # Arguments
1443 ///
1444 /// * `source` - The data source (table or query)
1445 /// * `file_path` - Path to the output Arrow IPC file
1446 /// * `options` - Export options
1447 ///
1448 /// # Returns
1449 ///
1450 /// The number of rows exported on success.
1451 ///
1452 /// # Errors
1453 ///
1454 /// Returns `ExportError` if the export fails.
1455 ///
1456 /// # Example
1457 ///
1458 pub async fn export_to_arrow_ipc(
1459 &mut self,
1460 source: crate::query::export::ExportSource,
1461 file_path: &std::path::Path,
1462 options: crate::export::arrow::ArrowExportOptions,
1463 ) -> Result<u64, crate::export::csv::ExportError> {
1464 // Pass Exasol host/port from connection params to export options
1465 let options = options
1466 .exasol_host(&self.params.host)
1467 .exasol_port(self.params.port);
1468
1469 let mut transport_guard = self.transport.lock().await;
1470 crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
1471 .await
1472 }
1473
1474 // ========================================================================
1475 // Blocking Import/Export Methods
1476 // ========================================================================
1477
1478 /// Import CSV data from a file into an Exasol table (blocking).
1479 ///
1480 /// This is a synchronous wrapper around [`import_csv_from_file`](Self::import_csv_from_file)
1481 /// for use in non-async contexts.
1482 ///
1483 /// # Arguments
1484 ///
1485 /// * `table` - Name of the target table
1486 /// * `file_path` - Path to the CSV file
1487 /// * `options` - Import options
1488 ///
1489 /// # Returns
1490 ///
1491 /// The number of rows imported on success.
1492 ///
1493 /// # Errors
1494 ///
1495 /// Returns `ImportError` if the import fails.
1496 ///
1497 /// # Example
1498 ///
1499 pub fn blocking_import_csv_from_file(
1500 &mut self,
1501 table: &str,
1502 file_path: &std::path::Path,
1503 options: crate::import::csv::CsvImportOptions,
1504 ) -> Result<u64, crate::import::ImportError> {
1505 blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
1506 }
1507
1508 /// Import multiple CSV files in parallel into an Exasol table (blocking).
1509 ///
1510 /// This is a synchronous wrapper around [`import_csv_from_files`](Self::import_csv_from_files)
1511 /// for use in non-async contexts.
1512 ///
1513 /// # Arguments
1514 ///
1515 /// * `table` - Name of the target table
1516 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1517 /// * `options` - Import options
1518 ///
1519 /// # Returns
1520 ///
1521 /// The number of rows imported on success.
1522 ///
1523 /// # Errors
1524 ///
1525 /// Returns `ImportError` if the import fails.
1526 pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
1527 &mut self,
1528 table: &str,
1529 paths: S,
1530 options: crate::import::csv::CsvImportOptions,
1531 ) -> Result<u64, crate::import::ImportError> {
1532 blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
1533 }
1534
1535 /// Import data from a Parquet file into an Exasol table (blocking).
1536 ///
1537 /// This is a synchronous wrapper around [`import_from_parquet`](Self::import_from_parquet)
1538 /// for use in non-async contexts.
1539 ///
1540 /// # Arguments
1541 ///
1542 /// * `table` - Name of the target table
1543 /// * `file_path` - Path to the Parquet file
1544 /// * `options` - Import options
1545 ///
1546 /// # Returns
1547 ///
1548 /// The number of rows imported on success.
1549 ///
1550 /// # Errors
1551 ///
1552 /// Returns `ImportError` if the import fails.
1553 ///
1554 /// # Example
1555 ///
1556 pub fn blocking_import_from_parquet(
1557 &mut self,
1558 table: &str,
1559 file_path: &std::path::Path,
1560 options: crate::import::parquet::ParquetImportOptions,
1561 ) -> Result<u64, crate::import::ImportError> {
1562 blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
1563 }
1564
1565 /// Import multiple Parquet files in parallel into an Exasol table (blocking).
1566 ///
1567 /// This is a synchronous wrapper around [`import_parquet_from_files`](Self::import_parquet_from_files)
1568 /// for use in non-async contexts.
1569 ///
1570 /// # Arguments
1571 ///
1572 /// * `table` - Name of the target table
1573 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1574 /// * `options` - Import options
1575 ///
1576 /// # Returns
1577 ///
1578 /// The number of rows imported on success.
1579 ///
1580 /// # Errors
1581 ///
1582 /// Returns `ImportError` if the import fails.
1583 pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
1584 &mut self,
1585 table: &str,
1586 paths: S,
1587 options: crate::import::parquet::ParquetImportOptions,
1588 ) -> Result<u64, crate::import::ImportError> {
1589 blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
1590 }
1591
1592 /// Import data from an Arrow RecordBatch into an Exasol table (blocking).
1593 ///
1594 /// This is a synchronous wrapper around [`import_from_record_batch`](Self::import_from_record_batch)
1595 /// for use in non-async contexts.
1596 ///
1597 /// # Arguments
1598 ///
1599 /// * `table` - Name of the target table
1600 /// * `batch` - The RecordBatch to import
1601 /// * `options` - Import options
1602 ///
1603 /// # Returns
1604 ///
1605 /// The number of rows imported on success.
1606 ///
1607 /// # Errors
1608 ///
1609 /// Returns `ImportError` if the import fails.
1610 ///
1611 /// # Example
1612 ///
1613 pub fn blocking_import_from_record_batch(
1614 &mut self,
1615 table: &str,
1616 batch: &RecordBatch,
1617 options: crate::import::arrow::ArrowImportOptions,
1618 ) -> Result<u64, crate::import::ImportError> {
1619 blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
1620 }
1621
1622 /// Import data from an Arrow IPC file into an Exasol table (blocking).
1623 ///
1624 /// This is a synchronous wrapper around [`import_from_arrow_ipc`](Self::import_from_arrow_ipc)
1625 /// for use in non-async contexts.
1626 ///
1627 /// Note: This method requires a synchronous reader that implements `std::io::Read`.
1628 /// The data will be read into memory before being imported.
1629 ///
1630 /// # Arguments
1631 ///
1632 /// * `table` - Name of the target table
1633 /// * `file_path` - Path to the Arrow IPC file
1634 /// * `options` - Import options
1635 ///
1636 /// # Returns
1637 ///
1638 /// The number of rows imported on success.
1639 ///
1640 /// # Errors
1641 ///
1642 /// Returns `ImportError` if the import fails.
1643 ///
1644 /// # Example
1645 ///
1646 pub fn blocking_import_from_arrow_ipc(
1647 &mut self,
1648 table: &str,
1649 file_path: &std::path::Path,
1650 options: crate::import::arrow::ArrowImportOptions,
1651 ) -> Result<u64, crate::import::ImportError> {
1652 blocking_runtime().block_on(async {
1653 let file = tokio::fs::File::open(file_path)
1654 .await
1655 .map_err(crate::import::ImportError::IoError)?;
1656 self.import_from_arrow_ipc(table, file, options).await
1657 })
1658 }
1659
1660 /// Export data from an Exasol table or query to a CSV file (blocking).
1661 ///
1662 /// This is a synchronous wrapper around [`export_csv_to_file`](Self::export_csv_to_file)
1663 /// for use in non-async contexts.
1664 ///
1665 /// # Arguments
1666 ///
1667 /// * `source` - The data source (table or query)
1668 /// * `file_path` - Path to the output file
1669 /// * `options` - Export options
1670 ///
1671 /// # Returns
1672 ///
1673 /// The number of rows exported on success.
1674 ///
1675 /// # Errors
1676 ///
1677 /// Returns `ExportError` if the export fails.
1678 ///
1679 /// # Example
1680 ///
1681 pub fn blocking_export_csv_to_file(
1682 &mut self,
1683 source: crate::query::export::ExportSource,
1684 file_path: &std::path::Path,
1685 options: crate::export::csv::CsvExportOptions,
1686 ) -> Result<u64, crate::export::csv::ExportError> {
1687 blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
1688 }
1689
1690 /// Export data from an Exasol table or query to a Parquet file (blocking).
1691 ///
1692 /// This is a synchronous wrapper around [`export_to_parquet`](Self::export_to_parquet)
1693 /// for use in non-async contexts.
1694 ///
1695 /// # Arguments
1696 ///
1697 /// * `source` - The data source (table or query)
1698 /// * `file_path` - Path to the output Parquet file
1699 /// * `options` - Export options
1700 ///
1701 /// # Returns
1702 ///
1703 /// The number of rows exported on success.
1704 ///
1705 /// # Errors
1706 ///
1707 /// Returns `ExportError` if the export fails.
1708 ///
1709 /// # Example
1710 ///
1711 pub fn blocking_export_to_parquet(
1712 &mut self,
1713 source: crate::query::export::ExportSource,
1714 file_path: &std::path::Path,
1715 options: crate::export::parquet::ParquetExportOptions,
1716 ) -> Result<u64, crate::export::csv::ExportError> {
1717 blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
1718 }
1719
1720 /// Export data from an Exasol table or query to Arrow RecordBatches (blocking).
1721 ///
1722 /// This is a synchronous wrapper around [`export_to_record_batches`](Self::export_to_record_batches)
1723 /// for use in non-async contexts.
1724 ///
1725 /// # Arguments
1726 ///
1727 /// * `source` - The data source (table or query)
1728 /// * `options` - Export options
1729 ///
1730 /// # Returns
1731 ///
1732 /// A vector of RecordBatches on success.
1733 ///
1734 /// # Errors
1735 ///
1736 /// Returns `ExportError` if the export fails.
1737 ///
1738 /// # Example
1739 ///
1740 pub fn blocking_export_to_record_batches(
1741 &mut self,
1742 source: crate::query::export::ExportSource,
1743 options: crate::export::arrow::ArrowExportOptions,
1744 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1745 blocking_runtime().block_on(self.export_to_record_batches(source, options))
1746 }
1747
1748 /// Export data from an Exasol table or query to an Arrow IPC file (blocking).
1749 ///
1750 /// This is a synchronous wrapper around [`export_to_arrow_ipc`](Self::export_to_arrow_ipc)
1751 /// for use in non-async contexts.
1752 ///
1753 /// # Arguments
1754 ///
1755 /// * `source` - The data source (table or query)
1756 /// * `file_path` - Path to the output Arrow IPC file
1757 /// * `options` - Export options
1758 ///
1759 /// # Returns
1760 ///
1761 /// The number of rows exported on success.
1762 ///
1763 /// # Errors
1764 ///
1765 /// Returns `ExportError` if the export fails.
1766 ///
1767 /// # Example
1768 ///
1769 pub fn blocking_export_to_arrow_ipc(
1770 &mut self,
1771 source: crate::query::export::ExportSource,
1772 file_path: &std::path::Path,
1773 options: crate::export::arrow::ArrowExportOptions,
1774 ) -> Result<u64, crate::export::csv::ExportError> {
1775 blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
1776 }
1777
1778 // ========================================================================
1779 // Private Helper Methods
1780 // ========================================================================
1781
1782 /// Update session state after query execution.
1783 async fn update_session_state_after_query(&self) {
1784 if self.session.in_transaction() {
1785 self.session.set_state(SessionState::InTransaction).await;
1786 } else {
1787 self.session.set_state(SessionState::Ready).await;
1788 }
1789 self.session.update_activity().await;
1790 }
1791}
1792
1793impl std::fmt::Debug for Connection {
1794 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1795 f.debug_struct("Connection")
1796 .field("session_id", &self.session.session_id())
1797 .field("host", &self.params.host)
1798 .field("port", &self.params.port)
1799 .field("username", &self.params.username)
1800 .field("in_transaction", &self.session.in_transaction())
1801 .finish()
1802 }
1803}
1804
1805/// Builder for creating Connection instances.
1806pub struct ConnectionBuilder {
1807 /// Connection parameters builder
1808 params_builder: crate::connection::params::ConnectionBuilder,
1809}
1810
1811impl ConnectionBuilder {
1812 /// Create a new ConnectionBuilder.
1813 pub fn new() -> Self {
1814 Self {
1815 params_builder: crate::connection::params::ConnectionBuilder::new(),
1816 }
1817 }
1818
1819 /// Set the database host.
1820 pub fn host(mut self, host: &str) -> Self {
1821 self.params_builder = self.params_builder.host(host);
1822 self
1823 }
1824
1825 /// Set the database port.
1826 pub fn port(mut self, port: u16) -> Self {
1827 self.params_builder = self.params_builder.port(port);
1828 self
1829 }
1830
1831 /// Set the username.
1832 pub fn username(mut self, username: &str) -> Self {
1833 self.params_builder = self.params_builder.username(username);
1834 self
1835 }
1836
1837 /// Set the password.
1838 pub fn password(mut self, password: &str) -> Self {
1839 self.params_builder = self.params_builder.password(password);
1840 self
1841 }
1842
1843 /// Set the default schema.
1844 pub fn schema(mut self, schema: &str) -> Self {
1845 self.params_builder = self.params_builder.schema(schema);
1846 self
1847 }
1848
1849 /// Enable or disable TLS.
1850 pub fn use_tls(mut self, use_tls: bool) -> Self {
1851 self.params_builder = self.params_builder.use_tls(use_tls);
1852 self
1853 }
1854
1855 /// Build and connect.
1856 ///
1857 /// # Returns
1858 ///
1859 /// A connected `Connection` instance.
1860 ///
1861 /// # Errors
1862 ///
1863 /// Returns `ExasolError` if the connection fails.
1864 pub async fn connect(self) -> Result<Connection, ExasolError> {
1865 let params = self.params_builder.build()?;
1866 Ok(Connection::from_params(params).await?)
1867 }
1868}
1869
1870impl Default for ConnectionBuilder {
1871 fn default() -> Self {
1872 Self::new()
1873 }
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878 use super::*;
1879
1880 #[test]
1881 fn test_connection_builder() {
1882 let _builder = ConnectionBuilder::new()
1883 .host("localhost")
1884 .port(8563)
1885 .username("test")
1886 .password("secret")
1887 .schema("MY_SCHEMA")
1888 .use_tls(false);
1889
1890 // Builder should compile and be valid
1891 // Actual connection requires a running Exasol instance
1892 }
1893
1894 #[test]
1895 fn test_create_statement_is_sync() {
1896 // This test verifies that create_statement is synchronous
1897 // by calling it without await
1898 // Note: We can't actually create a Connection without a database,
1899 // but we can verify the API compiles correctly
1900 }
1901}
1902
1903/// Blocking wrapper tests
1904#[cfg(test)]
1905mod blocking_tests {
1906 use super::*;
1907
1908 #[test]
1909 fn test_session_type_alias_exists() {
1910 // Verify that Session is a type alias for Connection
1911 fn takes_session(_session: &Session) {}
1912 fn takes_connection(_connection: &Connection) {}
1913
1914 // This should compile, showing Session = Connection
1915 fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
1916 where
1917 F1: Fn(&Session),
1918 F2: Fn(&Connection),
1919 {
1920 let _ = (f1, f2);
1921 }
1922
1923 verify_interchangeable(takes_session, takes_connection);
1924 }
1925
1926 #[test]
1927 fn test_blocking_runtime_exists() {
1928 // Verify that blocking_runtime() function exists and returns a Runtime
1929 let runtime = blocking_runtime();
1930 // If this compiles, the runtime is valid
1931 let _ = runtime.handle();
1932 }
1933
1934 #[test]
1935 fn test_connection_has_blocking_import_csv() {
1936 // This test verifies the blocking_import_csv_from_file method exists
1937 // by checking the method signature compiles
1938 fn _check_method_exists(_conn: &mut Connection) {
1939 // Method signature check - will fail to compile if method doesn't exist
1940 let _: fn(
1941 &mut Connection,
1942 &str,
1943 &std::path::Path,
1944 crate::import::csv::CsvImportOptions,
1945 ) -> Result<u64, crate::import::ImportError> =
1946 Connection::blocking_import_csv_from_file;
1947 }
1948 }
1949
1950 #[test]
1951 fn test_connection_has_blocking_import_parquet() {
1952 fn _check_method_exists(_conn: &mut Connection) {
1953 let _: fn(
1954 &mut Connection,
1955 &str,
1956 &std::path::Path,
1957 crate::import::parquet::ParquetImportOptions,
1958 ) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
1959 }
1960 }
1961
1962 #[test]
1963 fn test_connection_has_blocking_import_record_batch() {
1964 fn _check_method_exists(_conn: &mut Connection) {
1965 let _: fn(
1966 &mut Connection,
1967 &str,
1968 &RecordBatch,
1969 crate::import::arrow::ArrowImportOptions,
1970 ) -> Result<u64, crate::import::ImportError> =
1971 Connection::blocking_import_from_record_batch;
1972 }
1973 }
1974
1975 #[test]
1976 fn test_connection_has_blocking_import_arrow_ipc() {
1977 fn _check_method_exists(_conn: &mut Connection) {
1978 let _: fn(
1979 &mut Connection,
1980 &str,
1981 &std::path::Path,
1982 crate::import::arrow::ArrowImportOptions,
1983 ) -> Result<u64, crate::import::ImportError> =
1984 Connection::blocking_import_from_arrow_ipc;
1985 }
1986 }
1987
1988 #[test]
1989 fn test_connection_has_blocking_export_csv() {
1990 fn _check_method_exists(_conn: &mut Connection) {
1991 let _: fn(
1992 &mut Connection,
1993 crate::query::export::ExportSource,
1994 &std::path::Path,
1995 crate::export::csv::CsvExportOptions,
1996 ) -> Result<u64, crate::export::csv::ExportError> =
1997 Connection::blocking_export_csv_to_file;
1998 }
1999 }
2000
2001 #[test]
2002 fn test_connection_has_blocking_export_parquet() {
2003 fn _check_method_exists(_conn: &mut Connection) {
2004 let _: fn(
2005 &mut Connection,
2006 crate::query::export::ExportSource,
2007 &std::path::Path,
2008 crate::export::parquet::ParquetExportOptions,
2009 ) -> Result<u64, crate::export::csv::ExportError> =
2010 Connection::blocking_export_to_parquet;
2011 }
2012 }
2013
2014 #[test]
2015 fn test_connection_has_blocking_export_record_batches() {
2016 fn _check_method_exists(_conn: &mut Connection) {
2017 let _: fn(
2018 &mut Connection,
2019 crate::query::export::ExportSource,
2020 crate::export::arrow::ArrowExportOptions,
2021 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
2022 Connection::blocking_export_to_record_batches;
2023 }
2024 }
2025
2026 #[test]
2027 fn test_connection_has_blocking_export_arrow_ipc() {
2028 fn _check_method_exists(_conn: &mut Connection) {
2029 let _: fn(
2030 &mut Connection,
2031 crate::query::export::ExportSource,
2032 &std::path::Path,
2033 crate::export::arrow::ArrowExportOptions,
2034 ) -> Result<u64, crate::export::csv::ExportError> =
2035 Connection::blocking_export_to_arrow_ipc;
2036 }
2037 }
2038}