Skip to main content

exarrow_rs/adbc/
connection.rs

1//! ADBC Connection implementation.
2//!
3//! This module provides the `Connection` type which represents an active
4//! database connection and provides methods for executing queries.
5//!
6//! # New in v2.0.0
7//!
8//! Connection now owns the transport directly and Statement is a pure data container.
9//! Execute statements via Connection methods: `execute_statement()`, `execute_prepared()`.
10
11use crate::adbc::Statement;
12use crate::connection::auth::AuthResponseData;
13use crate::connection::params::ConnectionParams;
14use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
15use crate::error::{ConnectionError, ExasolError, QueryError};
16use crate::query::prepared::PreparedStatement;
17use crate::query::results::ResultSet;
18use crate::transport::protocol::{
19    ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
20    QueryResult, TransportProtocol,
21};
22use crate::transport::WebSocketTransport;
23use arrow::array::RecordBatch;
24use std::sync::{Arc, OnceLock};
25use std::time::Duration;
26use tokio::runtime::Runtime;
27use tokio::sync::Mutex;
28use tokio::time::timeout;
29
30/// Session is a type alias for Connection.
31///
32/// This alias provides a more intuitive name for database sessions when
33/// performing import/export operations. Both `Session` and `Connection`
34/// can be used interchangeably.
35///
36/// # Example
37///
38pub type Session = Connection;
39
40fn blocking_runtime() -> &'static Runtime {
41    static RUNTIME: OnceLock<Runtime> = OnceLock::new();
42    RUNTIME.get_or_init(|| {
43        tokio::runtime::Builder::new_current_thread()
44            .enable_all()
45            .build()
46            .expect("Failed to create tokio runtime for blocking operations")
47    })
48}
49
50/// ADBC Connection to an Exasol database.
51///
52/// The `Connection` type represents an active database connection and provides
53/// methods for executing queries, managing transactions, and retrieving metadata.
54///
55/// # v2.0.0 Breaking Changes
56///
57/// - Connection now owns the transport directly
58/// - `create_statement()` is now synchronous and returns a pure data container
59/// - Use `execute_statement()` instead of `Statement::execute()`
60/// - Use `prepare()` instead of `Statement::prepare()`
61///
62/// # Example
63///
64pub struct Connection {
65    /// Transport layer for communication (owned by Connection)
66    transport: Arc<Mutex<dyn TransportProtocol>>,
67    /// Session information
68    session: SessionInfo,
69    /// Connection parameters
70    params: ConnectionParams,
71}
72
73impl Connection {
74    /// Create a connection from connection parameters.
75    ///
76    /// This establishes a connection to the Exasol database using WebSocket transport,
77    /// authenticates the user, and creates a session.
78    ///
79    /// # Arguments
80    ///
81    /// * `params` - Connection parameters
82    ///
83    /// # Returns
84    ///
85    /// A connected `Connection` instance.
86    ///
87    /// # Errors
88    ///
89    /// Returns `ConnectionError` if the connection or authentication fails.
90    pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
91        // Create transport
92        let mut transport = WebSocketTransport::new();
93
94        // Convert ConnectionParams to TransportConnectionParams
95        let transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
96            .with_tls(params.use_tls)
97            .with_validate_server_certificate(params.validate_server_certificate)
98            .with_timeout(params.connection_timeout.as_millis() as u64);
99
100        // Connect
101        transport.connect(&transport_params).await.map_err(|e| {
102            ConnectionError::ConnectionFailed {
103                host: params.host.clone(),
104                port: params.port,
105                message: e.to_string(),
106            }
107        })?;
108
109        // Authenticate
110        let credentials =
111            TransportCredentials::new(params.username.clone(), params.password().to_string());
112        let session_info = transport
113            .authenticate(&credentials)
114            .await
115            .map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
116
117        // Create session config from connection params
118        let session_config = SessionConfig {
119            idle_timeout: params.idle_timeout,
120            query_timeout: params.query_timeout,
121            ..Default::default()
122        };
123
124        // Extract session_id once to avoid double clone
125        let session_id = session_info.session_id.clone();
126
127        // Convert SessionInfo to AuthResponseData
128        let auth_response = AuthResponseData {
129            session_id: session_id.clone(),
130            protocol_version: session_info.protocol_version,
131            release_version: session_info.release_version,
132            database_name: session_info.database_name,
133            product_name: session_info.product_name,
134            max_data_message_size: session_info.max_data_message_size,
135            max_identifier_length: 128,    // Default value
136            max_varchar_length: 2_000_000, // Default value
137            identifier_quote_string: "\"".to_string(),
138            time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
139            time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
140        };
141
142        // Create session (owned, not Arc)
143        let session = SessionInfo::new(session_id, auth_response, session_config);
144
145        // Set schema if specified
146        if let Some(schema) = &params.schema {
147            session.set_current_schema(Some(schema.clone())).await;
148        }
149
150        Ok(Self {
151            transport: Arc::new(Mutex::new(transport)),
152            session,
153            params,
154        })
155    }
156
157    /// Create a builder for constructing a connection.
158    ///
159    /// # Returns
160    ///
161    /// A `ConnectionBuilder` instance.
162    ///
163    /// # Example
164    ///
165    pub fn builder() -> ConnectionBuilder {
166        ConnectionBuilder::new()
167    }
168
169    // ========================================================================
170    // Statement Creation (synchronous - Statement is now a pure data container)
171    // ========================================================================
172
173    /// Create a new statement for executing SQL.
174    ///
175    /// Statement is now a pure data container. Use `execute_statement()` to execute it.
176    ///
177    /// # Arguments
178    ///
179    /// * `sql` - SQL query text
180    ///
181    /// # Returns
182    ///
183    /// A `Statement` instance ready for execution via `execute_statement()`.
184    ///
185    /// # Example
186    ///
187    pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
188        Statement::new(sql)
189    }
190
191    // ========================================================================
192    // Statement Execution Methods
193    // ========================================================================
194
195    /// Execute a statement and return results.
196    ///
197    /// # Arguments
198    ///
199    /// * `stmt` - Statement to execute
200    ///
201    /// # Returns
202    ///
203    /// A `ResultSet` containing the query results.
204    ///
205    /// # Errors
206    ///
207    /// Returns `QueryError` if execution fails or times out.
208    ///
209    /// # Example
210    ///
211    pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
212        // Validate session state
213        self.session
214            .validate_ready()
215            .await
216            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
217
218        // Update session state
219        self.session.set_state(SessionState::Executing).await;
220
221        // Increment query counter
222        self.session.increment_query_count();
223
224        // Build final SQL with parameters
225        let final_sql = stmt.build_sql()?;
226
227        // Execute with timeout
228        let timeout_duration = Duration::from_millis(stmt.timeout_ms());
229        let transport = Arc::clone(&self.transport);
230
231        let result = timeout(timeout_duration, async move {
232            let mut transport_guard = transport.lock().await;
233            transport_guard.execute_query(&final_sql).await
234        })
235        .await
236        .map_err(|_| QueryError::Timeout {
237            timeout_ms: stmt.timeout_ms(),
238        })?
239        .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
240
241        // Update session state back to ready/in_transaction
242        self.update_session_state_after_query().await;
243
244        // Convert transport result to ResultSet
245        ResultSet::from_transport_result(result, Arc::clone(&self.transport))
246    }
247
248    /// Execute a statement and return the row count (for non-SELECT statements).
249    ///
250    /// # Arguments
251    ///
252    /// * `stmt` - Statement to execute
253    ///
254    /// # Returns
255    ///
256    /// The number of rows affected.
257    ///
258    /// # Errors
259    ///
260    /// Returns `QueryError` if execution fails or if statement is a SELECT.
261    ///
262    /// # Example
263    ///
264    pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
265        let result_set = self.execute_statement(stmt).await?;
266
267        result_set.row_count().ok_or_else(|| {
268            QueryError::NoResultSet("Expected row count, got result set".to_string())
269        })
270    }
271
272    // ========================================================================
273    // Prepared Statement Methods
274    // ========================================================================
275
276    /// Create a prepared statement for parameterized query execution.
277    ///
278    /// This creates a server-side prepared statement that can be executed
279    /// multiple times with different parameter values.
280    ///
281    /// # Arguments
282    ///
283    /// * `sql` - SQL statement with parameter placeholders (?)
284    ///
285    /// # Returns
286    ///
287    /// A `PreparedStatement` ready for parameter binding and execution.
288    ///
289    /// # Errors
290    ///
291    /// Returns `QueryError` if preparation fails.
292    ///
293    /// # Example
294    ///
295    pub async fn prepare(
296        &mut self,
297        sql: impl Into<String>,
298    ) -> Result<PreparedStatement, QueryError> {
299        let sql = sql.into();
300
301        // Validate session state
302        self.session
303            .validate_ready()
304            .await
305            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
306
307        let mut transport = self.transport.lock().await;
308        let handle = transport
309            .create_prepared_statement(&sql)
310            .await
311            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
312
313        Ok(PreparedStatement::new(handle))
314    }
315
316    /// Execute a prepared statement and return results.
317    ///
318    /// # Arguments
319    ///
320    /// * `stmt` - Prepared statement to execute
321    ///
322    /// # Returns
323    ///
324    /// A `ResultSet` containing the query results.
325    ///
326    /// # Errors
327    ///
328    /// Returns `QueryError` if execution fails.
329    ///
330    /// # Example
331    ///
332    pub async fn execute_prepared(
333        &mut self,
334        stmt: &PreparedStatement,
335    ) -> Result<ResultSet, QueryError> {
336        if stmt.is_closed() {
337            return Err(QueryError::StatementClosed);
338        }
339
340        // Validate session state
341        self.session
342            .validate_ready()
343            .await
344            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
345
346        // Update session state
347        self.session.set_state(SessionState::Executing).await;
348
349        // Increment query counter
350        self.session.increment_query_count();
351
352        // Convert parameters to column-major JSON format
353        let params_data = stmt.build_parameters_data()?;
354
355        let mut transport = self.transport.lock().await;
356        let result = transport
357            .execute_prepared_statement(stmt.handle_ref(), params_data)
358            .await
359            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
360
361        drop(transport);
362
363        // Update session state back to ready/in_transaction
364        self.update_session_state_after_query().await;
365
366        ResultSet::from_transport_result(result, Arc::clone(&self.transport))
367    }
368
369    /// Execute a prepared statement and return the number of affected rows.
370    ///
371    /// Use this for INSERT, UPDATE, DELETE statements.
372    ///
373    /// # Arguments
374    ///
375    /// * `stmt` - Prepared statement to execute
376    ///
377    /// # Returns
378    ///
379    /// The number of rows affected.
380    ///
381    /// # Errors
382    ///
383    /// Returns `QueryError` if execution fails or statement returns a result set.
384    pub async fn execute_prepared_update(
385        &mut self,
386        stmt: &PreparedStatement,
387    ) -> Result<i64, QueryError> {
388        if stmt.is_closed() {
389            return Err(QueryError::StatementClosed);
390        }
391
392        // Validate session state
393        self.session
394            .validate_ready()
395            .await
396            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
397
398        // Update session state
399        self.session.set_state(SessionState::Executing).await;
400
401        // Convert parameters to column-major JSON format
402        let params_data = stmt.build_parameters_data()?;
403
404        let mut transport = self.transport.lock().await;
405        let result = transport
406            .execute_prepared_statement(stmt.handle_ref(), params_data)
407            .await
408            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
409
410        drop(transport);
411
412        // Update session state back to ready/in_transaction
413        self.update_session_state_after_query().await;
414
415        match result {
416            QueryResult::RowCount { count } => Ok(count),
417            QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
418        }
419    }
420
421    /// Close a prepared statement and release server-side resources.
422    ///
423    /// # Arguments
424    ///
425    /// * `stmt` - Prepared statement to close (consumed)
426    ///
427    /// # Errors
428    ///
429    /// Returns `QueryError` if closing fails.
430    ///
431    /// # Example
432    ///
433    pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
434        if stmt.is_closed() {
435            return Ok(());
436        }
437
438        let mut transport = self.transport.lock().await;
439        transport
440            .close_prepared_statement(stmt.handle_ref())
441            .await
442            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
443
444        stmt.mark_closed();
445        Ok(())
446    }
447
448    // ========================================================================
449    // Convenience Methods (these internally use execute_statement)
450    // ========================================================================
451
452    /// Execute a SQL query and return results.
453    ///
454    /// This is a convenience method that creates a statement and executes it.
455    ///
456    /// # Arguments
457    ///
458    /// * `sql` - SQL query text
459    ///
460    /// # Returns
461    ///
462    /// A `ResultSet` containing the query results.
463    ///
464    /// # Errors
465    ///
466    /// Returns `QueryError` if execution fails.
467    ///
468    /// # Example
469    ///
470    pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
471        let stmt = self.create_statement(sql);
472        self.execute_statement(&stmt).await
473    }
474
475    /// Execute a SQL query and return all results as RecordBatches.
476    ///
477    /// This is a convenience method that fetches all results into memory.
478    ///
479    /// # Arguments
480    ///
481    /// * `sql` - SQL query text
482    ///
483    /// # Returns
484    ///
485    /// A vector of `RecordBatch` instances.
486    ///
487    /// # Errors
488    ///
489    /// Returns `QueryError` if execution fails.
490    ///
491    /// # Example
492    ///
493    pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
494        let result_set = self.execute(sql).await?;
495        result_set.fetch_all().await
496    }
497
498    /// Execute a non-SELECT statement and return the row count.
499    ///
500    /// # Arguments
501    ///
502    /// * `sql` - SQL statement (INSERT, UPDATE, DELETE, etc.)
503    ///
504    /// # Returns
505    ///
506    /// The number of rows affected.
507    ///
508    /// # Errors
509    ///
510    /// Returns `QueryError` if execution fails.
511    ///
512    /// # Example
513    ///
514    pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
515        let stmt = self.create_statement(sql);
516        self.execute_statement_update(&stmt).await
517    }
518
519    // ========================================================================
520    // Transaction Methods
521    // ========================================================================
522
523    pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
524        // Disable autocommit on the server so statements don't auto-commit
525        self.transport
526            .lock()
527            .await
528            .set_autocommit(false)
529            .await
530            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
531
532        self.session
533            .begin_transaction()
534            .await
535            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
536
537        Ok(())
538    }
539
540    pub async fn commit(&mut self) -> Result<(), QueryError> {
541        if !self.in_transaction() {
542            return Ok(());
543        }
544
545        self.execute_update("COMMIT").await?;
546
547        self.session
548            .commit_transaction()
549            .await
550            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
551
552        Ok(())
553    }
554
555    pub async fn rollback(&mut self) -> Result<(), QueryError> {
556        if !self.in_transaction() {
557            return Ok(());
558        }
559
560        self.execute_update("ROLLBACK").await?;
561
562        self.session
563            .rollback_transaction()
564            .await
565            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
566
567        Ok(())
568    }
569
570    /// Check if a transaction is currently active.
571    ///
572    /// # Returns
573    ///
574    /// `true` if a transaction is active, `false` otherwise.
575    pub fn in_transaction(&self) -> bool {
576        self.session.in_transaction()
577    }
578
579    // ========================================================================
580    // Session and Schema Methods
581    // ========================================================================
582
583    /// Get the current schema.
584    ///
585    /// # Returns
586    ///
587    /// The current schema name, or `None` if no schema is set.
588    pub async fn current_schema(&self) -> Option<String> {
589        self.session.current_schema().await
590    }
591
592    /// Set the current schema.
593    ///
594    /// # Arguments
595    ///
596    /// * `schema` - The schema name to set
597    ///
598    /// # Errors
599    ///
600    /// Returns `QueryError` if the operation fails.
601    ///
602    /// # Example
603    ///
604    pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
605        let schema_name = schema.into();
606        self.execute_update(format!("OPEN SCHEMA {}", schema_name))
607            .await?;
608        self.session.set_current_schema(Some(schema_name)).await;
609        Ok(())
610    }
611
612    // ========================================================================
613    // Metadata Methods
614    // ========================================================================
615
616    /// Get metadata about catalogs.
617    ///
618    /// # Returns
619    ///
620    /// A `ResultSet` containing catalog metadata.
621    ///
622    /// # Errors
623    ///
624    /// Returns `QueryError` if the operation fails.
625    pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
626        self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
627            .await
628    }
629
630    /// Get metadata about schemas.
631    ///
632    /// # Arguments
633    ///
634    /// * `catalog` - Optional catalog name filter
635    ///
636    /// # Returns
637    ///
638    /// A `ResultSet` containing schema metadata.
639    ///
640    /// # Errors
641    ///
642    /// Returns `QueryError` if the operation fails.
643    pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
644        let sql = if let Some(cat) = catalog {
645            format!(
646                "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
647                cat.replace('\'', "''")
648            )
649        } else {
650            "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
651        };
652        self.execute(sql).await
653    }
654
655    /// Get metadata about tables.
656    ///
657    /// # Arguments
658    ///
659    /// * `catalog` - Optional catalog name filter
660    /// * `schema` - Optional schema name filter
661    /// * `table` - Optional table name filter
662    ///
663    /// # Returns
664    ///
665    /// A `ResultSet` containing table metadata.
666    ///
667    /// # Errors
668    ///
669    /// Returns `QueryError` if the operation fails.
670    pub async fn get_tables(
671        &mut self,
672        catalog: Option<&str>,
673        schema: Option<&str>,
674        table: Option<&str>,
675    ) -> Result<ResultSet, QueryError> {
676        let mut conditions = vec!["OBJECT_TYPE IN ('TABLE', 'VIEW')".to_string()];
677
678        // Exasol has no catalogs — ignore the catalog parameter
679        let _ = catalog;
680        if let Some(sch) = schema {
681            conditions.push(format!("ROOT_NAME = '{}'", sch.replace('\'', "''")));
682        }
683        if let Some(tbl) = table {
684            conditions.push(format!("OBJECT_NAME = '{}'", tbl.replace('\'', "''")));
685        }
686
687        let where_clause = format!("WHERE {}", conditions.join(" AND "));
688
689        let sql = format!(
690            "SELECT ROOT_NAME AS TABLE_SCHEMA, OBJECT_NAME AS TABLE_NAME, OBJECT_TYPE AS TABLE_TYPE FROM SYS.EXA_ALL_OBJECTS {} ORDER BY ROOT_NAME, OBJECT_NAME",
691            where_clause
692        );
693
694        self.execute(sql).await
695    }
696
697    /// Get metadata about columns.
698    ///
699    /// # Arguments
700    ///
701    /// * `catalog` - Optional catalog name filter
702    /// * `schema` - Optional schema name filter
703    /// * `table` - Optional table name filter
704    /// * `column` - Optional column name filter
705    ///
706    /// # Returns
707    ///
708    /// A `ResultSet` containing column metadata.
709    ///
710    /// # Errors
711    ///
712    /// Returns `QueryError` if the operation fails.
713    pub async fn get_columns(
714        &mut self,
715        catalog: Option<&str>,
716        schema: Option<&str>,
717        table: Option<&str>,
718        column: Option<&str>,
719    ) -> Result<ResultSet, QueryError> {
720        let mut conditions = Vec::new();
721
722        if let Some(cat) = catalog {
723            conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
724        }
725        if let Some(sch) = schema {
726            conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
727        }
728        if let Some(tbl) = table {
729            conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
730        }
731        if let Some(col) = column {
732            conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
733        }
734
735        let where_clause = if conditions.is_empty() {
736            String::new()
737        } else {
738            format!("WHERE {}", conditions.join(" AND "))
739        };
740
741        let sql = format!(
742            "SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
743             FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
744            where_clause
745        );
746
747        self.execute(sql).await
748    }
749
750    // ========================================================================
751    // Session Information Methods
752    // ========================================================================
753
754    /// Get session information.
755    ///
756    /// # Returns
757    ///
758    /// The session ID.
759    pub fn session_id(&self) -> &str {
760        self.session.session_id()
761    }
762
763    /// Get connection parameters.
764    ///
765    /// # Returns
766    ///
767    /// A reference to the connection parameters.
768    pub fn params(&self) -> &ConnectionParams {
769        &self.params
770    }
771
772    /// Check if the connection is closed.
773    ///
774    /// # Returns
775    ///
776    /// `true` if the connection is closed, `false` otherwise.
777    pub async fn is_closed(&self) -> bool {
778        self.session.is_closed().await
779    }
780
781    /// Close the connection.
782    ///
783    /// This closes the session and transport layer.
784    ///
785    /// # Errors
786    ///
787    /// Returns `ConnectionError` if closing fails.
788    ///
789    /// # Example
790    ///
791    pub async fn close(self) -> Result<(), ConnectionError> {
792        // Close session
793        self.session.close().await?;
794
795        // Close transport
796        let mut transport = self.transport.lock().await;
797        transport
798            .close()
799            .await
800            .map_err(|e| ConnectionError::ConnectionFailed {
801                host: self.params.host.clone(),
802                port: self.params.port,
803                message: e.to_string(),
804            })?;
805
806        Ok(())
807    }
808
809    /// Shut down the connection without consuming self.
810    ///
811    /// Unlike `close()`, this can be called through a shared reference,
812    /// making it suitable for use in `Drop` implementations where ownership
813    /// transfer is not possible (e.g., when the connection is behind `Arc<Mutex<>>`).
814    pub async fn shutdown(&self) -> Result<(), ConnectionError> {
815        self.session.close().await?;
816
817        let mut transport = self.transport.lock().await;
818        transport
819            .close()
820            .await
821            .map_err(|e| ConnectionError::ConnectionFailed {
822                host: self.params.host.clone(),
823                port: self.params.port,
824                message: e.to_string(),
825            })?;
826
827        Ok(())
828    }
829
830    // ========================================================================
831    // Import/Export Methods
832    // ========================================================================
833
834    /// Creates an SQL executor closure for import/export operations.
835    ///
836    /// This is a helper method that creates a closure which can execute SQL
837    /// statements and return the row count. The closure captures a cloned
838    /// reference to the transport, allowing it to be called multiple times
839    /// (e.g., for parallel file imports).
840    ///
841    /// # Returns
842    ///
843    /// A closure that takes an SQL string and returns a Future resolving to
844    /// either the affected row count or an error string.
845    fn make_sql_executor(
846        &self,
847    ) -> impl Fn(
848        String,
849    )
850        -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
851        let transport = Arc::clone(&self.transport);
852        move |sql: String| {
853            let transport = Arc::clone(&transport);
854            Box::pin(async move {
855                let mut transport_guard = transport.lock().await;
856                match transport_guard.execute_query(&sql).await {
857                    Ok(QueryResult::RowCount { count }) => Ok(count as u64),
858                    Ok(QueryResult::ResultSet { .. }) => Ok(0),
859                    Err(e) => Err(e.to_string()),
860                }
861            })
862        }
863    }
864
865    /// Import CSV data from a file into an Exasol table.
866    ///
867    /// This method reads CSV data from the specified file and imports it into
868    /// the target table using Exasol's HTTP transport layer.
869    ///
870    /// # Arguments
871    ///
872    /// * `table` - Name of the target table
873    /// * `file_path` - Path to the CSV file
874    /// * `options` - Import options
875    ///
876    /// # Returns
877    ///
878    /// The number of rows imported on success.
879    ///
880    /// # Errors
881    ///
882    /// Returns `ImportError` if the import fails.
883    ///
884    /// # Example
885    ///
886    pub async fn import_csv_from_file(
887        &mut self,
888        table: &str,
889        file_path: &std::path::Path,
890        options: crate::import::csv::CsvImportOptions,
891    ) -> Result<u64, crate::import::ImportError> {
892        // Pass Exasol host/port from connection params to import options
893        let options = options
894            .exasol_host(&self.params.host)
895            .exasol_port(self.params.port);
896
897        crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
898            .await
899    }
900
901    /// Import CSV data from an async reader into an Exasol table.
902    ///
903    /// This method reads CSV data from an async reader and imports it into
904    /// the target table using Exasol's HTTP transport layer.
905    ///
906    /// # Arguments
907    ///
908    /// * `table` - Name of the target table
909    /// * `reader` - Async reader providing CSV data
910    /// * `options` - Import options
911    ///
912    /// # Returns
913    ///
914    /// The number of rows imported on success.
915    ///
916    /// # Errors
917    ///
918    /// Returns `ImportError` if the import fails.
919    pub async fn import_csv_from_stream<R>(
920        &mut self,
921        table: &str,
922        reader: R,
923        options: crate::import::csv::CsvImportOptions,
924    ) -> Result<u64, crate::import::ImportError>
925    where
926        R: tokio::io::AsyncRead + Unpin + Send + 'static,
927    {
928        // Pass Exasol host/port from connection params to import options
929        let options = options
930            .exasol_host(&self.params.host)
931            .exasol_port(self.params.port);
932
933        crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
934            .await
935    }
936
937    /// Import CSV data from an iterator into an Exasol table.
938    ///
939    /// This method converts iterator rows to CSV format and imports them into
940    /// the target table using Exasol's HTTP transport layer.
941    ///
942    /// # Arguments
943    ///
944    /// * `table` - Name of the target table
945    /// * `rows` - Iterator of rows, where each row is an iterator of field values
946    /// * `options` - Import options
947    ///
948    /// # Returns
949    ///
950    /// The number of rows imported on success.
951    ///
952    /// # Errors
953    ///
954    /// Returns `ImportError` if the import fails.
955    ///
956    /// # Example
957    ///
958    pub async fn import_csv_from_iter<I, T, S>(
959        &mut self,
960        table: &str,
961        rows: I,
962        options: crate::import::csv::CsvImportOptions,
963    ) -> Result<u64, crate::import::ImportError>
964    where
965        I: IntoIterator<Item = T> + Send + 'static,
966        T: IntoIterator<Item = S> + Send,
967        S: AsRef<str>,
968    {
969        // Pass Exasol host/port from connection params to import options
970        let options = options
971            .exasol_host(&self.params.host)
972            .exasol_port(self.params.port);
973
974        crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
975    }
976
977    /// Export data from an Exasol table or query to a CSV file.
978    ///
979    /// This method exports data from the specified source to a CSV file
980    /// using Exasol's HTTP transport layer.
981    ///
982    /// # Arguments
983    ///
984    /// * `source` - The data source (table or query)
985    /// * `file_path` - Path to the output file
986    /// * `options` - Export options
987    ///
988    /// # Returns
989    ///
990    /// The number of rows exported on success.
991    ///
992    /// # Errors
993    ///
994    /// Returns `ExportError` if the export fails.
995    ///
996    /// # Example
997    ///
998    pub async fn export_csv_to_file(
999        &mut self,
1000        source: crate::query::export::ExportSource,
1001        file_path: &std::path::Path,
1002        options: crate::export::csv::CsvExportOptions,
1003    ) -> Result<u64, crate::export::csv::ExportError> {
1004        // Pass Exasol host/port from connection params to export options
1005        let options = options
1006            .exasol_host(&self.params.host)
1007            .exasol_port(self.params.port);
1008
1009        let mut transport_guard = self.transport.lock().await;
1010        crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
1011    }
1012
1013    /// Export data from an Exasol table or query to an async writer.
1014    ///
1015    /// This method exports data from the specified source to an async writer
1016    /// using Exasol's HTTP transport layer.
1017    ///
1018    /// # Arguments
1019    ///
1020    /// * `source` - The data source (table or query)
1021    /// * `writer` - Async writer to write the CSV data to
1022    /// * `options` - Export options
1023    ///
1024    /// # Returns
1025    ///
1026    /// The number of rows exported on success.
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns `ExportError` if the export fails.
1031    pub async fn export_csv_to_stream<W>(
1032        &mut self,
1033        source: crate::query::export::ExportSource,
1034        writer: W,
1035        options: crate::export::csv::CsvExportOptions,
1036    ) -> Result<u64, crate::export::csv::ExportError>
1037    where
1038        W: tokio::io::AsyncWrite + Unpin,
1039    {
1040        // Pass Exasol host/port from connection params to export options
1041        let options = options
1042            .exasol_host(&self.params.host)
1043            .exasol_port(self.params.port);
1044
1045        let mut transport_guard = self.transport.lock().await;
1046        crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
1047    }
1048
1049    /// Export data from an Exasol table or query to an in-memory list of rows.
1050    ///
1051    /// Each row is represented as a vector of string values.
1052    ///
1053    /// # Arguments
1054    ///
1055    /// * `source` - The data source (table or query)
1056    /// * `options` - Export options
1057    ///
1058    /// # Returns
1059    ///
1060    /// A vector of rows, where each row is a vector of column values.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns `ExportError` if the export fails.
1065    ///
1066    /// # Example
1067    ///
1068    pub async fn export_csv_to_list(
1069        &mut self,
1070        source: crate::query::export::ExportSource,
1071        options: crate::export::csv::CsvExportOptions,
1072    ) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
1073        // Pass Exasol host/port from connection params to export options
1074        let options = options
1075            .exasol_host(&self.params.host)
1076            .exasol_port(self.params.port);
1077
1078        let mut transport_guard = self.transport.lock().await;
1079        crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
1080    }
1081
1082    /// Import multiple CSV files in parallel into an Exasol table.
1083    ///
1084    /// This method reads CSV data from multiple files and imports them into
1085    /// the target table using parallel HTTP transport connections. Each file
1086    /// gets its own connection with a unique internal address.
1087    ///
1088    /// For a single file, this method delegates to `import_csv_from_file`.
1089    ///
1090    /// # Arguments
1091    ///
1092    /// * `table` - Name of the target table
1093    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1094    /// * `options` - Import options
1095    ///
1096    /// # Returns
1097    ///
1098    /// The number of rows imported on success.
1099    ///
1100    /// # Errors
1101    ///
1102    /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1103    ///
1104    /// # Example
1105    ///
1106    /// ```no_run
1107    /// use exarrow_rs::adbc::Connection;
1108    /// use exarrow_rs::import::CsvImportOptions;
1109    /// use std::path::PathBuf;
1110    ///
1111    /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1112    /// let files = vec![
1113    ///     PathBuf::from("/data/part1.csv"),
1114    ///     PathBuf::from("/data/part2.csv"),
1115    /// ];
1116    ///
1117    /// let options = CsvImportOptions::default();
1118    /// let rows = conn.import_csv_from_files("my_table", files, options).await?;
1119    /// # Ok(())
1120    /// # }
1121    /// ```
1122    pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
1123        &mut self,
1124        table: &str,
1125        paths: S,
1126        options: crate::import::csv::CsvImportOptions,
1127    ) -> Result<u64, crate::import::ImportError> {
1128        // Pass Exasol host/port from connection params to import options
1129        let options = options
1130            .exasol_host(&self.params.host)
1131            .exasol_port(self.params.port);
1132
1133        crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
1134    }
1135
1136    /// Import data from a Parquet file into an Exasol table.
1137    ///
1138    /// This method reads a Parquet file, converts the data to CSV format,
1139    /// and imports it into the target table using Exasol's HTTP transport layer.
1140    ///
1141    /// # Arguments
1142    ///
1143    /// * `table` - Name of the target table
1144    /// * `file_path` - Path to the Parquet file
1145    /// * `options` - Import options
1146    ///
1147    /// # Returns
1148    ///
1149    /// The number of rows imported on success.
1150    ///
1151    /// # Errors
1152    ///
1153    /// Returns `ImportError` if the import fails.
1154    ///
1155    /// # Example
1156    ///
1157    pub async fn import_from_parquet(
1158        &mut self,
1159        table: &str,
1160        file_path: &std::path::Path,
1161        options: crate::import::parquet::ParquetImportOptions,
1162    ) -> Result<u64, crate::import::ImportError> {
1163        // Pass Exasol host/port from connection params to import options
1164        let options = options
1165            .with_exasol_host(&self.params.host)
1166            .with_exasol_port(self.params.port);
1167
1168        crate::import::parquet::import_from_parquet(
1169            self.make_sql_executor(),
1170            table,
1171            file_path,
1172            options,
1173        )
1174        .await
1175    }
1176
1177    /// Import multiple Parquet files in parallel into an Exasol table.
1178    ///
1179    /// This method converts each Parquet file to CSV format concurrently,
1180    /// then streams the data through parallel HTTP transport connections.
1181    ///
1182    /// For a single file, this method delegates to `import_from_parquet`.
1183    ///
1184    /// # Arguments
1185    ///
1186    /// * `table` - Name of the target table
1187    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1188    /// * `options` - Import options
1189    ///
1190    /// # Returns
1191    ///
1192    /// The number of rows imported on success.
1193    ///
1194    /// # Errors
1195    ///
1196    /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1197    ///
1198    /// # Example
1199    ///
1200    /// ```no_run
1201    /// use exarrow_rs::adbc::Connection;
1202    /// use exarrow_rs::import::ParquetImportOptions;
1203    /// use std::path::PathBuf;
1204    ///
1205    /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1206    /// let files = vec![
1207    ///     PathBuf::from("/data/part1.parquet"),
1208    ///     PathBuf::from("/data/part2.parquet"),
1209    /// ];
1210    ///
1211    /// let options = ParquetImportOptions::default();
1212    /// let rows = conn.import_parquet_from_files("my_table", files, options).await?;
1213    /// # Ok(())
1214    /// # }
1215    /// ```
1216    pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
1217        &mut self,
1218        table: &str,
1219        paths: S,
1220        options: crate::import::parquet::ParquetImportOptions,
1221    ) -> Result<u64, crate::import::ImportError> {
1222        // Pass Exasol host/port from connection params to import options
1223        let options = options
1224            .with_exasol_host(&self.params.host)
1225            .with_exasol_port(self.params.port);
1226
1227        crate::import::parquet::import_from_parquet_files(
1228            self.make_sql_executor(),
1229            table,
1230            paths,
1231            options,
1232        )
1233        .await
1234    }
1235
1236    /// Export data from an Exasol table or query to a Parquet file.
1237    ///
1238    /// This method exports data from the specified source to a Parquet file.
1239    /// The data is first received as CSV from Exasol, then converted to Parquet format.
1240    ///
1241    /// # Arguments
1242    ///
1243    /// * `source` - The data source (table or query)
1244    /// * `file_path` - Path to the output Parquet file
1245    /// * `options` - Export options
1246    ///
1247    /// # Returns
1248    ///
1249    /// The number of rows exported on success.
1250    ///
1251    /// # Errors
1252    ///
1253    /// Returns `ExportError` if the export fails.
1254    ///
1255    /// # Example
1256    ///
1257    pub async fn export_to_parquet(
1258        &mut self,
1259        source: crate::query::export::ExportSource,
1260        file_path: &std::path::Path,
1261        options: crate::export::parquet::ParquetExportOptions,
1262    ) -> Result<u64, crate::export::csv::ExportError> {
1263        // Pass Exasol host/port from connection params to export options
1264        let options = options
1265            .exasol_host(&self.params.host)
1266            .exasol_port(self.params.port);
1267
1268        let mut transport_guard = self.transport.lock().await;
1269        crate::export::parquet::export_to_parquet_via_transport(
1270            &mut *transport_guard,
1271            source,
1272            file_path,
1273            options,
1274        )
1275        .await
1276    }
1277
1278    /// Import data from an Arrow RecordBatch into an Exasol table.
1279    ///
1280    /// This method converts the RecordBatch to CSV format and imports it
1281    /// into the target table using Exasol's HTTP transport layer.
1282    ///
1283    /// # Arguments
1284    ///
1285    /// * `table` - Name of the target table
1286    /// * `batch` - The RecordBatch to import
1287    /// * `options` - Import options
1288    ///
1289    /// # Returns
1290    ///
1291    /// The number of rows imported on success.
1292    ///
1293    /// # Errors
1294    ///
1295    /// Returns `ImportError` if the import fails.
1296    ///
1297    /// # Example
1298    ///
1299    pub async fn import_from_record_batch(
1300        &mut self,
1301        table: &str,
1302        batch: &RecordBatch,
1303        options: crate::import::arrow::ArrowImportOptions,
1304    ) -> Result<u64, crate::import::ImportError> {
1305        // Pass Exasol host/port from connection params to import options
1306        let options = options
1307            .exasol_host(&self.params.host)
1308            .exasol_port(self.params.port);
1309
1310        crate::import::arrow::import_from_record_batch(
1311            self.make_sql_executor(),
1312            table,
1313            batch,
1314            options,
1315        )
1316        .await
1317    }
1318
1319    /// Import data from multiple Arrow RecordBatches into an Exasol table.
1320    ///
1321    /// This method converts each RecordBatch to CSV format and imports them
1322    /// into the target table using Exasol's HTTP transport layer.
1323    ///
1324    /// # Arguments
1325    ///
1326    /// * `table` - Name of the target table
1327    /// * `batches` - An iterator of RecordBatches to import
1328    /// * `options` - Import options
1329    ///
1330    /// # Returns
1331    ///
1332    /// The number of rows imported on success.
1333    ///
1334    /// # Errors
1335    ///
1336    /// Returns `ImportError` if the import fails.
1337    pub async fn import_from_record_batches<I>(
1338        &mut self,
1339        table: &str,
1340        batches: I,
1341        options: crate::import::arrow::ArrowImportOptions,
1342    ) -> Result<u64, crate::import::ImportError>
1343    where
1344        I: IntoIterator<Item = RecordBatch>,
1345    {
1346        // Pass Exasol host/port from connection params to import options
1347        let options = options
1348            .exasol_host(&self.params.host)
1349            .exasol_port(self.params.port);
1350
1351        crate::import::arrow::import_from_record_batches(
1352            self.make_sql_executor(),
1353            table,
1354            batches,
1355            options,
1356        )
1357        .await
1358    }
1359
1360    /// Import data from an Arrow IPC file/stream into an Exasol table.
1361    ///
1362    /// This method reads Arrow IPC format data, converts it to CSV,
1363    /// and imports it into the target table using Exasol's HTTP transport layer.
1364    ///
1365    /// # Arguments
1366    ///
1367    /// * `table` - Name of the target table
1368    /// * `reader` - An async reader containing Arrow IPC data
1369    /// * `options` - Import options
1370    ///
1371    /// # Returns
1372    ///
1373    /// The number of rows imported on success.
1374    ///
1375    /// # Errors
1376    ///
1377    /// Returns `ImportError` if the import fails.
1378    ///
1379    /// # Example
1380    ///
1381    pub async fn import_from_arrow_ipc<R>(
1382        &mut self,
1383        table: &str,
1384        reader: R,
1385        options: crate::import::arrow::ArrowImportOptions,
1386    ) -> Result<u64, crate::import::ImportError>
1387    where
1388        R: tokio::io::AsyncRead + Unpin + Send,
1389    {
1390        // Pass Exasol host/port from connection params to import options
1391        let options = options
1392            .exasol_host(&self.params.host)
1393            .exasol_port(self.params.port);
1394
1395        crate::import::arrow::import_from_arrow_ipc(
1396            self.make_sql_executor(),
1397            table,
1398            reader,
1399            options,
1400        )
1401        .await
1402    }
1403
1404    /// Export data from an Exasol table or query to Arrow RecordBatches.
1405    ///
1406    /// This method exports data from the specified source and converts it
1407    /// to Arrow RecordBatches.
1408    ///
1409    /// # Arguments
1410    ///
1411    /// * `source` - The data source (table or query)
1412    /// * `options` - Export options
1413    ///
1414    /// # Returns
1415    ///
1416    /// A vector of RecordBatches on success.
1417    ///
1418    /// # Errors
1419    ///
1420    /// Returns `ExportError` if the export fails.
1421    ///
1422    /// # Example
1423    ///
1424    pub async fn export_to_record_batches(
1425        &mut self,
1426        source: crate::query::export::ExportSource,
1427        options: crate::export::arrow::ArrowExportOptions,
1428    ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1429        // Pass Exasol host/port from connection params to export options
1430        let options = options
1431            .exasol_host(&self.params.host)
1432            .exasol_port(self.params.port);
1433
1434        let mut transport_guard = self.transport.lock().await;
1435        crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
1436    }
1437
1438    /// Export data from an Exasol table or query to an Arrow IPC file.
1439    ///
1440    /// This method exports data from the specified source to an Arrow IPC file.
1441    ///
1442    /// # Arguments
1443    ///
1444    /// * `source` - The data source (table or query)
1445    /// * `file_path` - Path to the output Arrow IPC file
1446    /// * `options` - Export options
1447    ///
1448    /// # Returns
1449    ///
1450    /// The number of rows exported on success.
1451    ///
1452    /// # Errors
1453    ///
1454    /// Returns `ExportError` if the export fails.
1455    ///
1456    /// # Example
1457    ///
1458    pub async fn export_to_arrow_ipc(
1459        &mut self,
1460        source: crate::query::export::ExportSource,
1461        file_path: &std::path::Path,
1462        options: crate::export::arrow::ArrowExportOptions,
1463    ) -> Result<u64, crate::export::csv::ExportError> {
1464        // Pass Exasol host/port from connection params to export options
1465        let options = options
1466            .exasol_host(&self.params.host)
1467            .exasol_port(self.params.port);
1468
1469        let mut transport_guard = self.transport.lock().await;
1470        crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
1471            .await
1472    }
1473
1474    // ========================================================================
1475    // Blocking Import/Export Methods
1476    // ========================================================================
1477
1478    /// Import CSV data from a file into an Exasol table (blocking).
1479    ///
1480    /// This is a synchronous wrapper around [`import_csv_from_file`](Self::import_csv_from_file)
1481    /// for use in non-async contexts.
1482    ///
1483    /// # Arguments
1484    ///
1485    /// * `table` - Name of the target table
1486    /// * `file_path` - Path to the CSV file
1487    /// * `options` - Import options
1488    ///
1489    /// # Returns
1490    ///
1491    /// The number of rows imported on success.
1492    ///
1493    /// # Errors
1494    ///
1495    /// Returns `ImportError` if the import fails.
1496    ///
1497    /// # Example
1498    ///
1499    pub fn blocking_import_csv_from_file(
1500        &mut self,
1501        table: &str,
1502        file_path: &std::path::Path,
1503        options: crate::import::csv::CsvImportOptions,
1504    ) -> Result<u64, crate::import::ImportError> {
1505        blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
1506    }
1507
1508    /// Import multiple CSV files in parallel into an Exasol table (blocking).
1509    ///
1510    /// This is a synchronous wrapper around [`import_csv_from_files`](Self::import_csv_from_files)
1511    /// for use in non-async contexts.
1512    ///
1513    /// # Arguments
1514    ///
1515    /// * `table` - Name of the target table
1516    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1517    /// * `options` - Import options
1518    ///
1519    /// # Returns
1520    ///
1521    /// The number of rows imported on success.
1522    ///
1523    /// # Errors
1524    ///
1525    /// Returns `ImportError` if the import fails.
1526    pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
1527        &mut self,
1528        table: &str,
1529        paths: S,
1530        options: crate::import::csv::CsvImportOptions,
1531    ) -> Result<u64, crate::import::ImportError> {
1532        blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
1533    }
1534
1535    /// Import data from a Parquet file into an Exasol table (blocking).
1536    ///
1537    /// This is a synchronous wrapper around [`import_from_parquet`](Self::import_from_parquet)
1538    /// for use in non-async contexts.
1539    ///
1540    /// # Arguments
1541    ///
1542    /// * `table` - Name of the target table
1543    /// * `file_path` - Path to the Parquet file
1544    /// * `options` - Import options
1545    ///
1546    /// # Returns
1547    ///
1548    /// The number of rows imported on success.
1549    ///
1550    /// # Errors
1551    ///
1552    /// Returns `ImportError` if the import fails.
1553    ///
1554    /// # Example
1555    ///
1556    pub fn blocking_import_from_parquet(
1557        &mut self,
1558        table: &str,
1559        file_path: &std::path::Path,
1560        options: crate::import::parquet::ParquetImportOptions,
1561    ) -> Result<u64, crate::import::ImportError> {
1562        blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
1563    }
1564
1565    /// Import multiple Parquet files in parallel into an Exasol table (blocking).
1566    ///
1567    /// This is a synchronous wrapper around [`import_parquet_from_files`](Self::import_parquet_from_files)
1568    /// for use in non-async contexts.
1569    ///
1570    /// # Arguments
1571    ///
1572    /// * `table` - Name of the target table
1573    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1574    /// * `options` - Import options
1575    ///
1576    /// # Returns
1577    ///
1578    /// The number of rows imported on success.
1579    ///
1580    /// # Errors
1581    ///
1582    /// Returns `ImportError` if the import fails.
1583    pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
1584        &mut self,
1585        table: &str,
1586        paths: S,
1587        options: crate::import::parquet::ParquetImportOptions,
1588    ) -> Result<u64, crate::import::ImportError> {
1589        blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
1590    }
1591
1592    /// Import data from an Arrow RecordBatch into an Exasol table (blocking).
1593    ///
1594    /// This is a synchronous wrapper around [`import_from_record_batch`](Self::import_from_record_batch)
1595    /// for use in non-async contexts.
1596    ///
1597    /// # Arguments
1598    ///
1599    /// * `table` - Name of the target table
1600    /// * `batch` - The RecordBatch to import
1601    /// * `options` - Import options
1602    ///
1603    /// # Returns
1604    ///
1605    /// The number of rows imported on success.
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns `ImportError` if the import fails.
1610    ///
1611    /// # Example
1612    ///
1613    pub fn blocking_import_from_record_batch(
1614        &mut self,
1615        table: &str,
1616        batch: &RecordBatch,
1617        options: crate::import::arrow::ArrowImportOptions,
1618    ) -> Result<u64, crate::import::ImportError> {
1619        blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
1620    }
1621
1622    /// Import data from an Arrow IPC file into an Exasol table (blocking).
1623    ///
1624    /// This is a synchronous wrapper around [`import_from_arrow_ipc`](Self::import_from_arrow_ipc)
1625    /// for use in non-async contexts.
1626    ///
1627    /// Note: This method requires a synchronous reader that implements `std::io::Read`.
1628    /// The data will be read into memory before being imported.
1629    ///
1630    /// # Arguments
1631    ///
1632    /// * `table` - Name of the target table
1633    /// * `file_path` - Path to the Arrow IPC file
1634    /// * `options` - Import options
1635    ///
1636    /// # Returns
1637    ///
1638    /// The number of rows imported on success.
1639    ///
1640    /// # Errors
1641    ///
1642    /// Returns `ImportError` if the import fails.
1643    ///
1644    /// # Example
1645    ///
1646    pub fn blocking_import_from_arrow_ipc(
1647        &mut self,
1648        table: &str,
1649        file_path: &std::path::Path,
1650        options: crate::import::arrow::ArrowImportOptions,
1651    ) -> Result<u64, crate::import::ImportError> {
1652        blocking_runtime().block_on(async {
1653            let file = tokio::fs::File::open(file_path)
1654                .await
1655                .map_err(crate::import::ImportError::IoError)?;
1656            self.import_from_arrow_ipc(table, file, options).await
1657        })
1658    }
1659
1660    /// Export data from an Exasol table or query to a CSV file (blocking).
1661    ///
1662    /// This is a synchronous wrapper around [`export_csv_to_file`](Self::export_csv_to_file)
1663    /// for use in non-async contexts.
1664    ///
1665    /// # Arguments
1666    ///
1667    /// * `source` - The data source (table or query)
1668    /// * `file_path` - Path to the output file
1669    /// * `options` - Export options
1670    ///
1671    /// # Returns
1672    ///
1673    /// The number of rows exported on success.
1674    ///
1675    /// # Errors
1676    ///
1677    /// Returns `ExportError` if the export fails.
1678    ///
1679    /// # Example
1680    ///
1681    pub fn blocking_export_csv_to_file(
1682        &mut self,
1683        source: crate::query::export::ExportSource,
1684        file_path: &std::path::Path,
1685        options: crate::export::csv::CsvExportOptions,
1686    ) -> Result<u64, crate::export::csv::ExportError> {
1687        blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
1688    }
1689
1690    /// Export data from an Exasol table or query to a Parquet file (blocking).
1691    ///
1692    /// This is a synchronous wrapper around [`export_to_parquet`](Self::export_to_parquet)
1693    /// for use in non-async contexts.
1694    ///
1695    /// # Arguments
1696    ///
1697    /// * `source` - The data source (table or query)
1698    /// * `file_path` - Path to the output Parquet file
1699    /// * `options` - Export options
1700    ///
1701    /// # Returns
1702    ///
1703    /// The number of rows exported on success.
1704    ///
1705    /// # Errors
1706    ///
1707    /// Returns `ExportError` if the export fails.
1708    ///
1709    /// # Example
1710    ///
1711    pub fn blocking_export_to_parquet(
1712        &mut self,
1713        source: crate::query::export::ExportSource,
1714        file_path: &std::path::Path,
1715        options: crate::export::parquet::ParquetExportOptions,
1716    ) -> Result<u64, crate::export::csv::ExportError> {
1717        blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
1718    }
1719
1720    /// Export data from an Exasol table or query to Arrow RecordBatches (blocking).
1721    ///
1722    /// This is a synchronous wrapper around [`export_to_record_batches`](Self::export_to_record_batches)
1723    /// for use in non-async contexts.
1724    ///
1725    /// # Arguments
1726    ///
1727    /// * `source` - The data source (table or query)
1728    /// * `options` - Export options
1729    ///
1730    /// # Returns
1731    ///
1732    /// A vector of RecordBatches on success.
1733    ///
1734    /// # Errors
1735    ///
1736    /// Returns `ExportError` if the export fails.
1737    ///
1738    /// # Example
1739    ///
1740    pub fn blocking_export_to_record_batches(
1741        &mut self,
1742        source: crate::query::export::ExportSource,
1743        options: crate::export::arrow::ArrowExportOptions,
1744    ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1745        blocking_runtime().block_on(self.export_to_record_batches(source, options))
1746    }
1747
1748    /// Export data from an Exasol table or query to an Arrow IPC file (blocking).
1749    ///
1750    /// This is a synchronous wrapper around [`export_to_arrow_ipc`](Self::export_to_arrow_ipc)
1751    /// for use in non-async contexts.
1752    ///
1753    /// # Arguments
1754    ///
1755    /// * `source` - The data source (table or query)
1756    /// * `file_path` - Path to the output Arrow IPC file
1757    /// * `options` - Export options
1758    ///
1759    /// # Returns
1760    ///
1761    /// The number of rows exported on success.
1762    ///
1763    /// # Errors
1764    ///
1765    /// Returns `ExportError` if the export fails.
1766    ///
1767    /// # Example
1768    ///
1769    pub fn blocking_export_to_arrow_ipc(
1770        &mut self,
1771        source: crate::query::export::ExportSource,
1772        file_path: &std::path::Path,
1773        options: crate::export::arrow::ArrowExportOptions,
1774    ) -> Result<u64, crate::export::csv::ExportError> {
1775        blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
1776    }
1777
1778    // ========================================================================
1779    // Private Helper Methods
1780    // ========================================================================
1781
1782    /// Update session state after query execution.
1783    async fn update_session_state_after_query(&self) {
1784        if self.session.in_transaction() {
1785            self.session.set_state(SessionState::InTransaction).await;
1786        } else {
1787            self.session.set_state(SessionState::Ready).await;
1788        }
1789        self.session.update_activity().await;
1790    }
1791}
1792
1793impl std::fmt::Debug for Connection {
1794    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1795        f.debug_struct("Connection")
1796            .field("session_id", &self.session.session_id())
1797            .field("host", &self.params.host)
1798            .field("port", &self.params.port)
1799            .field("username", &self.params.username)
1800            .field("in_transaction", &self.session.in_transaction())
1801            .finish()
1802    }
1803}
1804
1805/// Builder for creating Connection instances.
1806pub struct ConnectionBuilder {
1807    /// Connection parameters builder
1808    params_builder: crate::connection::params::ConnectionBuilder,
1809}
1810
1811impl ConnectionBuilder {
1812    /// Create a new ConnectionBuilder.
1813    pub fn new() -> Self {
1814        Self {
1815            params_builder: crate::connection::params::ConnectionBuilder::new(),
1816        }
1817    }
1818
1819    /// Set the database host.
1820    pub fn host(mut self, host: &str) -> Self {
1821        self.params_builder = self.params_builder.host(host);
1822        self
1823    }
1824
1825    /// Set the database port.
1826    pub fn port(mut self, port: u16) -> Self {
1827        self.params_builder = self.params_builder.port(port);
1828        self
1829    }
1830
1831    /// Set the username.
1832    pub fn username(mut self, username: &str) -> Self {
1833        self.params_builder = self.params_builder.username(username);
1834        self
1835    }
1836
1837    /// Set the password.
1838    pub fn password(mut self, password: &str) -> Self {
1839        self.params_builder = self.params_builder.password(password);
1840        self
1841    }
1842
1843    /// Set the default schema.
1844    pub fn schema(mut self, schema: &str) -> Self {
1845        self.params_builder = self.params_builder.schema(schema);
1846        self
1847    }
1848
1849    /// Enable or disable TLS.
1850    pub fn use_tls(mut self, use_tls: bool) -> Self {
1851        self.params_builder = self.params_builder.use_tls(use_tls);
1852        self
1853    }
1854
1855    /// Build and connect.
1856    ///
1857    /// # Returns
1858    ///
1859    /// A connected `Connection` instance.
1860    ///
1861    /// # Errors
1862    ///
1863    /// Returns `ExasolError` if the connection fails.
1864    pub async fn connect(self) -> Result<Connection, ExasolError> {
1865        let params = self.params_builder.build()?;
1866        Ok(Connection::from_params(params).await?)
1867    }
1868}
1869
1870impl Default for ConnectionBuilder {
1871    fn default() -> Self {
1872        Self::new()
1873    }
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878    use super::*;
1879
1880    #[test]
1881    fn test_connection_builder() {
1882        let _builder = ConnectionBuilder::new()
1883            .host("localhost")
1884            .port(8563)
1885            .username("test")
1886            .password("secret")
1887            .schema("MY_SCHEMA")
1888            .use_tls(false);
1889
1890        // Builder should compile and be valid
1891        // Actual connection requires a running Exasol instance
1892    }
1893
1894    #[test]
1895    fn test_create_statement_is_sync() {
1896        // This test verifies that create_statement is synchronous
1897        // by calling it without await
1898        // Note: We can't actually create a Connection without a database,
1899        // but we can verify the API compiles correctly
1900    }
1901}
1902
1903/// Blocking wrapper tests
1904#[cfg(test)]
1905mod blocking_tests {
1906    use super::*;
1907
1908    #[test]
1909    fn test_session_type_alias_exists() {
1910        // Verify that Session is a type alias for Connection
1911        fn takes_session(_session: &Session) {}
1912        fn takes_connection(_connection: &Connection) {}
1913
1914        // This should compile, showing Session = Connection
1915        fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
1916        where
1917            F1: Fn(&Session),
1918            F2: Fn(&Connection),
1919        {
1920            let _ = (f1, f2);
1921        }
1922
1923        verify_interchangeable(takes_session, takes_connection);
1924    }
1925
1926    #[test]
1927    fn test_blocking_runtime_exists() {
1928        // Verify that blocking_runtime() function exists and returns a Runtime
1929        let runtime = blocking_runtime();
1930        // If this compiles, the runtime is valid
1931        let _ = runtime.handle();
1932    }
1933
1934    #[test]
1935    fn test_connection_has_blocking_import_csv() {
1936        // This test verifies the blocking_import_csv_from_file method exists
1937        // by checking the method signature compiles
1938        fn _check_method_exists(_conn: &mut Connection) {
1939            // Method signature check - will fail to compile if method doesn't exist
1940            let _: fn(
1941                &mut Connection,
1942                &str,
1943                &std::path::Path,
1944                crate::import::csv::CsvImportOptions,
1945            ) -> Result<u64, crate::import::ImportError> =
1946                Connection::blocking_import_csv_from_file;
1947        }
1948    }
1949
1950    #[test]
1951    fn test_connection_has_blocking_import_parquet() {
1952        fn _check_method_exists(_conn: &mut Connection) {
1953            let _: fn(
1954                &mut Connection,
1955                &str,
1956                &std::path::Path,
1957                crate::import::parquet::ParquetImportOptions,
1958            ) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
1959        }
1960    }
1961
1962    #[test]
1963    fn test_connection_has_blocking_import_record_batch() {
1964        fn _check_method_exists(_conn: &mut Connection) {
1965            let _: fn(
1966                &mut Connection,
1967                &str,
1968                &RecordBatch,
1969                crate::import::arrow::ArrowImportOptions,
1970            ) -> Result<u64, crate::import::ImportError> =
1971                Connection::blocking_import_from_record_batch;
1972        }
1973    }
1974
1975    #[test]
1976    fn test_connection_has_blocking_import_arrow_ipc() {
1977        fn _check_method_exists(_conn: &mut Connection) {
1978            let _: fn(
1979                &mut Connection,
1980                &str,
1981                &std::path::Path,
1982                crate::import::arrow::ArrowImportOptions,
1983            ) -> Result<u64, crate::import::ImportError> =
1984                Connection::blocking_import_from_arrow_ipc;
1985        }
1986    }
1987
1988    #[test]
1989    fn test_connection_has_blocking_export_csv() {
1990        fn _check_method_exists(_conn: &mut Connection) {
1991            let _: fn(
1992                &mut Connection,
1993                crate::query::export::ExportSource,
1994                &std::path::Path,
1995                crate::export::csv::CsvExportOptions,
1996            ) -> Result<u64, crate::export::csv::ExportError> =
1997                Connection::blocking_export_csv_to_file;
1998        }
1999    }
2000
2001    #[test]
2002    fn test_connection_has_blocking_export_parquet() {
2003        fn _check_method_exists(_conn: &mut Connection) {
2004            let _: fn(
2005                &mut Connection,
2006                crate::query::export::ExportSource,
2007                &std::path::Path,
2008                crate::export::parquet::ParquetExportOptions,
2009            ) -> Result<u64, crate::export::csv::ExportError> =
2010                Connection::blocking_export_to_parquet;
2011        }
2012    }
2013
2014    #[test]
2015    fn test_connection_has_blocking_export_record_batches() {
2016        fn _check_method_exists(_conn: &mut Connection) {
2017            let _: fn(
2018                &mut Connection,
2019                crate::query::export::ExportSource,
2020                crate::export::arrow::ArrowExportOptions,
2021            ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
2022                Connection::blocking_export_to_record_batches;
2023        }
2024    }
2025
2026    #[test]
2027    fn test_connection_has_blocking_export_arrow_ipc() {
2028        fn _check_method_exists(_conn: &mut Connection) {
2029            let _: fn(
2030                &mut Connection,
2031                crate::query::export::ExportSource,
2032                &std::path::Path,
2033                crate::export::arrow::ArrowExportOptions,
2034            ) -> Result<u64, crate::export::csv::ExportError> =
2035                Connection::blocking_export_to_arrow_ipc;
2036        }
2037    }
2038}