clickhouse_datafusion/utils/
create.rs

1#![cfg_attr(feature = "mocks", expect(unused_imports))]
2use clickhouse_arrow::CreateOptions;
3use datafusion::arrow::datatypes::SchemaRef;
4use datafusion::error::{DataFusionError, Result};
5use datafusion::sql::TableReference;
6use tracing::{debug, error};
7
8use crate::connection::ClickHouseConnectionPool;
9#[cfg(not(feature = "mocks"))]
10use crate::utils;
11
12/// Create a database in the remote `ClickHouse` instance.
13///
14/// # Errors
15/// - Returns an error if the `ClickHouse` endpoint is unreachable
16/// - Returns an error if the `ClickHouse` database fails to be created
17pub async fn create_database(database: &str, pool: &ClickHouseConnectionPool) -> Result<()> {
18    // Exit early if database is default
19    if database == "default" {
20        return Ok(());
21    }
22
23    #[cfg(not(feature = "mocks"))]
24    {
25        let conn = pool.pool().get().await.map_err(utils::map_external_err)?;
26
27        // Ensure db exists
28        conn.create_database(Some(database), None).await.map_err(|error| {
29            error!(?error, database, "Database creation failed");
30            DataFusionError::External(
31                format!("Failed to create database for {database}: {error}").into(),
32            )
33        })?;
34    }
35
36    #[cfg(feature = "mocks")]
37    let _ = pool.connect().await?;
38
39    Ok(())
40}
41
42/// Create a table in the remote `ClickHouse` instance.
43///
44/// # Errors
45/// - Returns an error if the `ClickHouse` endpoint is unreachable
46/// - Returns an error if the `ClickHouse` schema fails to be created
47#[cfg(not(feature = "mocks"))]
48pub async fn create_schema(
49    table_ref: &TableReference,
50    schema: &SchemaRef,
51    table_create_options: &CreateOptions,
52    pool: &ClickHouseConnectionPool,
53    if_not_exists: bool,
54) -> Result<()> {
55    let conn = pool.pool().get().await.map_err(utils::map_external_err)?;
56
57    // Ensure db exists
58    if if_not_exists {
59        debug!(?table_ref, "Creating database schema");
60        conn.create_database(table_ref.schema(), None).await.map_err(|error| {
61            error!(?error, name = %table_ref, "Database creation failed");
62            DataFusionError::External(
63                format!("Failed to create database for {table_ref}: {error}").into(),
64            )
65        })?;
66    }
67
68    // TODO: !IMPORTANT! Handle table_partition_cols from cmd
69    let table = table_ref.table();
70    let schema_name = table_ref.schema();
71    debug!(table, ?schema_name, "Creating database table");
72    conn.create_table(schema_name, table, schema, table_create_options, None).await.map_err(
73        |error| {
74            error!(?error, %table_ref, "Table creation failed");
75            DataFusionError::External(
76                format!("Failed to create table for {table_ref}: {error}").into(),
77            )
78        },
79    )?;
80
81    Ok(())
82}
83
84#[cfg(feature = "mocks")]
85#[expect(clippy::missing_errors_doc)]
86#[expect(clippy::unused_async)]
87pub async fn create_schema(
88    _table_ref: &TableReference,
89    _schema: &SchemaRef,
90    _table_create_options: &CreateOptions,
91    _pool: &ClickHouseConnectionPool,
92    _if_not_exists: bool,
93) -> Result<()> {
94    Ok(())
95}
96
97// The following tests are present to bridge a gap in coverage around mocks
98#[cfg(all(test, feature = "test-utils", feature = "mocks"))]
99mod tests {
100    use std::sync::Arc;
101
102    use clickhouse_arrow::CreateOptions;
103    use datafusion::arrow::datatypes::Schema;
104
105    use super::*;
106    use crate::connection::ClickHouseConnectionPool;
107
108    #[tokio::test]
109    async fn test_create_utils_mocked() {
110        let pool = Arc::new(ClickHouseConnectionPool::new("pool".to_string(), ()));
111
112        let result = create_database("test", &pool).await;
113        assert!(result.is_ok());
114
115        let result = create_schema(
116            &TableReference::from("test"),
117            &Arc::new(Schema::empty()),
118            &CreateOptions::default(),
119            &pool,
120            false,
121        )
122        .await;
123        assert!(result.is_ok());
124    }
125}