Skip to main content

streamling_e2e/resources/
clickhouse.rs

1//! ClickHouse resource manager for creating isolated databases per test.
2
3use crate::{E2eError, Result};
4use clickhouse::{Client, Row};
5use serde::Deserialize;
6use tracing::info;
7
8/// ClickHouse resource manager
9pub struct ClickHouseResource {
10    /// ClickHouse client
11    client: Client,
12    /// Admin client (connected to default database)
13    admin_client: Client,
14    /// Name of the isolated database
15    pub database: String,
16    /// Base URL
17    url: String,
18    /// Whether to drop the database on cleanup
19    should_drop: bool,
20}
21
22impl ClickHouseResource {
23    /// Connect to an existing ClickHouse database (for inspection, does not drop on cleanup)
24    pub async fn connect_existing(url: &str, database: &str) -> Result<Self> {
25        // Create client connected to the existing database
26        let client = Client::default().with_url(url).with_database(database);
27
28        // Create admin client (won't be used for dropping)
29        let admin_client = Client::default().with_url(url).with_database("default");
30
31        info!("Connected to existing ClickHouse database: {}", database);
32
33        Ok(Self {
34            client,
35            admin_client,
36            database: database.to_string(),
37            url: url.to_string(),
38            should_drop: false, // Don't drop when connecting to existing database
39        })
40    }
41
42    /// Create a new ClickHouse resource with an isolated database
43    pub async fn new(url: &str, database: &str) -> Result<Self> {
44        // Create admin client connected to default database
45        let admin_client = Client::default().with_url(url).with_database("default");
46
47        // Create the isolated database
48        admin_client
49            .query(&format!("CREATE DATABASE IF NOT EXISTS `{}`", database))
50            .execute()
51            .await
52            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
53
54        info!("Created ClickHouse database: {}", database);
55
56        // Create client connected to the isolated database
57        let client = Client::default().with_url(url).with_database(database);
58
59        Ok(Self {
60            client,
61            admin_client,
62            database: database.to_string(),
63            url: url.to_string(),
64            should_drop: true, // Drop when created by test
65        })
66    }
67
68    /// Get the ClickHouse client
69    pub fn client(&self) -> &Client {
70        &self.client
71    }
72
73    /// Execute a SQL statement
74    pub async fn execute(&self, sql: &str) -> Result<()> {
75        self.client
76            .query(sql)
77            .execute()
78            .await
79            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
80        Ok(())
81    }
82
83    /// Execute a count query and return the result
84    pub async fn count(&self, query: &str) -> Result<u64> {
85        let count: u64 = self
86            .client
87            .query(query)
88            .fetch_one()
89            .await
90            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
91        Ok(count)
92    }
93
94    /// Execute a query and fetch all rows
95    pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
96    where
97        T: Row + for<'a> Deserialize<'a>,
98    {
99        let rows: Vec<T> = self
100            .client
101            .query(query)
102            .fetch_all()
103            .await
104            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
105        Ok(rows)
106    }
107
108    /// Execute a query and fetch one row
109    pub async fn query_one<T>(&self, query: &str) -> Result<T>
110    where
111        T: Row + for<'a> Deserialize<'a>,
112    {
113        let row: T = self
114            .client
115            .query(query)
116            .fetch_one()
117            .await
118            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
119        Ok(row)
120    }
121
122    /// Get the column types for a table
123    pub async fn get_column_types(&self, table: &str) -> Result<Vec<(String, String)>> {
124        #[derive(Row, Deserialize)]
125        struct ColumnInfo {
126            name: String,
127            #[serde(rename = "type")]
128            col_type: String,
129        }
130
131        let query = format!(
132            "SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}' ORDER BY position",
133            self.database, table
134        );
135        let columns: Vec<ColumnInfo> = self
136            .client
137            .query(&query)
138            .fetch_all()
139            .await
140            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
141
142        Ok(columns.into_iter().map(|c| (c.name, c.col_type)).collect())
143    }
144
145    /// Get the connection URL for this database
146    pub fn connection_url(&self) -> String {
147        format!("{}?database={}", self.url, self.database)
148    }
149
150    /// List all tables in the database
151    pub async fn list_tables(&self) -> Result<Vec<String>> {
152        #[derive(Row, Deserialize)]
153        struct TableName {
154            name: String,
155        }
156
157        let tables: Vec<TableName> = self
158            .query(&format!(
159                "SELECT name FROM system.tables WHERE database = '{}' ORDER BY name",
160                self.database
161            ))
162            .await?;
163
164        Ok(tables.into_iter().map(|t| t.name).collect())
165    }
166
167    /// Get sample data from a table using HTTP API (for better formatting)
168    pub async fn get_sample_data_formatted(&self, table: &str, limit: usize) -> Result<String> {
169        let sample_query = format!(
170            "SELECT * FROM {}.{} LIMIT {} FORMAT PrettyCompact",
171            self.database, table, limit
172        );
173
174        let response = reqwest::Client::new()
175            .post(&self.url)
176            .query(&[("database", self.database.as_str())])
177            .body(sample_query)
178            .send()
179            .await
180            .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
181
182        if response.status().is_success() {
183            let text = response
184                .text()
185                .await
186                .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
187            Ok(text)
188        } else {
189            let status = response.status();
190            let error_text = response.text().await.unwrap_or_default();
191            Err(E2eError::ClickHouse(format!(
192                "HTTP {}: {}",
193                status, error_text
194            )))
195        }
196    }
197}
198
199impl Drop for ClickHouseResource {
200    fn drop(&mut self) {
201        // Only drop if this resource was created (not connected to existing)
202        if !self.should_drop {
203            return;
204        }
205
206        if let Ok(handle) = tokio::runtime::Handle::try_current() {
207            let database = self.database.clone();
208            let admin_client = self.admin_client.clone();
209
210            handle.spawn(async move {
211                let drop_sql = format!("DROP DATABASE IF EXISTS `{}`", database);
212                if let Err(e) = admin_client.query(&drop_sql).execute().await {
213                    tracing::warn!("Failed to drop ClickHouse database {}: {}", database, e);
214                } else {
215                    info!("Dropped ClickHouse database: {}", database);
216                }
217            });
218        }
219    }
220}