Skip to main content

exarrow_rs/adbc/
connection.rs

1//! ADBC Connection implementation.
2//!
3//! This module provides the `Connection` type which represents an active
4//! database connection and provides methods for executing queries.
5//!
6//! # New in v2.0.0
7//!
8//! Connection now owns the transport directly and Statement is a pure data container.
9//! Execute statements via Connection methods: `execute_statement()`, `execute_prepared()`.
10
11use crate::adbc::Statement;
12use crate::connection::auth::AuthResponseData;
13use crate::connection::params::ConnectionParams;
14use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
15use crate::error::{ConnectionError, ExasolError, QueryError};
16use crate::query::prepared::PreparedStatement;
17use crate::query::results::ResultSet;
18use crate::transport::protocol::{
19    ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
20    QueryResult, TransportProtocol,
21};
22use crate::transport::WebSocketTransport;
23use arrow::array::RecordBatch;
24use std::sync::{Arc, OnceLock};
25use std::time::Duration;
26use tokio::runtime::Runtime;
27use tokio::sync::Mutex;
28use tokio::time::timeout;
29
30/// Session is a type alias for Connection.
31///
32/// This alias provides a more intuitive name for database sessions when
33/// performing import/export operations. Both `Session` and `Connection`
34/// can be used interchangeably.
35///
36/// # Example
37///
38pub type Session = Connection;
39
40/// Global tokio runtime for blocking operations.
41///
42/// This runtime is lazily initialized on first use and is shared across
43/// all blocking import/export operations. It provides a way to call
44/// async methods from synchronous code.
45fn blocking_runtime() -> &'static Runtime {
46    static RUNTIME: OnceLock<Runtime> = OnceLock::new();
47    RUNTIME.get_or_init(|| {
48        tokio::runtime::Builder::new_multi_thread()
49            .enable_all()
50            .build()
51            .expect("Failed to create tokio runtime for blocking operations")
52    })
53}
54
55/// ADBC Connection to an Exasol database.
56///
57/// The `Connection` type represents an active database connection and provides
58/// methods for executing queries, managing transactions, and retrieving metadata.
59///
60/// # v2.0.0 Breaking Changes
61///
62/// - Connection now owns the transport directly
63/// - `create_statement()` is now synchronous and returns a pure data container
64/// - Use `execute_statement()` instead of `Statement::execute()`
65/// - Use `prepare()` instead of `Statement::prepare()`
66///
67/// # Example
68///
69pub struct Connection {
70    /// Transport layer for communication (owned by Connection)
71    transport: Arc<Mutex<dyn TransportProtocol>>,
72    /// Session information
73    session: SessionInfo,
74    /// Connection parameters
75    params: ConnectionParams,
76}
77
78impl Connection {
79    /// Create a connection from connection parameters.
80    ///
81    /// This establishes a connection to the Exasol database using WebSocket transport,
82    /// authenticates the user, and creates a session.
83    ///
84    /// # Arguments
85    ///
86    /// * `params` - Connection parameters
87    ///
88    /// # Returns
89    ///
90    /// A connected `Connection` instance.
91    ///
92    /// # Errors
93    ///
94    /// Returns `ConnectionError` if the connection or authentication fails.
95    pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
96        // Create transport
97        let mut transport = WebSocketTransport::new();
98
99        // Convert ConnectionParams to TransportConnectionParams
100        let transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
101            .with_tls(params.use_tls)
102            .with_validate_server_certificate(params.validate_server_certificate)
103            .with_timeout(params.connection_timeout.as_millis() as u64);
104
105        // Connect
106        transport.connect(&transport_params).await.map_err(|e| {
107            ConnectionError::ConnectionFailed {
108                host: params.host.clone(),
109                port: params.port,
110                message: e.to_string(),
111            }
112        })?;
113
114        // Authenticate
115        let credentials =
116            TransportCredentials::new(params.username.clone(), params.password().to_string());
117        let session_info = transport
118            .authenticate(&credentials)
119            .await
120            .map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
121
122        // Create session config from connection params
123        let session_config = SessionConfig {
124            idle_timeout: params.idle_timeout,
125            query_timeout: params.query_timeout,
126            ..Default::default()
127        };
128
129        // Extract session_id once to avoid double clone
130        let session_id = session_info.session_id.clone();
131
132        // Convert SessionInfo to AuthResponseData
133        let auth_response = AuthResponseData {
134            session_id: session_id.clone(),
135            protocol_version: session_info.protocol_version,
136            release_version: session_info.release_version,
137            database_name: session_info.database_name,
138            product_name: session_info.product_name,
139            max_data_message_size: session_info.max_data_message_size,
140            max_identifier_length: 128,    // Default value
141            max_varchar_length: 2_000_000, // Default value
142            identifier_quote_string: "\"".to_string(),
143            time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
144            time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
145        };
146
147        // Create session (owned, not Arc)
148        let session = SessionInfo::new(session_id, auth_response, session_config);
149
150        // Set schema if specified
151        if let Some(schema) = &params.schema {
152            session.set_current_schema(Some(schema.clone())).await;
153        }
154
155        Ok(Self {
156            transport: Arc::new(Mutex::new(transport)),
157            session,
158            params,
159        })
160    }
161
162    /// Create a builder for constructing a connection.
163    ///
164    /// # Returns
165    ///
166    /// A `ConnectionBuilder` instance.
167    ///
168    /// # Example
169    ///
170    pub fn builder() -> ConnectionBuilder {
171        ConnectionBuilder::new()
172    }
173
174    // ========================================================================
175    // Statement Creation (synchronous - Statement is now a pure data container)
176    // ========================================================================
177
178    /// Create a new statement for executing SQL.
179    ///
180    /// Statement is now a pure data container. Use `execute_statement()` to execute it.
181    ///
182    /// # Arguments
183    ///
184    /// * `sql` - SQL query text
185    ///
186    /// # Returns
187    ///
188    /// A `Statement` instance ready for execution via `execute_statement()`.
189    ///
190    /// # Example
191    ///
192    pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
193        Statement::new(sql)
194    }
195
196    // ========================================================================
197    // Statement Execution Methods
198    // ========================================================================
199
200    /// Execute a statement and return results.
201    ///
202    /// # Arguments
203    ///
204    /// * `stmt` - Statement to execute
205    ///
206    /// # Returns
207    ///
208    /// A `ResultSet` containing the query results.
209    ///
210    /// # Errors
211    ///
212    /// Returns `QueryError` if execution fails or times out.
213    ///
214    /// # Example
215    ///
216    pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
217        // Validate session state
218        self.session
219            .validate_ready()
220            .await
221            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
222
223        // Update session state
224        self.session.set_state(SessionState::Executing).await;
225
226        // Increment query counter
227        self.session.increment_query_count();
228
229        // Build final SQL with parameters
230        let final_sql = stmt.build_sql()?;
231
232        // Execute with timeout
233        let timeout_duration = Duration::from_millis(stmt.timeout_ms());
234        let transport = Arc::clone(&self.transport);
235
236        let result = timeout(timeout_duration, async move {
237            let mut transport_guard = transport.lock().await;
238            transport_guard.execute_query(&final_sql).await
239        })
240        .await
241        .map_err(|_| QueryError::Timeout {
242            timeout_ms: stmt.timeout_ms(),
243        })?
244        .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
245
246        // Update session state back to ready/in_transaction
247        self.update_session_state_after_query().await;
248
249        // Convert transport result to ResultSet
250        ResultSet::from_transport_result(result, Arc::clone(&self.transport))
251    }
252
253    /// Execute a statement and return the row count (for non-SELECT statements).
254    ///
255    /// # Arguments
256    ///
257    /// * `stmt` - Statement to execute
258    ///
259    /// # Returns
260    ///
261    /// The number of rows affected.
262    ///
263    /// # Errors
264    ///
265    /// Returns `QueryError` if execution fails or if statement is a SELECT.
266    ///
267    /// # Example
268    ///
269    pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
270        let result_set = self.execute_statement(stmt).await?;
271
272        result_set.row_count().ok_or_else(|| {
273            QueryError::NoResultSet("Expected row count, got result set".to_string())
274        })
275    }
276
277    // ========================================================================
278    // Prepared Statement Methods
279    // ========================================================================
280
281    /// Create a prepared statement for parameterized query execution.
282    ///
283    /// This creates a server-side prepared statement that can be executed
284    /// multiple times with different parameter values.
285    ///
286    /// # Arguments
287    ///
288    /// * `sql` - SQL statement with parameter placeholders (?)
289    ///
290    /// # Returns
291    ///
292    /// A `PreparedStatement` ready for parameter binding and execution.
293    ///
294    /// # Errors
295    ///
296    /// Returns `QueryError` if preparation fails.
297    ///
298    /// # Example
299    ///
300    pub async fn prepare(
301        &mut self,
302        sql: impl Into<String>,
303    ) -> Result<PreparedStatement, QueryError> {
304        let sql = sql.into();
305
306        // Validate session state
307        self.session
308            .validate_ready()
309            .await
310            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
311
312        let mut transport = self.transport.lock().await;
313        let handle = transport
314            .create_prepared_statement(&sql)
315            .await
316            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
317
318        Ok(PreparedStatement::new(handle))
319    }
320
321    /// Execute a prepared statement and return results.
322    ///
323    /// # Arguments
324    ///
325    /// * `stmt` - Prepared statement to execute
326    ///
327    /// # Returns
328    ///
329    /// A `ResultSet` containing the query results.
330    ///
331    /// # Errors
332    ///
333    /// Returns `QueryError` if execution fails.
334    ///
335    /// # Example
336    ///
337    pub async fn execute_prepared(
338        &mut self,
339        stmt: &PreparedStatement,
340    ) -> Result<ResultSet, QueryError> {
341        if stmt.is_closed() {
342            return Err(QueryError::StatementClosed);
343        }
344
345        // Validate session state
346        self.session
347            .validate_ready()
348            .await
349            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
350
351        // Update session state
352        self.session.set_state(SessionState::Executing).await;
353
354        // Increment query counter
355        self.session.increment_query_count();
356
357        // Convert parameters to column-major JSON format
358        let params_data = stmt.build_parameters_data()?;
359
360        let mut transport = self.transport.lock().await;
361        let result = transport
362            .execute_prepared_statement(stmt.handle_ref(), params_data)
363            .await
364            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
365
366        drop(transport);
367
368        // Update session state back to ready/in_transaction
369        self.update_session_state_after_query().await;
370
371        ResultSet::from_transport_result(result, Arc::clone(&self.transport))
372    }
373
374    /// Execute a prepared statement and return the number of affected rows.
375    ///
376    /// Use this for INSERT, UPDATE, DELETE statements.
377    ///
378    /// # Arguments
379    ///
380    /// * `stmt` - Prepared statement to execute
381    ///
382    /// # Returns
383    ///
384    /// The number of rows affected.
385    ///
386    /// # Errors
387    ///
388    /// Returns `QueryError` if execution fails or statement returns a result set.
389    pub async fn execute_prepared_update(
390        &mut self,
391        stmt: &PreparedStatement,
392    ) -> Result<i64, QueryError> {
393        if stmt.is_closed() {
394            return Err(QueryError::StatementClosed);
395        }
396
397        // Validate session state
398        self.session
399            .validate_ready()
400            .await
401            .map_err(|e| QueryError::InvalidState(e.to_string()))?;
402
403        // Update session state
404        self.session.set_state(SessionState::Executing).await;
405
406        // Convert parameters to column-major JSON format
407        let params_data = stmt.build_parameters_data()?;
408
409        let mut transport = self.transport.lock().await;
410        let result = transport
411            .execute_prepared_statement(stmt.handle_ref(), params_data)
412            .await
413            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
414
415        drop(transport);
416
417        // Update session state back to ready/in_transaction
418        self.update_session_state_after_query().await;
419
420        match result {
421            QueryResult::RowCount { count } => Ok(count),
422            QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
423        }
424    }
425
426    /// Close a prepared statement and release server-side resources.
427    ///
428    /// # Arguments
429    ///
430    /// * `stmt` - Prepared statement to close (consumed)
431    ///
432    /// # Errors
433    ///
434    /// Returns `QueryError` if closing fails.
435    ///
436    /// # Example
437    ///
438    pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
439        if stmt.is_closed() {
440            return Ok(());
441        }
442
443        let mut transport = self.transport.lock().await;
444        transport
445            .close_prepared_statement(stmt.handle_ref())
446            .await
447            .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
448
449        stmt.mark_closed();
450        Ok(())
451    }
452
453    // ========================================================================
454    // Convenience Methods (these internally use execute_statement)
455    // ========================================================================
456
457    /// Execute a SQL query and return results.
458    ///
459    /// This is a convenience method that creates a statement and executes it.
460    ///
461    /// # Arguments
462    ///
463    /// * `sql` - SQL query text
464    ///
465    /// # Returns
466    ///
467    /// A `ResultSet` containing the query results.
468    ///
469    /// # Errors
470    ///
471    /// Returns `QueryError` if execution fails.
472    ///
473    /// # Example
474    ///
475    pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
476        let stmt = self.create_statement(sql);
477        self.execute_statement(&stmt).await
478    }
479
480    /// Execute a SQL query and return all results as RecordBatches.
481    ///
482    /// This is a convenience method that fetches all results into memory.
483    ///
484    /// # Arguments
485    ///
486    /// * `sql` - SQL query text
487    ///
488    /// # Returns
489    ///
490    /// A vector of `RecordBatch` instances.
491    ///
492    /// # Errors
493    ///
494    /// Returns `QueryError` if execution fails.
495    ///
496    /// # Example
497    ///
498    pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
499        let result_set = self.execute(sql).await?;
500        result_set.fetch_all().await
501    }
502
503    /// Execute a non-SELECT statement and return the row count.
504    ///
505    /// # Arguments
506    ///
507    /// * `sql` - SQL statement (INSERT, UPDATE, DELETE, etc.)
508    ///
509    /// # Returns
510    ///
511    /// The number of rows affected.
512    ///
513    /// # Errors
514    ///
515    /// Returns `QueryError` if execution fails.
516    ///
517    /// # Example
518    ///
519    pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
520        let stmt = self.create_statement(sql);
521        self.execute_statement_update(&stmt).await
522    }
523
524    // ========================================================================
525    // Transaction Methods
526    // ========================================================================
527
528    /// Begin a transaction.
529    ///
530    /// # Errors
531    ///
532    /// Returns `QueryError` if a transaction is already active or if the operation fails.
533    ///
534    /// # Example
535    ///
536    pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
537        // Exasol doesn't have a BEGIN statement - transactions are implicit.
538        // Starting a transaction just means we're tracking that autocommit is off.
539        // The actual transaction begins with the first DML/query.
540        self.session
541            .begin_transaction()
542            .await
543            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
544
545        Ok(())
546    }
547
548    /// Commit the current transaction.
549    ///
550    /// # Errors
551    ///
552    /// Returns `QueryError` if no transaction is active or if the operation fails.
553    ///
554    /// # Example
555    ///
556    pub async fn commit(&mut self) -> Result<(), QueryError> {
557        // Execute COMMIT statement
558        self.execute_update("COMMIT").await?;
559
560        self.session
561            .commit_transaction()
562            .await
563            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
564
565        Ok(())
566    }
567
568    /// Rollback the current transaction.
569    ///
570    /// # Errors
571    ///
572    /// Returns `QueryError` if no transaction is active or if the operation fails.
573    ///
574    /// # Example
575    ///
576    pub async fn rollback(&mut self) -> Result<(), QueryError> {
577        // Execute ROLLBACK statement
578        self.execute_update("ROLLBACK").await?;
579
580        self.session
581            .rollback_transaction()
582            .await
583            .map_err(|e| QueryError::TransactionError(e.to_string()))?;
584
585        Ok(())
586    }
587
588    /// Check if a transaction is currently active.
589    ///
590    /// # Returns
591    ///
592    /// `true` if a transaction is active, `false` otherwise.
593    pub fn in_transaction(&self) -> bool {
594        self.session.in_transaction()
595    }
596
597    // ========================================================================
598    // Session and Schema Methods
599    // ========================================================================
600
601    /// Get the current schema.
602    ///
603    /// # Returns
604    ///
605    /// The current schema name, or `None` if no schema is set.
606    pub async fn current_schema(&self) -> Option<String> {
607        self.session.current_schema().await
608    }
609
610    /// Set the current schema.
611    ///
612    /// # Arguments
613    ///
614    /// * `schema` - The schema name to set
615    ///
616    /// # Errors
617    ///
618    /// Returns `QueryError` if the operation fails.
619    ///
620    /// # Example
621    ///
622    pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
623        let schema_name = schema.into();
624        self.execute_update(format!("OPEN SCHEMA {}", schema_name))
625            .await?;
626        self.session.set_current_schema(Some(schema_name)).await;
627        Ok(())
628    }
629
630    // ========================================================================
631    // Metadata Methods
632    // ========================================================================
633
634    /// Get metadata about catalogs.
635    ///
636    /// # Returns
637    ///
638    /// A `ResultSet` containing catalog metadata.
639    ///
640    /// # Errors
641    ///
642    /// Returns `QueryError` if the operation fails.
643    pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
644        self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
645            .await
646    }
647
648    /// Get metadata about schemas.
649    ///
650    /// # Arguments
651    ///
652    /// * `catalog` - Optional catalog name filter
653    ///
654    /// # Returns
655    ///
656    /// A `ResultSet` containing schema metadata.
657    ///
658    /// # Errors
659    ///
660    /// Returns `QueryError` if the operation fails.
661    pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
662        let sql = if let Some(cat) = catalog {
663            format!(
664                "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
665                cat.replace('\'', "''")
666            )
667        } else {
668            "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
669        };
670        self.execute(sql).await
671    }
672
673    /// Get metadata about tables.
674    ///
675    /// # Arguments
676    ///
677    /// * `catalog` - Optional catalog name filter
678    /// * `schema` - Optional schema name filter
679    /// * `table` - Optional table name filter
680    ///
681    /// # Returns
682    ///
683    /// A `ResultSet` containing table metadata.
684    ///
685    /// # Errors
686    ///
687    /// Returns `QueryError` if the operation fails.
688    pub async fn get_tables(
689        &mut self,
690        catalog: Option<&str>,
691        schema: Option<&str>,
692        table: Option<&str>,
693    ) -> Result<ResultSet, QueryError> {
694        let mut conditions = Vec::new();
695
696        if let Some(cat) = catalog {
697            conditions.push(format!("TABLE_SCHEMA = '{}'", cat.replace('\'', "''")));
698        }
699        if let Some(sch) = schema {
700            conditions.push(format!("TABLE_SCHEMA = '{}'", sch.replace('\'', "''")));
701        }
702        if let Some(tbl) = table {
703            conditions.push(format!("TABLE_NAME = '{}'", tbl.replace('\'', "''")));
704        }
705
706        let where_clause = if conditions.is_empty() {
707            String::new()
708        } else {
709            format!("WHERE {}", conditions.join(" AND "))
710        };
711
712        let sql = format!(
713            "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM SYS.EXA_ALL_TABLES {} ORDER BY TABLE_SCHEMA, TABLE_NAME",
714            where_clause
715        );
716
717        self.execute(sql).await
718    }
719
720    /// Get metadata about columns.
721    ///
722    /// # Arguments
723    ///
724    /// * `catalog` - Optional catalog name filter
725    /// * `schema` - Optional schema name filter
726    /// * `table` - Optional table name filter
727    /// * `column` - Optional column name filter
728    ///
729    /// # Returns
730    ///
731    /// A `ResultSet` containing column metadata.
732    ///
733    /// # Errors
734    ///
735    /// Returns `QueryError` if the operation fails.
736    pub async fn get_columns(
737        &mut self,
738        catalog: Option<&str>,
739        schema: Option<&str>,
740        table: Option<&str>,
741        column: Option<&str>,
742    ) -> Result<ResultSet, QueryError> {
743        let mut conditions = Vec::new();
744
745        if let Some(cat) = catalog {
746            conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
747        }
748        if let Some(sch) = schema {
749            conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
750        }
751        if let Some(tbl) = table {
752            conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
753        }
754        if let Some(col) = column {
755            conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
756        }
757
758        let where_clause = if conditions.is_empty() {
759            String::new()
760        } else {
761            format!("WHERE {}", conditions.join(" AND "))
762        };
763
764        let sql = format!(
765            "SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
766             FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
767            where_clause
768        );
769
770        self.execute(sql).await
771    }
772
773    // ========================================================================
774    // Session Information Methods
775    // ========================================================================
776
777    /// Get session information.
778    ///
779    /// # Returns
780    ///
781    /// The session ID.
782    pub fn session_id(&self) -> &str {
783        self.session.session_id()
784    }
785
786    /// Get connection parameters.
787    ///
788    /// # Returns
789    ///
790    /// A reference to the connection parameters.
791    pub fn params(&self) -> &ConnectionParams {
792        &self.params
793    }
794
795    /// Check if the connection is closed.
796    ///
797    /// # Returns
798    ///
799    /// `true` if the connection is closed, `false` otherwise.
800    pub async fn is_closed(&self) -> bool {
801        self.session.is_closed().await
802    }
803
804    /// Close the connection.
805    ///
806    /// This closes the session and transport layer.
807    ///
808    /// # Errors
809    ///
810    /// Returns `ConnectionError` if closing fails.
811    ///
812    /// # Example
813    ///
814    pub async fn close(self) -> Result<(), ConnectionError> {
815        // Close session
816        self.session.close().await?;
817
818        // Close transport
819        let mut transport = self.transport.lock().await;
820        transport
821            .close()
822            .await
823            .map_err(|e| ConnectionError::ConnectionFailed {
824                host: self.params.host.clone(),
825                port: self.params.port,
826                message: e.to_string(),
827            })?;
828
829        Ok(())
830    }
831
832    // ========================================================================
833    // Import/Export Methods
834    // ========================================================================
835
836    /// Creates an SQL executor closure for import/export operations.
837    ///
838    /// This is a helper method that creates a closure which can execute SQL
839    /// statements and return the row count. The closure captures a cloned
840    /// reference to the transport, allowing it to be called multiple times
841    /// (e.g., for parallel file imports).
842    ///
843    /// # Returns
844    ///
845    /// A closure that takes an SQL string and returns a Future resolving to
846    /// either the affected row count or an error string.
847    fn make_sql_executor(
848        &self,
849    ) -> impl Fn(
850        String,
851    )
852        -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
853        let transport = Arc::clone(&self.transport);
854        move |sql: String| {
855            let transport = Arc::clone(&transport);
856            Box::pin(async move {
857                let mut transport_guard = transport.lock().await;
858                match transport_guard.execute_query(&sql).await {
859                    Ok(QueryResult::RowCount { count }) => Ok(count as u64),
860                    Ok(QueryResult::ResultSet { .. }) => Ok(0),
861                    Err(e) => Err(e.to_string()),
862                }
863            })
864        }
865    }
866
867    /// Import CSV data from a file into an Exasol table.
868    ///
869    /// This method reads CSV data from the specified file and imports it into
870    /// the target table using Exasol's HTTP transport layer.
871    ///
872    /// # Arguments
873    ///
874    /// * `table` - Name of the target table
875    /// * `file_path` - Path to the CSV file
876    /// * `options` - Import options
877    ///
878    /// # Returns
879    ///
880    /// The number of rows imported on success.
881    ///
882    /// # Errors
883    ///
884    /// Returns `ImportError` if the import fails.
885    ///
886    /// # Example
887    ///
888    pub async fn import_csv_from_file(
889        &mut self,
890        table: &str,
891        file_path: &std::path::Path,
892        options: crate::import::csv::CsvImportOptions,
893    ) -> Result<u64, crate::import::ImportError> {
894        // Pass Exasol host/port from connection params to import options
895        let options = options
896            .exasol_host(&self.params.host)
897            .exasol_port(self.params.port);
898
899        crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
900            .await
901    }
902
903    /// Import CSV data from an async reader into an Exasol table.
904    ///
905    /// This method reads CSV data from an async reader and imports it into
906    /// the target table using Exasol's HTTP transport layer.
907    ///
908    /// # Arguments
909    ///
910    /// * `table` - Name of the target table
911    /// * `reader` - Async reader providing CSV data
912    /// * `options` - Import options
913    ///
914    /// # Returns
915    ///
916    /// The number of rows imported on success.
917    ///
918    /// # Errors
919    ///
920    /// Returns `ImportError` if the import fails.
921    pub async fn import_csv_from_stream<R>(
922        &mut self,
923        table: &str,
924        reader: R,
925        options: crate::import::csv::CsvImportOptions,
926    ) -> Result<u64, crate::import::ImportError>
927    where
928        R: tokio::io::AsyncRead + Unpin + Send + 'static,
929    {
930        // Pass Exasol host/port from connection params to import options
931        let options = options
932            .exasol_host(&self.params.host)
933            .exasol_port(self.params.port);
934
935        crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
936            .await
937    }
938
939    /// Import CSV data from an iterator into an Exasol table.
940    ///
941    /// This method converts iterator rows to CSV format and imports them into
942    /// the target table using Exasol's HTTP transport layer.
943    ///
944    /// # Arguments
945    ///
946    /// * `table` - Name of the target table
947    /// * `rows` - Iterator of rows, where each row is an iterator of field values
948    /// * `options` - Import options
949    ///
950    /// # Returns
951    ///
952    /// The number of rows imported on success.
953    ///
954    /// # Errors
955    ///
956    /// Returns `ImportError` if the import fails.
957    ///
958    /// # Example
959    ///
960    pub async fn import_csv_from_iter<I, T, S>(
961        &mut self,
962        table: &str,
963        rows: I,
964        options: crate::import::csv::CsvImportOptions,
965    ) -> Result<u64, crate::import::ImportError>
966    where
967        I: IntoIterator<Item = T> + Send + 'static,
968        T: IntoIterator<Item = S> + Send,
969        S: AsRef<str>,
970    {
971        // Pass Exasol host/port from connection params to import options
972        let options = options
973            .exasol_host(&self.params.host)
974            .exasol_port(self.params.port);
975
976        crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
977    }
978
979    /// Export data from an Exasol table or query to a CSV file.
980    ///
981    /// This method exports data from the specified source to a CSV file
982    /// using Exasol's HTTP transport layer.
983    ///
984    /// # Arguments
985    ///
986    /// * `source` - The data source (table or query)
987    /// * `file_path` - Path to the output file
988    /// * `options` - Export options
989    ///
990    /// # Returns
991    ///
992    /// The number of rows exported on success.
993    ///
994    /// # Errors
995    ///
996    /// Returns `ExportError` if the export fails.
997    ///
998    /// # Example
999    ///
1000    pub async fn export_csv_to_file(
1001        &mut self,
1002        source: crate::query::export::ExportSource,
1003        file_path: &std::path::Path,
1004        options: crate::export::csv::CsvExportOptions,
1005    ) -> Result<u64, crate::export::csv::ExportError> {
1006        // Pass Exasol host/port from connection params to export options
1007        let options = options
1008            .exasol_host(&self.params.host)
1009            .exasol_port(self.params.port);
1010
1011        let mut transport_guard = self.transport.lock().await;
1012        crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
1013    }
1014
1015    /// Export data from an Exasol table or query to an async writer.
1016    ///
1017    /// This method exports data from the specified source to an async writer
1018    /// using Exasol's HTTP transport layer.
1019    ///
1020    /// # Arguments
1021    ///
1022    /// * `source` - The data source (table or query)
1023    /// * `writer` - Async writer to write the CSV data to
1024    /// * `options` - Export options
1025    ///
1026    /// # Returns
1027    ///
1028    /// The number of rows exported on success.
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns `ExportError` if the export fails.
1033    pub async fn export_csv_to_stream<W>(
1034        &mut self,
1035        source: crate::query::export::ExportSource,
1036        writer: W,
1037        options: crate::export::csv::CsvExportOptions,
1038    ) -> Result<u64, crate::export::csv::ExportError>
1039    where
1040        W: tokio::io::AsyncWrite + Unpin,
1041    {
1042        // Pass Exasol host/port from connection params to export options
1043        let options = options
1044            .exasol_host(&self.params.host)
1045            .exasol_port(self.params.port);
1046
1047        let mut transport_guard = self.transport.lock().await;
1048        crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
1049    }
1050
1051    /// Export data from an Exasol table or query to an in-memory list of rows.
1052    ///
1053    /// Each row is represented as a vector of string values.
1054    ///
1055    /// # Arguments
1056    ///
1057    /// * `source` - The data source (table or query)
1058    /// * `options` - Export options
1059    ///
1060    /// # Returns
1061    ///
1062    /// A vector of rows, where each row is a vector of column values.
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns `ExportError` if the export fails.
1067    ///
1068    /// # Example
1069    ///
1070    pub async fn export_csv_to_list(
1071        &mut self,
1072        source: crate::query::export::ExportSource,
1073        options: crate::export::csv::CsvExportOptions,
1074    ) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
1075        // Pass Exasol host/port from connection params to export options
1076        let options = options
1077            .exasol_host(&self.params.host)
1078            .exasol_port(self.params.port);
1079
1080        let mut transport_guard = self.transport.lock().await;
1081        crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
1082    }
1083
1084    /// Import multiple CSV files in parallel into an Exasol table.
1085    ///
1086    /// This method reads CSV data from multiple files and imports them into
1087    /// the target table using parallel HTTP transport connections. Each file
1088    /// gets its own connection with a unique internal address.
1089    ///
1090    /// For a single file, this method delegates to `import_csv_from_file`.
1091    ///
1092    /// # Arguments
1093    ///
1094    /// * `table` - Name of the target table
1095    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1096    /// * `options` - Import options
1097    ///
1098    /// # Returns
1099    ///
1100    /// The number of rows imported on success.
1101    ///
1102    /// # Errors
1103    ///
1104    /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1105    ///
1106    /// # Example
1107    ///
1108    /// ```no_run
1109    /// use exarrow_rs::adbc::Connection;
1110    /// use exarrow_rs::import::CsvImportOptions;
1111    /// use std::path::PathBuf;
1112    ///
1113    /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1114    /// let files = vec![
1115    ///     PathBuf::from("/data/part1.csv"),
1116    ///     PathBuf::from("/data/part2.csv"),
1117    /// ];
1118    ///
1119    /// let options = CsvImportOptions::default();
1120    /// let rows = conn.import_csv_from_files("my_table", files, options).await?;
1121    /// # Ok(())
1122    /// # }
1123    /// ```
1124    pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
1125        &mut self,
1126        table: &str,
1127        paths: S,
1128        options: crate::import::csv::CsvImportOptions,
1129    ) -> Result<u64, crate::import::ImportError> {
1130        // Pass Exasol host/port from connection params to import options
1131        let options = options
1132            .exasol_host(&self.params.host)
1133            .exasol_port(self.params.port);
1134
1135        crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
1136    }
1137
1138    /// Import data from a Parquet file into an Exasol table.
1139    ///
1140    /// This method reads a Parquet file, converts the data to CSV format,
1141    /// and imports it into the target table using Exasol's HTTP transport layer.
1142    ///
1143    /// # Arguments
1144    ///
1145    /// * `table` - Name of the target table
1146    /// * `file_path` - Path to the Parquet file
1147    /// * `options` - Import options
1148    ///
1149    /// # Returns
1150    ///
1151    /// The number of rows imported on success.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns `ImportError` if the import fails.
1156    ///
1157    /// # Example
1158    ///
1159    pub async fn import_from_parquet(
1160        &mut self,
1161        table: &str,
1162        file_path: &std::path::Path,
1163        options: crate::import::parquet::ParquetImportOptions,
1164    ) -> Result<u64, crate::import::ImportError> {
1165        // Pass Exasol host/port from connection params to import options
1166        let options = options
1167            .with_exasol_host(&self.params.host)
1168            .with_exasol_port(self.params.port);
1169
1170        crate::import::parquet::import_from_parquet(
1171            self.make_sql_executor(),
1172            table,
1173            file_path,
1174            options,
1175        )
1176        .await
1177    }
1178
1179    /// Import multiple Parquet files in parallel into an Exasol table.
1180    ///
1181    /// This method converts each Parquet file to CSV format concurrently,
1182    /// then streams the data through parallel HTTP transport connections.
1183    ///
1184    /// For a single file, this method delegates to `import_from_parquet`.
1185    ///
1186    /// # Arguments
1187    ///
1188    /// * `table` - Name of the target table
1189    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1190    /// * `options` - Import options
1191    ///
1192    /// # Returns
1193    ///
1194    /// The number of rows imported on success.
1195    ///
1196    /// # Errors
1197    ///
1198    /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1199    ///
1200    /// # Example
1201    ///
1202    /// ```no_run
1203    /// use exarrow_rs::adbc::Connection;
1204    /// use exarrow_rs::import::ParquetImportOptions;
1205    /// use std::path::PathBuf;
1206    ///
1207    /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1208    /// let files = vec![
1209    ///     PathBuf::from("/data/part1.parquet"),
1210    ///     PathBuf::from("/data/part2.parquet"),
1211    /// ];
1212    ///
1213    /// let options = ParquetImportOptions::default();
1214    /// let rows = conn.import_parquet_from_files("my_table", files, options).await?;
1215    /// # Ok(())
1216    /// # }
1217    /// ```
1218    pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
1219        &mut self,
1220        table: &str,
1221        paths: S,
1222        options: crate::import::parquet::ParquetImportOptions,
1223    ) -> Result<u64, crate::import::ImportError> {
1224        // Pass Exasol host/port from connection params to import options
1225        let options = options
1226            .with_exasol_host(&self.params.host)
1227            .with_exasol_port(self.params.port);
1228
1229        crate::import::parquet::import_from_parquet_files(
1230            self.make_sql_executor(),
1231            table,
1232            paths,
1233            options,
1234        )
1235        .await
1236    }
1237
1238    /// Export data from an Exasol table or query to a Parquet file.
1239    ///
1240    /// This method exports data from the specified source to a Parquet file.
1241    /// The data is first received as CSV from Exasol, then converted to Parquet format.
1242    ///
1243    /// # Arguments
1244    ///
1245    /// * `source` - The data source (table or query)
1246    /// * `file_path` - Path to the output Parquet file
1247    /// * `options` - Export options
1248    ///
1249    /// # Returns
1250    ///
1251    /// The number of rows exported on success.
1252    ///
1253    /// # Errors
1254    ///
1255    /// Returns `ExportError` if the export fails.
1256    ///
1257    /// # Example
1258    ///
1259    pub async fn export_to_parquet(
1260        &mut self,
1261        source: crate::query::export::ExportSource,
1262        file_path: &std::path::Path,
1263        options: crate::export::parquet::ParquetExportOptions,
1264    ) -> Result<u64, crate::export::csv::ExportError> {
1265        // Pass Exasol host/port from connection params to export options
1266        let options = options
1267            .exasol_host(&self.params.host)
1268            .exasol_port(self.params.port);
1269
1270        let mut transport_guard = self.transport.lock().await;
1271        crate::export::parquet::export_to_parquet_via_transport(
1272            &mut *transport_guard,
1273            source,
1274            file_path,
1275            options,
1276        )
1277        .await
1278    }
1279
1280    /// Import data from an Arrow RecordBatch into an Exasol table.
1281    ///
1282    /// This method converts the RecordBatch to CSV format and imports it
1283    /// into the target table using Exasol's HTTP transport layer.
1284    ///
1285    /// # Arguments
1286    ///
1287    /// * `table` - Name of the target table
1288    /// * `batch` - The RecordBatch to import
1289    /// * `options` - Import options
1290    ///
1291    /// # Returns
1292    ///
1293    /// The number of rows imported on success.
1294    ///
1295    /// # Errors
1296    ///
1297    /// Returns `ImportError` if the import fails.
1298    ///
1299    /// # Example
1300    ///
1301    pub async fn import_from_record_batch(
1302        &mut self,
1303        table: &str,
1304        batch: &RecordBatch,
1305        options: crate::import::arrow::ArrowImportOptions,
1306    ) -> Result<u64, crate::import::ImportError> {
1307        // Pass Exasol host/port from connection params to import options
1308        let options = options
1309            .exasol_host(&self.params.host)
1310            .exasol_port(self.params.port);
1311
1312        crate::import::arrow::import_from_record_batch(
1313            self.make_sql_executor(),
1314            table,
1315            batch,
1316            options,
1317        )
1318        .await
1319    }
1320
1321    /// Import data from multiple Arrow RecordBatches into an Exasol table.
1322    ///
1323    /// This method converts each RecordBatch to CSV format and imports them
1324    /// into the target table using Exasol's HTTP transport layer.
1325    ///
1326    /// # Arguments
1327    ///
1328    /// * `table` - Name of the target table
1329    /// * `batches` - An iterator of RecordBatches to import
1330    /// * `options` - Import options
1331    ///
1332    /// # Returns
1333    ///
1334    /// The number of rows imported on success.
1335    ///
1336    /// # Errors
1337    ///
1338    /// Returns `ImportError` if the import fails.
1339    pub async fn import_from_record_batches<I>(
1340        &mut self,
1341        table: &str,
1342        batches: I,
1343        options: crate::import::arrow::ArrowImportOptions,
1344    ) -> Result<u64, crate::import::ImportError>
1345    where
1346        I: IntoIterator<Item = RecordBatch>,
1347    {
1348        // Pass Exasol host/port from connection params to import options
1349        let options = options
1350            .exasol_host(&self.params.host)
1351            .exasol_port(self.params.port);
1352
1353        crate::import::arrow::import_from_record_batches(
1354            self.make_sql_executor(),
1355            table,
1356            batches,
1357            options,
1358        )
1359        .await
1360    }
1361
1362    /// Import data from an Arrow IPC file/stream into an Exasol table.
1363    ///
1364    /// This method reads Arrow IPC format data, converts it to CSV,
1365    /// and imports it into the target table using Exasol's HTTP transport layer.
1366    ///
1367    /// # Arguments
1368    ///
1369    /// * `table` - Name of the target table
1370    /// * `reader` - An async reader containing Arrow IPC data
1371    /// * `options` - Import options
1372    ///
1373    /// # Returns
1374    ///
1375    /// The number of rows imported on success.
1376    ///
1377    /// # Errors
1378    ///
1379    /// Returns `ImportError` if the import fails.
1380    ///
1381    /// # Example
1382    ///
1383    pub async fn import_from_arrow_ipc<R>(
1384        &mut self,
1385        table: &str,
1386        reader: R,
1387        options: crate::import::arrow::ArrowImportOptions,
1388    ) -> Result<u64, crate::import::ImportError>
1389    where
1390        R: tokio::io::AsyncRead + Unpin + Send,
1391    {
1392        // Pass Exasol host/port from connection params to import options
1393        let options = options
1394            .exasol_host(&self.params.host)
1395            .exasol_port(self.params.port);
1396
1397        crate::import::arrow::import_from_arrow_ipc(
1398            self.make_sql_executor(),
1399            table,
1400            reader,
1401            options,
1402        )
1403        .await
1404    }
1405
1406    /// Export data from an Exasol table or query to Arrow RecordBatches.
1407    ///
1408    /// This method exports data from the specified source and converts it
1409    /// to Arrow RecordBatches.
1410    ///
1411    /// # Arguments
1412    ///
1413    /// * `source` - The data source (table or query)
1414    /// * `options` - Export options
1415    ///
1416    /// # Returns
1417    ///
1418    /// A vector of RecordBatches on success.
1419    ///
1420    /// # Errors
1421    ///
1422    /// Returns `ExportError` if the export fails.
1423    ///
1424    /// # Example
1425    ///
1426    pub async fn export_to_record_batches(
1427        &mut self,
1428        source: crate::query::export::ExportSource,
1429        options: crate::export::arrow::ArrowExportOptions,
1430    ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1431        // Pass Exasol host/port from connection params to export options
1432        let options = options
1433            .exasol_host(&self.params.host)
1434            .exasol_port(self.params.port);
1435
1436        let mut transport_guard = self.transport.lock().await;
1437        crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
1438    }
1439
1440    /// Export data from an Exasol table or query to an Arrow IPC file.
1441    ///
1442    /// This method exports data from the specified source to an Arrow IPC file.
1443    ///
1444    /// # Arguments
1445    ///
1446    /// * `source` - The data source (table or query)
1447    /// * `file_path` - Path to the output Arrow IPC file
1448    /// * `options` - Export options
1449    ///
1450    /// # Returns
1451    ///
1452    /// The number of rows exported on success.
1453    ///
1454    /// # Errors
1455    ///
1456    /// Returns `ExportError` if the export fails.
1457    ///
1458    /// # Example
1459    ///
1460    pub async fn export_to_arrow_ipc(
1461        &mut self,
1462        source: crate::query::export::ExportSource,
1463        file_path: &std::path::Path,
1464        options: crate::export::arrow::ArrowExportOptions,
1465    ) -> Result<u64, crate::export::csv::ExportError> {
1466        // Pass Exasol host/port from connection params to export options
1467        let options = options
1468            .exasol_host(&self.params.host)
1469            .exasol_port(self.params.port);
1470
1471        let mut transport_guard = self.transport.lock().await;
1472        crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
1473            .await
1474    }
1475
1476    // ========================================================================
1477    // Blocking Import/Export Methods
1478    // ========================================================================
1479
1480    /// Import CSV data from a file into an Exasol table (blocking).
1481    ///
1482    /// This is a synchronous wrapper around [`import_csv_from_file`](Self::import_csv_from_file)
1483    /// for use in non-async contexts.
1484    ///
1485    /// # Arguments
1486    ///
1487    /// * `table` - Name of the target table
1488    /// * `file_path` - Path to the CSV file
1489    /// * `options` - Import options
1490    ///
1491    /// # Returns
1492    ///
1493    /// The number of rows imported on success.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns `ImportError` if the import fails.
1498    ///
1499    /// # Example
1500    ///
1501    pub fn blocking_import_csv_from_file(
1502        &mut self,
1503        table: &str,
1504        file_path: &std::path::Path,
1505        options: crate::import::csv::CsvImportOptions,
1506    ) -> Result<u64, crate::import::ImportError> {
1507        blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
1508    }
1509
1510    /// Import multiple CSV files in parallel into an Exasol table (blocking).
1511    ///
1512    /// This is a synchronous wrapper around [`import_csv_from_files`](Self::import_csv_from_files)
1513    /// for use in non-async contexts.
1514    ///
1515    /// # Arguments
1516    ///
1517    /// * `table` - Name of the target table
1518    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1519    /// * `options` - Import options
1520    ///
1521    /// # Returns
1522    ///
1523    /// The number of rows imported on success.
1524    ///
1525    /// # Errors
1526    ///
1527    /// Returns `ImportError` if the import fails.
1528    pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
1529        &mut self,
1530        table: &str,
1531        paths: S,
1532        options: crate::import::csv::CsvImportOptions,
1533    ) -> Result<u64, crate::import::ImportError> {
1534        blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
1535    }
1536
1537    /// Import data from a Parquet file into an Exasol table (blocking).
1538    ///
1539    /// This is a synchronous wrapper around [`import_from_parquet`](Self::import_from_parquet)
1540    /// for use in non-async contexts.
1541    ///
1542    /// # Arguments
1543    ///
1544    /// * `table` - Name of the target table
1545    /// * `file_path` - Path to the Parquet file
1546    /// * `options` - Import options
1547    ///
1548    /// # Returns
1549    ///
1550    /// The number of rows imported on success.
1551    ///
1552    /// # Errors
1553    ///
1554    /// Returns `ImportError` if the import fails.
1555    ///
1556    /// # Example
1557    ///
1558    pub fn blocking_import_from_parquet(
1559        &mut self,
1560        table: &str,
1561        file_path: &std::path::Path,
1562        options: crate::import::parquet::ParquetImportOptions,
1563    ) -> Result<u64, crate::import::ImportError> {
1564        blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
1565    }
1566
1567    /// Import multiple Parquet files in parallel into an Exasol table (blocking).
1568    ///
1569    /// This is a synchronous wrapper around [`import_parquet_from_files`](Self::import_parquet_from_files)
1570    /// for use in non-async contexts.
1571    ///
1572    /// # Arguments
1573    ///
1574    /// * `table` - Name of the target table
1575    /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1576    /// * `options` - Import options
1577    ///
1578    /// # Returns
1579    ///
1580    /// The number of rows imported on success.
1581    ///
1582    /// # Errors
1583    ///
1584    /// Returns `ImportError` if the import fails.
1585    pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
1586        &mut self,
1587        table: &str,
1588        paths: S,
1589        options: crate::import::parquet::ParquetImportOptions,
1590    ) -> Result<u64, crate::import::ImportError> {
1591        blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
1592    }
1593
1594    /// Import data from an Arrow RecordBatch into an Exasol table (blocking).
1595    ///
1596    /// This is a synchronous wrapper around [`import_from_record_batch`](Self::import_from_record_batch)
1597    /// for use in non-async contexts.
1598    ///
1599    /// # Arguments
1600    ///
1601    /// * `table` - Name of the target table
1602    /// * `batch` - The RecordBatch to import
1603    /// * `options` - Import options
1604    ///
1605    /// # Returns
1606    ///
1607    /// The number of rows imported on success.
1608    ///
1609    /// # Errors
1610    ///
1611    /// Returns `ImportError` if the import fails.
1612    ///
1613    /// # Example
1614    ///
1615    pub fn blocking_import_from_record_batch(
1616        &mut self,
1617        table: &str,
1618        batch: &RecordBatch,
1619        options: crate::import::arrow::ArrowImportOptions,
1620    ) -> Result<u64, crate::import::ImportError> {
1621        blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
1622    }
1623
1624    /// Import data from an Arrow IPC file into an Exasol table (blocking).
1625    ///
1626    /// This is a synchronous wrapper around [`import_from_arrow_ipc`](Self::import_from_arrow_ipc)
1627    /// for use in non-async contexts.
1628    ///
1629    /// Note: This method requires a synchronous reader that implements `std::io::Read`.
1630    /// The data will be read into memory before being imported.
1631    ///
1632    /// # Arguments
1633    ///
1634    /// * `table` - Name of the target table
1635    /// * `file_path` - Path to the Arrow IPC file
1636    /// * `options` - Import options
1637    ///
1638    /// # Returns
1639    ///
1640    /// The number of rows imported on success.
1641    ///
1642    /// # Errors
1643    ///
1644    /// Returns `ImportError` if the import fails.
1645    ///
1646    /// # Example
1647    ///
1648    pub fn blocking_import_from_arrow_ipc(
1649        &mut self,
1650        table: &str,
1651        file_path: &std::path::Path,
1652        options: crate::import::arrow::ArrowImportOptions,
1653    ) -> Result<u64, crate::import::ImportError> {
1654        blocking_runtime().block_on(async {
1655            let file = tokio::fs::File::open(file_path)
1656                .await
1657                .map_err(crate::import::ImportError::IoError)?;
1658            self.import_from_arrow_ipc(table, file, options).await
1659        })
1660    }
1661
1662    /// Export data from an Exasol table or query to a CSV file (blocking).
1663    ///
1664    /// This is a synchronous wrapper around [`export_csv_to_file`](Self::export_csv_to_file)
1665    /// for use in non-async contexts.
1666    ///
1667    /// # Arguments
1668    ///
1669    /// * `source` - The data source (table or query)
1670    /// * `file_path` - Path to the output file
1671    /// * `options` - Export options
1672    ///
1673    /// # Returns
1674    ///
1675    /// The number of rows exported on success.
1676    ///
1677    /// # Errors
1678    ///
1679    /// Returns `ExportError` if the export fails.
1680    ///
1681    /// # Example
1682    ///
1683    pub fn blocking_export_csv_to_file(
1684        &mut self,
1685        source: crate::query::export::ExportSource,
1686        file_path: &std::path::Path,
1687        options: crate::export::csv::CsvExportOptions,
1688    ) -> Result<u64, crate::export::csv::ExportError> {
1689        blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
1690    }
1691
1692    /// Export data from an Exasol table or query to a Parquet file (blocking).
1693    ///
1694    /// This is a synchronous wrapper around [`export_to_parquet`](Self::export_to_parquet)
1695    /// for use in non-async contexts.
1696    ///
1697    /// # Arguments
1698    ///
1699    /// * `source` - The data source (table or query)
1700    /// * `file_path` - Path to the output Parquet file
1701    /// * `options` - Export options
1702    ///
1703    /// # Returns
1704    ///
1705    /// The number of rows exported on success.
1706    ///
1707    /// # Errors
1708    ///
1709    /// Returns `ExportError` if the export fails.
1710    ///
1711    /// # Example
1712    ///
1713    pub fn blocking_export_to_parquet(
1714        &mut self,
1715        source: crate::query::export::ExportSource,
1716        file_path: &std::path::Path,
1717        options: crate::export::parquet::ParquetExportOptions,
1718    ) -> Result<u64, crate::export::csv::ExportError> {
1719        blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
1720    }
1721
1722    /// Export data from an Exasol table or query to Arrow RecordBatches (blocking).
1723    ///
1724    /// This is a synchronous wrapper around [`export_to_record_batches`](Self::export_to_record_batches)
1725    /// for use in non-async contexts.
1726    ///
1727    /// # Arguments
1728    ///
1729    /// * `source` - The data source (table or query)
1730    /// * `options` - Export options
1731    ///
1732    /// # Returns
1733    ///
1734    /// A vector of RecordBatches on success.
1735    ///
1736    /// # Errors
1737    ///
1738    /// Returns `ExportError` if the export fails.
1739    ///
1740    /// # Example
1741    ///
1742    pub fn blocking_export_to_record_batches(
1743        &mut self,
1744        source: crate::query::export::ExportSource,
1745        options: crate::export::arrow::ArrowExportOptions,
1746    ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1747        blocking_runtime().block_on(self.export_to_record_batches(source, options))
1748    }
1749
1750    /// Export data from an Exasol table or query to an Arrow IPC file (blocking).
1751    ///
1752    /// This is a synchronous wrapper around [`export_to_arrow_ipc`](Self::export_to_arrow_ipc)
1753    /// for use in non-async contexts.
1754    ///
1755    /// # Arguments
1756    ///
1757    /// * `source` - The data source (table or query)
1758    /// * `file_path` - Path to the output Arrow IPC file
1759    /// * `options` - Export options
1760    ///
1761    /// # Returns
1762    ///
1763    /// The number of rows exported on success.
1764    ///
1765    /// # Errors
1766    ///
1767    /// Returns `ExportError` if the export fails.
1768    ///
1769    /// # Example
1770    ///
1771    pub fn blocking_export_to_arrow_ipc(
1772        &mut self,
1773        source: crate::query::export::ExportSource,
1774        file_path: &std::path::Path,
1775        options: crate::export::arrow::ArrowExportOptions,
1776    ) -> Result<u64, crate::export::csv::ExportError> {
1777        blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
1778    }
1779
1780    // ========================================================================
1781    // Private Helper Methods
1782    // ========================================================================
1783
1784    /// Update session state after query execution.
1785    async fn update_session_state_after_query(&self) {
1786        if self.session.in_transaction() {
1787            self.session.set_state(SessionState::InTransaction).await;
1788        } else {
1789            self.session.set_state(SessionState::Ready).await;
1790        }
1791        self.session.update_activity().await;
1792    }
1793}
1794
1795impl std::fmt::Debug for Connection {
1796    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1797        f.debug_struct("Connection")
1798            .field("session_id", &self.session.session_id())
1799            .field("host", &self.params.host)
1800            .field("port", &self.params.port)
1801            .field("username", &self.params.username)
1802            .field("in_transaction", &self.session.in_transaction())
1803            .finish()
1804    }
1805}
1806
1807/// Builder for creating Connection instances.
1808pub struct ConnectionBuilder {
1809    /// Connection parameters builder
1810    params_builder: crate::connection::params::ConnectionBuilder,
1811}
1812
1813impl ConnectionBuilder {
1814    /// Create a new ConnectionBuilder.
1815    pub fn new() -> Self {
1816        Self {
1817            params_builder: crate::connection::params::ConnectionBuilder::new(),
1818        }
1819    }
1820
1821    /// Set the database host.
1822    pub fn host(mut self, host: &str) -> Self {
1823        self.params_builder = self.params_builder.host(host);
1824        self
1825    }
1826
1827    /// Set the database port.
1828    pub fn port(mut self, port: u16) -> Self {
1829        self.params_builder = self.params_builder.port(port);
1830        self
1831    }
1832
1833    /// Set the username.
1834    pub fn username(mut self, username: &str) -> Self {
1835        self.params_builder = self.params_builder.username(username);
1836        self
1837    }
1838
1839    /// Set the password.
1840    pub fn password(mut self, password: &str) -> Self {
1841        self.params_builder = self.params_builder.password(password);
1842        self
1843    }
1844
1845    /// Set the default schema.
1846    pub fn schema(mut self, schema: &str) -> Self {
1847        self.params_builder = self.params_builder.schema(schema);
1848        self
1849    }
1850
1851    /// Enable or disable TLS.
1852    pub fn use_tls(mut self, use_tls: bool) -> Self {
1853        self.params_builder = self.params_builder.use_tls(use_tls);
1854        self
1855    }
1856
1857    /// Build and connect.
1858    ///
1859    /// # Returns
1860    ///
1861    /// A connected `Connection` instance.
1862    ///
1863    /// # Errors
1864    ///
1865    /// Returns `ExasolError` if the connection fails.
1866    pub async fn connect(self) -> Result<Connection, ExasolError> {
1867        let params = self.params_builder.build()?;
1868        Ok(Connection::from_params(params).await?)
1869    }
1870}
1871
1872impl Default for ConnectionBuilder {
1873    fn default() -> Self {
1874        Self::new()
1875    }
1876}
1877
1878#[cfg(test)]
1879mod tests {
1880    use super::*;
1881
1882    #[test]
1883    fn test_connection_builder() {
1884        let _builder = ConnectionBuilder::new()
1885            .host("localhost")
1886            .port(8563)
1887            .username("test")
1888            .password("secret")
1889            .schema("MY_SCHEMA")
1890            .use_tls(false);
1891
1892        // Builder should compile and be valid
1893        // Actual connection requires a running Exasol instance
1894    }
1895
1896    #[test]
1897    fn test_create_statement_is_sync() {
1898        // This test verifies that create_statement is synchronous
1899        // by calling it without await
1900        // Note: We can't actually create a Connection without a database,
1901        // but we can verify the API compiles correctly
1902    }
1903}
1904
1905/// Blocking wrapper tests
1906#[cfg(test)]
1907mod blocking_tests {
1908    use super::*;
1909
1910    #[test]
1911    fn test_session_type_alias_exists() {
1912        // Verify that Session is a type alias for Connection
1913        fn takes_session(_session: &Session) {}
1914        fn takes_connection(_connection: &Connection) {}
1915
1916        // This should compile, showing Session = Connection
1917        fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
1918        where
1919            F1: Fn(&Session),
1920            F2: Fn(&Connection),
1921        {
1922            let _ = (f1, f2);
1923        }
1924
1925        verify_interchangeable(takes_session, takes_connection);
1926    }
1927
1928    #[test]
1929    fn test_blocking_runtime_exists() {
1930        // Verify that blocking_runtime() function exists and returns a Runtime
1931        let runtime = blocking_runtime();
1932        // If this compiles, the runtime is valid
1933        let _ = runtime.handle();
1934    }
1935
1936    #[test]
1937    fn test_connection_has_blocking_import_csv() {
1938        // This test verifies the blocking_import_csv_from_file method exists
1939        // by checking the method signature compiles
1940        fn _check_method_exists(_conn: &mut Connection) {
1941            // Method signature check - will fail to compile if method doesn't exist
1942            let _: fn(
1943                &mut Connection,
1944                &str,
1945                &std::path::Path,
1946                crate::import::csv::CsvImportOptions,
1947            ) -> Result<u64, crate::import::ImportError> =
1948                Connection::blocking_import_csv_from_file;
1949        }
1950    }
1951
1952    #[test]
1953    fn test_connection_has_blocking_import_parquet() {
1954        fn _check_method_exists(_conn: &mut Connection) {
1955            let _: fn(
1956                &mut Connection,
1957                &str,
1958                &std::path::Path,
1959                crate::import::parquet::ParquetImportOptions,
1960            ) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
1961        }
1962    }
1963
1964    #[test]
1965    fn test_connection_has_blocking_import_record_batch() {
1966        fn _check_method_exists(_conn: &mut Connection) {
1967            let _: fn(
1968                &mut Connection,
1969                &str,
1970                &RecordBatch,
1971                crate::import::arrow::ArrowImportOptions,
1972            ) -> Result<u64, crate::import::ImportError> =
1973                Connection::blocking_import_from_record_batch;
1974        }
1975    }
1976
1977    #[test]
1978    fn test_connection_has_blocking_import_arrow_ipc() {
1979        fn _check_method_exists(_conn: &mut Connection) {
1980            let _: fn(
1981                &mut Connection,
1982                &str,
1983                &std::path::Path,
1984                crate::import::arrow::ArrowImportOptions,
1985            ) -> Result<u64, crate::import::ImportError> =
1986                Connection::blocking_import_from_arrow_ipc;
1987        }
1988    }
1989
1990    #[test]
1991    fn test_connection_has_blocking_export_csv() {
1992        fn _check_method_exists(_conn: &mut Connection) {
1993            let _: fn(
1994                &mut Connection,
1995                crate::query::export::ExportSource,
1996                &std::path::Path,
1997                crate::export::csv::CsvExportOptions,
1998            ) -> Result<u64, crate::export::csv::ExportError> =
1999                Connection::blocking_export_csv_to_file;
2000        }
2001    }
2002
2003    #[test]
2004    fn test_connection_has_blocking_export_parquet() {
2005        fn _check_method_exists(_conn: &mut Connection) {
2006            let _: fn(
2007                &mut Connection,
2008                crate::query::export::ExportSource,
2009                &std::path::Path,
2010                crate::export::parquet::ParquetExportOptions,
2011            ) -> Result<u64, crate::export::csv::ExportError> =
2012                Connection::blocking_export_to_parquet;
2013        }
2014    }
2015
2016    #[test]
2017    fn test_connection_has_blocking_export_record_batches() {
2018        fn _check_method_exists(_conn: &mut Connection) {
2019            let _: fn(
2020                &mut Connection,
2021                crate::query::export::ExportSource,
2022                crate::export::arrow::ArrowExportOptions,
2023            ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
2024                Connection::blocking_export_to_record_batches;
2025        }
2026    }
2027
2028    #[test]
2029    fn test_connection_has_blocking_export_arrow_ipc() {
2030        fn _check_method_exists(_conn: &mut Connection) {
2031            let _: fn(
2032                &mut Connection,
2033                crate::query::export::ExportSource,
2034                &std::path::Path,
2035                crate::export::arrow::ArrowExportOptions,
2036            ) -> Result<u64, crate::export::csv::ExportError> =
2037                Connection::blocking_export_to_arrow_ipc;
2038        }
2039    }
2040}