chdb_rust/connection.rs
1//! Connection management for chDB.
2//!
3//! This module provides the [`Connection`] type for managing connections to chDB databases.
4
5use std::ffi::{c_char, CString};
6
7use crate::arrow_stream::{ArrowArray, ArrowSchema, ArrowStream};
8use crate::bindings;
9use crate::error::{Error, Result};
10use crate::format::OutputFormat;
11use crate::query_result::QueryResult;
12
13/// A connection to a chDB database.
14///
15/// A `Connection` represents an active connection to a chDB database instance.
16/// Connections can be created for in-memory databases or persistent databases
17/// stored on disk.
18///
19/// # Thread Safety
20///
21/// `Connection` implements `Send`, meaning it can be safely transferred between threads.
22/// However, the underlying chDB library may have limitations on concurrent access.
23/// It's recommended to use one connection per thread or implement proper synchronization.
24///
25/// # Examples
26///
27/// ```no_run
28/// use chdb_rust::connection::Connection;
29/// use chdb_rust::format::OutputFormat;
30///
31/// // Create an in-memory connection
32/// let conn = Connection::open_in_memory()?;
33///
34/// // Execute a query
35/// let result = conn.query("SELECT 1", OutputFormat::JSONEachRow)?;
36/// println!("{}", result.data_utf8_lossy());
37/// # Ok::<(), chdb_rust::error::Error>(())
38/// ```
39#[derive(Debug)]
40pub struct Connection {
41 // Pointer to chdb_connection (which is *mut chdb_connection_)
42 inner: *mut bindings::chdb_connection,
43}
44
45// Safety: Connection is safe to send between threads
46// The underlying chDB library is thread-safe for query execution
47unsafe impl Send for Connection {}
48
49impl Connection {
50 /// Connect to chDB with the given command-line arguments.
51 ///
52 /// This is a low-level function that allows you to pass arbitrary arguments
53 /// to the chDB connection. For most use cases, prefer [`open_in_memory`](Self::open_in_memory)
54 /// or [`open_with_path`](Self::open_with_path).
55 ///
56 /// # Arguments
57 ///
58 /// * `args` - Array of command-line arguments (e.g., `["clickhouse", "--path=/tmp/db"]`)
59 ///
60 /// # Examples
61 ///
62 /// ```no_run
63 /// use chdb_rust::connection::Connection;
64 ///
65 /// // Connect with custom arguments
66 /// let conn = Connection::open(&["clickhouse", "--path=/tmp/mydb"])?;
67 /// # Ok::<(), chdb_rust::error::Error>(())
68 /// ```
69 ///
70 /// # Errors
71 ///
72 /// Returns [`Error::ConnectionFailed`] if the
73 /// connection cannot be established.
74 pub fn open(args: &[&str]) -> Result<Self> {
75 let c_args: Vec<CString> = args
76 .iter()
77 .map(|s| CString::new(*s))
78 .collect::<std::result::Result<Vec<_>, _>>()?;
79
80 let mut argv: Vec<*mut c_char> = c_args.iter().map(|s| s.as_ptr() as *mut c_char).collect();
81
82 let conn_ptr = unsafe { bindings::chdb_connect(argv.len() as i32, argv.as_mut_ptr()) };
83
84 if conn_ptr.is_null() {
85 return Err(Error::ConnectionFailed);
86 }
87
88 // Check if the connection itself is null
89 let conn = unsafe { *conn_ptr };
90 if conn.is_null() {
91 return Err(Error::ConnectionFailed);
92 }
93
94 Ok(Self { inner: conn_ptr })
95 }
96
97 /// Connect to an in-memory database.
98 ///
99 /// Creates a connection to a temporary in-memory database. Data stored in this
100 /// database will be lost when the connection is closed.
101 ///
102 /// # Examples
103 ///
104 /// ```no_run
105 /// use chdb_rust::connection::Connection;
106 ///
107 /// let conn = Connection::open_in_memory()?;
108 /// # Ok::<(), chdb_rust::error::Error>(())
109 /// ```
110 ///
111 /// # Errors
112 ///
113 /// Returns [`Error::ConnectionFailed`] if the
114 /// connection cannot be established.
115 pub fn open_in_memory() -> Result<Self> {
116 Self::open(&["clickhouse"])
117 }
118
119 /// Connect to a database at the given path.
120 ///
121 /// Creates a connection to a persistent database stored at the specified path.
122 /// The directory will be created if it doesn't exist.
123 ///
124 /// # Arguments
125 ///
126 /// * `path` - The filesystem path where the database should be stored
127 ///
128 /// # Examples
129 ///
130 /// ```no_run
131 /// use chdb_rust::connection::Connection;
132 ///
133 /// let conn = Connection::open_with_path("/tmp/mydb")?;
134 /// # Ok::<(), chdb_rust::error::Error>(())
135 /// ```
136 ///
137 /// # Errors
138 ///
139 /// Returns [`Error::ConnectionFailed`] if the
140 /// connection cannot be established.
141 pub fn open_with_path(path: &str) -> Result<Self> {
142 let path_arg = format!("--path={path}");
143 Self::open(&["clickhouse", &path_arg])
144 }
145
146 /// Execute a query and return the result.
147 ///
148 /// Executes a SQL query against the database and returns the result in the
149 /// specified output format.
150 ///
151 /// # Arguments
152 ///
153 /// * `sql` - The SQL query string to execute
154 /// * `format` - The desired output format for the result
155 ///
156 /// # Returns
157 ///
158 /// Returns a [`QueryResult`] containing the query output, or an [`Error`]
159 /// if the query fails.
160 ///
161 /// # Examples
162 ///
163 /// ```no_run
164 /// use chdb_rust::connection::Connection;
165 /// use chdb_rust::format::OutputFormat;
166 ///
167 /// let conn = Connection::open_in_memory()?;
168 /// let result = conn.query("SELECT 1 + 1 AS sum", OutputFormat::JSONEachRow)?;
169 /// println!("{}", result.data_utf8_lossy());
170 /// # Ok::<(), chdb_rust::error::Error>(())
171 /// ```
172 ///
173 /// # Errors
174 ///
175 /// Returns an error if:
176 /// - The query syntax is invalid
177 /// - The query references non-existent tables or columns
178 /// - The query execution fails for any other reason
179 pub fn query(&self, sql: &str, format: OutputFormat) -> Result<QueryResult> {
180 let query_cstr = CString::new(sql)?;
181 let format_cstr = CString::new(format.as_str())?;
182
183 // chdb_query takes chdb_connection (which is *mut chdb_connection_)
184 let conn = unsafe { *self.inner };
185 let result_ptr =
186 unsafe { bindings::chdb_query(conn, query_cstr.as_ptr(), format_cstr.as_ptr()) };
187
188 if result_ptr.is_null() {
189 return Err(Error::NoResult);
190 }
191
192 let result = QueryResult::new(result_ptr);
193 result.check_error()
194 }
195
196 /// Register an Arrow stream as a table function with the given name.
197 ///
198 /// This function registers an Arrow stream as a virtual table that can be queried
199 /// using SQL. The table will be available for queries until it is unregistered.
200 ///
201 /// # Arguments
202 ///
203 /// * `table_name` - The name to register for the Arrow stream table function
204 /// * `arrow_stream` - The Arrow stream handle to register
205 ///
206 /// # Returns
207 ///
208 /// Returns `Ok(())` on success, or an [`Error`] if registration fails.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use chdb_rust::connection::Connection;
214 /// use chdb_rust::arrow_stream::ArrowStream;
215 ///
216 /// let conn = Connection::open_in_memory()?;
217 ///
218 /// // Assuming you have an Arrow stream handle
219 /// // let arrow_stream = ArrowStream::from_raw(stream_ptr);
220 /// // conn.register_arrow_stream("my_data", &arrow_stream)?;
221 ///
222 /// // Now you can query it
223 /// // let result = conn.query("SELECT * FROM my_data", OutputFormat::JSONEachRow)?;
224 /// # Ok::<(), chdb_rust::error::Error>(())
225 /// ```
226 ///
227 /// # Errors
228 ///
229 /// Returns an error if:
230 /// - The table name contains invalid characters
231 /// - The Arrow stream handle is invalid
232 /// - Registration fails for any other reason
233 pub fn register_arrow_stream(
234 &self,
235 table_name: &str,
236 arrow_stream: &ArrowStream,
237 ) -> Result<()> {
238 let table_name_cstr = CString::new(table_name)?;
239 let conn = unsafe { *self.inner };
240
241 let state = unsafe {
242 bindings::chdb_arrow_scan(conn, table_name_cstr.as_ptr(), arrow_stream.as_raw())
243 };
244
245 if state == bindings::chdb_state_CHDBSuccess {
246 Ok(())
247 } else {
248 Err(Error::QueryError(format!(
249 "Failed to register Arrow stream as table '{}'",
250 table_name
251 )))
252 }
253 }
254
255 /// Register an Arrow array as a table function with the given name.
256 ///
257 /// This function registers an Arrow array (with its schema) as a virtual table
258 /// that can be queried using SQL. The table will be available for queries until
259 /// it is unregistered.
260 ///
261 /// # Arguments
262 ///
263 /// * `table_name` - The name to register for the Arrow array table function
264 /// * `arrow_schema` - The Arrow schema handle describing the array structure
265 /// * `arrow_array` - The Arrow array handle containing the data
266 ///
267 /// # Returns
268 ///
269 /// Returns `Ok(())` on success, or an [`Error`] if registration fails.
270 ///
271 /// # Examples
272 ///
273 /// ```no_run
274 /// use chdb_rust::connection::Connection;
275 /// use chdb_rust::arrow_stream::{ArrowSchema, ArrowArray};
276 ///
277 /// let conn = Connection::open_in_memory()?;
278 ///
279 /// // Assuming you have Arrow schema and array handles
280 /// // let arrow_schema = ArrowSchema::from_raw(schema_ptr);
281 /// // let arrow_array = ArrowArray::from_raw(array_ptr);
282 /// // conn.register_arrow_array("my_data", &arrow_schema, &arrow_array)?;
283 ///
284 /// // Now you can query it
285 /// // let result = conn.query("SELECT * FROM my_data", OutputFormat::JSONEachRow)?;
286 /// # Ok::<(), chdb_rust::error::Error>(())
287 /// ```
288 ///
289 /// # Errors
290 ///
291 /// Returns an error if:
292 /// - The table name contains invalid characters
293 /// - The Arrow schema or array handles are invalid
294 /// - Registration fails for any other reason
295 pub fn register_arrow_array(
296 &self,
297 table_name: &str,
298 arrow_schema: &ArrowSchema,
299 arrow_array: &ArrowArray,
300 ) -> Result<()> {
301 let table_name_cstr = CString::new(table_name)?;
302 let conn = unsafe { *self.inner };
303
304 let state = unsafe {
305 bindings::chdb_arrow_array_scan(
306 conn,
307 table_name_cstr.as_ptr(),
308 arrow_schema.as_raw(),
309 arrow_array.as_raw(),
310 )
311 };
312
313 if state == bindings::chdb_state_CHDBSuccess {
314 Ok(())
315 } else {
316 Err(Error::QueryError(format!(
317 "Failed to register Arrow array as table '{}'",
318 table_name
319 )))
320 }
321 }
322
323 /// Unregister an Arrow stream table function that was previously registered.
324 ///
325 /// This function removes a previously registered Arrow stream table function,
326 /// making it no longer available for queries.
327 ///
328 /// # Arguments
329 ///
330 /// * `table_name` - The name of the Arrow stream table function to unregister
331 ///
332 /// # Returns
333 ///
334 /// Returns `Ok(())` on success, or an [`Error`] if unregistration fails.
335 ///
336 /// # Examples
337 ///
338 /// ```no_run
339 /// use chdb_rust::connection::Connection;
340 /// use chdb_rust::arrow_stream::ArrowStream;
341 ///
342 /// let conn = Connection::open_in_memory()?;
343 ///
344 /// // Register a table
345 /// // let arrow_stream = ArrowStream::from_raw(stream_ptr);
346 /// // conn.register_arrow_stream("my_data", &arrow_stream)?;
347 ///
348 /// // Use it...
349 ///
350 /// // Unregister when done
351 /// // conn.unregister_arrow_table("my_data")?;
352 /// # Ok::<(), chdb_rust::error::Error>(())
353 /// ```
354 ///
355 /// # Errors
356 ///
357 /// Returns an error if:
358 /// - The table name contains invalid characters
359 /// - The table was not previously registered
360 /// - Unregistration fails for any other reason
361 pub fn unregister_arrow_table(&self, table_name: &str) -> Result<()> {
362 let table_name_cstr = CString::new(table_name)?;
363 let conn = unsafe { *self.inner };
364
365 let state =
366 unsafe { bindings::chdb_arrow_unregister_table(conn, table_name_cstr.as_ptr()) };
367
368 if state == bindings::chdb_state_CHDBSuccess {
369 Ok(())
370 } else {
371 Err(Error::QueryError(format!(
372 "Failed to unregister Arrow table '{}'",
373 table_name
374 )))
375 }
376 }
377}
378
379impl Drop for Connection {
380 fn drop(&mut self) {
381 if !self.inner.is_null() {
382 unsafe { bindings::chdb_close_conn(self.inner) };
383 }
384 }
385}