exarrow_rs/adbc/connection.rs
1//! ADBC Connection implementation.
2//!
3//! This module provides the `Connection` type which represents an active
4//! database connection and provides methods for executing queries.
5//!
6//! # New in v2.0.0
7//!
8//! Connection now owns the transport directly and Statement is a pure data container.
9//! Execute statements via Connection methods: `execute_statement()`, `execute_prepared()`.
10
11use crate::adbc::Statement;
12use crate::connection::auth::AuthResponseData;
13use crate::connection::params::ConnectionParams;
14use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
15use crate::error::{ConnectionError, ExasolError, QueryError};
16use crate::query::prepared::PreparedStatement;
17use crate::query::results::ResultSet;
18use crate::transport::protocol::{
19 ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
20 QueryResult, TransportProtocol,
21};
22use crate::transport::WebSocketTransport;
23use arrow::array::RecordBatch;
24use std::sync::{Arc, OnceLock};
25use std::time::Duration;
26use tokio::runtime::Runtime;
27use tokio::sync::Mutex;
28use tokio::time::timeout;
29
30/// Session is a type alias for Connection.
31///
32/// This alias provides a more intuitive name for database sessions when
33/// performing import/export operations. Both `Session` and `Connection`
34/// can be used interchangeably.
35///
36/// # Example
37///
38pub type Session = Connection;
39
40/// Global tokio runtime for blocking operations.
41///
42/// This runtime is lazily initialized on first use and is shared across
43/// all blocking import/export operations. It provides a way to call
44/// async methods from synchronous code.
45fn blocking_runtime() -> &'static Runtime {
46 static RUNTIME: OnceLock<Runtime> = OnceLock::new();
47 RUNTIME.get_or_init(|| {
48 tokio::runtime::Builder::new_multi_thread()
49 .enable_all()
50 .build()
51 .expect("Failed to create tokio runtime for blocking operations")
52 })
53}
54
55/// ADBC Connection to an Exasol database.
56///
57/// The `Connection` type represents an active database connection and provides
58/// methods for executing queries, managing transactions, and retrieving metadata.
59///
60/// # v2.0.0 Breaking Changes
61///
62/// - Connection now owns the transport directly
63/// - `create_statement()` is now synchronous and returns a pure data container
64/// - Use `execute_statement()` instead of `Statement::execute()`
65/// - Use `prepare()` instead of `Statement::prepare()`
66///
67/// # Example
68///
69pub struct Connection {
70 /// Transport layer for communication (owned by Connection)
71 transport: Arc<Mutex<dyn TransportProtocol>>,
72 /// Session information
73 session: SessionInfo,
74 /// Connection parameters
75 params: ConnectionParams,
76}
77
78impl Connection {
79 /// Create a connection from connection parameters.
80 ///
81 /// This establishes a connection to the Exasol database using WebSocket transport,
82 /// authenticates the user, and creates a session.
83 ///
84 /// # Arguments
85 ///
86 /// * `params` - Connection parameters
87 ///
88 /// # Returns
89 ///
90 /// A connected `Connection` instance.
91 ///
92 /// # Errors
93 ///
94 /// Returns `ConnectionError` if the connection or authentication fails.
95 pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
96 // Create transport
97 let mut transport = WebSocketTransport::new();
98
99 // Convert ConnectionParams to TransportConnectionParams
100 let transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
101 .with_tls(params.use_tls)
102 .with_validate_server_certificate(params.validate_server_certificate)
103 .with_timeout(params.connection_timeout.as_millis() as u64);
104
105 // Connect
106 transport.connect(&transport_params).await.map_err(|e| {
107 ConnectionError::ConnectionFailed {
108 host: params.host.clone(),
109 port: params.port,
110 message: e.to_string(),
111 }
112 })?;
113
114 // Authenticate
115 let credentials =
116 TransportCredentials::new(params.username.clone(), params.password().to_string());
117 let session_info = transport
118 .authenticate(&credentials)
119 .await
120 .map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
121
122 // Create session config from connection params
123 let session_config = SessionConfig {
124 idle_timeout: params.idle_timeout,
125 query_timeout: params.query_timeout,
126 ..Default::default()
127 };
128
129 // Extract session_id once to avoid double clone
130 let session_id = session_info.session_id.clone();
131
132 // Convert SessionInfo to AuthResponseData
133 let auth_response = AuthResponseData {
134 session_id: session_id.clone(),
135 protocol_version: session_info.protocol_version,
136 release_version: session_info.release_version,
137 database_name: session_info.database_name,
138 product_name: session_info.product_name,
139 max_data_message_size: session_info.max_data_message_size,
140 max_identifier_length: 128, // Default value
141 max_varchar_length: 2_000_000, // Default value
142 identifier_quote_string: "\"".to_string(),
143 time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
144 time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
145 };
146
147 // Create session (owned, not Arc)
148 let session = SessionInfo::new(session_id, auth_response, session_config);
149
150 // Set schema if specified
151 if let Some(schema) = ¶ms.schema {
152 session.set_current_schema(Some(schema.clone())).await;
153 }
154
155 Ok(Self {
156 transport: Arc::new(Mutex::new(transport)),
157 session,
158 params,
159 })
160 }
161
162 /// Create a builder for constructing a connection.
163 ///
164 /// # Returns
165 ///
166 /// A `ConnectionBuilder` instance.
167 ///
168 /// # Example
169 ///
170 pub fn builder() -> ConnectionBuilder {
171 ConnectionBuilder::new()
172 }
173
174 // ========================================================================
175 // Statement Creation (synchronous - Statement is now a pure data container)
176 // ========================================================================
177
178 /// Create a new statement for executing SQL.
179 ///
180 /// Statement is now a pure data container. Use `execute_statement()` to execute it.
181 ///
182 /// # Arguments
183 ///
184 /// * `sql` - SQL query text
185 ///
186 /// # Returns
187 ///
188 /// A `Statement` instance ready for execution via `execute_statement()`.
189 ///
190 /// # Example
191 ///
192 pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
193 Statement::new(sql)
194 }
195
196 // ========================================================================
197 // Statement Execution Methods
198 // ========================================================================
199
200 /// Execute a statement and return results.
201 ///
202 /// # Arguments
203 ///
204 /// * `stmt` - Statement to execute
205 ///
206 /// # Returns
207 ///
208 /// A `ResultSet` containing the query results.
209 ///
210 /// # Errors
211 ///
212 /// Returns `QueryError` if execution fails or times out.
213 ///
214 /// # Example
215 ///
216 pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
217 // Validate session state
218 self.session
219 .validate_ready()
220 .await
221 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
222
223 // Update session state
224 self.session.set_state(SessionState::Executing).await;
225
226 // Increment query counter
227 self.session.increment_query_count();
228
229 // Build final SQL with parameters
230 let final_sql = stmt.build_sql()?;
231
232 // Execute with timeout
233 let timeout_duration = Duration::from_millis(stmt.timeout_ms());
234 let transport = Arc::clone(&self.transport);
235
236 let result = timeout(timeout_duration, async move {
237 let mut transport_guard = transport.lock().await;
238 transport_guard.execute_query(&final_sql).await
239 })
240 .await
241 .map_err(|_| QueryError::Timeout {
242 timeout_ms: stmt.timeout_ms(),
243 })?
244 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
245
246 // Update session state back to ready/in_transaction
247 self.update_session_state_after_query().await;
248
249 // Convert transport result to ResultSet
250 ResultSet::from_transport_result(result, Arc::clone(&self.transport))
251 }
252
253 /// Execute a statement and return the row count (for non-SELECT statements).
254 ///
255 /// # Arguments
256 ///
257 /// * `stmt` - Statement to execute
258 ///
259 /// # Returns
260 ///
261 /// The number of rows affected.
262 ///
263 /// # Errors
264 ///
265 /// Returns `QueryError` if execution fails or if statement is a SELECT.
266 ///
267 /// # Example
268 ///
269 pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
270 let result_set = self.execute_statement(stmt).await?;
271
272 result_set.row_count().ok_or_else(|| {
273 QueryError::NoResultSet("Expected row count, got result set".to_string())
274 })
275 }
276
277 // ========================================================================
278 // Prepared Statement Methods
279 // ========================================================================
280
281 /// Create a prepared statement for parameterized query execution.
282 ///
283 /// This creates a server-side prepared statement that can be executed
284 /// multiple times with different parameter values.
285 ///
286 /// # Arguments
287 ///
288 /// * `sql` - SQL statement with parameter placeholders (?)
289 ///
290 /// # Returns
291 ///
292 /// A `PreparedStatement` ready for parameter binding and execution.
293 ///
294 /// # Errors
295 ///
296 /// Returns `QueryError` if preparation fails.
297 ///
298 /// # Example
299 ///
300 pub async fn prepare(
301 &mut self,
302 sql: impl Into<String>,
303 ) -> Result<PreparedStatement, QueryError> {
304 let sql = sql.into();
305
306 // Validate session state
307 self.session
308 .validate_ready()
309 .await
310 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
311
312 let mut transport = self.transport.lock().await;
313 let handle = transport
314 .create_prepared_statement(&sql)
315 .await
316 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
317
318 Ok(PreparedStatement::new(handle))
319 }
320
321 /// Execute a prepared statement and return results.
322 ///
323 /// # Arguments
324 ///
325 /// * `stmt` - Prepared statement to execute
326 ///
327 /// # Returns
328 ///
329 /// A `ResultSet` containing the query results.
330 ///
331 /// # Errors
332 ///
333 /// Returns `QueryError` if execution fails.
334 ///
335 /// # Example
336 ///
337 pub async fn execute_prepared(
338 &mut self,
339 stmt: &PreparedStatement,
340 ) -> Result<ResultSet, QueryError> {
341 if stmt.is_closed() {
342 return Err(QueryError::StatementClosed);
343 }
344
345 // Validate session state
346 self.session
347 .validate_ready()
348 .await
349 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
350
351 // Update session state
352 self.session.set_state(SessionState::Executing).await;
353
354 // Increment query counter
355 self.session.increment_query_count();
356
357 // Convert parameters to column-major JSON format
358 let params_data = stmt.build_parameters_data()?;
359
360 let mut transport = self.transport.lock().await;
361 let result = transport
362 .execute_prepared_statement(stmt.handle_ref(), params_data)
363 .await
364 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
365
366 drop(transport);
367
368 // Update session state back to ready/in_transaction
369 self.update_session_state_after_query().await;
370
371 ResultSet::from_transport_result(result, Arc::clone(&self.transport))
372 }
373
374 /// Execute a prepared statement and return the number of affected rows.
375 ///
376 /// Use this for INSERT, UPDATE, DELETE statements.
377 ///
378 /// # Arguments
379 ///
380 /// * `stmt` - Prepared statement to execute
381 ///
382 /// # Returns
383 ///
384 /// The number of rows affected.
385 ///
386 /// # Errors
387 ///
388 /// Returns `QueryError` if execution fails or statement returns a result set.
389 pub async fn execute_prepared_update(
390 &mut self,
391 stmt: &PreparedStatement,
392 ) -> Result<i64, QueryError> {
393 if stmt.is_closed() {
394 return Err(QueryError::StatementClosed);
395 }
396
397 // Validate session state
398 self.session
399 .validate_ready()
400 .await
401 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
402
403 // Update session state
404 self.session.set_state(SessionState::Executing).await;
405
406 // Convert parameters to column-major JSON format
407 let params_data = stmt.build_parameters_data()?;
408
409 let mut transport = self.transport.lock().await;
410 let result = transport
411 .execute_prepared_statement(stmt.handle_ref(), params_data)
412 .await
413 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
414
415 drop(transport);
416
417 // Update session state back to ready/in_transaction
418 self.update_session_state_after_query().await;
419
420 match result {
421 QueryResult::RowCount { count } => Ok(count),
422 QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
423 }
424 }
425
426 /// Close a prepared statement and release server-side resources.
427 ///
428 /// # Arguments
429 ///
430 /// * `stmt` - Prepared statement to close (consumed)
431 ///
432 /// # Errors
433 ///
434 /// Returns `QueryError` if closing fails.
435 ///
436 /// # Example
437 ///
438 pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
439 if stmt.is_closed() {
440 return Ok(());
441 }
442
443 let mut transport = self.transport.lock().await;
444 transport
445 .close_prepared_statement(stmt.handle_ref())
446 .await
447 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
448
449 stmt.mark_closed();
450 Ok(())
451 }
452
453 // ========================================================================
454 // Convenience Methods (these internally use execute_statement)
455 // ========================================================================
456
457 /// Execute a SQL query and return results.
458 ///
459 /// This is a convenience method that creates a statement and executes it.
460 ///
461 /// # Arguments
462 ///
463 /// * `sql` - SQL query text
464 ///
465 /// # Returns
466 ///
467 /// A `ResultSet` containing the query results.
468 ///
469 /// # Errors
470 ///
471 /// Returns `QueryError` if execution fails.
472 ///
473 /// # Example
474 ///
475 pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
476 let stmt = self.create_statement(sql);
477 self.execute_statement(&stmt).await
478 }
479
480 /// Execute a SQL query and return all results as RecordBatches.
481 ///
482 /// This is a convenience method that fetches all results into memory.
483 ///
484 /// # Arguments
485 ///
486 /// * `sql` - SQL query text
487 ///
488 /// # Returns
489 ///
490 /// A vector of `RecordBatch` instances.
491 ///
492 /// # Errors
493 ///
494 /// Returns `QueryError` if execution fails.
495 ///
496 /// # Example
497 ///
498 pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
499 let result_set = self.execute(sql).await?;
500 result_set.fetch_all().await
501 }
502
503 /// Execute a non-SELECT statement and return the row count.
504 ///
505 /// # Arguments
506 ///
507 /// * `sql` - SQL statement (INSERT, UPDATE, DELETE, etc.)
508 ///
509 /// # Returns
510 ///
511 /// The number of rows affected.
512 ///
513 /// # Errors
514 ///
515 /// Returns `QueryError` if execution fails.
516 ///
517 /// # Example
518 ///
519 pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
520 let stmt = self.create_statement(sql);
521 self.execute_statement_update(&stmt).await
522 }
523
524 // ========================================================================
525 // Transaction Methods
526 // ========================================================================
527
528 /// Begin a transaction.
529 ///
530 /// # Errors
531 ///
532 /// Returns `QueryError` if a transaction is already active or if the operation fails.
533 ///
534 /// # Example
535 ///
536 pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
537 // Exasol doesn't have a BEGIN statement - transactions are implicit.
538 // Starting a transaction just means we're tracking that autocommit is off.
539 // The actual transaction begins with the first DML/query.
540 self.session
541 .begin_transaction()
542 .await
543 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
544
545 Ok(())
546 }
547
548 /// Commit the current transaction.
549 ///
550 /// # Errors
551 ///
552 /// Returns `QueryError` if no transaction is active or if the operation fails.
553 ///
554 /// # Example
555 ///
556 pub async fn commit(&mut self) -> Result<(), QueryError> {
557 // Execute COMMIT statement
558 self.execute_update("COMMIT").await?;
559
560 self.session
561 .commit_transaction()
562 .await
563 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
564
565 Ok(())
566 }
567
568 /// Rollback the current transaction.
569 ///
570 /// # Errors
571 ///
572 /// Returns `QueryError` if no transaction is active or if the operation fails.
573 ///
574 /// # Example
575 ///
576 pub async fn rollback(&mut self) -> Result<(), QueryError> {
577 // Execute ROLLBACK statement
578 self.execute_update("ROLLBACK").await?;
579
580 self.session
581 .rollback_transaction()
582 .await
583 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
584
585 Ok(())
586 }
587
588 /// Check if a transaction is currently active.
589 ///
590 /// # Returns
591 ///
592 /// `true` if a transaction is active, `false` otherwise.
593 pub fn in_transaction(&self) -> bool {
594 self.session.in_transaction()
595 }
596
597 // ========================================================================
598 // Session and Schema Methods
599 // ========================================================================
600
601 /// Get the current schema.
602 ///
603 /// # Returns
604 ///
605 /// The current schema name, or `None` if no schema is set.
606 pub async fn current_schema(&self) -> Option<String> {
607 self.session.current_schema().await
608 }
609
610 /// Set the current schema.
611 ///
612 /// # Arguments
613 ///
614 /// * `schema` - The schema name to set
615 ///
616 /// # Errors
617 ///
618 /// Returns `QueryError` if the operation fails.
619 ///
620 /// # Example
621 ///
622 pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
623 let schema_name = schema.into();
624 self.execute_update(format!("OPEN SCHEMA {}", schema_name))
625 .await?;
626 self.session.set_current_schema(Some(schema_name)).await;
627 Ok(())
628 }
629
630 // ========================================================================
631 // Metadata Methods
632 // ========================================================================
633
634 /// Get metadata about catalogs.
635 ///
636 /// # Returns
637 ///
638 /// A `ResultSet` containing catalog metadata.
639 ///
640 /// # Errors
641 ///
642 /// Returns `QueryError` if the operation fails.
643 pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
644 self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
645 .await
646 }
647
648 /// Get metadata about schemas.
649 ///
650 /// # Arguments
651 ///
652 /// * `catalog` - Optional catalog name filter
653 ///
654 /// # Returns
655 ///
656 /// A `ResultSet` containing schema metadata.
657 ///
658 /// # Errors
659 ///
660 /// Returns `QueryError` if the operation fails.
661 pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
662 let sql = if let Some(cat) = catalog {
663 format!(
664 "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
665 cat.replace('\'', "''")
666 )
667 } else {
668 "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
669 };
670 self.execute(sql).await
671 }
672
673 /// Get metadata about tables.
674 ///
675 /// # Arguments
676 ///
677 /// * `catalog` - Optional catalog name filter
678 /// * `schema` - Optional schema name filter
679 /// * `table` - Optional table name filter
680 ///
681 /// # Returns
682 ///
683 /// A `ResultSet` containing table metadata.
684 ///
685 /// # Errors
686 ///
687 /// Returns `QueryError` if the operation fails.
688 pub async fn get_tables(
689 &mut self,
690 catalog: Option<&str>,
691 schema: Option<&str>,
692 table: Option<&str>,
693 ) -> Result<ResultSet, QueryError> {
694 let mut conditions = Vec::new();
695
696 if let Some(cat) = catalog {
697 conditions.push(format!("TABLE_SCHEMA = '{}'", cat.replace('\'', "''")));
698 }
699 if let Some(sch) = schema {
700 conditions.push(format!("TABLE_SCHEMA = '{}'", sch.replace('\'', "''")));
701 }
702 if let Some(tbl) = table {
703 conditions.push(format!("TABLE_NAME = '{}'", tbl.replace('\'', "''")));
704 }
705
706 let where_clause = if conditions.is_empty() {
707 String::new()
708 } else {
709 format!("WHERE {}", conditions.join(" AND "))
710 };
711
712 let sql = format!(
713 "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM SYS.EXA_ALL_TABLES {} ORDER BY TABLE_SCHEMA, TABLE_NAME",
714 where_clause
715 );
716
717 self.execute(sql).await
718 }
719
720 /// Get metadata about columns.
721 ///
722 /// # Arguments
723 ///
724 /// * `catalog` - Optional catalog name filter
725 /// * `schema` - Optional schema name filter
726 /// * `table` - Optional table name filter
727 /// * `column` - Optional column name filter
728 ///
729 /// # Returns
730 ///
731 /// A `ResultSet` containing column metadata.
732 ///
733 /// # Errors
734 ///
735 /// Returns `QueryError` if the operation fails.
736 pub async fn get_columns(
737 &mut self,
738 catalog: Option<&str>,
739 schema: Option<&str>,
740 table: Option<&str>,
741 column: Option<&str>,
742 ) -> Result<ResultSet, QueryError> {
743 let mut conditions = Vec::new();
744
745 if let Some(cat) = catalog {
746 conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
747 }
748 if let Some(sch) = schema {
749 conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
750 }
751 if let Some(tbl) = table {
752 conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
753 }
754 if let Some(col) = column {
755 conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
756 }
757
758 let where_clause = if conditions.is_empty() {
759 String::new()
760 } else {
761 format!("WHERE {}", conditions.join(" AND "))
762 };
763
764 let sql = format!(
765 "SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
766 FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
767 where_clause
768 );
769
770 self.execute(sql).await
771 }
772
773 // ========================================================================
774 // Session Information Methods
775 // ========================================================================
776
777 /// Get session information.
778 ///
779 /// # Returns
780 ///
781 /// The session ID.
782 pub fn session_id(&self) -> &str {
783 self.session.session_id()
784 }
785
786 /// Get connection parameters.
787 ///
788 /// # Returns
789 ///
790 /// A reference to the connection parameters.
791 pub fn params(&self) -> &ConnectionParams {
792 &self.params
793 }
794
795 /// Check if the connection is closed.
796 ///
797 /// # Returns
798 ///
799 /// `true` if the connection is closed, `false` otherwise.
800 pub async fn is_closed(&self) -> bool {
801 self.session.is_closed().await
802 }
803
804 /// Close the connection.
805 ///
806 /// This closes the session and transport layer.
807 ///
808 /// # Errors
809 ///
810 /// Returns `ConnectionError` if closing fails.
811 ///
812 /// # Example
813 ///
814 pub async fn close(self) -> Result<(), ConnectionError> {
815 // Close session
816 self.session.close().await?;
817
818 // Close transport
819 let mut transport = self.transport.lock().await;
820 transport
821 .close()
822 .await
823 .map_err(|e| ConnectionError::ConnectionFailed {
824 host: self.params.host.clone(),
825 port: self.params.port,
826 message: e.to_string(),
827 })?;
828
829 Ok(())
830 }
831
832 // ========================================================================
833 // Import/Export Methods
834 // ========================================================================
835
836 /// Creates an SQL executor closure for import/export operations.
837 ///
838 /// This is a helper method that creates a closure which can execute SQL
839 /// statements and return the row count. The closure captures a cloned
840 /// reference to the transport, allowing it to be called multiple times
841 /// (e.g., for parallel file imports).
842 ///
843 /// # Returns
844 ///
845 /// A closure that takes an SQL string and returns a Future resolving to
846 /// either the affected row count or an error string.
847 fn make_sql_executor(
848 &self,
849 ) -> impl Fn(
850 String,
851 )
852 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
853 let transport = Arc::clone(&self.transport);
854 move |sql: String| {
855 let transport = Arc::clone(&transport);
856 Box::pin(async move {
857 let mut transport_guard = transport.lock().await;
858 match transport_guard.execute_query(&sql).await {
859 Ok(QueryResult::RowCount { count }) => Ok(count as u64),
860 Ok(QueryResult::ResultSet { .. }) => Ok(0),
861 Err(e) => Err(e.to_string()),
862 }
863 })
864 }
865 }
866
867 /// Import CSV data from a file into an Exasol table.
868 ///
869 /// This method reads CSV data from the specified file and imports it into
870 /// the target table using Exasol's HTTP transport layer.
871 ///
872 /// # Arguments
873 ///
874 /// * `table` - Name of the target table
875 /// * `file_path` - Path to the CSV file
876 /// * `options` - Import options
877 ///
878 /// # Returns
879 ///
880 /// The number of rows imported on success.
881 ///
882 /// # Errors
883 ///
884 /// Returns `ImportError` if the import fails.
885 ///
886 /// # Example
887 ///
888 pub async fn import_csv_from_file(
889 &mut self,
890 table: &str,
891 file_path: &std::path::Path,
892 options: crate::import::csv::CsvImportOptions,
893 ) -> Result<u64, crate::import::ImportError> {
894 // Pass Exasol host/port from connection params to import options
895 let options = options
896 .exasol_host(&self.params.host)
897 .exasol_port(self.params.port);
898
899 crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
900 .await
901 }
902
903 /// Import CSV data from an async reader into an Exasol table.
904 ///
905 /// This method reads CSV data from an async reader and imports it into
906 /// the target table using Exasol's HTTP transport layer.
907 ///
908 /// # Arguments
909 ///
910 /// * `table` - Name of the target table
911 /// * `reader` - Async reader providing CSV data
912 /// * `options` - Import options
913 ///
914 /// # Returns
915 ///
916 /// The number of rows imported on success.
917 ///
918 /// # Errors
919 ///
920 /// Returns `ImportError` if the import fails.
921 pub async fn import_csv_from_stream<R>(
922 &mut self,
923 table: &str,
924 reader: R,
925 options: crate::import::csv::CsvImportOptions,
926 ) -> Result<u64, crate::import::ImportError>
927 where
928 R: tokio::io::AsyncRead + Unpin + Send + 'static,
929 {
930 // Pass Exasol host/port from connection params to import options
931 let options = options
932 .exasol_host(&self.params.host)
933 .exasol_port(self.params.port);
934
935 crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
936 .await
937 }
938
939 /// Import CSV data from an iterator into an Exasol table.
940 ///
941 /// This method converts iterator rows to CSV format and imports them into
942 /// the target table using Exasol's HTTP transport layer.
943 ///
944 /// # Arguments
945 ///
946 /// * `table` - Name of the target table
947 /// * `rows` - Iterator of rows, where each row is an iterator of field values
948 /// * `options` - Import options
949 ///
950 /// # Returns
951 ///
952 /// The number of rows imported on success.
953 ///
954 /// # Errors
955 ///
956 /// Returns `ImportError` if the import fails.
957 ///
958 /// # Example
959 ///
960 pub async fn import_csv_from_iter<I, T, S>(
961 &mut self,
962 table: &str,
963 rows: I,
964 options: crate::import::csv::CsvImportOptions,
965 ) -> Result<u64, crate::import::ImportError>
966 where
967 I: IntoIterator<Item = T> + Send + 'static,
968 T: IntoIterator<Item = S> + Send,
969 S: AsRef<str>,
970 {
971 // Pass Exasol host/port from connection params to import options
972 let options = options
973 .exasol_host(&self.params.host)
974 .exasol_port(self.params.port);
975
976 crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
977 }
978
979 /// Export data from an Exasol table or query to a CSV file.
980 ///
981 /// This method exports data from the specified source to a CSV file
982 /// using Exasol's HTTP transport layer.
983 ///
984 /// # Arguments
985 ///
986 /// * `source` - The data source (table or query)
987 /// * `file_path` - Path to the output file
988 /// * `options` - Export options
989 ///
990 /// # Returns
991 ///
992 /// The number of rows exported on success.
993 ///
994 /// # Errors
995 ///
996 /// Returns `ExportError` if the export fails.
997 ///
998 /// # Example
999 ///
1000 pub async fn export_csv_to_file(
1001 &mut self,
1002 source: crate::query::export::ExportSource,
1003 file_path: &std::path::Path,
1004 options: crate::export::csv::CsvExportOptions,
1005 ) -> Result<u64, crate::export::csv::ExportError> {
1006 // Pass Exasol host/port from connection params to export options
1007 let options = options
1008 .exasol_host(&self.params.host)
1009 .exasol_port(self.params.port);
1010
1011 let mut transport_guard = self.transport.lock().await;
1012 crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
1013 }
1014
1015 /// Export data from an Exasol table or query to an async writer.
1016 ///
1017 /// This method exports data from the specified source to an async writer
1018 /// using Exasol's HTTP transport layer.
1019 ///
1020 /// # Arguments
1021 ///
1022 /// * `source` - The data source (table or query)
1023 /// * `writer` - Async writer to write the CSV data to
1024 /// * `options` - Export options
1025 ///
1026 /// # Returns
1027 ///
1028 /// The number of rows exported on success.
1029 ///
1030 /// # Errors
1031 ///
1032 /// Returns `ExportError` if the export fails.
1033 pub async fn export_csv_to_stream<W>(
1034 &mut self,
1035 source: crate::query::export::ExportSource,
1036 writer: W,
1037 options: crate::export::csv::CsvExportOptions,
1038 ) -> Result<u64, crate::export::csv::ExportError>
1039 where
1040 W: tokio::io::AsyncWrite + Unpin,
1041 {
1042 // Pass Exasol host/port from connection params to export options
1043 let options = options
1044 .exasol_host(&self.params.host)
1045 .exasol_port(self.params.port);
1046
1047 let mut transport_guard = self.transport.lock().await;
1048 crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
1049 }
1050
1051 /// Export data from an Exasol table or query to an in-memory list of rows.
1052 ///
1053 /// Each row is represented as a vector of string values.
1054 ///
1055 /// # Arguments
1056 ///
1057 /// * `source` - The data source (table or query)
1058 /// * `options` - Export options
1059 ///
1060 /// # Returns
1061 ///
1062 /// A vector of rows, where each row is a vector of column values.
1063 ///
1064 /// # Errors
1065 ///
1066 /// Returns `ExportError` if the export fails.
1067 ///
1068 /// # Example
1069 ///
1070 pub async fn export_csv_to_list(
1071 &mut self,
1072 source: crate::query::export::ExportSource,
1073 options: crate::export::csv::CsvExportOptions,
1074 ) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
1075 // Pass Exasol host/port from connection params to export options
1076 let options = options
1077 .exasol_host(&self.params.host)
1078 .exasol_port(self.params.port);
1079
1080 let mut transport_guard = self.transport.lock().await;
1081 crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
1082 }
1083
1084 /// Import multiple CSV files in parallel into an Exasol table.
1085 ///
1086 /// This method reads CSV data from multiple files and imports them into
1087 /// the target table using parallel HTTP transport connections. Each file
1088 /// gets its own connection with a unique internal address.
1089 ///
1090 /// For a single file, this method delegates to `import_csv_from_file`.
1091 ///
1092 /// # Arguments
1093 ///
1094 /// * `table` - Name of the target table
1095 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1096 /// * `options` - Import options
1097 ///
1098 /// # Returns
1099 ///
1100 /// The number of rows imported on success.
1101 ///
1102 /// # Errors
1103 ///
1104 /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1105 ///
1106 /// # Example
1107 ///
1108 /// ```no_run
1109 /// use exarrow_rs::adbc::Connection;
1110 /// use exarrow_rs::import::CsvImportOptions;
1111 /// use std::path::PathBuf;
1112 ///
1113 /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1114 /// let files = vec![
1115 /// PathBuf::from("/data/part1.csv"),
1116 /// PathBuf::from("/data/part2.csv"),
1117 /// ];
1118 ///
1119 /// let options = CsvImportOptions::default();
1120 /// let rows = conn.import_csv_from_files("my_table", files, options).await?;
1121 /// # Ok(())
1122 /// # }
1123 /// ```
1124 pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
1125 &mut self,
1126 table: &str,
1127 paths: S,
1128 options: crate::import::csv::CsvImportOptions,
1129 ) -> Result<u64, crate::import::ImportError> {
1130 // Pass Exasol host/port from connection params to import options
1131 let options = options
1132 .exasol_host(&self.params.host)
1133 .exasol_port(self.params.port);
1134
1135 crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
1136 }
1137
1138 /// Import data from a Parquet file into an Exasol table.
1139 ///
1140 /// This method reads a Parquet file, converts the data to CSV format,
1141 /// and imports it into the target table using Exasol's HTTP transport layer.
1142 ///
1143 /// # Arguments
1144 ///
1145 /// * `table` - Name of the target table
1146 /// * `file_path` - Path to the Parquet file
1147 /// * `options` - Import options
1148 ///
1149 /// # Returns
1150 ///
1151 /// The number of rows imported on success.
1152 ///
1153 /// # Errors
1154 ///
1155 /// Returns `ImportError` if the import fails.
1156 ///
1157 /// # Example
1158 ///
1159 pub async fn import_from_parquet(
1160 &mut self,
1161 table: &str,
1162 file_path: &std::path::Path,
1163 options: crate::import::parquet::ParquetImportOptions,
1164 ) -> Result<u64, crate::import::ImportError> {
1165 // Pass Exasol host/port from connection params to import options
1166 let options = options
1167 .with_exasol_host(&self.params.host)
1168 .with_exasol_port(self.params.port);
1169
1170 crate::import::parquet::import_from_parquet(
1171 self.make_sql_executor(),
1172 table,
1173 file_path,
1174 options,
1175 )
1176 .await
1177 }
1178
1179 /// Import multiple Parquet files in parallel into an Exasol table.
1180 ///
1181 /// This method converts each Parquet file to CSV format concurrently,
1182 /// then streams the data through parallel HTTP transport connections.
1183 ///
1184 /// For a single file, this method delegates to `import_from_parquet`.
1185 ///
1186 /// # Arguments
1187 ///
1188 /// * `table` - Name of the target table
1189 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1190 /// * `options` - Import options
1191 ///
1192 /// # Returns
1193 ///
1194 /// The number of rows imported on success.
1195 ///
1196 /// # Errors
1197 ///
1198 /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1199 ///
1200 /// # Example
1201 ///
1202 /// ```no_run
1203 /// use exarrow_rs::adbc::Connection;
1204 /// use exarrow_rs::import::ParquetImportOptions;
1205 /// use std::path::PathBuf;
1206 ///
1207 /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1208 /// let files = vec![
1209 /// PathBuf::from("/data/part1.parquet"),
1210 /// PathBuf::from("/data/part2.parquet"),
1211 /// ];
1212 ///
1213 /// let options = ParquetImportOptions::default();
1214 /// let rows = conn.import_parquet_from_files("my_table", files, options).await?;
1215 /// # Ok(())
1216 /// # }
1217 /// ```
1218 pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
1219 &mut self,
1220 table: &str,
1221 paths: S,
1222 options: crate::import::parquet::ParquetImportOptions,
1223 ) -> Result<u64, crate::import::ImportError> {
1224 // Pass Exasol host/port from connection params to import options
1225 let options = options
1226 .with_exasol_host(&self.params.host)
1227 .with_exasol_port(self.params.port);
1228
1229 crate::import::parquet::import_from_parquet_files(
1230 self.make_sql_executor(),
1231 table,
1232 paths,
1233 options,
1234 )
1235 .await
1236 }
1237
1238 /// Export data from an Exasol table or query to a Parquet file.
1239 ///
1240 /// This method exports data from the specified source to a Parquet file.
1241 /// The data is first received as CSV from Exasol, then converted to Parquet format.
1242 ///
1243 /// # Arguments
1244 ///
1245 /// * `source` - The data source (table or query)
1246 /// * `file_path` - Path to the output Parquet file
1247 /// * `options` - Export options
1248 ///
1249 /// # Returns
1250 ///
1251 /// The number of rows exported on success.
1252 ///
1253 /// # Errors
1254 ///
1255 /// Returns `ExportError` if the export fails.
1256 ///
1257 /// # Example
1258 ///
1259 pub async fn export_to_parquet(
1260 &mut self,
1261 source: crate::query::export::ExportSource,
1262 file_path: &std::path::Path,
1263 options: crate::export::parquet::ParquetExportOptions,
1264 ) -> Result<u64, crate::export::csv::ExportError> {
1265 // Pass Exasol host/port from connection params to export options
1266 let options = options
1267 .exasol_host(&self.params.host)
1268 .exasol_port(self.params.port);
1269
1270 let mut transport_guard = self.transport.lock().await;
1271 crate::export::parquet::export_to_parquet_via_transport(
1272 &mut *transport_guard,
1273 source,
1274 file_path,
1275 options,
1276 )
1277 .await
1278 }
1279
1280 /// Import data from an Arrow RecordBatch into an Exasol table.
1281 ///
1282 /// This method converts the RecordBatch to CSV format and imports it
1283 /// into the target table using Exasol's HTTP transport layer.
1284 ///
1285 /// # Arguments
1286 ///
1287 /// * `table` - Name of the target table
1288 /// * `batch` - The RecordBatch to import
1289 /// * `options` - Import options
1290 ///
1291 /// # Returns
1292 ///
1293 /// The number of rows imported on success.
1294 ///
1295 /// # Errors
1296 ///
1297 /// Returns `ImportError` if the import fails.
1298 ///
1299 /// # Example
1300 ///
1301 pub async fn import_from_record_batch(
1302 &mut self,
1303 table: &str,
1304 batch: &RecordBatch,
1305 options: crate::import::arrow::ArrowImportOptions,
1306 ) -> Result<u64, crate::import::ImportError> {
1307 // Pass Exasol host/port from connection params to import options
1308 let options = options
1309 .exasol_host(&self.params.host)
1310 .exasol_port(self.params.port);
1311
1312 crate::import::arrow::import_from_record_batch(
1313 self.make_sql_executor(),
1314 table,
1315 batch,
1316 options,
1317 )
1318 .await
1319 }
1320
1321 /// Import data from multiple Arrow RecordBatches into an Exasol table.
1322 ///
1323 /// This method converts each RecordBatch to CSV format and imports them
1324 /// into the target table using Exasol's HTTP transport layer.
1325 ///
1326 /// # Arguments
1327 ///
1328 /// * `table` - Name of the target table
1329 /// * `batches` - An iterator of RecordBatches to import
1330 /// * `options` - Import options
1331 ///
1332 /// # Returns
1333 ///
1334 /// The number of rows imported on success.
1335 ///
1336 /// # Errors
1337 ///
1338 /// Returns `ImportError` if the import fails.
1339 pub async fn import_from_record_batches<I>(
1340 &mut self,
1341 table: &str,
1342 batches: I,
1343 options: crate::import::arrow::ArrowImportOptions,
1344 ) -> Result<u64, crate::import::ImportError>
1345 where
1346 I: IntoIterator<Item = RecordBatch>,
1347 {
1348 // Pass Exasol host/port from connection params to import options
1349 let options = options
1350 .exasol_host(&self.params.host)
1351 .exasol_port(self.params.port);
1352
1353 crate::import::arrow::import_from_record_batches(
1354 self.make_sql_executor(),
1355 table,
1356 batches,
1357 options,
1358 )
1359 .await
1360 }
1361
1362 /// Import data from an Arrow IPC file/stream into an Exasol table.
1363 ///
1364 /// This method reads Arrow IPC format data, converts it to CSV,
1365 /// and imports it into the target table using Exasol's HTTP transport layer.
1366 ///
1367 /// # Arguments
1368 ///
1369 /// * `table` - Name of the target table
1370 /// * `reader` - An async reader containing Arrow IPC data
1371 /// * `options` - Import options
1372 ///
1373 /// # Returns
1374 ///
1375 /// The number of rows imported on success.
1376 ///
1377 /// # Errors
1378 ///
1379 /// Returns `ImportError` if the import fails.
1380 ///
1381 /// # Example
1382 ///
1383 pub async fn import_from_arrow_ipc<R>(
1384 &mut self,
1385 table: &str,
1386 reader: R,
1387 options: crate::import::arrow::ArrowImportOptions,
1388 ) -> Result<u64, crate::import::ImportError>
1389 where
1390 R: tokio::io::AsyncRead + Unpin + Send,
1391 {
1392 // Pass Exasol host/port from connection params to import options
1393 let options = options
1394 .exasol_host(&self.params.host)
1395 .exasol_port(self.params.port);
1396
1397 crate::import::arrow::import_from_arrow_ipc(
1398 self.make_sql_executor(),
1399 table,
1400 reader,
1401 options,
1402 )
1403 .await
1404 }
1405
1406 /// Export data from an Exasol table or query to Arrow RecordBatches.
1407 ///
1408 /// This method exports data from the specified source and converts it
1409 /// to Arrow RecordBatches.
1410 ///
1411 /// # Arguments
1412 ///
1413 /// * `source` - The data source (table or query)
1414 /// * `options` - Export options
1415 ///
1416 /// # Returns
1417 ///
1418 /// A vector of RecordBatches on success.
1419 ///
1420 /// # Errors
1421 ///
1422 /// Returns `ExportError` if the export fails.
1423 ///
1424 /// # Example
1425 ///
1426 pub async fn export_to_record_batches(
1427 &mut self,
1428 source: crate::query::export::ExportSource,
1429 options: crate::export::arrow::ArrowExportOptions,
1430 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1431 // Pass Exasol host/port from connection params to export options
1432 let options = options
1433 .exasol_host(&self.params.host)
1434 .exasol_port(self.params.port);
1435
1436 let mut transport_guard = self.transport.lock().await;
1437 crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
1438 }
1439
1440 /// Export data from an Exasol table or query to an Arrow IPC file.
1441 ///
1442 /// This method exports data from the specified source to an Arrow IPC file.
1443 ///
1444 /// # Arguments
1445 ///
1446 /// * `source` - The data source (table or query)
1447 /// * `file_path` - Path to the output Arrow IPC file
1448 /// * `options` - Export options
1449 ///
1450 /// # Returns
1451 ///
1452 /// The number of rows exported on success.
1453 ///
1454 /// # Errors
1455 ///
1456 /// Returns `ExportError` if the export fails.
1457 ///
1458 /// # Example
1459 ///
1460 pub async fn export_to_arrow_ipc(
1461 &mut self,
1462 source: crate::query::export::ExportSource,
1463 file_path: &std::path::Path,
1464 options: crate::export::arrow::ArrowExportOptions,
1465 ) -> Result<u64, crate::export::csv::ExportError> {
1466 // Pass Exasol host/port from connection params to export options
1467 let options = options
1468 .exasol_host(&self.params.host)
1469 .exasol_port(self.params.port);
1470
1471 let mut transport_guard = self.transport.lock().await;
1472 crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
1473 .await
1474 }
1475
1476 // ========================================================================
1477 // Blocking Import/Export Methods
1478 // ========================================================================
1479
1480 /// Import CSV data from a file into an Exasol table (blocking).
1481 ///
1482 /// This is a synchronous wrapper around [`import_csv_from_file`](Self::import_csv_from_file)
1483 /// for use in non-async contexts.
1484 ///
1485 /// # Arguments
1486 ///
1487 /// * `table` - Name of the target table
1488 /// * `file_path` - Path to the CSV file
1489 /// * `options` - Import options
1490 ///
1491 /// # Returns
1492 ///
1493 /// The number of rows imported on success.
1494 ///
1495 /// # Errors
1496 ///
1497 /// Returns `ImportError` if the import fails.
1498 ///
1499 /// # Example
1500 ///
1501 pub fn blocking_import_csv_from_file(
1502 &mut self,
1503 table: &str,
1504 file_path: &std::path::Path,
1505 options: crate::import::csv::CsvImportOptions,
1506 ) -> Result<u64, crate::import::ImportError> {
1507 blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
1508 }
1509
1510 /// Import multiple CSV files in parallel into an Exasol table (blocking).
1511 ///
1512 /// This is a synchronous wrapper around [`import_csv_from_files`](Self::import_csv_from_files)
1513 /// for use in non-async contexts.
1514 ///
1515 /// # Arguments
1516 ///
1517 /// * `table` - Name of the target table
1518 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1519 /// * `options` - Import options
1520 ///
1521 /// # Returns
1522 ///
1523 /// The number of rows imported on success.
1524 ///
1525 /// # Errors
1526 ///
1527 /// Returns `ImportError` if the import fails.
1528 pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
1529 &mut self,
1530 table: &str,
1531 paths: S,
1532 options: crate::import::csv::CsvImportOptions,
1533 ) -> Result<u64, crate::import::ImportError> {
1534 blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
1535 }
1536
1537 /// Import data from a Parquet file into an Exasol table (blocking).
1538 ///
1539 /// This is a synchronous wrapper around [`import_from_parquet`](Self::import_from_parquet)
1540 /// for use in non-async contexts.
1541 ///
1542 /// # Arguments
1543 ///
1544 /// * `table` - Name of the target table
1545 /// * `file_path` - Path to the Parquet file
1546 /// * `options` - Import options
1547 ///
1548 /// # Returns
1549 ///
1550 /// The number of rows imported on success.
1551 ///
1552 /// # Errors
1553 ///
1554 /// Returns `ImportError` if the import fails.
1555 ///
1556 /// # Example
1557 ///
1558 pub fn blocking_import_from_parquet(
1559 &mut self,
1560 table: &str,
1561 file_path: &std::path::Path,
1562 options: crate::import::parquet::ParquetImportOptions,
1563 ) -> Result<u64, crate::import::ImportError> {
1564 blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
1565 }
1566
1567 /// Import multiple Parquet files in parallel into an Exasol table (blocking).
1568 ///
1569 /// This is a synchronous wrapper around [`import_parquet_from_files`](Self::import_parquet_from_files)
1570 /// for use in non-async contexts.
1571 ///
1572 /// # Arguments
1573 ///
1574 /// * `table` - Name of the target table
1575 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1576 /// * `options` - Import options
1577 ///
1578 /// # Returns
1579 ///
1580 /// The number of rows imported on success.
1581 ///
1582 /// # Errors
1583 ///
1584 /// Returns `ImportError` if the import fails.
1585 pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
1586 &mut self,
1587 table: &str,
1588 paths: S,
1589 options: crate::import::parquet::ParquetImportOptions,
1590 ) -> Result<u64, crate::import::ImportError> {
1591 blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
1592 }
1593
1594 /// Import data from an Arrow RecordBatch into an Exasol table (blocking).
1595 ///
1596 /// This is a synchronous wrapper around [`import_from_record_batch`](Self::import_from_record_batch)
1597 /// for use in non-async contexts.
1598 ///
1599 /// # Arguments
1600 ///
1601 /// * `table` - Name of the target table
1602 /// * `batch` - The RecordBatch to import
1603 /// * `options` - Import options
1604 ///
1605 /// # Returns
1606 ///
1607 /// The number of rows imported on success.
1608 ///
1609 /// # Errors
1610 ///
1611 /// Returns `ImportError` if the import fails.
1612 ///
1613 /// # Example
1614 ///
1615 pub fn blocking_import_from_record_batch(
1616 &mut self,
1617 table: &str,
1618 batch: &RecordBatch,
1619 options: crate::import::arrow::ArrowImportOptions,
1620 ) -> Result<u64, crate::import::ImportError> {
1621 blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
1622 }
1623
1624 /// Import data from an Arrow IPC file into an Exasol table (blocking).
1625 ///
1626 /// This is a synchronous wrapper around [`import_from_arrow_ipc`](Self::import_from_arrow_ipc)
1627 /// for use in non-async contexts.
1628 ///
1629 /// Note: This method requires a synchronous reader that implements `std::io::Read`.
1630 /// The data will be read into memory before being imported.
1631 ///
1632 /// # Arguments
1633 ///
1634 /// * `table` - Name of the target table
1635 /// * `file_path` - Path to the Arrow IPC file
1636 /// * `options` - Import options
1637 ///
1638 /// # Returns
1639 ///
1640 /// The number of rows imported on success.
1641 ///
1642 /// # Errors
1643 ///
1644 /// Returns `ImportError` if the import fails.
1645 ///
1646 /// # Example
1647 ///
1648 pub fn blocking_import_from_arrow_ipc(
1649 &mut self,
1650 table: &str,
1651 file_path: &std::path::Path,
1652 options: crate::import::arrow::ArrowImportOptions,
1653 ) -> Result<u64, crate::import::ImportError> {
1654 blocking_runtime().block_on(async {
1655 let file = tokio::fs::File::open(file_path)
1656 .await
1657 .map_err(crate::import::ImportError::IoError)?;
1658 self.import_from_arrow_ipc(table, file, options).await
1659 })
1660 }
1661
1662 /// Export data from an Exasol table or query to a CSV file (blocking).
1663 ///
1664 /// This is a synchronous wrapper around [`export_csv_to_file`](Self::export_csv_to_file)
1665 /// for use in non-async contexts.
1666 ///
1667 /// # Arguments
1668 ///
1669 /// * `source` - The data source (table or query)
1670 /// * `file_path` - Path to the output file
1671 /// * `options` - Export options
1672 ///
1673 /// # Returns
1674 ///
1675 /// The number of rows exported on success.
1676 ///
1677 /// # Errors
1678 ///
1679 /// Returns `ExportError` if the export fails.
1680 ///
1681 /// # Example
1682 ///
1683 pub fn blocking_export_csv_to_file(
1684 &mut self,
1685 source: crate::query::export::ExportSource,
1686 file_path: &std::path::Path,
1687 options: crate::export::csv::CsvExportOptions,
1688 ) -> Result<u64, crate::export::csv::ExportError> {
1689 blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
1690 }
1691
1692 /// Export data from an Exasol table or query to a Parquet file (blocking).
1693 ///
1694 /// This is a synchronous wrapper around [`export_to_parquet`](Self::export_to_parquet)
1695 /// for use in non-async contexts.
1696 ///
1697 /// # Arguments
1698 ///
1699 /// * `source` - The data source (table or query)
1700 /// * `file_path` - Path to the output Parquet file
1701 /// * `options` - Export options
1702 ///
1703 /// # Returns
1704 ///
1705 /// The number of rows exported on success.
1706 ///
1707 /// # Errors
1708 ///
1709 /// Returns `ExportError` if the export fails.
1710 ///
1711 /// # Example
1712 ///
1713 pub fn blocking_export_to_parquet(
1714 &mut self,
1715 source: crate::query::export::ExportSource,
1716 file_path: &std::path::Path,
1717 options: crate::export::parquet::ParquetExportOptions,
1718 ) -> Result<u64, crate::export::csv::ExportError> {
1719 blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
1720 }
1721
1722 /// Export data from an Exasol table or query to Arrow RecordBatches (blocking).
1723 ///
1724 /// This is a synchronous wrapper around [`export_to_record_batches`](Self::export_to_record_batches)
1725 /// for use in non-async contexts.
1726 ///
1727 /// # Arguments
1728 ///
1729 /// * `source` - The data source (table or query)
1730 /// * `options` - Export options
1731 ///
1732 /// # Returns
1733 ///
1734 /// A vector of RecordBatches on success.
1735 ///
1736 /// # Errors
1737 ///
1738 /// Returns `ExportError` if the export fails.
1739 ///
1740 /// # Example
1741 ///
1742 pub fn blocking_export_to_record_batches(
1743 &mut self,
1744 source: crate::query::export::ExportSource,
1745 options: crate::export::arrow::ArrowExportOptions,
1746 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1747 blocking_runtime().block_on(self.export_to_record_batches(source, options))
1748 }
1749
1750 /// Export data from an Exasol table or query to an Arrow IPC file (blocking).
1751 ///
1752 /// This is a synchronous wrapper around [`export_to_arrow_ipc`](Self::export_to_arrow_ipc)
1753 /// for use in non-async contexts.
1754 ///
1755 /// # Arguments
1756 ///
1757 /// * `source` - The data source (table or query)
1758 /// * `file_path` - Path to the output Arrow IPC file
1759 /// * `options` - Export options
1760 ///
1761 /// # Returns
1762 ///
1763 /// The number of rows exported on success.
1764 ///
1765 /// # Errors
1766 ///
1767 /// Returns `ExportError` if the export fails.
1768 ///
1769 /// # Example
1770 ///
1771 pub fn blocking_export_to_arrow_ipc(
1772 &mut self,
1773 source: crate::query::export::ExportSource,
1774 file_path: &std::path::Path,
1775 options: crate::export::arrow::ArrowExportOptions,
1776 ) -> Result<u64, crate::export::csv::ExportError> {
1777 blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
1778 }
1779
1780 // ========================================================================
1781 // Private Helper Methods
1782 // ========================================================================
1783
1784 /// Update session state after query execution.
1785 async fn update_session_state_after_query(&self) {
1786 if self.session.in_transaction() {
1787 self.session.set_state(SessionState::InTransaction).await;
1788 } else {
1789 self.session.set_state(SessionState::Ready).await;
1790 }
1791 self.session.update_activity().await;
1792 }
1793}
1794
1795impl std::fmt::Debug for Connection {
1796 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1797 f.debug_struct("Connection")
1798 .field("session_id", &self.session.session_id())
1799 .field("host", &self.params.host)
1800 .field("port", &self.params.port)
1801 .field("username", &self.params.username)
1802 .field("in_transaction", &self.session.in_transaction())
1803 .finish()
1804 }
1805}
1806
1807/// Builder for creating Connection instances.
1808pub struct ConnectionBuilder {
1809 /// Connection parameters builder
1810 params_builder: crate::connection::params::ConnectionBuilder,
1811}
1812
1813impl ConnectionBuilder {
1814 /// Create a new ConnectionBuilder.
1815 pub fn new() -> Self {
1816 Self {
1817 params_builder: crate::connection::params::ConnectionBuilder::new(),
1818 }
1819 }
1820
1821 /// Set the database host.
1822 pub fn host(mut self, host: &str) -> Self {
1823 self.params_builder = self.params_builder.host(host);
1824 self
1825 }
1826
1827 /// Set the database port.
1828 pub fn port(mut self, port: u16) -> Self {
1829 self.params_builder = self.params_builder.port(port);
1830 self
1831 }
1832
1833 /// Set the username.
1834 pub fn username(mut self, username: &str) -> Self {
1835 self.params_builder = self.params_builder.username(username);
1836 self
1837 }
1838
1839 /// Set the password.
1840 pub fn password(mut self, password: &str) -> Self {
1841 self.params_builder = self.params_builder.password(password);
1842 self
1843 }
1844
1845 /// Set the default schema.
1846 pub fn schema(mut self, schema: &str) -> Self {
1847 self.params_builder = self.params_builder.schema(schema);
1848 self
1849 }
1850
1851 /// Enable or disable TLS.
1852 pub fn use_tls(mut self, use_tls: bool) -> Self {
1853 self.params_builder = self.params_builder.use_tls(use_tls);
1854 self
1855 }
1856
1857 /// Build and connect.
1858 ///
1859 /// # Returns
1860 ///
1861 /// A connected `Connection` instance.
1862 ///
1863 /// # Errors
1864 ///
1865 /// Returns `ExasolError` if the connection fails.
1866 pub async fn connect(self) -> Result<Connection, ExasolError> {
1867 let params = self.params_builder.build()?;
1868 Ok(Connection::from_params(params).await?)
1869 }
1870}
1871
1872impl Default for ConnectionBuilder {
1873 fn default() -> Self {
1874 Self::new()
1875 }
1876}
1877
1878#[cfg(test)]
1879mod tests {
1880 use super::*;
1881
1882 #[test]
1883 fn test_connection_builder() {
1884 let _builder = ConnectionBuilder::new()
1885 .host("localhost")
1886 .port(8563)
1887 .username("test")
1888 .password("secret")
1889 .schema("MY_SCHEMA")
1890 .use_tls(false);
1891
1892 // Builder should compile and be valid
1893 // Actual connection requires a running Exasol instance
1894 }
1895
1896 #[test]
1897 fn test_create_statement_is_sync() {
1898 // This test verifies that create_statement is synchronous
1899 // by calling it without await
1900 // Note: We can't actually create a Connection without a database,
1901 // but we can verify the API compiles correctly
1902 }
1903}
1904
1905/// Blocking wrapper tests
1906#[cfg(test)]
1907mod blocking_tests {
1908 use super::*;
1909
1910 #[test]
1911 fn test_session_type_alias_exists() {
1912 // Verify that Session is a type alias for Connection
1913 fn takes_session(_session: &Session) {}
1914 fn takes_connection(_connection: &Connection) {}
1915
1916 // This should compile, showing Session = Connection
1917 fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
1918 where
1919 F1: Fn(&Session),
1920 F2: Fn(&Connection),
1921 {
1922 let _ = (f1, f2);
1923 }
1924
1925 verify_interchangeable(takes_session, takes_connection);
1926 }
1927
1928 #[test]
1929 fn test_blocking_runtime_exists() {
1930 // Verify that blocking_runtime() function exists and returns a Runtime
1931 let runtime = blocking_runtime();
1932 // If this compiles, the runtime is valid
1933 let _ = runtime.handle();
1934 }
1935
1936 #[test]
1937 fn test_connection_has_blocking_import_csv() {
1938 // This test verifies the blocking_import_csv_from_file method exists
1939 // by checking the method signature compiles
1940 fn _check_method_exists(_conn: &mut Connection) {
1941 // Method signature check - will fail to compile if method doesn't exist
1942 let _: fn(
1943 &mut Connection,
1944 &str,
1945 &std::path::Path,
1946 crate::import::csv::CsvImportOptions,
1947 ) -> Result<u64, crate::import::ImportError> =
1948 Connection::blocking_import_csv_from_file;
1949 }
1950 }
1951
1952 #[test]
1953 fn test_connection_has_blocking_import_parquet() {
1954 fn _check_method_exists(_conn: &mut Connection) {
1955 let _: fn(
1956 &mut Connection,
1957 &str,
1958 &std::path::Path,
1959 crate::import::parquet::ParquetImportOptions,
1960 ) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
1961 }
1962 }
1963
1964 #[test]
1965 fn test_connection_has_blocking_import_record_batch() {
1966 fn _check_method_exists(_conn: &mut Connection) {
1967 let _: fn(
1968 &mut Connection,
1969 &str,
1970 &RecordBatch,
1971 crate::import::arrow::ArrowImportOptions,
1972 ) -> Result<u64, crate::import::ImportError> =
1973 Connection::blocking_import_from_record_batch;
1974 }
1975 }
1976
1977 #[test]
1978 fn test_connection_has_blocking_import_arrow_ipc() {
1979 fn _check_method_exists(_conn: &mut Connection) {
1980 let _: fn(
1981 &mut Connection,
1982 &str,
1983 &std::path::Path,
1984 crate::import::arrow::ArrowImportOptions,
1985 ) -> Result<u64, crate::import::ImportError> =
1986 Connection::blocking_import_from_arrow_ipc;
1987 }
1988 }
1989
1990 #[test]
1991 fn test_connection_has_blocking_export_csv() {
1992 fn _check_method_exists(_conn: &mut Connection) {
1993 let _: fn(
1994 &mut Connection,
1995 crate::query::export::ExportSource,
1996 &std::path::Path,
1997 crate::export::csv::CsvExportOptions,
1998 ) -> Result<u64, crate::export::csv::ExportError> =
1999 Connection::blocking_export_csv_to_file;
2000 }
2001 }
2002
2003 #[test]
2004 fn test_connection_has_blocking_export_parquet() {
2005 fn _check_method_exists(_conn: &mut Connection) {
2006 let _: fn(
2007 &mut Connection,
2008 crate::query::export::ExportSource,
2009 &std::path::Path,
2010 crate::export::parquet::ParquetExportOptions,
2011 ) -> Result<u64, crate::export::csv::ExportError> =
2012 Connection::blocking_export_to_parquet;
2013 }
2014 }
2015
2016 #[test]
2017 fn test_connection_has_blocking_export_record_batches() {
2018 fn _check_method_exists(_conn: &mut Connection) {
2019 let _: fn(
2020 &mut Connection,
2021 crate::query::export::ExportSource,
2022 crate::export::arrow::ArrowExportOptions,
2023 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
2024 Connection::blocking_export_to_record_batches;
2025 }
2026 }
2027
2028 #[test]
2029 fn test_connection_has_blocking_export_arrow_ipc() {
2030 fn _check_method_exists(_conn: &mut Connection) {
2031 let _: fn(
2032 &mut Connection,
2033 crate::query::export::ExportSource,
2034 &std::path::Path,
2035 crate::export::arrow::ArrowExportOptions,
2036 ) -> Result<u64, crate::export::csv::ExportError> =
2037 Connection::blocking_export_to_arrow_ipc;
2038 }
2039 }
2040}