Skip to main content

chdb_rust/
connection.rs

1//! Connection management for chDB.
2//!
3//! This module provides the [`Connection`] type for managing connections to chDB databases.
4
5use std::ffi::{c_char, CString};
6
7use crate::arrow_stream::{ArrowArray, ArrowSchema, ArrowStream};
8use crate::bindings;
9use crate::error::{Error, Result};
10use crate::format::OutputFormat;
11use crate::query_result::QueryResult;
12
13/// A connection to a chDB database.
14///
15/// A `Connection` represents an active connection to a chDB database instance.
16/// Connections can be created for in-memory databases or persistent databases
17/// stored on disk.
18///
19/// # Thread Safety
20///
21/// `Connection` implements `Send`, meaning it can be safely transferred between threads.
22/// However, the underlying chDB library may have limitations on concurrent access.
23/// It's recommended to use one connection per thread or implement proper synchronization.
24///
25/// # Examples
26///
27/// ```no_run
28/// use chdb_rust::connection::Connection;
29/// use chdb_rust::format::OutputFormat;
30///
31/// // Create an in-memory connection
32/// let conn = Connection::open_in_memory()?;
33///
34/// // Execute a query
35/// let result = conn.query("SELECT 1", OutputFormat::JSONEachRow)?;
36/// println!("{}", result.data_utf8_lossy());
37/// # Ok::<(), chdb_rust::error::Error>(())
38/// ```
39#[derive(Debug)]
40pub struct Connection {
41    // Pointer to chdb_connection (which is *mut chdb_connection_)
42    inner: *mut bindings::chdb_connection,
43}
44
45// Safety: Connection is safe to send between threads
46// The underlying chDB library is thread-safe for query execution
47unsafe impl Send for Connection {}
48
49impl Connection {
50    /// Connect to chDB with the given command-line arguments.
51    ///
52    /// This is a low-level function that allows you to pass arbitrary arguments
53    /// to the chDB connection. For most use cases, prefer [`open_in_memory`](Self::open_in_memory)
54    /// or [`open_with_path`](Self::open_with_path).
55    ///
56    /// # Arguments
57    ///
58    /// * `args` - Array of command-line arguments (e.g., `["clickhouse", "--path=/tmp/db"]`)
59    ///
60    /// # Examples
61    ///
62    /// ```no_run
63    /// use chdb_rust::connection::Connection;
64    ///
65    /// // Connect with custom arguments
66    /// let conn = Connection::open(&["clickhouse", "--path=/tmp/mydb"])?;
67    /// # Ok::<(), chdb_rust::error::Error>(())
68    /// ```
69    ///
70    /// # Errors
71    ///
72    /// Returns [`Error::ConnectionFailed`] if the
73    /// connection cannot be established.
74    pub fn open(args: &[&str]) -> Result<Self> {
75        let c_args: Vec<CString> = args
76            .iter()
77            .map(|s| CString::new(*s))
78            .collect::<std::result::Result<Vec<_>, _>>()?;
79
80        let mut argv: Vec<*mut c_char> = c_args.iter().map(|s| s.as_ptr() as *mut c_char).collect();
81
82        let conn_ptr = unsafe { bindings::chdb_connect(argv.len() as i32, argv.as_mut_ptr()) };
83
84        if conn_ptr.is_null() {
85            return Err(Error::ConnectionFailed);
86        }
87
88        // Check if the connection itself is null
89        let conn = unsafe { *conn_ptr };
90        if conn.is_null() {
91            return Err(Error::ConnectionFailed);
92        }
93
94        Ok(Self { inner: conn_ptr })
95    }
96
97    /// Connect to an in-memory database.
98    ///
99    /// Creates a connection to a temporary in-memory database. Data stored in this
100    /// database will be lost when the connection is closed.
101    ///
102    /// # Examples
103    ///
104    /// ```no_run
105    /// use chdb_rust::connection::Connection;
106    ///
107    /// let conn = Connection::open_in_memory()?;
108    /// # Ok::<(), chdb_rust::error::Error>(())
109    /// ```
110    ///
111    /// # Errors
112    ///
113    /// Returns [`Error::ConnectionFailed`] if the
114    /// connection cannot be established.
115    pub fn open_in_memory() -> Result<Self> {
116        Self::open(&["clickhouse"])
117    }
118
119    /// Connect to a database at the given path.
120    ///
121    /// Creates a connection to a persistent database stored at the specified path.
122    /// The directory will be created if it doesn't exist.
123    ///
124    /// # Arguments
125    ///
126    /// * `path` - The filesystem path where the database should be stored
127    ///
128    /// # Examples
129    ///
130    /// ```no_run
131    /// use chdb_rust::connection::Connection;
132    ///
133    /// let conn = Connection::open_with_path("/tmp/mydb")?;
134    /// # Ok::<(), chdb_rust::error::Error>(())
135    /// ```
136    ///
137    /// # Errors
138    ///
139    /// Returns [`Error::ConnectionFailed`] if the
140    /// connection cannot be established.
141    pub fn open_with_path(path: &str) -> Result<Self> {
142        let path_arg = format!("--path={path}");
143        Self::open(&["clickhouse", &path_arg])
144    }
145
146    /// Execute a query and return the result.
147    ///
148    /// Executes a SQL query against the database and returns the result in the
149    /// specified output format.
150    ///
151    /// # Arguments
152    ///
153    /// * `sql` - The SQL query string to execute
154    /// * `format` - The desired output format for the result
155    ///
156    /// # Returns
157    ///
158    /// Returns a [`QueryResult`] containing the query output, or an [`Error`]
159    /// if the query fails.
160    ///
161    /// # Examples
162    ///
163    /// ```no_run
164    /// use chdb_rust::connection::Connection;
165    /// use chdb_rust::format::OutputFormat;
166    ///
167    /// let conn = Connection::open_in_memory()?;
168    /// let result = conn.query("SELECT 1 + 1 AS sum", OutputFormat::JSONEachRow)?;
169    /// println!("{}", result.data_utf8_lossy());
170    /// # Ok::<(), chdb_rust::error::Error>(())
171    /// ```
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if:
176    /// - The query syntax is invalid
177    /// - The query references non-existent tables or columns
178    /// - The query execution fails for any other reason
179    pub fn query(&self, sql: &str, format: OutputFormat) -> Result<QueryResult> {
180        let query_cstr = CString::new(sql)?;
181        let format_cstr = CString::new(format.as_str())?;
182
183        // chdb_query takes chdb_connection (which is *mut chdb_connection_)
184        let conn = unsafe { *self.inner };
185        let result_ptr =
186            unsafe { bindings::chdb_query(conn, query_cstr.as_ptr(), format_cstr.as_ptr()) };
187
188        if result_ptr.is_null() {
189            return Err(Error::NoResult);
190        }
191
192        let result = QueryResult::new(result_ptr);
193        result.check_error()
194    }
195
196    /// Register an Arrow stream as a table function with the given name.
197    ///
198    /// This function registers an Arrow stream as a virtual table that can be queried
199    /// using SQL. The table will be available for queries until it is unregistered.
200    ///
201    /// # Arguments
202    ///
203    /// * `table_name` - The name to register for the Arrow stream table function
204    /// * `arrow_stream` - The Arrow stream handle to register
205    ///
206    /// # Returns
207    ///
208    /// Returns `Ok(())` on success, or an [`Error`] if registration fails.
209    ///
210    /// # Examples
211    ///
212    /// ```no_run
213    /// use chdb_rust::connection::Connection;
214    /// use chdb_rust::arrow_stream::ArrowStream;
215    ///
216    /// let conn = Connection::open_in_memory()?;
217    ///
218    /// // Assuming you have an Arrow stream handle
219    /// // let arrow_stream = ArrowStream::from_raw(stream_ptr);
220    /// // conn.register_arrow_stream("my_data", &arrow_stream)?;
221    ///
222    /// // Now you can query it
223    /// // let result = conn.query("SELECT * FROM my_data", OutputFormat::JSONEachRow)?;
224    /// # Ok::<(), chdb_rust::error::Error>(())
225    /// ```
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if:
230    /// - The table name contains invalid characters
231    /// - The Arrow stream handle is invalid
232    /// - Registration fails for any other reason
233    pub fn register_arrow_stream(
234        &self,
235        table_name: &str,
236        arrow_stream: &ArrowStream,
237    ) -> Result<()> {
238        let table_name_cstr = CString::new(table_name)?;
239        let conn = unsafe { *self.inner };
240
241        let state = unsafe {
242            bindings::chdb_arrow_scan(conn, table_name_cstr.as_ptr(), arrow_stream.as_raw())
243        };
244
245        if state == bindings::chdb_state_CHDBSuccess {
246            Ok(())
247        } else {
248            Err(Error::QueryError(format!(
249                "Failed to register Arrow stream as table '{}'",
250                table_name
251            )))
252        }
253    }
254
255    /// Register an Arrow array as a table function with the given name.
256    ///
257    /// This function registers an Arrow array (with its schema) as a virtual table
258    /// that can be queried using SQL. The table will be available for queries until
259    /// it is unregistered.
260    ///
261    /// # Arguments
262    ///
263    /// * `table_name` - The name to register for the Arrow array table function
264    /// * `arrow_schema` - The Arrow schema handle describing the array structure
265    /// * `arrow_array` - The Arrow array handle containing the data
266    ///
267    /// # Returns
268    ///
269    /// Returns `Ok(())` on success, or an [`Error`] if registration fails.
270    ///
271    /// # Examples
272    ///
273    /// ```no_run
274    /// use chdb_rust::connection::Connection;
275    /// use chdb_rust::arrow_stream::{ArrowSchema, ArrowArray};
276    ///
277    /// let conn = Connection::open_in_memory()?;
278    ///
279    /// // Assuming you have Arrow schema and array handles
280    /// // let arrow_schema = ArrowSchema::from_raw(schema_ptr);
281    /// // let arrow_array = ArrowArray::from_raw(array_ptr);
282    /// // conn.register_arrow_array("my_data", &arrow_schema, &arrow_array)?;
283    ///
284    /// // Now you can query it
285    /// // let result = conn.query("SELECT * FROM my_data", OutputFormat::JSONEachRow)?;
286    /// # Ok::<(), chdb_rust::error::Error>(())
287    /// ```
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if:
292    /// - The table name contains invalid characters
293    /// - The Arrow schema or array handles are invalid
294    /// - Registration fails for any other reason
295    pub fn register_arrow_array(
296        &self,
297        table_name: &str,
298        arrow_schema: &ArrowSchema,
299        arrow_array: &ArrowArray,
300    ) -> Result<()> {
301        let table_name_cstr = CString::new(table_name)?;
302        let conn = unsafe { *self.inner };
303
304        let state = unsafe {
305            bindings::chdb_arrow_array_scan(
306                conn,
307                table_name_cstr.as_ptr(),
308                arrow_schema.as_raw(),
309                arrow_array.as_raw(),
310            )
311        };
312
313        if state == bindings::chdb_state_CHDBSuccess {
314            Ok(())
315        } else {
316            Err(Error::QueryError(format!(
317                "Failed to register Arrow array as table '{}'",
318                table_name
319            )))
320        }
321    }
322
323    /// Unregister an Arrow stream table function that was previously registered.
324    ///
325    /// This function removes a previously registered Arrow stream table function,
326    /// making it no longer available for queries.
327    ///
328    /// # Arguments
329    ///
330    /// * `table_name` - The name of the Arrow stream table function to unregister
331    ///
332    /// # Returns
333    ///
334    /// Returns `Ok(())` on success, or an [`Error`] if unregistration fails.
335    ///
336    /// # Examples
337    ///
338    /// ```no_run
339    /// use chdb_rust::connection::Connection;
340    /// use chdb_rust::arrow_stream::ArrowStream;
341    ///
342    /// let conn = Connection::open_in_memory()?;
343    ///
344    /// // Register a table
345    /// // let arrow_stream = ArrowStream::from_raw(stream_ptr);
346    /// // conn.register_arrow_stream("my_data", &arrow_stream)?;
347    ///
348    /// // Use it...
349    ///
350    /// // Unregister when done
351    /// // conn.unregister_arrow_table("my_data")?;
352    /// # Ok::<(), chdb_rust::error::Error>(())
353    /// ```
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if:
358    /// - The table name contains invalid characters
359    /// - The table was not previously registered
360    /// - Unregistration fails for any other reason
361    pub fn unregister_arrow_table(&self, table_name: &str) -> Result<()> {
362        let table_name_cstr = CString::new(table_name)?;
363        let conn = unsafe { *self.inner };
364
365        let state =
366            unsafe { bindings::chdb_arrow_unregister_table(conn, table_name_cstr.as_ptr()) };
367
368        if state == bindings::chdb_state_CHDBSuccess {
369            Ok(())
370        } else {
371            Err(Error::QueryError(format!(
372                "Failed to unregister Arrow table '{}'",
373                table_name
374            )))
375        }
376    }
377}
378
379impl Drop for Connection {
380    fn drop(&mut self) {
381        if !self.inner.is_null() {
382            unsafe { bindings::chdb_close_conn(self.inner) };
383        }
384    }
385}