Skip to main content

exarrow_rs/adbc/
connection.rs

1//! ADBC Connection implementation.
2//!
3//! This module provides the `Connection` type which represents an active
4//! database connection and provides methods for executing queries.
5//!
6//! # New in v2.0.0
7//!
8//! Connection now owns the transport directly and Statement is a pure data container.
9//! Execute statements via Connection methods: `execute_statement()`, `execute_prepared()`.
10
11use crate::adbc::Statement;
12use crate::connection::auth::AuthResponseData;
13use crate::connection::params::ConnectionParams;
14use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
15use crate::error::{ConnectionError, ExasolError, QueryError};
16use crate::query::prepared::PreparedStatement;
17use crate::query::results::ResultSet;
18use crate::transport::protocol::{
19    ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
20    QueryResult, TransportProtocol,
21};
22use crate::transport::WebSocketTransport;
23use arrow::array::RecordBatch;
24use std::sync::{Arc, OnceLock};
25use std::time::Duration;
26use tokio::runtime::Runtime;
27use tokio::sync::Mutex;
28use tokio::time::timeout;
29
30/// Session is a type alias for Connection.
31///
32/// This alias provides a more intuitive name for database sessions when
33/// performing import/export operations. Both `Session` and `Connection`
34/// can be used interchangeably.
35///
36/// # Example
37///
38pub type Session = Connection;
39
40fn blocking_runtime() -> &'static Runtime {
41    static RUNTIME: OnceLock<Runtime> = OnceLock::new();
42    RUNTIME.get_or_init(|| {
43        tokio::runtime::Builder::new_current_thread()
44            .enable_all()
45            .build()
46            .expect("Failed to create tokio runtime for blocking operations")
47    })
48}
49
50/// ADBC Connection to an Exasol database.
51///
52/// The `Connection` type represents an active database connection and provides
53/// methods for executing queries, managing transactions, and retrieving metadata.
54///
55/// # v2.0.0 Breaking Changes
56///
57/// - Connection now owns the transport directly
58/// - `create_statement()` is now synchronous and returns a pure data container
59/// - Use `execute_statement()` instead of `Statement::execute()`
60/// - Use `prepare()` instead of `Statement::prepare()`
61///
62/// # Example
63///
64pub struct Connection {
65    /// Transport layer for communication (owned by Connection)
66    transport: Arc<Mutex<dyn TransportProtocol>>,
67    /// Session information
68    session: SessionInfo,
69    /// Connection parameters
70    params: ConnectionParams,
71}
72
73impl Connection {
74    /// Create a connection from connection parameters.
75    ///
76    /// This establishes a connection to the Exasol database using WebSocket transport,
77    /// authenticates the user, and creates a session.
78    ///
79    /// # Arguments
80    ///
81    /// * `params` - Connection parameters
82    ///
83    /// # Returns
84    ///
85    /// A connected `Connection` instance.
86    ///
87    /// # Errors
88    ///
89    /// Returns `ConnectionError` if the connection or authentication fails.
90    pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
91        // Create transport
92        let mut transport = WebSocketTransport::new();
93
94        // Convert ConnectionParams to TransportConnectionParams
95        let mut transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
96            .with_tls(params.use_tls)
97            .with_validate_server_certificate(params.validate_server_certificate)
98            .with_timeout(params.connection_timeout.as_millis() as u64);
99        if let Some(ref fp) = params.certificate_fingerprint {
100            transport_params = transport_params.with_certificate_fingerprint(fp.clone());
101        }
102
103        // Connect
104        transport.connect(&transport_params).await.map_err(|e| {
105            ConnectionError::ConnectionFailed {
106                host: params.host.clone(),
107                port: params.port,
108                message: e.to_string(),
109            }
110        })?;
111
112        // Authenticate
113        let credentials =
114            TransportCredentials::new(params.username.clone(), params.password().to_string());
115        let session_info = transport
116            .authenticate(&credentials)
117            .await
118            .map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
119
120        // Create session config from connection params
121        let session_config = SessionConfig {
122            idle_timeout: params.idle_timeout,
123            query_timeout: params.query_timeout,
124            ..Default::default()
125        };
126
127        // Extract session_id once to avoid double clone
128        let session_id = session_info.session_id.clone();
129
130        // Convert SessionInfo to AuthResponseData
131        let auth_response = AuthResponseData {
132            session_id: session_id.clone(),
133            protocol_version: session_info.protocol_version,
134            release_version: session_info.release_version,
135            database_name: session_info.database_name,
136            product_name: session_info.product_name,
137            max_data_message_size: session_info.max_data_message_size,
138            max_identifier_length: 128,    // Default value
139            max_varchar_length: 2_000_000, // Default value
140            identifier_quote_string: "\"".to_string(),
141            time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
142            time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
143        };
144
145        // Create session (owned, not Arc)
146        let session = SessionInfo::new(session_id, auth_response, session_config);
147
148        // Set schema if specified
149        if let Some(schema) = &params.schema {
150            session.set_current_schema(Some(schema.clone())).await;
151        }
152
153        Ok(Self {
154            transport: Arc::new(Mutex::new(transport)),
155            session,
156            params,
157        })
158    }
159
160    /// Create a builder for constructing a connection.
161    ///
162    /// # Returns
163    ///
164    /// A `ConnectionBuilder` instance.
165    ///
166    /// # Example
167    ///
168    pub fn builder() -> ConnectionBuilder {
169        ConnectionBuilder::new()
170    }
171
172    // ========================================================================
173    // Statement Creation (synchronous - Statement is now a pure data container)
174    // ========================================================================
175
176    /// Create a new statement for executing SQL.
177    ///
178    /// Statement is now a pure data container. Use `execute_statement()` to execute it.
179    ///
180    /// # Arguments
181    ///
182    /// * `sql` - SQL query text
183    ///
184    /// # Returns
185    ///
186    /// A `Statement` instance ready for execution via `execute_statement()`.
187    ///
188    /// # Example
189    ///
190    pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
191        Statement::new(sql)
192    }
193
194    // ========================================================================
195    // Statement Execution Methods
196    // ========================================================================
197
198    /// Execute a statement and return results.
199    ///
200    /// # Arguments
201    ///
202    /// * `stmt` - Statement to execute
203    ///
204    /// # Returns
205    ///
206    /// A `ResultSet` containing the query results.
207    ///
208    /// # Errors
209    ///
210    /// Returns `QueryError` if execution fails or times out.
211    ///
212    /// # Example
213    ///
214    pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
215        // Validate session state
216        self.session
217            .validate_ready()
218            .await
219            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
220
221        // Update session state
222        self.session.set_state(SessionState::Executing).await;
223
224        // Increment query counter
225        self.session.increment_query_count();
226
227        // Build final SQL with parameters
228        let final_sql = stmt.build_sql()?;
229
230        // Execute with timeout
231        let timeout_duration = Duration::from_millis(stmt.timeout_ms());
232        let transport = Arc::clone(&self.transport);
233
234        let result = timeout(timeout_duration, async move {
235            let mut transport_guard = transport.lock().await;
236            transport_guard.execute_query(&final_sql).await
237        })
238        .await
239        .map_err(|_| QueryError::Timeout {
240            timeout_ms: stmt.timeout_ms(),
241        })?
242        .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
243
244        // Update session state back to ready/in_transaction
245        self.update_session_state_after_query().await;
246
247        // Convert transport result to ResultSet
248        ResultSet::from_transport_result(result, Arc::clone(&self.transport))
249    }
250
251    /// Execute a statement and return the row count (for non-SELECT statements).
252    ///
253    /// # Arguments
254    ///
255    /// * `stmt` - Statement to execute
256    ///
257    /// # Returns
258    ///
259    /// The number of rows affected.
260    ///
261    /// # Errors
262    ///
263    /// Returns `QueryError` if execution fails or if statement is a SELECT.
264    ///
265    /// # Example
266    ///
267    pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
268        let result_set = self.execute_statement(stmt).await?;
269
270        result_set.row_count().ok_or_else(|| {
271            QueryError::NoResultSet("Expected row count, got result set".to_string())
272        })
273    }
274
275    // ========================================================================
276    // Prepared Statement Methods
277    // ========================================================================
278
279    /// Create a prepared statement for parameterized query execution.
280    ///
281    /// This creates a server-side prepared statement that can be executed
282    /// multiple times with different parameter values.
283    ///
284    /// # Arguments
285    ///
286    /// * `sql` - SQL statement with parameter placeholders (?)
287    ///
288    /// # Returns
289    ///
290    /// A `PreparedStatement` ready for parameter binding and execution.
291    ///
292    /// # Errors
293    ///
294    /// Returns `QueryError` if preparation fails.
295    ///
296    /// # Example
297    ///
298    pub async fn prepare(
299        &mut self,
300        sql: impl Into<String>,
301    ) -> Result<PreparedStatement, QueryError> {
302        let sql = sql.into();
303
304        // Validate session state
305        self.session
306            .validate_ready()
307            .await
308            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
309
310        let mut transport = self.transport.lock().await;
311        let handle = transport
312            .create_prepared_statement(&sql)
313            .await
314            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
315
316        Ok(PreparedStatement::new(handle))
317    }
318
319    /// Execute a prepared statement and return results.
320    ///
321    /// # Arguments
322    ///
323    /// * `stmt` - Prepared statement to execute
324    ///
325    /// # Returns
326    ///
327    /// A `ResultSet` containing the query results.
328    ///
329    /// # Errors
330    ///
331    /// Returns `QueryError` if execution fails.
332    ///
333    /// # Example
334    ///
335    pub async fn execute_prepared(
336        &mut self,
337        stmt: &PreparedStatement,
338    ) -> Result<ResultSet, QueryError> {
339        if stmt.is_closed() {
340            return Err(QueryError::StatementClosed);
341        }
342
343        // Validate session state
344        self.session
345            .validate_ready()
346            .await
347            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
348
349        // Update session state
350        self.session.set_state(SessionState::Executing).await;
351
352        // Increment query counter
353        self.session.increment_query_count();
354
355        // Convert parameters to column-major JSON format
356        let params_data = stmt.build_parameters_data()?;
357
358        let mut transport = self.transport.lock().await;
359        let result = transport
360            .execute_prepared_statement(stmt.handle_ref(), params_data)
361            .await
362            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
363
364        drop(transport);
365
366        // Update session state back to ready/in_transaction
367        self.update_session_state_after_query().await;
368
369        ResultSet::from_transport_result(result, Arc::clone(&self.transport))
370    }
371
372    /// Execute a prepared statement and return the number of affected rows.
373    ///
374    /// Use this for INSERT, UPDATE, DELETE statements.
375    ///
376    /// # Arguments
377    ///
378    /// * `stmt` - Prepared statement to execute
379    ///
380    /// # Returns
381    ///
382    /// The number of rows affected.
383    ///
384    /// # Errors
385    ///
386    /// Returns `QueryError` if execution fails or statement returns a result set.
387    pub async fn execute_prepared_update(
388        &mut self,
389        stmt: &PreparedStatement,
390    ) -> Result<i64, QueryError> {
391        if stmt.is_closed() {
392            return Err(QueryError::StatementClosed);
393        }
394
395        // Validate session state
396        self.session
397            .validate_ready()
398            .await
399            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
400
401        // Update session state
402        self.session.set_state(SessionState::Executing).await;
403
404        // Convert parameters to column-major JSON format
405        let params_data = stmt.build_parameters_data()?;
406
407        let mut transport = self.transport.lock().await;
408        let result = transport
409            .execute_prepared_statement(stmt.handle_ref(), params_data)
410            .await
411            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
412
413        drop(transport);
414
415        // Update session state back to ready/in_transaction
416        self.update_session_state_after_query().await;
417
418        match result {
419            QueryResult::RowCount { count } => Ok(count),
420            QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
421        }
422    }
423
424    /// Close a prepared statement and release server-side resources.
425    ///
426    /// # Arguments
427    ///
428    /// * `stmt` - Prepared statement to close (consumed)
429    ///
430    /// # Errors
431    ///
432    /// Returns `QueryError` if closing fails.
433    ///
434    /// # Example
435    ///
436    pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
437        if stmt.is_closed() {
438            return Ok(());
439        }
440
441        let mut transport = self.transport.lock().await;
442        transport
443            .close_prepared_statement(stmt.handle_ref())
444            .await
445            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
446
447        stmt.mark_closed();
448        Ok(())
449    }
450
451    // ========================================================================
452    // Convenience Methods (these internally use execute_statement)
453    // ========================================================================
454
455    /// Execute a SQL query and return results.
456    ///
457    /// This is a convenience method that creates a statement and executes it.
458    ///
459    /// # Arguments
460    ///
461    /// * `sql` - SQL query text
462    ///
463    /// # Returns
464    ///
465    /// A `ResultSet` containing the query results.
466    ///
467    /// # Errors
468    ///
469    /// Returns `QueryError` if execution fails.
470    ///
471    /// # Example
472    ///
473    pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
474        let stmt = self.create_statement(sql);
475        self.execute_statement(&stmt).await
476    }
477
478    /// Execute a SQL query and return all results as RecordBatches.
479    ///
480    /// This is a convenience method that fetches all results into memory.
481    ///
482    /// # Arguments
483    ///
484    /// * `sql` - SQL query text
485    ///
486    /// # Returns
487    ///
488    /// A vector of `RecordBatch` instances.
489    ///
490    /// # Errors
491    ///
492    /// Returns `QueryError` if execution fails.
493    ///
494    /// # Example
495    ///
496    pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
497        let result_set = self.execute(sql).await?;
498        result_set.fetch_all().await
499    }
500
501    /// Execute a non-SELECT statement and return the row count.
502    ///
503    /// # Arguments
504    ///
505    /// * `sql` - SQL statement (INSERT, UPDATE, DELETE, etc.)
506    ///
507    /// # Returns
508    ///
509    /// The number of rows affected.
510    ///
511    /// # Errors
512    ///
513    /// Returns `QueryError` if execution fails.
514    ///
515    /// # Example
516    ///
517    pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
518        let stmt = self.create_statement(sql);
519        self.execute_statement_update(&stmt).await
520    }
521
522    // ========================================================================
523    // Transaction Methods
524    // ========================================================================
525
526    pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
527        // Disable autocommit on the server so statements don't auto-commit
528        self.transport
529            .lock()
530            .await
531            .set_autocommit(false)
532            .await
533            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
534
535        self.session
536            .begin_transaction()
537            .await
538            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
539
540        Ok(())
541    }
542
543    pub async fn commit(&mut self) -> Result<(), QueryError> {
544        if !self.in_transaction() {
545            return Ok(());
546        }
547
548        self.execute_update("COMMIT").await?;
549
550        self.session
551            .commit_transaction()
552            .await
553            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
554
555        Ok(())
556    }
557
558    pub async fn rollback(&mut self) -> Result<(), QueryError> {
559        if !self.in_transaction() {
560            return Ok(());
561        }
562
563        self.execute_update("ROLLBACK").await?;
564
565        self.session
566            .rollback_transaction()
567            .await
568            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
569
570        Ok(())
571    }
572
573    /// Check if a transaction is currently active.
574    ///
575    /// # Returns
576    ///
577    /// `true` if a transaction is active, `false` otherwise.
578    pub fn in_transaction(&self) -> bool {
579        self.session.in_transaction()
580    }
581
582    // ========================================================================
583    // Session and Schema Methods
584    // ========================================================================
585
586    /// Get the current schema.
587    ///
588    /// # Returns
589    ///
590    /// The current schema name, or `None` if no schema is set.
591    pub async fn current_schema(&self) -> Option<String> {
592        self.session.current_schema().await
593    }
594
595    /// Set the current schema.
596    ///
597    /// # Arguments
598    ///
599    /// * `schema` - The schema name to set
600    ///
601    /// # Errors
602    ///
603    /// Returns `QueryError` if the operation fails.
604    ///
605    /// # Example
606    ///
607    pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
608        let schema_name = schema.into();
609        self.execute_update(format!("OPEN SCHEMA {}", schema_name))
610            .await?;
611        self.session.set_current_schema(Some(schema_name)).await;
612        Ok(())
613    }
614
615    // ========================================================================
616    // Metadata Methods
617    // ========================================================================
618
619    /// Get metadata about catalogs.
620    ///
621    /// # Returns
622    ///
623    /// A `ResultSet` containing catalog metadata.
624    ///
625    /// # Errors
626    ///
627    /// Returns `QueryError` if the operation fails.
628    pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
629        self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
630            .await
631    }
632
633    /// Get metadata about schemas.
634    ///
635    /// # Arguments
636    ///
637    /// * `catalog` - Optional catalog name filter
638    ///
639    /// # Returns
640    ///
641    /// A `ResultSet` containing schema metadata.
642    ///
643    /// # Errors
644    ///
645    /// Returns `QueryError` if the operation fails.
646    pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
647        let sql = if let Some(cat) = catalog {
648            format!(
649                "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
650                cat.replace('\'', "''")
651            )
652        } else {
653            "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
654        };
655        self.execute(sql).await
656    }
657
658    /// Get metadata about tables.
659    ///
660    /// # Arguments
661    ///
662    /// * `catalog` - Optional catalog name filter
663    /// * `schema` - Optional schema name filter
664    /// * `table` - Optional table name filter
665    ///
666    /// # Returns
667    ///
668    /// A `ResultSet` containing table metadata.
669    ///
670    /// # Errors
671    ///
672    /// Returns `QueryError` if the operation fails.
673    pub async fn get_tables(
674        &mut self,
675        catalog: Option<&str>,
676        schema: Option<&str>,
677        table: Option<&str>,
678    ) -> Result<ResultSet, QueryError> {
679        let mut conditions = vec!["OBJECT_TYPE IN ('TABLE', 'VIEW')".to_string()];
680
681        // Exasol has no catalogs — ignore the catalog parameter
682        let _ = catalog;
683        if let Some(sch) = schema {
684            conditions.push(format!("ROOT_NAME = '{}'", sch.replace('\'', "''")));
685        }
686        if let Some(tbl) = table {
687            conditions.push(format!("OBJECT_NAME = '{}'", tbl.replace('\'', "''")));
688        }
689
690        let where_clause = format!("WHERE {}", conditions.join(" AND "));
691
692        let sql = format!(
693            "SELECT ROOT_NAME AS TABLE_SCHEMA, OBJECT_NAME AS TABLE_NAME, OBJECT_TYPE AS TABLE_TYPE FROM SYS.EXA_ALL_OBJECTS {} ORDER BY ROOT_NAME, OBJECT_NAME",
694            where_clause
695        );
696
697        self.execute(sql).await
698    }
699
700    /// Get metadata about columns.
701    ///
702    /// # Arguments
703    ///
704    /// * `catalog` - Optional catalog name filter
705    /// * `schema` - Optional schema name filter
706    /// * `table` - Optional table name filter
707    /// * `column` - Optional column name filter
708    ///
709    /// # Returns
710    ///
711    /// A `ResultSet` containing column metadata.
712    ///
713    /// # Errors
714    ///
715    /// Returns `QueryError` if the operation fails.
716    pub async fn get_columns(
717        &mut self,
718        catalog: Option<&str>,
719        schema: Option<&str>,
720        table: Option<&str>,
721        column: Option<&str>,
722    ) -> Result<ResultSet, QueryError> {
723        let mut conditions = Vec::new();
724
725        if let Some(cat) = catalog {
726            conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
727        }
728        if let Some(sch) = schema {
729            conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
730        }
731        if let Some(tbl) = table {
732            conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
733        }
734        if let Some(col) = column {
735            conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
736        }
737
738        let where_clause = if conditions.is_empty() {
739            String::new()
740        } else {
741            format!("WHERE {}", conditions.join(" AND "))
742        };
743
744        let sql = format!(
745            "SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
746             FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
747            where_clause
748        );
749
750        self.execute(sql).await
751    }
752
753    // ========================================================================
754    // Session Information Methods
755    // ========================================================================
756
757    /// Get session information.
758    ///
759    /// # Returns
760    ///
761    /// The session ID.
762    pub fn session_id(&self) -> &str {
763        self.session.session_id()
764    }
765
766    /// Get connection parameters.
767    ///
768    /// # Returns
769    ///
770    /// A reference to the connection parameters.
771    pub fn params(&self) -> &ConnectionParams {
772        &self.params
773    }
774
775    /// Check if the connection is closed.
776    ///
777    /// # Returns
778    ///
779    /// `true` if the connection is closed, `false` otherwise.
780    pub async fn is_closed(&self) -> bool {
781        self.session.is_closed().await
782    }
783
784    /// Close the connection.
785    ///
786    /// This closes the session and transport layer.
787    ///
788    /// # Errors
789    ///
790    /// Returns `ConnectionError` if closing fails.
791    ///
792    /// # Example
793    ///
794    pub async fn close(self) -> Result<(), ConnectionError> {
795        // Close session
796        self.session.close().await?;
797
798        // Close transport
799        let mut transport = self.transport.lock().await;
800        transport
801            .close()
802            .await
803            .map_err(|e| ConnectionError::ConnectionFailed {
804                host: self.params.host.clone(),
805                port: self.params.port,
806                message: e.to_string(),
807            })?;
808
809        Ok(())
810    }
811
812    /// Shut down the connection without consuming self.
813    ///
814    /// Unlike `close()`, this can be called through a shared reference,
815    /// making it suitable for use in `Drop` implementations where ownership
816    /// transfer is not possible (e.g., when the connection is behind `Arc<Mutex<>>`).
817    pub async fn shutdown(&self) -> Result<(), ConnectionError> {
818        self.session.close().await?;
819
820        let mut transport = self.transport.lock().await;
821        transport
822            .close()
823            .await
824            .map_err(|e| ConnectionError::ConnectionFailed {
825                host: self.params.host.clone(),
826                port: self.params.port,
827                message: e.to_string(),
828            })?;
829
830        Ok(())
831    }
832
833    // ========================================================================
834    // Import/Export Methods
835    // ========================================================================
836
837    /// Creates an SQL executor closure for import/export operations.
838    ///
839    /// This is a helper method that creates a closure which can execute SQL
840    /// statements and return the row count. The closure captures a cloned
841    /// reference to the transport, allowing it to be called multiple times
842    /// (e.g., for parallel file imports).
843    ///
844    /// # Returns
845    ///
846    /// A closure that takes an SQL string and returns a Future resolving to
847    /// either the affected row count or an error string.
848    fn make_sql_executor(
849        &self,
850    ) -> impl Fn(
851        String,
852    )
853        -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
854        let transport = Arc::clone(&self.transport);
855        move |sql: String| {
856            let transport = Arc::clone(&transport);
857            Box::pin(async move {
858                let mut transport_guard = transport.lock().await;
859                match transport_guard.execute_query(&sql).await {
860                    Ok(QueryResult::RowCount { count }) => Ok(count as u64),
861                    Ok(QueryResult::ResultSet { .. }) => Ok(0),
862                    Err(e) => Err(e.to_string()),
863                }
864            })
865        }
866    }
867
868    /// Import CSV data from a file into an Exasol table.
869    ///
870    /// This method reads CSV data from the specified file and imports it into
871    /// the target table using Exasol's HTTP transport layer.
872    ///
873    /// # Arguments
874    ///
875    /// * `table` - Name of the target table
876    /// * `file_path` - Path to the CSV file
877    /// * `options` - Import options
878    ///
879    /// # Returns
880    ///
881    /// The number of rows imported on success.
882    ///
883    /// # Errors
884    ///
885    /// Returns `ImportError` if the import fails.
886    ///
887    /// # Example
888    ///
889    pub async fn import_csv_from_file(
890        &mut self,
891        table: &str,
892        file_path: &std::path::Path,
893        options: crate::import::csv::CsvImportOptions,
894    ) -> Result<u64, crate::import::ImportError> {
895        // Pass Exasol host/port from connection params to import options
896        let options = options
897            .exasol_host(&self.params.host)
898            .exasol_port(self.params.port);
899
900        crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
901            .await
902    }
903
904    /// Import CSV data from an async reader into an Exasol table.
905    ///
906    /// This method reads CSV data from an async reader and imports it into
907    /// the target table using Exasol's HTTP transport layer.
908    ///
909    /// # Arguments
910    ///
911    /// * `table` - Name of the target table
912    /// * `reader` - Async reader providing CSV data
913    /// * `options` - Import options
914    ///
915    /// # Returns
916    ///
917    /// The number of rows imported on success.
918    ///
919    /// # Errors
920    ///
921    /// Returns `ImportError` if the import fails.
922    pub async fn import_csv_from_stream<R>(
923        &mut self,
924        table: &str,
925        reader: R,
926        options: crate::import::csv::CsvImportOptions,
927    ) -> Result<u64, crate::import::ImportError>
928    where
929        R: tokio::io::AsyncRead + Unpin + Send + 'static,
930    {
931        // Pass Exasol host/port from connection params to import options
932        let options = options
933            .exasol_host(&self.params.host)
934            .exasol_port(self.params.port);
935
936        crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
937            .await
938    }
939
940    /// Import CSV data from an iterator into an Exasol table.
941    ///
942    /// This method converts iterator rows to CSV format and imports them into
943    /// the target table using Exasol's HTTP transport layer.
944    ///
945    /// # Arguments
946    ///
947    /// * `table` - Name of the target table
948    /// * `rows` - Iterator of rows, where each row is an iterator of field values
949    /// * `options` - Import options
950    ///
951    /// # Returns
952    ///
953    /// The number of rows imported on success.
954    ///
955    /// # Errors
956    ///
957    /// Returns `ImportError` if the import fails.
958    ///
959    /// # Example
960    ///
961    pub async fn import_csv_from_iter<I, T, S>(
962        &mut self,
963        table: &str,
964        rows: I,
965        options: crate::import::csv::CsvImportOptions,
966    ) -> Result<u64, crate::import::ImportError>
967    where
968        I: IntoIterator<Item = T> + Send + 'static,
969        T: IntoIterator<Item = S> + Send,
970        S: AsRef<str>,
971    {
972        // Pass Exasol host/port from connection params to import options
973        let options = options
974            .exasol_host(&self.params.host)
975            .exasol_port(self.params.port);
976
977        crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
978    }
979
980    /// Export data from an Exasol table or query to a CSV file.
981    ///
982    /// This method exports data from the specified source to a CSV file
983    /// using Exasol's HTTP transport layer.
984    ///
985    /// # Arguments
986    ///
987    /// * `source` - The data source (table or query)
988    /// * `file_path` - Path to the output file
989    /// * `options` - Export options
990    ///
991    /// # Returns
992    ///
993    /// The number of rows exported on success.
994    ///
995    /// # Errors
996    ///
997    /// Returns `ExportError` if the export fails.
998    ///
999    /// # Example
1000    ///
1001    pub async fn export_csv_to_file(
1002        &mut self,
1003        source: crate::query::export::ExportSource,
1004        file_path: &std::path::Path,
1005        options: crate::export::csv::CsvExportOptions,
1006    ) -> Result<u64, crate::export::csv::ExportError> {
1007        // Pass Exasol host/port from connection params to export options
1008        let options = options
1009            .exasol_host(&self.params.host)
1010            .exasol_port(self.params.port);
1011
1012        let mut transport_guard = self.transport.lock().await;
1013        crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
1014    }
1015
1016    /// Export data from an Exasol table or query to an async writer.
1017    ///
1018    /// This method exports data from the specified source to an async writer
1019    /// using Exasol's HTTP transport layer.
1020    ///
1021    /// # Arguments
1022    ///
1023    /// * `source` - The data source (table or query)
1024    /// * `writer` - Async writer to write the CSV data to
1025    /// * `options` - Export options
1026    ///
1027    /// # Returns
1028    ///
1029    /// The number of rows exported on success.
1030    ///
1031    /// # Errors
1032    ///
1033    /// Returns `ExportError` if the export fails.
1034    pub async fn export_csv_to_stream<W>(
1035        &mut self,
1036        source: crate::query::export::ExportSource,
1037        writer: W,
1038        options: crate::export::csv::CsvExportOptions,
1039    ) -> Result<u64, crate::export::csv::ExportError>
1040    where
1041        W: tokio::io::AsyncWrite + Unpin,
1042    {
1043        // Pass Exasol host/port from connection params to export options
1044        let options = options
1045            .exasol_host(&self.params.host)
1046            .exasol_port(self.params.port);
1047
1048        let mut transport_guard = self.transport.lock().await;
1049        crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
1050    }
1051
1052    /// Export data from an Exasol table or query to an in-memory list of rows.
1053    ///
1054    /// Each row is represented as a vector of string values.
1055    ///
1056    /// # Arguments
1057    ///
1058    /// * `source` - The data source (table or query)
1059    /// * `options` - Export options
1060    ///
1061    /// # Returns
1062    ///
1063    /// A vector of rows, where each row is a vector of column values.
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns `ExportError` if the export fails.
1068    ///
1069    /// # Example
1070    ///
1071    pub async fn export_csv_to_list(
1072        &mut self,
1073        source: crate::query::export::ExportSource,
1074        options: crate::export::csv::CsvExportOptions,
1075    ) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
1076        // Pass Exasol host/port from connection params to export options
1077        let options = options
1078            .exasol_host(&self.params.host)
1079            .exasol_port(self.params.port);
1080
1081        let mut transport_guard = self.transport.lock().await;
1082        crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
1083    }
1084
1085    /// Import multiple CSV files in parallel into an Exasol table.
1086    ///
1087    /// This method reads CSV data from multiple files and imports them into
1088    /// the target table using parallel HTTP transport connections. Each file
1089    /// gets its own connection with a unique internal address.
1090    ///
1091    /// For a single file, this method delegates to `import_csv_from_file`.
1092    ///
1093    /// # Arguments
1094    ///
1095    /// * `table` - Name of the target table
1096    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1097    /// * `options` - Import options
1098    ///
1099    /// # Returns
1100    ///
1101    /// The number of rows imported on success.
1102    ///
1103    /// # Errors
1104    ///
1105    /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1106    ///
1107    /// # Example
1108    ///
1109    /// ```no_run
1110    /// use exarrow_rs::adbc::Connection;
1111    /// use exarrow_rs::import::CsvImportOptions;
1112    /// use std::path::PathBuf;
1113    ///
1114    /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1115    /// let files = vec![
1116    ///     PathBuf::from("/data/part1.csv"),
1117    ///     PathBuf::from("/data/part2.csv"),
1118    /// ];
1119    ///
1120    /// let options = CsvImportOptions::default();
1121    /// let rows = conn.import_csv_from_files("my_table", files, options).await?;
1122    /// # Ok(())
1123    /// # }
1124    /// ```
1125    pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
1126        &mut self,
1127        table: &str,
1128        paths: S,
1129        options: crate::import::csv::CsvImportOptions,
1130    ) -> Result<u64, crate::import::ImportError> {
1131        // Pass Exasol host/port from connection params to import options
1132        let options = options
1133            .exasol_host(&self.params.host)
1134            .exasol_port(self.params.port);
1135
1136        crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
1137    }
1138
1139    /// Import data from a Parquet file into an Exasol table.
1140    ///
1141    /// This method reads a Parquet file, converts the data to CSV format,
1142    /// and imports it into the target table using Exasol's HTTP transport layer.
1143    ///
1144    /// # Arguments
1145    ///
1146    /// * `table` - Name of the target table
1147    /// * `file_path` - Path to the Parquet file
1148    /// * `options` - Import options
1149    ///
1150    /// # Returns
1151    ///
1152    /// The number of rows imported on success.
1153    ///
1154    /// # Errors
1155    ///
1156    /// Returns `ImportError` if the import fails.
1157    ///
1158    /// # Example
1159    ///
1160    pub async fn import_from_parquet(
1161        &mut self,
1162        table: &str,
1163        file_path: &std::path::Path,
1164        options: crate::import::parquet::ParquetImportOptions,
1165    ) -> Result<u64, crate::import::ImportError> {
1166        // Pass Exasol host/port from connection params to import options
1167        let options = options
1168            .with_exasol_host(&self.params.host)
1169            .with_exasol_port(self.params.port);
1170
1171        crate::import::parquet::import_from_parquet(
1172            self.make_sql_executor(),
1173            table,
1174            file_path,
1175            options,
1176        )
1177        .await
1178    }
1179
1180    /// Import multiple Parquet files in parallel into an Exasol table.
1181    ///
1182    /// This method converts each Parquet file to CSV format concurrently,
1183    /// then streams the data through parallel HTTP transport connections.
1184    ///
1185    /// For a single file, this method delegates to `import_from_parquet`.
1186    ///
1187    /// # Arguments
1188    ///
1189    /// * `table` - Name of the target table
1190    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1191    /// * `options` - Import options
1192    ///
1193    /// # Returns
1194    ///
1195    /// The number of rows imported on success.
1196    ///
1197    /// # Errors
1198    ///
1199    /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1200    ///
1201    /// # Example
1202    ///
1203    /// ```no_run
1204    /// use exarrow_rs::adbc::Connection;
1205    /// use exarrow_rs::import::ParquetImportOptions;
1206    /// use std::path::PathBuf;
1207    ///
1208    /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1209    /// let files = vec![
1210    ///     PathBuf::from("/data/part1.parquet"),
1211    ///     PathBuf::from("/data/part2.parquet"),
1212    /// ];
1213    ///
1214    /// let options = ParquetImportOptions::default();
1215    /// let rows = conn.import_parquet_from_files("my_table", files, options).await?;
1216    /// # Ok(())
1217    /// # }
1218    /// ```
1219    pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
1220        &mut self,
1221        table: &str,
1222        paths: S,
1223        options: crate::import::parquet::ParquetImportOptions,
1224    ) -> Result<u64, crate::import::ImportError> {
1225        // Pass Exasol host/port from connection params to import options
1226        let options = options
1227            .with_exasol_host(&self.params.host)
1228            .with_exasol_port(self.params.port);
1229
1230        crate::import::parquet::import_from_parquet_files(
1231            self.make_sql_executor(),
1232            table,
1233            paths,
1234            options,
1235        )
1236        .await
1237    }
1238
1239    /// Export data from an Exasol table or query to a Parquet file.
1240    ///
1241    /// This method exports data from the specified source to a Parquet file.
1242    /// The data is first received as CSV from Exasol, then converted to Parquet format.
1243    ///
1244    /// # Arguments
1245    ///
1246    /// * `source` - The data source (table or query)
1247    /// * `file_path` - Path to the output Parquet file
1248    /// * `options` - Export options
1249    ///
1250    /// # Returns
1251    ///
1252    /// The number of rows exported on success.
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns `ExportError` if the export fails.
1257    ///
1258    /// # Example
1259    ///
1260    pub async fn export_to_parquet(
1261        &mut self,
1262        source: crate::query::export::ExportSource,
1263        file_path: &std::path::Path,
1264        options: crate::export::parquet::ParquetExportOptions,
1265    ) -> Result<u64, crate::export::csv::ExportError> {
1266        // Pass Exasol host/port from connection params to export options
1267        let options = options
1268            .exasol_host(&self.params.host)
1269            .exasol_port(self.params.port);
1270
1271        let mut transport_guard = self.transport.lock().await;
1272        crate::export::parquet::export_to_parquet_via_transport(
1273            &mut *transport_guard,
1274            source,
1275            file_path,
1276            options,
1277        )
1278        .await
1279    }
1280
1281    /// Import data from an Arrow RecordBatch into an Exasol table.
1282    ///
1283    /// This method converts the RecordBatch to CSV format and imports it
1284    /// into the target table using Exasol's HTTP transport layer.
1285    ///
1286    /// # Arguments
1287    ///
1288    /// * `table` - Name of the target table
1289    /// * `batch` - The RecordBatch to import
1290    /// * `options` - Import options
1291    ///
1292    /// # Returns
1293    ///
1294    /// The number of rows imported on success.
1295    ///
1296    /// # Errors
1297    ///
1298    /// Returns `ImportError` if the import fails.
1299    ///
1300    /// # Example
1301    ///
1302    pub async fn import_from_record_batch(
1303        &mut self,
1304        table: &str,
1305        batch: &RecordBatch,
1306        options: crate::import::arrow::ArrowImportOptions,
1307    ) -> Result<u64, crate::import::ImportError> {
1308        // Pass Exasol host/port from connection params to import options
1309        let options = options
1310            .exasol_host(&self.params.host)
1311            .exasol_port(self.params.port);
1312
1313        crate::import::arrow::import_from_record_batch(
1314            self.make_sql_executor(),
1315            table,
1316            batch,
1317            options,
1318        )
1319        .await
1320    }
1321
1322    /// Import data from multiple Arrow RecordBatches into an Exasol table.
1323    ///
1324    /// This method converts each RecordBatch to CSV format and imports them
1325    /// into the target table using Exasol's HTTP transport layer.
1326    ///
1327    /// # Arguments
1328    ///
1329    /// * `table` - Name of the target table
1330    /// * `batches` - An iterator of RecordBatches to import
1331    /// * `options` - Import options
1332    ///
1333    /// # Returns
1334    ///
1335    /// The number of rows imported on success.
1336    ///
1337    /// # Errors
1338    ///
1339    /// Returns `ImportError` if the import fails.
1340    pub async fn import_from_record_batches<I>(
1341        &mut self,
1342        table: &str,
1343        batches: I,
1344        options: crate::import::arrow::ArrowImportOptions,
1345    ) -> Result<u64, crate::import::ImportError>
1346    where
1347        I: IntoIterator<Item = RecordBatch>,
1348    {
1349        // Pass Exasol host/port from connection params to import options
1350        let options = options
1351            .exasol_host(&self.params.host)
1352            .exasol_port(self.params.port);
1353
1354        crate::import::arrow::import_from_record_batches(
1355            self.make_sql_executor(),
1356            table,
1357            batches,
1358            options,
1359        )
1360        .await
1361    }
1362
1363    /// Import data from an Arrow IPC file/stream into an Exasol table.
1364    ///
1365    /// This method reads Arrow IPC format data, converts it to CSV,
1366    /// and imports it into the target table using Exasol's HTTP transport layer.
1367    ///
1368    /// # Arguments
1369    ///
1370    /// * `table` - Name of the target table
1371    /// * `reader` - An async reader containing Arrow IPC data
1372    /// * `options` - Import options
1373    ///
1374    /// # Returns
1375    ///
1376    /// The number of rows imported on success.
1377    ///
1378    /// # Errors
1379    ///
1380    /// Returns `ImportError` if the import fails.
1381    ///
1382    /// # Example
1383    ///
1384    pub async fn import_from_arrow_ipc<R>(
1385        &mut self,
1386        table: &str,
1387        reader: R,
1388        options: crate::import::arrow::ArrowImportOptions,
1389    ) -> Result<u64, crate::import::ImportError>
1390    where
1391        R: tokio::io::AsyncRead + Unpin + Send,
1392    {
1393        // Pass Exasol host/port from connection params to import options
1394        let options = options
1395            .exasol_host(&self.params.host)
1396            .exasol_port(self.params.port);
1397
1398        crate::import::arrow::import_from_arrow_ipc(
1399            self.make_sql_executor(),
1400            table,
1401            reader,
1402            options,
1403        )
1404        .await
1405    }
1406
1407    /// Export data from an Exasol table or query to Arrow RecordBatches.
1408    ///
1409    /// This method exports data from the specified source and converts it
1410    /// to Arrow RecordBatches.
1411    ///
1412    /// # Arguments
1413    ///
1414    /// * `source` - The data source (table or query)
1415    /// * `options` - Export options
1416    ///
1417    /// # Returns
1418    ///
1419    /// A vector of RecordBatches on success.
1420    ///
1421    /// # Errors
1422    ///
1423    /// Returns `ExportError` if the export fails.
1424    ///
1425    /// # Example
1426    ///
1427    pub async fn export_to_record_batches(
1428        &mut self,
1429        source: crate::query::export::ExportSource,
1430        options: crate::export::arrow::ArrowExportOptions,
1431    ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1432        // Pass Exasol host/port from connection params to export options
1433        let options = options
1434            .exasol_host(&self.params.host)
1435            .exasol_port(self.params.port);
1436
1437        let mut transport_guard = self.transport.lock().await;
1438        crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
1439    }
1440
1441    /// Export data from an Exasol table or query to an Arrow IPC file.
1442    ///
1443    /// This method exports data from the specified source to an Arrow IPC file.
1444    ///
1445    /// # Arguments
1446    ///
1447    /// * `source` - The data source (table or query)
1448    /// * `file_path` - Path to the output Arrow IPC file
1449    /// * `options` - Export options
1450    ///
1451    /// # Returns
1452    ///
1453    /// The number of rows exported on success.
1454    ///
1455    /// # Errors
1456    ///
1457    /// Returns `ExportError` if the export fails.
1458    ///
1459    /// # Example
1460    ///
1461    pub async fn export_to_arrow_ipc(
1462        &mut self,
1463        source: crate::query::export::ExportSource,
1464        file_path: &std::path::Path,
1465        options: crate::export::arrow::ArrowExportOptions,
1466    ) -> Result<u64, crate::export::csv::ExportError> {
1467        // Pass Exasol host/port from connection params to export options
1468        let options = options
1469            .exasol_host(&self.params.host)
1470            .exasol_port(self.params.port);
1471
1472        let mut transport_guard = self.transport.lock().await;
1473        crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
1474            .await
1475    }
1476
1477    // ========================================================================
1478    // Blocking Import/Export Methods
1479    // ========================================================================
1480
1481    /// Import CSV data from a file into an Exasol table (blocking).
1482    ///
1483    /// This is a synchronous wrapper around [`import_csv_from_file`](Self::import_csv_from_file)
1484    /// for use in non-async contexts.
1485    ///
1486    /// # Arguments
1487    ///
1488    /// * `table` - Name of the target table
1489    /// * `file_path` - Path to the CSV file
1490    /// * `options` - Import options
1491    ///
1492    /// # Returns
1493    ///
1494    /// The number of rows imported on success.
1495    ///
1496    /// # Errors
1497    ///
1498    /// Returns `ImportError` if the import fails.
1499    ///
1500    /// # Example
1501    ///
1502    pub fn blocking_import_csv_from_file(
1503        &mut self,
1504        table: &str,
1505        file_path: &std::path::Path,
1506        options: crate::import::csv::CsvImportOptions,
1507    ) -> Result<u64, crate::import::ImportError> {
1508        blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
1509    }
1510
1511    /// Import multiple CSV files in parallel into an Exasol table (blocking).
1512    ///
1513    /// This is a synchronous wrapper around [`import_csv_from_files`](Self::import_csv_from_files)
1514    /// for use in non-async contexts.
1515    ///
1516    /// # Arguments
1517    ///
1518    /// * `table` - Name of the target table
1519    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1520    /// * `options` - Import options
1521    ///
1522    /// # Returns
1523    ///
1524    /// The number of rows imported on success.
1525    ///
1526    /// # Errors
1527    ///
1528    /// Returns `ImportError` if the import fails.
1529    pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
1530        &mut self,
1531        table: &str,
1532        paths: S,
1533        options: crate::import::csv::CsvImportOptions,
1534    ) -> Result<u64, crate::import::ImportError> {
1535        blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
1536    }
1537
1538    /// Import data from a Parquet file into an Exasol table (blocking).
1539    ///
1540    /// This is a synchronous wrapper around [`import_from_parquet`](Self::import_from_parquet)
1541    /// for use in non-async contexts.
1542    ///
1543    /// # Arguments
1544    ///
1545    /// * `table` - Name of the target table
1546    /// * `file_path` - Path to the Parquet file
1547    /// * `options` - Import options
1548    ///
1549    /// # Returns
1550    ///
1551    /// The number of rows imported on success.
1552    ///
1553    /// # Errors
1554    ///
1555    /// Returns `ImportError` if the import fails.
1556    ///
1557    /// # Example
1558    ///
1559    pub fn blocking_import_from_parquet(
1560        &mut self,
1561        table: &str,
1562        file_path: &std::path::Path,
1563        options: crate::import::parquet::ParquetImportOptions,
1564    ) -> Result<u64, crate::import::ImportError> {
1565        blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
1566    }
1567
1568    /// Import multiple Parquet files in parallel into an Exasol table (blocking).
1569    ///
1570    /// This is a synchronous wrapper around [`import_parquet_from_files`](Self::import_parquet_from_files)
1571    /// for use in non-async contexts.
1572    ///
1573    /// # Arguments
1574    ///
1575    /// * `table` - Name of the target table
1576    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1577    /// * `options` - Import options
1578    ///
1579    /// # Returns
1580    ///
1581    /// The number of rows imported on success.
1582    ///
1583    /// # Errors
1584    ///
1585    /// Returns `ImportError` if the import fails.
1586    pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
1587        &mut self,
1588        table: &str,
1589        paths: S,
1590        options: crate::import::parquet::ParquetImportOptions,
1591    ) -> Result<u64, crate::import::ImportError> {
1592        blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
1593    }
1594
1595    /// Import data from an Arrow RecordBatch into an Exasol table (blocking).
1596    ///
1597    /// This is a synchronous wrapper around [`import_from_record_batch`](Self::import_from_record_batch)
1598    /// for use in non-async contexts.
1599    ///
1600    /// # Arguments
1601    ///
1602    /// * `table` - Name of the target table
1603    /// * `batch` - The RecordBatch to import
1604    /// * `options` - Import options
1605    ///
1606    /// # Returns
1607    ///
1608    /// The number of rows imported on success.
1609    ///
1610    /// # Errors
1611    ///
1612    /// Returns `ImportError` if the import fails.
1613    ///
1614    /// # Example
1615    ///
1616    pub fn blocking_import_from_record_batch(
1617        &mut self,
1618        table: &str,
1619        batch: &RecordBatch,
1620        options: crate::import::arrow::ArrowImportOptions,
1621    ) -> Result<u64, crate::import::ImportError> {
1622        blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
1623    }
1624
1625    /// Import data from an Arrow IPC file into an Exasol table (blocking).
1626    ///
1627    /// This is a synchronous wrapper around [`import_from_arrow_ipc`](Self::import_from_arrow_ipc)
1628    /// for use in non-async contexts.
1629    ///
1630    /// Note: This method requires a synchronous reader that implements `std::io::Read`.
1631    /// The data will be read into memory before being imported.
1632    ///
1633    /// # Arguments
1634    ///
1635    /// * `table` - Name of the target table
1636    /// * `file_path` - Path to the Arrow IPC file
1637    /// * `options` - Import options
1638    ///
1639    /// # Returns
1640    ///
1641    /// The number of rows imported on success.
1642    ///
1643    /// # Errors
1644    ///
1645    /// Returns `ImportError` if the import fails.
1646    ///
1647    /// # Example
1648    ///
1649    pub fn blocking_import_from_arrow_ipc(
1650        &mut self,
1651        table: &str,
1652        file_path: &std::path::Path,
1653        options: crate::import::arrow::ArrowImportOptions,
1654    ) -> Result<u64, crate::import::ImportError> {
1655        blocking_runtime().block_on(async {
1656            let file = tokio::fs::File::open(file_path)
1657                .await
1658                .map_err(crate::import::ImportError::IoError)?;
1659            self.import_from_arrow_ipc(table, file, options).await
1660        })
1661    }
1662
1663    /// Export data from an Exasol table or query to a CSV file (blocking).
1664    ///
1665    /// This is a synchronous wrapper around [`export_csv_to_file`](Self::export_csv_to_file)
1666    /// for use in non-async contexts.
1667    ///
1668    /// # Arguments
1669    ///
1670    /// * `source` - The data source (table or query)
1671    /// * `file_path` - Path to the output file
1672    /// * `options` - Export options
1673    ///
1674    /// # Returns
1675    ///
1676    /// The number of rows exported on success.
1677    ///
1678    /// # Errors
1679    ///
1680    /// Returns `ExportError` if the export fails.
1681    ///
1682    /// # Example
1683    ///
1684    pub fn blocking_export_csv_to_file(
1685        &mut self,
1686        source: crate::query::export::ExportSource,
1687        file_path: &std::path::Path,
1688        options: crate::export::csv::CsvExportOptions,
1689    ) -> Result<u64, crate::export::csv::ExportError> {
1690        blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
1691    }
1692
1693    /// Export data from an Exasol table or query to a Parquet file (blocking).
1694    ///
1695    /// This is a synchronous wrapper around [`export_to_parquet`](Self::export_to_parquet)
1696    /// for use in non-async contexts.
1697    ///
1698    /// # Arguments
1699    ///
1700    /// * `source` - The data source (table or query)
1701    /// * `file_path` - Path to the output Parquet file
1702    /// * `options` - Export options
1703    ///
1704    /// # Returns
1705    ///
1706    /// The number of rows exported on success.
1707    ///
1708    /// # Errors
1709    ///
1710    /// Returns `ExportError` if the export fails.
1711    ///
1712    /// # Example
1713    ///
1714    pub fn blocking_export_to_parquet(
1715        &mut self,
1716        source: crate::query::export::ExportSource,
1717        file_path: &std::path::Path,
1718        options: crate::export::parquet::ParquetExportOptions,
1719    ) -> Result<u64, crate::export::csv::ExportError> {
1720        blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
1721    }
1722
1723    /// Export data from an Exasol table or query to Arrow RecordBatches (blocking).
1724    ///
1725    /// This is a synchronous wrapper around [`export_to_record_batches`](Self::export_to_record_batches)
1726    /// for use in non-async contexts.
1727    ///
1728    /// # Arguments
1729    ///
1730    /// * `source` - The data source (table or query)
1731    /// * `options` - Export options
1732    ///
1733    /// # Returns
1734    ///
1735    /// A vector of RecordBatches on success.
1736    ///
1737    /// # Errors
1738    ///
1739    /// Returns `ExportError` if the export fails.
1740    ///
1741    /// # Example
1742    ///
1743    pub fn blocking_export_to_record_batches(
1744        &mut self,
1745        source: crate::query::export::ExportSource,
1746        options: crate::export::arrow::ArrowExportOptions,
1747    ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1748        blocking_runtime().block_on(self.export_to_record_batches(source, options))
1749    }
1750
1751    /// Export data from an Exasol table or query to an Arrow IPC file (blocking).
1752    ///
1753    /// This is a synchronous wrapper around [`export_to_arrow_ipc`](Self::export_to_arrow_ipc)
1754    /// for use in non-async contexts.
1755    ///
1756    /// # Arguments
1757    ///
1758    /// * `source` - The data source (table or query)
1759    /// * `file_path` - Path to the output Arrow IPC file
1760    /// * `options` - Export options
1761    ///
1762    /// # Returns
1763    ///
1764    /// The number of rows exported on success.
1765    ///
1766    /// # Errors
1767    ///
1768    /// Returns `ExportError` if the export fails.
1769    ///
1770    /// # Example
1771    ///
1772    pub fn blocking_export_to_arrow_ipc(
1773        &mut self,
1774        source: crate::query::export::ExportSource,
1775        file_path: &std::path::Path,
1776        options: crate::export::arrow::ArrowExportOptions,
1777    ) -> Result<u64, crate::export::csv::ExportError> {
1778        blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
1779    }
1780
1781    // ========================================================================
1782    // Private Helper Methods
1783    // ========================================================================
1784
1785    /// Update session state after query execution.
1786    async fn update_session_state_after_query(&self) {
1787        if self.session.in_transaction() {
1788            self.session.set_state(SessionState::InTransaction).await;
1789        } else {
1790            self.session.set_state(SessionState::Ready).await;
1791        }
1792        self.session.update_activity().await;
1793    }
1794}
1795
1796impl std::fmt::Debug for Connection {
1797    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1798        f.debug_struct("Connection")
1799            .field("session_id", &self.session.session_id())
1800            .field("host", &self.params.host)
1801            .field("port", &self.params.port)
1802            .field("username", &self.params.username)
1803            .field("in_transaction", &self.session.in_transaction())
1804            .finish()
1805    }
1806}
1807
1808/// Builder for creating Connection instances.
1809pub struct ConnectionBuilder {
1810    /// Connection parameters builder
1811    params_builder: crate::connection::params::ConnectionBuilder,
1812}
1813
1814impl ConnectionBuilder {
1815    /// Create a new ConnectionBuilder.
1816    pub fn new() -> Self {
1817        Self {
1818            params_builder: crate::connection::params::ConnectionBuilder::new(),
1819        }
1820    }
1821
1822    /// Set the database host.
1823    pub fn host(mut self, host: &str) -> Self {
1824        self.params_builder = self.params_builder.host(host);
1825        self
1826    }
1827
1828    /// Set the database port.
1829    pub fn port(mut self, port: u16) -> Self {
1830        self.params_builder = self.params_builder.port(port);
1831        self
1832    }
1833
1834    /// Set the username.
1835    pub fn username(mut self, username: &str) -> Self {
1836        self.params_builder = self.params_builder.username(username);
1837        self
1838    }
1839
1840    /// Set the password.
1841    pub fn password(mut self, password: &str) -> Self {
1842        self.params_builder = self.params_builder.password(password);
1843        self
1844    }
1845
1846    /// Set the default schema.
1847    pub fn schema(mut self, schema: &str) -> Self {
1848        self.params_builder = self.params_builder.schema(schema);
1849        self
1850    }
1851
1852    /// Enable or disable TLS.
1853    pub fn use_tls(mut self, use_tls: bool) -> Self {
1854        self.params_builder = self.params_builder.use_tls(use_tls);
1855        self
1856    }
1857
1858    /// Build and connect.
1859    ///
1860    /// # Returns
1861    ///
1862    /// A connected `Connection` instance.
1863    ///
1864    /// # Errors
1865    ///
1866    /// Returns `ExasolError` if the connection fails.
1867    pub async fn connect(self) -> Result<Connection, ExasolError> {
1868        let params = self.params_builder.build()?;
1869        Ok(Connection::from_params(params).await?)
1870    }
1871}
1872
1873impl Default for ConnectionBuilder {
1874    fn default() -> Self {
1875        Self::new()
1876    }
1877}
1878
1879#[cfg(test)]
1880mod tests {
1881    use super::*;
1882
1883    #[test]
1884    fn test_connection_builder() {
1885        let _builder = ConnectionBuilder::new()
1886            .host("localhost")
1887            .port(8563)
1888            .username("test")
1889            .password("secret")
1890            .schema("MY_SCHEMA")
1891            .use_tls(false);
1892
1893        // Builder should compile and be valid
1894        // Actual connection requires a running Exasol instance
1895    }
1896
1897    #[test]
1898    fn test_create_statement_is_sync() {
1899        // This test verifies that create_statement is synchronous
1900        // by calling it without await
1901        // Note: We can't actually create a Connection without a database,
1902        // but we can verify the API compiles correctly
1903    }
1904}
1905
1906/// Blocking wrapper tests
1907#[cfg(test)]
1908mod blocking_tests {
1909    use super::*;
1910
1911    #[test]
1912    fn test_session_type_alias_exists() {
1913        // Verify that Session is a type alias for Connection
1914        fn takes_session(_session: &Session) {}
1915        fn takes_connection(_connection: &Connection) {}
1916
1917        // This should compile, showing Session = Connection
1918        fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
1919        where
1920            F1: Fn(&Session),
1921            F2: Fn(&Connection),
1922        {
1923            let _ = (f1, f2);
1924        }
1925
1926        verify_interchangeable(takes_session, takes_connection);
1927    }
1928
1929    #[test]
1930    fn test_blocking_runtime_exists() {
1931        // Verify that blocking_runtime() function exists and returns a Runtime
1932        let runtime = blocking_runtime();
1933        // If this compiles, the runtime is valid
1934        let _ = runtime.handle();
1935    }
1936
1937    #[test]
1938    fn test_connection_has_blocking_import_csv() {
1939        // This test verifies the blocking_import_csv_from_file method exists
1940        // by checking the method signature compiles
1941        fn _check_method_exists(_conn: &mut Connection) {
1942            // Method signature check - will fail to compile if method doesn't exist
1943            let _: fn(
1944                &mut Connection,
1945                &str,
1946                &std::path::Path,
1947                crate::import::csv::CsvImportOptions,
1948            ) -> Result<u64, crate::import::ImportError> =
1949                Connection::blocking_import_csv_from_file;
1950        }
1951    }
1952
1953    #[test]
1954    fn test_connection_has_blocking_import_parquet() {
1955        fn _check_method_exists(_conn: &mut Connection) {
1956            let _: fn(
1957                &mut Connection,
1958                &str,
1959                &std::path::Path,
1960                crate::import::parquet::ParquetImportOptions,
1961            ) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
1962        }
1963    }
1964
1965    #[test]
1966    fn test_connection_has_blocking_import_record_batch() {
1967        fn _check_method_exists(_conn: &mut Connection) {
1968            let _: fn(
1969                &mut Connection,
1970                &str,
1971                &RecordBatch,
1972                crate::import::arrow::ArrowImportOptions,
1973            ) -> Result<u64, crate::import::ImportError> =
1974                Connection::blocking_import_from_record_batch;
1975        }
1976    }
1977
1978    #[test]
1979    fn test_connection_has_blocking_import_arrow_ipc() {
1980        fn _check_method_exists(_conn: &mut Connection) {
1981            let _: fn(
1982                &mut Connection,
1983                &str,
1984                &std::path::Path,
1985                crate::import::arrow::ArrowImportOptions,
1986            ) -> Result<u64, crate::import::ImportError> =
1987                Connection::blocking_import_from_arrow_ipc;
1988        }
1989    }
1990
1991    #[test]
1992    fn test_connection_has_blocking_export_csv() {
1993        fn _check_method_exists(_conn: &mut Connection) {
1994            let _: fn(
1995                &mut Connection,
1996                crate::query::export::ExportSource,
1997                &std::path::Path,
1998                crate::export::csv::CsvExportOptions,
1999            ) -> Result<u64, crate::export::csv::ExportError> =
2000                Connection::blocking_export_csv_to_file;
2001        }
2002    }
2003
2004    #[test]
2005    fn test_connection_has_blocking_export_parquet() {
2006        fn _check_method_exists(_conn: &mut Connection) {
2007            let _: fn(
2008                &mut Connection,
2009                crate::query::export::ExportSource,
2010                &std::path::Path,
2011                crate::export::parquet::ParquetExportOptions,
2012            ) -> Result<u64, crate::export::csv::ExportError> =
2013                Connection::blocking_export_to_parquet;
2014        }
2015    }
2016
2017    #[test]
2018    fn test_connection_has_blocking_export_record_batches() {
2019        fn _check_method_exists(_conn: &mut Connection) {
2020            let _: fn(
2021                &mut Connection,
2022                crate::query::export::ExportSource,
2023                crate::export::arrow::ArrowExportOptions,
2024            ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
2025                Connection::blocking_export_to_record_batches;
2026        }
2027    }
2028
2029    #[test]
2030    fn test_connection_has_blocking_export_arrow_ipc() {
2031        fn _check_method_exists(_conn: &mut Connection) {
2032            let _: fn(
2033                &mut Connection,
2034                crate::query::export::ExportSource,
2035                &std::path::Path,
2036                crate::export::arrow::ArrowExportOptions,
2037            ) -> Result<u64, crate::export::csv::ExportError> =
2038                Connection::blocking_export_to_arrow_ipc;
2039        }
2040    }
2041}