exarrow_rs/adbc/connection.rs
1//! ADBC Connection implementation.
2//!
3//! This module provides the `Connection` type which represents an active
4//! database connection and provides methods for executing queries.
5//!
6//! # New in v2.0.0
7//!
8//! Connection now owns the transport directly and Statement is a pure data container.
9//! Execute statements via Connection methods: `execute_statement()`, `execute_prepared()`.
10
11use crate::adbc::Statement;
12use crate::connection::auth::AuthResponseData;
13use crate::connection::params::ConnectionParams;
14use crate::connection::session::{Session as SessionInfo, SessionConfig, SessionState};
15use crate::error::{ConnectionError, ExasolError, QueryError};
16use crate::query::prepared::PreparedStatement;
17use crate::query::results::ResultSet;
18use crate::transport::protocol::{
19 ConnectionParams as TransportConnectionParams, Credentials as TransportCredentials,
20 QueryResult, TransportProtocol,
21};
22use crate::transport::WebSocketTransport;
23use arrow::array::RecordBatch;
24use std::sync::{Arc, OnceLock};
25use std::time::Duration;
26use tokio::runtime::Runtime;
27use tokio::sync::Mutex;
28use tokio::time::timeout;
29
30/// Session is a type alias for Connection.
31///
32/// This alias provides a more intuitive name for database sessions when
33/// performing import/export operations. Both `Session` and `Connection`
34/// can be used interchangeably.
35///
36/// # Example
37///
38pub type Session = Connection;
39
40fn blocking_runtime() -> &'static Runtime {
41 static RUNTIME: OnceLock<Runtime> = OnceLock::new();
42 RUNTIME.get_or_init(|| {
43 tokio::runtime::Builder::new_current_thread()
44 .enable_all()
45 .build()
46 .expect("Failed to create tokio runtime for blocking operations")
47 })
48}
49
50/// ADBC Connection to an Exasol database.
51///
52/// The `Connection` type represents an active database connection and provides
53/// methods for executing queries, managing transactions, and retrieving metadata.
54///
55/// # v2.0.0 Breaking Changes
56///
57/// - Connection now owns the transport directly
58/// - `create_statement()` is now synchronous and returns a pure data container
59/// - Use `execute_statement()` instead of `Statement::execute()`
60/// - Use `prepare()` instead of `Statement::prepare()`
61///
62/// # Example
63///
64pub struct Connection {
65 /// Transport layer for communication (owned by Connection)
66 transport: Arc<Mutex<dyn TransportProtocol>>,
67 /// Session information
68 session: SessionInfo,
69 /// Connection parameters
70 params: ConnectionParams,
71}
72
73impl Connection {
74 /// Create a connection from connection parameters.
75 ///
76 /// This establishes a connection to the Exasol database using WebSocket transport,
77 /// authenticates the user, and creates a session.
78 ///
79 /// # Arguments
80 ///
81 /// * `params` - Connection parameters
82 ///
83 /// # Returns
84 ///
85 /// A connected `Connection` instance.
86 ///
87 /// # Errors
88 ///
89 /// Returns `ConnectionError` if the connection or authentication fails.
90 pub async fn from_params(params: ConnectionParams) -> Result<Self, ConnectionError> {
91 // Create transport
92 let mut transport = WebSocketTransport::new();
93
94 // Convert ConnectionParams to TransportConnectionParams
95 let mut transport_params = TransportConnectionParams::new(params.host.clone(), params.port)
96 .with_tls(params.use_tls)
97 .with_validate_server_certificate(params.validate_server_certificate)
98 .with_timeout(params.connection_timeout.as_millis() as u64);
99 if let Some(ref fp) = params.certificate_fingerprint {
100 transport_params = transport_params.with_certificate_fingerprint(fp.clone());
101 }
102
103 // Connect
104 transport.connect(&transport_params).await.map_err(|e| {
105 ConnectionError::ConnectionFailed {
106 host: params.host.clone(),
107 port: params.port,
108 message: e.to_string(),
109 }
110 })?;
111
112 // Authenticate
113 let credentials =
114 TransportCredentials::new(params.username.clone(), params.password().to_string());
115 let session_info = transport
116 .authenticate(&credentials)
117 .await
118 .map_err(|e| ConnectionError::AuthenticationFailed(e.to_string()))?;
119
120 // Create session config from connection params
121 let session_config = SessionConfig {
122 idle_timeout: params.idle_timeout,
123 query_timeout: params.query_timeout,
124 ..Default::default()
125 };
126
127 // Extract session_id once to avoid double clone
128 let session_id = session_info.session_id.clone();
129
130 // Convert SessionInfo to AuthResponseData
131 let auth_response = AuthResponseData {
132 session_id: session_id.clone(),
133 protocol_version: session_info.protocol_version,
134 release_version: session_info.release_version,
135 database_name: session_info.database_name,
136 product_name: session_info.product_name,
137 max_data_message_size: session_info.max_data_message_size,
138 max_identifier_length: 128, // Default value
139 max_varchar_length: 2_000_000, // Default value
140 identifier_quote_string: "\"".to_string(),
141 time_zone: session_info.time_zone.unwrap_or_else(|| "UTC".to_string()),
142 time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
143 };
144
145 // Create session (owned, not Arc)
146 let session = SessionInfo::new(session_id, auth_response, session_config);
147
148 // Set schema if specified
149 if let Some(schema) = ¶ms.schema {
150 session.set_current_schema(Some(schema.clone())).await;
151 }
152
153 Ok(Self {
154 transport: Arc::new(Mutex::new(transport)),
155 session,
156 params,
157 })
158 }
159
160 /// Create a builder for constructing a connection.
161 ///
162 /// # Returns
163 ///
164 /// A `ConnectionBuilder` instance.
165 ///
166 /// # Example
167 ///
168 pub fn builder() -> ConnectionBuilder {
169 ConnectionBuilder::new()
170 }
171
172 // ========================================================================
173 // Statement Creation (synchronous - Statement is now a pure data container)
174 // ========================================================================
175
176 /// Create a new statement for executing SQL.
177 ///
178 /// Statement is now a pure data container. Use `execute_statement()` to execute it.
179 ///
180 /// # Arguments
181 ///
182 /// * `sql` - SQL query text
183 ///
184 /// # Returns
185 ///
186 /// A `Statement` instance ready for execution via `execute_statement()`.
187 ///
188 /// # Example
189 ///
190 pub fn create_statement(&self, sql: impl Into<String>) -> Statement {
191 Statement::new(sql)
192 }
193
194 // ========================================================================
195 // Statement Execution Methods
196 // ========================================================================
197
198 /// Execute a statement and return results.
199 ///
200 /// # Arguments
201 ///
202 /// * `stmt` - Statement to execute
203 ///
204 /// # Returns
205 ///
206 /// A `ResultSet` containing the query results.
207 ///
208 /// # Errors
209 ///
210 /// Returns `QueryError` if execution fails or times out.
211 ///
212 /// # Example
213 ///
214 pub async fn execute_statement(&mut self, stmt: &Statement) -> Result<ResultSet, QueryError> {
215 // Validate session state
216 self.session
217 .validate_ready()
218 .await
219 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
220
221 // Update session state
222 self.session.set_state(SessionState::Executing).await;
223
224 // Increment query counter
225 self.session.increment_query_count();
226
227 // Build final SQL with parameters
228 let final_sql = stmt.build_sql()?;
229
230 // Execute with timeout
231 let timeout_duration = Duration::from_millis(stmt.timeout_ms());
232 let transport = Arc::clone(&self.transport);
233
234 let result = timeout(timeout_duration, async move {
235 let mut transport_guard = transport.lock().await;
236 transport_guard.execute_query(&final_sql).await
237 })
238 .await
239 .map_err(|_| QueryError::Timeout {
240 timeout_ms: stmt.timeout_ms(),
241 })?
242 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
243
244 // Update session state back to ready/in_transaction
245 self.update_session_state_after_query().await;
246
247 // Convert transport result to ResultSet
248 ResultSet::from_transport_result(result, Arc::clone(&self.transport))
249 }
250
251 /// Execute a statement and return the row count (for non-SELECT statements).
252 ///
253 /// # Arguments
254 ///
255 /// * `stmt` - Statement to execute
256 ///
257 /// # Returns
258 ///
259 /// The number of rows affected.
260 ///
261 /// # Errors
262 ///
263 /// Returns `QueryError` if execution fails or if statement is a SELECT.
264 ///
265 /// # Example
266 ///
267 pub async fn execute_statement_update(&mut self, stmt: &Statement) -> Result<i64, QueryError> {
268 let result_set = self.execute_statement(stmt).await?;
269
270 result_set.row_count().ok_or_else(|| {
271 QueryError::NoResultSet("Expected row count, got result set".to_string())
272 })
273 }
274
275 // ========================================================================
276 // Prepared Statement Methods
277 // ========================================================================
278
279 /// Create a prepared statement for parameterized query execution.
280 ///
281 /// This creates a server-side prepared statement that can be executed
282 /// multiple times with different parameter values.
283 ///
284 /// # Arguments
285 ///
286 /// * `sql` - SQL statement with parameter placeholders (?)
287 ///
288 /// # Returns
289 ///
290 /// A `PreparedStatement` ready for parameter binding and execution.
291 ///
292 /// # Errors
293 ///
294 /// Returns `QueryError` if preparation fails.
295 ///
296 /// # Example
297 ///
298 pub async fn prepare(
299 &mut self,
300 sql: impl Into<String>,
301 ) -> Result<PreparedStatement, QueryError> {
302 let sql = sql.into();
303
304 // Validate session state
305 self.session
306 .validate_ready()
307 .await
308 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
309
310 let mut transport = self.transport.lock().await;
311 let handle = transport
312 .create_prepared_statement(&sql)
313 .await
314 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
315
316 Ok(PreparedStatement::new(handle))
317 }
318
319 /// Execute a prepared statement and return results.
320 ///
321 /// # Arguments
322 ///
323 /// * `stmt` - Prepared statement to execute
324 ///
325 /// # Returns
326 ///
327 /// A `ResultSet` containing the query results.
328 ///
329 /// # Errors
330 ///
331 /// Returns `QueryError` if execution fails.
332 ///
333 /// # Example
334 ///
335 pub async fn execute_prepared(
336 &mut self,
337 stmt: &PreparedStatement,
338 ) -> Result<ResultSet, QueryError> {
339 if stmt.is_closed() {
340 return Err(QueryError::StatementClosed);
341 }
342
343 // Validate session state
344 self.session
345 .validate_ready()
346 .await
347 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
348
349 // Update session state
350 self.session.set_state(SessionState::Executing).await;
351
352 // Increment query counter
353 self.session.increment_query_count();
354
355 // Convert parameters to column-major JSON format
356 let params_data = stmt.build_parameters_data()?;
357
358 let mut transport = self.transport.lock().await;
359 let result = transport
360 .execute_prepared_statement(stmt.handle_ref(), params_data)
361 .await
362 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
363
364 drop(transport);
365
366 // Update session state back to ready/in_transaction
367 self.update_session_state_after_query().await;
368
369 ResultSet::from_transport_result(result, Arc::clone(&self.transport))
370 }
371
372 /// Execute a prepared statement and return the number of affected rows.
373 ///
374 /// Use this for INSERT, UPDATE, DELETE statements.
375 ///
376 /// # Arguments
377 ///
378 /// * `stmt` - Prepared statement to execute
379 ///
380 /// # Returns
381 ///
382 /// The number of rows affected.
383 ///
384 /// # Errors
385 ///
386 /// Returns `QueryError` if execution fails or statement returns a result set.
387 pub async fn execute_prepared_update(
388 &mut self,
389 stmt: &PreparedStatement,
390 ) -> Result<i64, QueryError> {
391 if stmt.is_closed() {
392 return Err(QueryError::StatementClosed);
393 }
394
395 // Validate session state
396 self.session
397 .validate_ready()
398 .await
399 .map_err(|e| QueryError::InvalidState(e.to_string()))?;
400
401 // Update session state
402 self.session.set_state(SessionState::Executing).await;
403
404 // Convert parameters to column-major JSON format
405 let params_data = stmt.build_parameters_data()?;
406
407 let mut transport = self.transport.lock().await;
408 let result = transport
409 .execute_prepared_statement(stmt.handle_ref(), params_data)
410 .await
411 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
412
413 drop(transport);
414
415 // Update session state back to ready/in_transaction
416 self.update_session_state_after_query().await;
417
418 match result {
419 QueryResult::RowCount { count } => Ok(count),
420 QueryResult::ResultSet { .. } => Err(QueryError::UnexpectedResultSet),
421 }
422 }
423
424 /// Close a prepared statement and release server-side resources.
425 ///
426 /// # Arguments
427 ///
428 /// * `stmt` - Prepared statement to close (consumed)
429 ///
430 /// # Errors
431 ///
432 /// Returns `QueryError` if closing fails.
433 ///
434 /// # Example
435 ///
436 pub async fn close_prepared(&mut self, mut stmt: PreparedStatement) -> Result<(), QueryError> {
437 if stmt.is_closed() {
438 return Ok(());
439 }
440
441 let mut transport = self.transport.lock().await;
442 transport
443 .close_prepared_statement(stmt.handle_ref())
444 .await
445 .map_err(|e| QueryError::ExecutionFailed(e.to_string()))?;
446
447 stmt.mark_closed();
448 Ok(())
449 }
450
451 // ========================================================================
452 // Convenience Methods (these internally use execute_statement)
453 // ========================================================================
454
455 /// Execute a SQL query and return results.
456 ///
457 /// This is a convenience method that creates a statement and executes it.
458 ///
459 /// # Arguments
460 ///
461 /// * `sql` - SQL query text
462 ///
463 /// # Returns
464 ///
465 /// A `ResultSet` containing the query results.
466 ///
467 /// # Errors
468 ///
469 /// Returns `QueryError` if execution fails.
470 ///
471 /// # Example
472 ///
473 pub async fn execute(&mut self, sql: impl Into<String>) -> Result<ResultSet, QueryError> {
474 let stmt = self.create_statement(sql);
475 self.execute_statement(&stmt).await
476 }
477
478 /// Execute a SQL query and return all results as RecordBatches.
479 ///
480 /// This is a convenience method that fetches all results into memory.
481 ///
482 /// # Arguments
483 ///
484 /// * `sql` - SQL query text
485 ///
486 /// # Returns
487 ///
488 /// A vector of `RecordBatch` instances.
489 ///
490 /// # Errors
491 ///
492 /// Returns `QueryError` if execution fails.
493 ///
494 /// # Example
495 ///
496 pub async fn query(&mut self, sql: impl Into<String>) -> Result<Vec<RecordBatch>, QueryError> {
497 let result_set = self.execute(sql).await?;
498 result_set.fetch_all().await
499 }
500
501 /// Execute a non-SELECT statement and return the row count.
502 ///
503 /// # Arguments
504 ///
505 /// * `sql` - SQL statement (INSERT, UPDATE, DELETE, etc.)
506 ///
507 /// # Returns
508 ///
509 /// The number of rows affected.
510 ///
511 /// # Errors
512 ///
513 /// Returns `QueryError` if execution fails.
514 ///
515 /// # Example
516 ///
517 pub async fn execute_update(&mut self, sql: impl Into<String>) -> Result<i64, QueryError> {
518 let stmt = self.create_statement(sql);
519 self.execute_statement_update(&stmt).await
520 }
521
522 // ========================================================================
523 // Transaction Methods
524 // ========================================================================
525
526 pub async fn begin_transaction(&mut self) -> Result<(), QueryError> {
527 // Disable autocommit on the server so statements don't auto-commit
528 self.transport
529 .lock()
530 .await
531 .set_autocommit(false)
532 .await
533 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
534
535 self.session
536 .begin_transaction()
537 .await
538 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
539
540 Ok(())
541 }
542
543 pub async fn commit(&mut self) -> Result<(), QueryError> {
544 if !self.in_transaction() {
545 return Ok(());
546 }
547
548 self.execute_update("COMMIT").await?;
549
550 self.session
551 .commit_transaction()
552 .await
553 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
554
555 Ok(())
556 }
557
558 pub async fn rollback(&mut self) -> Result<(), QueryError> {
559 if !self.in_transaction() {
560 return Ok(());
561 }
562
563 self.execute_update("ROLLBACK").await?;
564
565 self.session
566 .rollback_transaction()
567 .await
568 .map_err(|e| QueryError::TransactionError(e.to_string()))?;
569
570 Ok(())
571 }
572
573 /// Check if a transaction is currently active.
574 ///
575 /// # Returns
576 ///
577 /// `true` if a transaction is active, `false` otherwise.
578 pub fn in_transaction(&self) -> bool {
579 self.session.in_transaction()
580 }
581
582 // ========================================================================
583 // Session and Schema Methods
584 // ========================================================================
585
586 /// Get the current schema.
587 ///
588 /// # Returns
589 ///
590 /// The current schema name, or `None` if no schema is set.
591 pub async fn current_schema(&self) -> Option<String> {
592 self.session.current_schema().await
593 }
594
595 /// Set the current schema.
596 ///
597 /// # Arguments
598 ///
599 /// * `schema` - The schema name to set
600 ///
601 /// # Errors
602 ///
603 /// Returns `QueryError` if the operation fails.
604 ///
605 /// # Example
606 ///
607 pub async fn set_schema(&mut self, schema: impl Into<String>) -> Result<(), QueryError> {
608 let schema_name = schema.into();
609 self.execute_update(format!("OPEN SCHEMA {}", schema_name))
610 .await?;
611 self.session.set_current_schema(Some(schema_name)).await;
612 Ok(())
613 }
614
615 // ========================================================================
616 // Metadata Methods
617 // ========================================================================
618
619 /// Get metadata about catalogs.
620 ///
621 /// # Returns
622 ///
623 /// A `ResultSet` containing catalog metadata.
624 ///
625 /// # Errors
626 ///
627 /// Returns `QueryError` if the operation fails.
628 pub async fn get_catalogs(&mut self) -> Result<ResultSet, QueryError> {
629 self.execute("SELECT DISTINCT SCHEMA_NAME AS CATALOG_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY CATALOG_NAME")
630 .await
631 }
632
633 /// Get metadata about schemas.
634 ///
635 /// # Arguments
636 ///
637 /// * `catalog` - Optional catalog name filter
638 ///
639 /// # Returns
640 ///
641 /// A `ResultSet` containing schema metadata.
642 ///
643 /// # Errors
644 ///
645 /// Returns `QueryError` if the operation fails.
646 pub async fn get_schemas(&mut self, catalog: Option<&str>) -> Result<ResultSet, QueryError> {
647 let sql = if let Some(cat) = catalog {
648 format!(
649 "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS WHERE SCHEMA_NAME = '{}' ORDER BY SCHEMA_NAME",
650 cat.replace('\'', "''")
651 )
652 } else {
653 "SELECT SCHEMA_NAME FROM SYS.EXA_ALL_SCHEMAS ORDER BY SCHEMA_NAME".to_string()
654 };
655 self.execute(sql).await
656 }
657
658 /// Get metadata about tables.
659 ///
660 /// # Arguments
661 ///
662 /// * `catalog` - Optional catalog name filter
663 /// * `schema` - Optional schema name filter
664 /// * `table` - Optional table name filter
665 ///
666 /// # Returns
667 ///
668 /// A `ResultSet` containing table metadata.
669 ///
670 /// # Errors
671 ///
672 /// Returns `QueryError` if the operation fails.
673 pub async fn get_tables(
674 &mut self,
675 catalog: Option<&str>,
676 schema: Option<&str>,
677 table: Option<&str>,
678 ) -> Result<ResultSet, QueryError> {
679 let mut conditions = vec!["OBJECT_TYPE IN ('TABLE', 'VIEW')".to_string()];
680
681 // Exasol has no catalogs — ignore the catalog parameter
682 let _ = catalog;
683 if let Some(sch) = schema {
684 conditions.push(format!("ROOT_NAME = '{}'", sch.replace('\'', "''")));
685 }
686 if let Some(tbl) = table {
687 conditions.push(format!("OBJECT_NAME = '{}'", tbl.replace('\'', "''")));
688 }
689
690 let where_clause = format!("WHERE {}", conditions.join(" AND "));
691
692 let sql = format!(
693 "SELECT ROOT_NAME AS TABLE_SCHEMA, OBJECT_NAME AS TABLE_NAME, OBJECT_TYPE AS TABLE_TYPE FROM SYS.EXA_ALL_OBJECTS {} ORDER BY ROOT_NAME, OBJECT_NAME",
694 where_clause
695 );
696
697 self.execute(sql).await
698 }
699
700 /// Get metadata about columns.
701 ///
702 /// # Arguments
703 ///
704 /// * `catalog` - Optional catalog name filter
705 /// * `schema` - Optional schema name filter
706 /// * `table` - Optional table name filter
707 /// * `column` - Optional column name filter
708 ///
709 /// # Returns
710 ///
711 /// A `ResultSet` containing column metadata.
712 ///
713 /// # Errors
714 ///
715 /// Returns `QueryError` if the operation fails.
716 pub async fn get_columns(
717 &mut self,
718 catalog: Option<&str>,
719 schema: Option<&str>,
720 table: Option<&str>,
721 column: Option<&str>,
722 ) -> Result<ResultSet, QueryError> {
723 let mut conditions = Vec::new();
724
725 if let Some(cat) = catalog {
726 conditions.push(format!("COLUMN_SCHEMA = '{}'", cat.replace('\'', "''")));
727 }
728 if let Some(sch) = schema {
729 conditions.push(format!("COLUMN_SCHEMA = '{}'", sch.replace('\'', "''")));
730 }
731 if let Some(tbl) = table {
732 conditions.push(format!("COLUMN_TABLE = '{}'", tbl.replace('\'', "''")));
733 }
734 if let Some(col) = column {
735 conditions.push(format!("COLUMN_NAME = '{}'", col.replace('\'', "''")));
736 }
737
738 let where_clause = if conditions.is_empty() {
739 String::new()
740 } else {
741 format!("WHERE {}", conditions.join(" AND "))
742 };
743
744 let sql = format!(
745 "SELECT COLUMN_SCHEMA, COLUMN_TABLE, COLUMN_NAME, COLUMN_TYPE, COLUMN_NUM_PREC, COLUMN_NUM_SCALE, COLUMN_IS_NULLABLE \
746 FROM SYS.EXA_ALL_COLUMNS {} ORDER BY COLUMN_SCHEMA, COLUMN_TABLE, ORDINAL_POSITION",
747 where_clause
748 );
749
750 self.execute(sql).await
751 }
752
753 // ========================================================================
754 // Session Information Methods
755 // ========================================================================
756
757 /// Get session information.
758 ///
759 /// # Returns
760 ///
761 /// The session ID.
762 pub fn session_id(&self) -> &str {
763 self.session.session_id()
764 }
765
766 /// Get connection parameters.
767 ///
768 /// # Returns
769 ///
770 /// A reference to the connection parameters.
771 pub fn params(&self) -> &ConnectionParams {
772 &self.params
773 }
774
775 /// Check if the connection is closed.
776 ///
777 /// # Returns
778 ///
779 /// `true` if the connection is closed, `false` otherwise.
780 pub async fn is_closed(&self) -> bool {
781 self.session.is_closed().await
782 }
783
784 /// Close the connection.
785 ///
786 /// This closes the session and transport layer.
787 ///
788 /// # Errors
789 ///
790 /// Returns `ConnectionError` if closing fails.
791 ///
792 /// # Example
793 ///
794 pub async fn close(self) -> Result<(), ConnectionError> {
795 // Close session
796 self.session.close().await?;
797
798 // Close transport
799 let mut transport = self.transport.lock().await;
800 transport
801 .close()
802 .await
803 .map_err(|e| ConnectionError::ConnectionFailed {
804 host: self.params.host.clone(),
805 port: self.params.port,
806 message: e.to_string(),
807 })?;
808
809 Ok(())
810 }
811
812 /// Shut down the connection without consuming self.
813 ///
814 /// Unlike `close()`, this can be called through a shared reference,
815 /// making it suitable for use in `Drop` implementations where ownership
816 /// transfer is not possible (e.g., when the connection is behind `Arc<Mutex<>>`).
817 pub async fn shutdown(&self) -> Result<(), ConnectionError> {
818 self.session.close().await?;
819
820 let mut transport = self.transport.lock().await;
821 transport
822 .close()
823 .await
824 .map_err(|e| ConnectionError::ConnectionFailed {
825 host: self.params.host.clone(),
826 port: self.params.port,
827 message: e.to_string(),
828 })?;
829
830 Ok(())
831 }
832
833 // ========================================================================
834 // Import/Export Methods
835 // ========================================================================
836
837 /// Creates an SQL executor closure for import/export operations.
838 ///
839 /// This is a helper method that creates a closure which can execute SQL
840 /// statements and return the row count. The closure captures a cloned
841 /// reference to the transport, allowing it to be called multiple times
842 /// (e.g., for parallel file imports).
843 ///
844 /// # Returns
845 ///
846 /// A closure that takes an SQL string and returns a Future resolving to
847 /// either the affected row count or an error string.
848 fn make_sql_executor(
849 &self,
850 ) -> impl Fn(
851 String,
852 )
853 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, String>> + Send>> {
854 let transport = Arc::clone(&self.transport);
855 move |sql: String| {
856 let transport = Arc::clone(&transport);
857 Box::pin(async move {
858 let mut transport_guard = transport.lock().await;
859 match transport_guard.execute_query(&sql).await {
860 Ok(QueryResult::RowCount { count }) => Ok(count as u64),
861 Ok(QueryResult::ResultSet { .. }) => Ok(0),
862 Err(e) => Err(e.to_string()),
863 }
864 })
865 }
866 }
867
868 /// Import CSV data from a file into an Exasol table.
869 ///
870 /// This method reads CSV data from the specified file and imports it into
871 /// the target table using Exasol's HTTP transport layer.
872 ///
873 /// # Arguments
874 ///
875 /// * `table` - Name of the target table
876 /// * `file_path` - Path to the CSV file
877 /// * `options` - Import options
878 ///
879 /// # Returns
880 ///
881 /// The number of rows imported on success.
882 ///
883 /// # Errors
884 ///
885 /// Returns `ImportError` if the import fails.
886 ///
887 /// # Example
888 ///
889 pub async fn import_csv_from_file(
890 &mut self,
891 table: &str,
892 file_path: &std::path::Path,
893 options: crate::import::csv::CsvImportOptions,
894 ) -> Result<u64, crate::import::ImportError> {
895 // Pass Exasol host/port from connection params to import options
896 let options = options
897 .exasol_host(&self.params.host)
898 .exasol_port(self.params.port);
899
900 crate::import::csv::import_from_file(self.make_sql_executor(), table, file_path, options)
901 .await
902 }
903
904 /// Import CSV data from an async reader into an Exasol table.
905 ///
906 /// This method reads CSV data from an async reader and imports it into
907 /// the target table using Exasol's HTTP transport layer.
908 ///
909 /// # Arguments
910 ///
911 /// * `table` - Name of the target table
912 /// * `reader` - Async reader providing CSV data
913 /// * `options` - Import options
914 ///
915 /// # Returns
916 ///
917 /// The number of rows imported on success.
918 ///
919 /// # Errors
920 ///
921 /// Returns `ImportError` if the import fails.
922 pub async fn import_csv_from_stream<R>(
923 &mut self,
924 table: &str,
925 reader: R,
926 options: crate::import::csv::CsvImportOptions,
927 ) -> Result<u64, crate::import::ImportError>
928 where
929 R: tokio::io::AsyncRead + Unpin + Send + 'static,
930 {
931 // Pass Exasol host/port from connection params to import options
932 let options = options
933 .exasol_host(&self.params.host)
934 .exasol_port(self.params.port);
935
936 crate::import::csv::import_from_stream(self.make_sql_executor(), table, reader, options)
937 .await
938 }
939
940 /// Import CSV data from an iterator into an Exasol table.
941 ///
942 /// This method converts iterator rows to CSV format and imports them into
943 /// the target table using Exasol's HTTP transport layer.
944 ///
945 /// # Arguments
946 ///
947 /// * `table` - Name of the target table
948 /// * `rows` - Iterator of rows, where each row is an iterator of field values
949 /// * `options` - Import options
950 ///
951 /// # Returns
952 ///
953 /// The number of rows imported on success.
954 ///
955 /// # Errors
956 ///
957 /// Returns `ImportError` if the import fails.
958 ///
959 /// # Example
960 ///
961 pub async fn import_csv_from_iter<I, T, S>(
962 &mut self,
963 table: &str,
964 rows: I,
965 options: crate::import::csv::CsvImportOptions,
966 ) -> Result<u64, crate::import::ImportError>
967 where
968 I: IntoIterator<Item = T> + Send + 'static,
969 T: IntoIterator<Item = S> + Send,
970 S: AsRef<str>,
971 {
972 // Pass Exasol host/port from connection params to import options
973 let options = options
974 .exasol_host(&self.params.host)
975 .exasol_port(self.params.port);
976
977 crate::import::csv::import_from_iter(self.make_sql_executor(), table, rows, options).await
978 }
979
980 /// Export data from an Exasol table or query to a CSV file.
981 ///
982 /// This method exports data from the specified source to a CSV file
983 /// using Exasol's HTTP transport layer.
984 ///
985 /// # Arguments
986 ///
987 /// * `source` - The data source (table or query)
988 /// * `file_path` - Path to the output file
989 /// * `options` - Export options
990 ///
991 /// # Returns
992 ///
993 /// The number of rows exported on success.
994 ///
995 /// # Errors
996 ///
997 /// Returns `ExportError` if the export fails.
998 ///
999 /// # Example
1000 ///
1001 pub async fn export_csv_to_file(
1002 &mut self,
1003 source: crate::query::export::ExportSource,
1004 file_path: &std::path::Path,
1005 options: crate::export::csv::CsvExportOptions,
1006 ) -> Result<u64, crate::export::csv::ExportError> {
1007 // Pass Exasol host/port from connection params to export options
1008 let options = options
1009 .exasol_host(&self.params.host)
1010 .exasol_port(self.params.port);
1011
1012 let mut transport_guard = self.transport.lock().await;
1013 crate::export::csv::export_to_file(&mut *transport_guard, source, file_path, options).await
1014 }
1015
1016 /// Export data from an Exasol table or query to an async writer.
1017 ///
1018 /// This method exports data from the specified source to an async writer
1019 /// using Exasol's HTTP transport layer.
1020 ///
1021 /// # Arguments
1022 ///
1023 /// * `source` - The data source (table or query)
1024 /// * `writer` - Async writer to write the CSV data to
1025 /// * `options` - Export options
1026 ///
1027 /// # Returns
1028 ///
1029 /// The number of rows exported on success.
1030 ///
1031 /// # Errors
1032 ///
1033 /// Returns `ExportError` if the export fails.
1034 pub async fn export_csv_to_stream<W>(
1035 &mut self,
1036 source: crate::query::export::ExportSource,
1037 writer: W,
1038 options: crate::export::csv::CsvExportOptions,
1039 ) -> Result<u64, crate::export::csv::ExportError>
1040 where
1041 W: tokio::io::AsyncWrite + Unpin,
1042 {
1043 // Pass Exasol host/port from connection params to export options
1044 let options = options
1045 .exasol_host(&self.params.host)
1046 .exasol_port(self.params.port);
1047
1048 let mut transport_guard = self.transport.lock().await;
1049 crate::export::csv::export_to_stream(&mut *transport_guard, source, writer, options).await
1050 }
1051
1052 /// Export data from an Exasol table or query to an in-memory list of rows.
1053 ///
1054 /// Each row is represented as a vector of string values.
1055 ///
1056 /// # Arguments
1057 ///
1058 /// * `source` - The data source (table or query)
1059 /// * `options` - Export options
1060 ///
1061 /// # Returns
1062 ///
1063 /// A vector of rows, where each row is a vector of column values.
1064 ///
1065 /// # Errors
1066 ///
1067 /// Returns `ExportError` if the export fails.
1068 ///
1069 /// # Example
1070 ///
1071 pub async fn export_csv_to_list(
1072 &mut self,
1073 source: crate::query::export::ExportSource,
1074 options: crate::export::csv::CsvExportOptions,
1075 ) -> Result<Vec<Vec<String>>, crate::export::csv::ExportError> {
1076 // Pass Exasol host/port from connection params to export options
1077 let options = options
1078 .exasol_host(&self.params.host)
1079 .exasol_port(self.params.port);
1080
1081 let mut transport_guard = self.transport.lock().await;
1082 crate::export::csv::export_to_list(&mut *transport_guard, source, options).await
1083 }
1084
1085 /// Import multiple CSV files in parallel into an Exasol table.
1086 ///
1087 /// This method reads CSV data from multiple files and imports them into
1088 /// the target table using parallel HTTP transport connections. Each file
1089 /// gets its own connection with a unique internal address.
1090 ///
1091 /// For a single file, this method delegates to `import_csv_from_file`.
1092 ///
1093 /// # Arguments
1094 ///
1095 /// * `table` - Name of the target table
1096 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1097 /// * `options` - Import options
1098 ///
1099 /// # Returns
1100 ///
1101 /// The number of rows imported on success.
1102 ///
1103 /// # Errors
1104 ///
1105 /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1106 ///
1107 /// # Example
1108 ///
1109 /// ```no_run
1110 /// use exarrow_rs::adbc::Connection;
1111 /// use exarrow_rs::import::CsvImportOptions;
1112 /// use std::path::PathBuf;
1113 ///
1114 /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1115 /// let files = vec![
1116 /// PathBuf::from("/data/part1.csv"),
1117 /// PathBuf::from("/data/part2.csv"),
1118 /// ];
1119 ///
1120 /// let options = CsvImportOptions::default();
1121 /// let rows = conn.import_csv_from_files("my_table", files, options).await?;
1122 /// # Ok(())
1123 /// # }
1124 /// ```
1125 pub async fn import_csv_from_files<S: crate::import::IntoFileSources>(
1126 &mut self,
1127 table: &str,
1128 paths: S,
1129 options: crate::import::csv::CsvImportOptions,
1130 ) -> Result<u64, crate::import::ImportError> {
1131 // Pass Exasol host/port from connection params to import options
1132 let options = options
1133 .exasol_host(&self.params.host)
1134 .exasol_port(self.params.port);
1135
1136 crate::import::csv::import_from_files(self.make_sql_executor(), table, paths, options).await
1137 }
1138
1139 /// Import data from a Parquet file into an Exasol table.
1140 ///
1141 /// This method reads a Parquet file, converts the data to CSV format,
1142 /// and imports it into the target table using Exasol's HTTP transport layer.
1143 ///
1144 /// # Arguments
1145 ///
1146 /// * `table` - Name of the target table
1147 /// * `file_path` - Path to the Parquet file
1148 /// * `options` - Import options
1149 ///
1150 /// # Returns
1151 ///
1152 /// The number of rows imported on success.
1153 ///
1154 /// # Errors
1155 ///
1156 /// Returns `ImportError` if the import fails.
1157 ///
1158 /// # Example
1159 ///
1160 pub async fn import_from_parquet(
1161 &mut self,
1162 table: &str,
1163 file_path: &std::path::Path,
1164 options: crate::import::parquet::ParquetImportOptions,
1165 ) -> Result<u64, crate::import::ImportError> {
1166 // Pass Exasol host/port from connection params to import options
1167 let options = options
1168 .with_exasol_host(&self.params.host)
1169 .with_exasol_port(self.params.port);
1170
1171 crate::import::parquet::import_from_parquet(
1172 self.make_sql_executor(),
1173 table,
1174 file_path,
1175 options,
1176 )
1177 .await
1178 }
1179
1180 /// Import multiple Parquet files in parallel into an Exasol table.
1181 ///
1182 /// This method converts each Parquet file to CSV format concurrently,
1183 /// then streams the data through parallel HTTP transport connections.
1184 ///
1185 /// For a single file, this method delegates to `import_from_parquet`.
1186 ///
1187 /// # Arguments
1188 ///
1189 /// * `table` - Name of the target table
1190 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1191 /// * `options` - Import options
1192 ///
1193 /// # Returns
1194 ///
1195 /// The number of rows imported on success.
1196 ///
1197 /// # Errors
1198 ///
1199 /// Returns `ImportError` if the import fails. Uses fail-fast semantics.
1200 ///
1201 /// # Example
1202 ///
1203 /// ```no_run
1204 /// use exarrow_rs::adbc::Connection;
1205 /// use exarrow_rs::import::ParquetImportOptions;
1206 /// use std::path::PathBuf;
1207 ///
1208 /// # async fn example(conn: &mut Connection) -> Result<(), Box<dyn std::error::Error>> {
1209 /// let files = vec![
1210 /// PathBuf::from("/data/part1.parquet"),
1211 /// PathBuf::from("/data/part2.parquet"),
1212 /// ];
1213 ///
1214 /// let options = ParquetImportOptions::default();
1215 /// let rows = conn.import_parquet_from_files("my_table", files, options).await?;
1216 /// # Ok(())
1217 /// # }
1218 /// ```
1219 pub async fn import_parquet_from_files<S: crate::import::IntoFileSources>(
1220 &mut self,
1221 table: &str,
1222 paths: S,
1223 options: crate::import::parquet::ParquetImportOptions,
1224 ) -> Result<u64, crate::import::ImportError> {
1225 // Pass Exasol host/port from connection params to import options
1226 let options = options
1227 .with_exasol_host(&self.params.host)
1228 .with_exasol_port(self.params.port);
1229
1230 crate::import::parquet::import_from_parquet_files(
1231 self.make_sql_executor(),
1232 table,
1233 paths,
1234 options,
1235 )
1236 .await
1237 }
1238
1239 /// Export data from an Exasol table or query to a Parquet file.
1240 ///
1241 /// This method exports data from the specified source to a Parquet file.
1242 /// The data is first received as CSV from Exasol, then converted to Parquet format.
1243 ///
1244 /// # Arguments
1245 ///
1246 /// * `source` - The data source (table or query)
1247 /// * `file_path` - Path to the output Parquet file
1248 /// * `options` - Export options
1249 ///
1250 /// # Returns
1251 ///
1252 /// The number of rows exported on success.
1253 ///
1254 /// # Errors
1255 ///
1256 /// Returns `ExportError` if the export fails.
1257 ///
1258 /// # Example
1259 ///
1260 pub async fn export_to_parquet(
1261 &mut self,
1262 source: crate::query::export::ExportSource,
1263 file_path: &std::path::Path,
1264 options: crate::export::parquet::ParquetExportOptions,
1265 ) -> Result<u64, crate::export::csv::ExportError> {
1266 // Pass Exasol host/port from connection params to export options
1267 let options = options
1268 .exasol_host(&self.params.host)
1269 .exasol_port(self.params.port);
1270
1271 let mut transport_guard = self.transport.lock().await;
1272 crate::export::parquet::export_to_parquet_via_transport(
1273 &mut *transport_guard,
1274 source,
1275 file_path,
1276 options,
1277 )
1278 .await
1279 }
1280
1281 /// Import data from an Arrow RecordBatch into an Exasol table.
1282 ///
1283 /// This method converts the RecordBatch to CSV format and imports it
1284 /// into the target table using Exasol's HTTP transport layer.
1285 ///
1286 /// # Arguments
1287 ///
1288 /// * `table` - Name of the target table
1289 /// * `batch` - The RecordBatch to import
1290 /// * `options` - Import options
1291 ///
1292 /// # Returns
1293 ///
1294 /// The number of rows imported on success.
1295 ///
1296 /// # Errors
1297 ///
1298 /// Returns `ImportError` if the import fails.
1299 ///
1300 /// # Example
1301 ///
1302 pub async fn import_from_record_batch(
1303 &mut self,
1304 table: &str,
1305 batch: &RecordBatch,
1306 options: crate::import::arrow::ArrowImportOptions,
1307 ) -> Result<u64, crate::import::ImportError> {
1308 // Pass Exasol host/port from connection params to import options
1309 let options = options
1310 .exasol_host(&self.params.host)
1311 .exasol_port(self.params.port);
1312
1313 crate::import::arrow::import_from_record_batch(
1314 self.make_sql_executor(),
1315 table,
1316 batch,
1317 options,
1318 )
1319 .await
1320 }
1321
1322 /// Import data from multiple Arrow RecordBatches into an Exasol table.
1323 ///
1324 /// This method converts each RecordBatch to CSV format and imports them
1325 /// into the target table using Exasol's HTTP transport layer.
1326 ///
1327 /// # Arguments
1328 ///
1329 /// * `table` - Name of the target table
1330 /// * `batches` - An iterator of RecordBatches to import
1331 /// * `options` - Import options
1332 ///
1333 /// # Returns
1334 ///
1335 /// The number of rows imported on success.
1336 ///
1337 /// # Errors
1338 ///
1339 /// Returns `ImportError` if the import fails.
1340 pub async fn import_from_record_batches<I>(
1341 &mut self,
1342 table: &str,
1343 batches: I,
1344 options: crate::import::arrow::ArrowImportOptions,
1345 ) -> Result<u64, crate::import::ImportError>
1346 where
1347 I: IntoIterator<Item = RecordBatch>,
1348 {
1349 // Pass Exasol host/port from connection params to import options
1350 let options = options
1351 .exasol_host(&self.params.host)
1352 .exasol_port(self.params.port);
1353
1354 crate::import::arrow::import_from_record_batches(
1355 self.make_sql_executor(),
1356 table,
1357 batches,
1358 options,
1359 )
1360 .await
1361 }
1362
1363 /// Import data from an Arrow IPC file/stream into an Exasol table.
1364 ///
1365 /// This method reads Arrow IPC format data, converts it to CSV,
1366 /// and imports it into the target table using Exasol's HTTP transport layer.
1367 ///
1368 /// # Arguments
1369 ///
1370 /// * `table` - Name of the target table
1371 /// * `reader` - An async reader containing Arrow IPC data
1372 /// * `options` - Import options
1373 ///
1374 /// # Returns
1375 ///
1376 /// The number of rows imported on success.
1377 ///
1378 /// # Errors
1379 ///
1380 /// Returns `ImportError` if the import fails.
1381 ///
1382 /// # Example
1383 ///
1384 pub async fn import_from_arrow_ipc<R>(
1385 &mut self,
1386 table: &str,
1387 reader: R,
1388 options: crate::import::arrow::ArrowImportOptions,
1389 ) -> Result<u64, crate::import::ImportError>
1390 where
1391 R: tokio::io::AsyncRead + Unpin + Send,
1392 {
1393 // Pass Exasol host/port from connection params to import options
1394 let options = options
1395 .exasol_host(&self.params.host)
1396 .exasol_port(self.params.port);
1397
1398 crate::import::arrow::import_from_arrow_ipc(
1399 self.make_sql_executor(),
1400 table,
1401 reader,
1402 options,
1403 )
1404 .await
1405 }
1406
1407 /// Export data from an Exasol table or query to Arrow RecordBatches.
1408 ///
1409 /// This method exports data from the specified source and converts it
1410 /// to Arrow RecordBatches.
1411 ///
1412 /// # Arguments
1413 ///
1414 /// * `source` - The data source (table or query)
1415 /// * `options` - Export options
1416 ///
1417 /// # Returns
1418 ///
1419 /// A vector of RecordBatches on success.
1420 ///
1421 /// # Errors
1422 ///
1423 /// Returns `ExportError` if the export fails.
1424 ///
1425 /// # Example
1426 ///
1427 pub async fn export_to_record_batches(
1428 &mut self,
1429 source: crate::query::export::ExportSource,
1430 options: crate::export::arrow::ArrowExportOptions,
1431 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1432 // Pass Exasol host/port from connection params to export options
1433 let options = options
1434 .exasol_host(&self.params.host)
1435 .exasol_port(self.params.port);
1436
1437 let mut transport_guard = self.transport.lock().await;
1438 crate::export::arrow::export_to_record_batches(&mut *transport_guard, source, options).await
1439 }
1440
1441 /// Export data from an Exasol table or query to an Arrow IPC file.
1442 ///
1443 /// This method exports data from the specified source to an Arrow IPC file.
1444 ///
1445 /// # Arguments
1446 ///
1447 /// * `source` - The data source (table or query)
1448 /// * `file_path` - Path to the output Arrow IPC file
1449 /// * `options` - Export options
1450 ///
1451 /// # Returns
1452 ///
1453 /// The number of rows exported on success.
1454 ///
1455 /// # Errors
1456 ///
1457 /// Returns `ExportError` if the export fails.
1458 ///
1459 /// # Example
1460 ///
1461 pub async fn export_to_arrow_ipc(
1462 &mut self,
1463 source: crate::query::export::ExportSource,
1464 file_path: &std::path::Path,
1465 options: crate::export::arrow::ArrowExportOptions,
1466 ) -> Result<u64, crate::export::csv::ExportError> {
1467 // Pass Exasol host/port from connection params to export options
1468 let options = options
1469 .exasol_host(&self.params.host)
1470 .exasol_port(self.params.port);
1471
1472 let mut transport_guard = self.transport.lock().await;
1473 crate::export::arrow::export_to_arrow_ipc(&mut *transport_guard, source, file_path, options)
1474 .await
1475 }
1476
1477 // ========================================================================
1478 // Blocking Import/Export Methods
1479 // ========================================================================
1480
1481 /// Import CSV data from a file into an Exasol table (blocking).
1482 ///
1483 /// This is a synchronous wrapper around [`import_csv_from_file`](Self::import_csv_from_file)
1484 /// for use in non-async contexts.
1485 ///
1486 /// # Arguments
1487 ///
1488 /// * `table` - Name of the target table
1489 /// * `file_path` - Path to the CSV file
1490 /// * `options` - Import options
1491 ///
1492 /// # Returns
1493 ///
1494 /// The number of rows imported on success.
1495 ///
1496 /// # Errors
1497 ///
1498 /// Returns `ImportError` if the import fails.
1499 ///
1500 /// # Example
1501 ///
1502 pub fn blocking_import_csv_from_file(
1503 &mut self,
1504 table: &str,
1505 file_path: &std::path::Path,
1506 options: crate::import::csv::CsvImportOptions,
1507 ) -> Result<u64, crate::import::ImportError> {
1508 blocking_runtime().block_on(self.import_csv_from_file(table, file_path, options))
1509 }
1510
1511 /// Import multiple CSV files in parallel into an Exasol table (blocking).
1512 ///
1513 /// This is a synchronous wrapper around [`import_csv_from_files`](Self::import_csv_from_files)
1514 /// for use in non-async contexts.
1515 ///
1516 /// # Arguments
1517 ///
1518 /// * `table` - Name of the target table
1519 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1520 /// * `options` - Import options
1521 ///
1522 /// # Returns
1523 ///
1524 /// The number of rows imported on success.
1525 ///
1526 /// # Errors
1527 ///
1528 /// Returns `ImportError` if the import fails.
1529 pub fn blocking_import_csv_from_files<S: crate::import::IntoFileSources>(
1530 &mut self,
1531 table: &str,
1532 paths: S,
1533 options: crate::import::csv::CsvImportOptions,
1534 ) -> Result<u64, crate::import::ImportError> {
1535 blocking_runtime().block_on(self.import_csv_from_files(table, paths, options))
1536 }
1537
1538 /// Import data from a Parquet file into an Exasol table (blocking).
1539 ///
1540 /// This is a synchronous wrapper around [`import_from_parquet`](Self::import_from_parquet)
1541 /// for use in non-async contexts.
1542 ///
1543 /// # Arguments
1544 ///
1545 /// * `table` - Name of the target table
1546 /// * `file_path` - Path to the Parquet file
1547 /// * `options` - Import options
1548 ///
1549 /// # Returns
1550 ///
1551 /// The number of rows imported on success.
1552 ///
1553 /// # Errors
1554 ///
1555 /// Returns `ImportError` if the import fails.
1556 ///
1557 /// # Example
1558 ///
1559 pub fn blocking_import_from_parquet(
1560 &mut self,
1561 table: &str,
1562 file_path: &std::path::Path,
1563 options: crate::import::parquet::ParquetImportOptions,
1564 ) -> Result<u64, crate::import::ImportError> {
1565 blocking_runtime().block_on(self.import_from_parquet(table, file_path, options))
1566 }
1567
1568 /// Import multiple Parquet files in parallel into an Exasol table (blocking).
1569 ///
1570 /// This is a synchronous wrapper around [`import_parquet_from_files`](Self::import_parquet_from_files)
1571 /// for use in non-async contexts.
1572 ///
1573 /// # Arguments
1574 ///
1575 /// * `table` - Name of the target table
1576 /// * `paths` - File paths (accepts single path, Vec, array, or slice)
1577 /// * `options` - Import options
1578 ///
1579 /// # Returns
1580 ///
1581 /// The number of rows imported on success.
1582 ///
1583 /// # Errors
1584 ///
1585 /// Returns `ImportError` if the import fails.
1586 pub fn blocking_import_parquet_from_files<S: crate::import::IntoFileSources>(
1587 &mut self,
1588 table: &str,
1589 paths: S,
1590 options: crate::import::parquet::ParquetImportOptions,
1591 ) -> Result<u64, crate::import::ImportError> {
1592 blocking_runtime().block_on(self.import_parquet_from_files(table, paths, options))
1593 }
1594
1595 /// Import data from an Arrow RecordBatch into an Exasol table (blocking).
1596 ///
1597 /// This is a synchronous wrapper around [`import_from_record_batch`](Self::import_from_record_batch)
1598 /// for use in non-async contexts.
1599 ///
1600 /// # Arguments
1601 ///
1602 /// * `table` - Name of the target table
1603 /// * `batch` - The RecordBatch to import
1604 /// * `options` - Import options
1605 ///
1606 /// # Returns
1607 ///
1608 /// The number of rows imported on success.
1609 ///
1610 /// # Errors
1611 ///
1612 /// Returns `ImportError` if the import fails.
1613 ///
1614 /// # Example
1615 ///
1616 pub fn blocking_import_from_record_batch(
1617 &mut self,
1618 table: &str,
1619 batch: &RecordBatch,
1620 options: crate::import::arrow::ArrowImportOptions,
1621 ) -> Result<u64, crate::import::ImportError> {
1622 blocking_runtime().block_on(self.import_from_record_batch(table, batch, options))
1623 }
1624
1625 /// Import data from an Arrow IPC file into an Exasol table (blocking).
1626 ///
1627 /// This is a synchronous wrapper around [`import_from_arrow_ipc`](Self::import_from_arrow_ipc)
1628 /// for use in non-async contexts.
1629 ///
1630 /// Note: This method requires a synchronous reader that implements `std::io::Read`.
1631 /// The data will be read into memory before being imported.
1632 ///
1633 /// # Arguments
1634 ///
1635 /// * `table` - Name of the target table
1636 /// * `file_path` - Path to the Arrow IPC file
1637 /// * `options` - Import options
1638 ///
1639 /// # Returns
1640 ///
1641 /// The number of rows imported on success.
1642 ///
1643 /// # Errors
1644 ///
1645 /// Returns `ImportError` if the import fails.
1646 ///
1647 /// # Example
1648 ///
1649 pub fn blocking_import_from_arrow_ipc(
1650 &mut self,
1651 table: &str,
1652 file_path: &std::path::Path,
1653 options: crate::import::arrow::ArrowImportOptions,
1654 ) -> Result<u64, crate::import::ImportError> {
1655 blocking_runtime().block_on(async {
1656 let file = tokio::fs::File::open(file_path)
1657 .await
1658 .map_err(crate::import::ImportError::IoError)?;
1659 self.import_from_arrow_ipc(table, file, options).await
1660 })
1661 }
1662
1663 /// Export data from an Exasol table or query to a CSV file (blocking).
1664 ///
1665 /// This is a synchronous wrapper around [`export_csv_to_file`](Self::export_csv_to_file)
1666 /// for use in non-async contexts.
1667 ///
1668 /// # Arguments
1669 ///
1670 /// * `source` - The data source (table or query)
1671 /// * `file_path` - Path to the output file
1672 /// * `options` - Export options
1673 ///
1674 /// # Returns
1675 ///
1676 /// The number of rows exported on success.
1677 ///
1678 /// # Errors
1679 ///
1680 /// Returns `ExportError` if the export fails.
1681 ///
1682 /// # Example
1683 ///
1684 pub fn blocking_export_csv_to_file(
1685 &mut self,
1686 source: crate::query::export::ExportSource,
1687 file_path: &std::path::Path,
1688 options: crate::export::csv::CsvExportOptions,
1689 ) -> Result<u64, crate::export::csv::ExportError> {
1690 blocking_runtime().block_on(self.export_csv_to_file(source, file_path, options))
1691 }
1692
1693 /// Export data from an Exasol table or query to a Parquet file (blocking).
1694 ///
1695 /// This is a synchronous wrapper around [`export_to_parquet`](Self::export_to_parquet)
1696 /// for use in non-async contexts.
1697 ///
1698 /// # Arguments
1699 ///
1700 /// * `source` - The data source (table or query)
1701 /// * `file_path` - Path to the output Parquet file
1702 /// * `options` - Export options
1703 ///
1704 /// # Returns
1705 ///
1706 /// The number of rows exported on success.
1707 ///
1708 /// # Errors
1709 ///
1710 /// Returns `ExportError` if the export fails.
1711 ///
1712 /// # Example
1713 ///
1714 pub fn blocking_export_to_parquet(
1715 &mut self,
1716 source: crate::query::export::ExportSource,
1717 file_path: &std::path::Path,
1718 options: crate::export::parquet::ParquetExportOptions,
1719 ) -> Result<u64, crate::export::csv::ExportError> {
1720 blocking_runtime().block_on(self.export_to_parquet(source, file_path, options))
1721 }
1722
1723 /// Export data from an Exasol table or query to Arrow RecordBatches (blocking).
1724 ///
1725 /// This is a synchronous wrapper around [`export_to_record_batches`](Self::export_to_record_batches)
1726 /// for use in non-async contexts.
1727 ///
1728 /// # Arguments
1729 ///
1730 /// * `source` - The data source (table or query)
1731 /// * `options` - Export options
1732 ///
1733 /// # Returns
1734 ///
1735 /// A vector of RecordBatches on success.
1736 ///
1737 /// # Errors
1738 ///
1739 /// Returns `ExportError` if the export fails.
1740 ///
1741 /// # Example
1742 ///
1743 pub fn blocking_export_to_record_batches(
1744 &mut self,
1745 source: crate::query::export::ExportSource,
1746 options: crate::export::arrow::ArrowExportOptions,
1747 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> {
1748 blocking_runtime().block_on(self.export_to_record_batches(source, options))
1749 }
1750
1751 /// Export data from an Exasol table or query to an Arrow IPC file (blocking).
1752 ///
1753 /// This is a synchronous wrapper around [`export_to_arrow_ipc`](Self::export_to_arrow_ipc)
1754 /// for use in non-async contexts.
1755 ///
1756 /// # Arguments
1757 ///
1758 /// * `source` - The data source (table or query)
1759 /// * `file_path` - Path to the output Arrow IPC file
1760 /// * `options` - Export options
1761 ///
1762 /// # Returns
1763 ///
1764 /// The number of rows exported on success.
1765 ///
1766 /// # Errors
1767 ///
1768 /// Returns `ExportError` if the export fails.
1769 ///
1770 /// # Example
1771 ///
1772 pub fn blocking_export_to_arrow_ipc(
1773 &mut self,
1774 source: crate::query::export::ExportSource,
1775 file_path: &std::path::Path,
1776 options: crate::export::arrow::ArrowExportOptions,
1777 ) -> Result<u64, crate::export::csv::ExportError> {
1778 blocking_runtime().block_on(self.export_to_arrow_ipc(source, file_path, options))
1779 }
1780
1781 // ========================================================================
1782 // Private Helper Methods
1783 // ========================================================================
1784
1785 /// Update session state after query execution.
1786 async fn update_session_state_after_query(&self) {
1787 if self.session.in_transaction() {
1788 self.session.set_state(SessionState::InTransaction).await;
1789 } else {
1790 self.session.set_state(SessionState::Ready).await;
1791 }
1792 self.session.update_activity().await;
1793 }
1794}
1795
1796impl std::fmt::Debug for Connection {
1797 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1798 f.debug_struct("Connection")
1799 .field("session_id", &self.session.session_id())
1800 .field("host", &self.params.host)
1801 .field("port", &self.params.port)
1802 .field("username", &self.params.username)
1803 .field("in_transaction", &self.session.in_transaction())
1804 .finish()
1805 }
1806}
1807
1808/// Builder for creating Connection instances.
1809pub struct ConnectionBuilder {
1810 /// Connection parameters builder
1811 params_builder: crate::connection::params::ConnectionBuilder,
1812}
1813
1814impl ConnectionBuilder {
1815 /// Create a new ConnectionBuilder.
1816 pub fn new() -> Self {
1817 Self {
1818 params_builder: crate::connection::params::ConnectionBuilder::new(),
1819 }
1820 }
1821
1822 /// Set the database host.
1823 pub fn host(mut self, host: &str) -> Self {
1824 self.params_builder = self.params_builder.host(host);
1825 self
1826 }
1827
1828 /// Set the database port.
1829 pub fn port(mut self, port: u16) -> Self {
1830 self.params_builder = self.params_builder.port(port);
1831 self
1832 }
1833
1834 /// Set the username.
1835 pub fn username(mut self, username: &str) -> Self {
1836 self.params_builder = self.params_builder.username(username);
1837 self
1838 }
1839
1840 /// Set the password.
1841 pub fn password(mut self, password: &str) -> Self {
1842 self.params_builder = self.params_builder.password(password);
1843 self
1844 }
1845
1846 /// Set the default schema.
1847 pub fn schema(mut self, schema: &str) -> Self {
1848 self.params_builder = self.params_builder.schema(schema);
1849 self
1850 }
1851
1852 /// Enable or disable TLS.
1853 pub fn use_tls(mut self, use_tls: bool) -> Self {
1854 self.params_builder = self.params_builder.use_tls(use_tls);
1855 self
1856 }
1857
1858 /// Build and connect.
1859 ///
1860 /// # Returns
1861 ///
1862 /// A connected `Connection` instance.
1863 ///
1864 /// # Errors
1865 ///
1866 /// Returns `ExasolError` if the connection fails.
1867 pub async fn connect(self) -> Result<Connection, ExasolError> {
1868 let params = self.params_builder.build()?;
1869 Ok(Connection::from_params(params).await?)
1870 }
1871}
1872
1873impl Default for ConnectionBuilder {
1874 fn default() -> Self {
1875 Self::new()
1876 }
1877}
1878
1879#[cfg(test)]
1880mod tests {
1881 use super::*;
1882
1883 #[test]
1884 fn test_connection_builder() {
1885 let _builder = ConnectionBuilder::new()
1886 .host("localhost")
1887 .port(8563)
1888 .username("test")
1889 .password("secret")
1890 .schema("MY_SCHEMA")
1891 .use_tls(false);
1892
1893 // Builder should compile and be valid
1894 // Actual connection requires a running Exasol instance
1895 }
1896
1897 #[test]
1898 fn test_create_statement_is_sync() {
1899 // This test verifies that create_statement is synchronous
1900 // by calling it without await
1901 // Note: We can't actually create a Connection without a database,
1902 // but we can verify the API compiles correctly
1903 }
1904}
1905
1906/// Blocking wrapper tests
1907#[cfg(test)]
1908mod blocking_tests {
1909 use super::*;
1910
1911 #[test]
1912 fn test_session_type_alias_exists() {
1913 // Verify that Session is a type alias for Connection
1914 fn takes_session(_session: &Session) {}
1915 fn takes_connection(_connection: &Connection) {}
1916
1917 // This should compile, showing Session = Connection
1918 fn verify_interchangeable<F1, F2>(f1: F1, f2: F2)
1919 where
1920 F1: Fn(&Session),
1921 F2: Fn(&Connection),
1922 {
1923 let _ = (f1, f2);
1924 }
1925
1926 verify_interchangeable(takes_session, takes_connection);
1927 }
1928
1929 #[test]
1930 fn test_blocking_runtime_exists() {
1931 // Verify that blocking_runtime() function exists and returns a Runtime
1932 let runtime = blocking_runtime();
1933 // If this compiles, the runtime is valid
1934 let _ = runtime.handle();
1935 }
1936
1937 #[test]
1938 fn test_connection_has_blocking_import_csv() {
1939 // This test verifies the blocking_import_csv_from_file method exists
1940 // by checking the method signature compiles
1941 fn _check_method_exists(_conn: &mut Connection) {
1942 // Method signature check - will fail to compile if method doesn't exist
1943 let _: fn(
1944 &mut Connection,
1945 &str,
1946 &std::path::Path,
1947 crate::import::csv::CsvImportOptions,
1948 ) -> Result<u64, crate::import::ImportError> =
1949 Connection::blocking_import_csv_from_file;
1950 }
1951 }
1952
1953 #[test]
1954 fn test_connection_has_blocking_import_parquet() {
1955 fn _check_method_exists(_conn: &mut Connection) {
1956 let _: fn(
1957 &mut Connection,
1958 &str,
1959 &std::path::Path,
1960 crate::import::parquet::ParquetImportOptions,
1961 ) -> Result<u64, crate::import::ImportError> = Connection::blocking_import_from_parquet;
1962 }
1963 }
1964
1965 #[test]
1966 fn test_connection_has_blocking_import_record_batch() {
1967 fn _check_method_exists(_conn: &mut Connection) {
1968 let _: fn(
1969 &mut Connection,
1970 &str,
1971 &RecordBatch,
1972 crate::import::arrow::ArrowImportOptions,
1973 ) -> Result<u64, crate::import::ImportError> =
1974 Connection::blocking_import_from_record_batch;
1975 }
1976 }
1977
1978 #[test]
1979 fn test_connection_has_blocking_import_arrow_ipc() {
1980 fn _check_method_exists(_conn: &mut Connection) {
1981 let _: fn(
1982 &mut Connection,
1983 &str,
1984 &std::path::Path,
1985 crate::import::arrow::ArrowImportOptions,
1986 ) -> Result<u64, crate::import::ImportError> =
1987 Connection::blocking_import_from_arrow_ipc;
1988 }
1989 }
1990
1991 #[test]
1992 fn test_connection_has_blocking_export_csv() {
1993 fn _check_method_exists(_conn: &mut Connection) {
1994 let _: fn(
1995 &mut Connection,
1996 crate::query::export::ExportSource,
1997 &std::path::Path,
1998 crate::export::csv::CsvExportOptions,
1999 ) -> Result<u64, crate::export::csv::ExportError> =
2000 Connection::blocking_export_csv_to_file;
2001 }
2002 }
2003
2004 #[test]
2005 fn test_connection_has_blocking_export_parquet() {
2006 fn _check_method_exists(_conn: &mut Connection) {
2007 let _: fn(
2008 &mut Connection,
2009 crate::query::export::ExportSource,
2010 &std::path::Path,
2011 crate::export::parquet::ParquetExportOptions,
2012 ) -> Result<u64, crate::export::csv::ExportError> =
2013 Connection::blocking_export_to_parquet;
2014 }
2015 }
2016
2017 #[test]
2018 fn test_connection_has_blocking_export_record_batches() {
2019 fn _check_method_exists(_conn: &mut Connection) {
2020 let _: fn(
2021 &mut Connection,
2022 crate::query::export::ExportSource,
2023 crate::export::arrow::ArrowExportOptions,
2024 ) -> Result<Vec<RecordBatch>, crate::export::csv::ExportError> =
2025 Connection::blocking_export_to_record_batches;
2026 }
2027 }
2028
2029 #[test]
2030 fn test_connection_has_blocking_export_arrow_ipc() {
2031 fn _check_method_exists(_conn: &mut Connection) {
2032 let _: fn(
2033 &mut Connection,
2034 crate::query::export::ExportSource,
2035 &std::path::Path,
2036 crate::export::arrow::ArrowExportOptions,
2037 ) -> Result<u64, crate::export::csv::ExportError> =
2038 Connection::blocking_export_to_arrow_ipc;
2039 }
2040 }
2041}