clickhouse_datafusion/utils/
create.rs1#![cfg_attr(feature = "mocks", expect(unused_imports))]
2use clickhouse_arrow::CreateOptions;
3use datafusion::arrow::datatypes::SchemaRef;
4use datafusion::error::{DataFusionError, Result};
5use datafusion::sql::TableReference;
6use tracing::{debug, error};
7
8use crate::connection::ClickHouseConnectionPool;
9#[cfg(not(feature = "mocks"))]
10use crate::utils;
11
12pub async fn create_database(database: &str, pool: &ClickHouseConnectionPool) -> Result<()> {
18 if database == "default" {
20 return Ok(());
21 }
22
23 #[cfg(not(feature = "mocks"))]
24 {
25 let conn = pool.pool().get().await.map_err(utils::map_external_err)?;
26
27 conn.create_database(Some(database), None).await.map_err(|error| {
29 error!(?error, database, "Database creation failed");
30 DataFusionError::External(
31 format!("Failed to create database for {database}: {error}").into(),
32 )
33 })?;
34 }
35
36 #[cfg(feature = "mocks")]
37 let _ = pool.connect().await?;
38
39 Ok(())
40}
41
42#[cfg(not(feature = "mocks"))]
48pub async fn create_schema(
49 table_ref: &TableReference,
50 schema: &SchemaRef,
51 table_create_options: &CreateOptions,
52 pool: &ClickHouseConnectionPool,
53 if_not_exists: bool,
54) -> Result<()> {
55 let conn = pool.pool().get().await.map_err(utils::map_external_err)?;
56
57 if if_not_exists {
59 debug!(?table_ref, "Creating database schema");
60 conn.create_database(table_ref.schema(), None).await.map_err(|error| {
61 error!(?error, name = %table_ref, "Database creation failed");
62 DataFusionError::External(
63 format!("Failed to create database for {table_ref}: {error}").into(),
64 )
65 })?;
66 }
67
68 let table = table_ref.table();
70 let schema_name = table_ref.schema();
71 debug!(table, ?schema_name, "Creating database table");
72 conn.create_table(schema_name, table, schema, table_create_options, None).await.map_err(
73 |error| {
74 error!(?error, %table_ref, "Table creation failed");
75 DataFusionError::External(
76 format!("Failed to create table for {table_ref}: {error}").into(),
77 )
78 },
79 )?;
80
81 Ok(())
82}
83
84#[cfg(feature = "mocks")]
85#[expect(clippy::missing_errors_doc)]
86#[expect(clippy::unused_async)]
87pub async fn create_schema(
88 _table_ref: &TableReference,
89 _schema: &SchemaRef,
90 _table_create_options: &CreateOptions,
91 _pool: &ClickHouseConnectionPool,
92 _if_not_exists: bool,
93) -> Result<()> {
94 Ok(())
95}
96
97#[cfg(all(test, feature = "test-utils", feature = "mocks"))]
99mod tests {
100 use std::sync::Arc;
101
102 use clickhouse_arrow::CreateOptions;
103 use datafusion::arrow::datatypes::Schema;
104
105 use super::*;
106 use crate::connection::ClickHouseConnectionPool;
107
108 #[tokio::test]
109 async fn test_create_utils_mocked() {
110 let pool = Arc::new(ClickHouseConnectionPool::new("pool".to_string(), ()));
111
112 let result = create_database("test", &pool).await;
113 assert!(result.is_ok());
114
115 let result = create_schema(
116 &TableReference::from("test"),
117 &Arc::new(Schema::empty()),
118 &CreateOptions::default(),
119 &pool,
120 false,
121 )
122 .await;
123 assert!(result.is_ok());
124 }
125}