streamling_e2e/resources/
clickhouse.rs1use crate::{E2eError, Result};
4use clickhouse::{Client, Row};
5use serde::Deserialize;
6use tracing::info;
7
8pub struct ClickHouseResource {
10 client: Client,
12 admin_client: Client,
14 pub database: String,
16 url: String,
18 should_drop: bool,
20}
21
22impl ClickHouseResource {
23 pub async fn connect_existing(url: &str, database: &str) -> Result<Self> {
25 let client = Client::default().with_url(url).with_database(database);
27
28 let admin_client = Client::default().with_url(url).with_database("default");
30
31 info!("Connected to existing ClickHouse database: {}", database);
32
33 Ok(Self {
34 client,
35 admin_client,
36 database: database.to_string(),
37 url: url.to_string(),
38 should_drop: false, })
40 }
41
42 pub async fn new(url: &str, database: &str) -> Result<Self> {
44 let admin_client = Client::default().with_url(url).with_database("default");
46
47 admin_client
49 .query(&format!("CREATE DATABASE IF NOT EXISTS `{}`", database))
50 .execute()
51 .await
52 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
53
54 info!("Created ClickHouse database: {}", database);
55
56 let client = Client::default().with_url(url).with_database(database);
58
59 Ok(Self {
60 client,
61 admin_client,
62 database: database.to_string(),
63 url: url.to_string(),
64 should_drop: true, })
66 }
67
68 pub fn client(&self) -> &Client {
70 &self.client
71 }
72
73 pub async fn execute(&self, sql: &str) -> Result<()> {
75 self.client
76 .query(sql)
77 .execute()
78 .await
79 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
80 Ok(())
81 }
82
83 pub async fn count(&self, query: &str) -> Result<u64> {
85 let count: u64 = self
86 .client
87 .query(query)
88 .fetch_one()
89 .await
90 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
91 Ok(count)
92 }
93
94 pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
96 where
97 T: Row + for<'a> Deserialize<'a>,
98 {
99 let rows: Vec<T> = self
100 .client
101 .query(query)
102 .fetch_all()
103 .await
104 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
105 Ok(rows)
106 }
107
108 pub async fn query_one<T>(&self, query: &str) -> Result<T>
110 where
111 T: Row + for<'a> Deserialize<'a>,
112 {
113 let row: T = self
114 .client
115 .query(query)
116 .fetch_one()
117 .await
118 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
119 Ok(row)
120 }
121
122 pub async fn get_column_types(&self, table: &str) -> Result<Vec<(String, String)>> {
124 #[derive(Row, Deserialize)]
125 struct ColumnInfo {
126 name: String,
127 #[serde(rename = "type")]
128 col_type: String,
129 }
130
131 let query = format!(
132 "SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}' ORDER BY position",
133 self.database, table
134 );
135 let columns: Vec<ColumnInfo> = self
136 .client
137 .query(&query)
138 .fetch_all()
139 .await
140 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
141
142 Ok(columns.into_iter().map(|c| (c.name, c.col_type)).collect())
143 }
144
145 pub fn connection_url(&self) -> String {
147 format!("{}?database={}", self.url, self.database)
148 }
149
150 pub async fn list_tables(&self) -> Result<Vec<String>> {
152 #[derive(Row, Deserialize)]
153 struct TableName {
154 name: String,
155 }
156
157 let tables: Vec<TableName> = self
158 .query(&format!(
159 "SELECT name FROM system.tables WHERE database = '{}' ORDER BY name",
160 self.database
161 ))
162 .await?;
163
164 Ok(tables.into_iter().map(|t| t.name).collect())
165 }
166
167 pub async fn get_sample_data_formatted(&self, table: &str, limit: usize) -> Result<String> {
169 let sample_query = format!(
170 "SELECT * FROM {}.{} LIMIT {} FORMAT PrettyCompact",
171 self.database, table, limit
172 );
173
174 let response = reqwest::Client::new()
175 .post(&self.url)
176 .query(&[("database", self.database.as_str())])
177 .body(sample_query)
178 .send()
179 .await
180 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
181
182 if response.status().is_success() {
183 let text = response
184 .text()
185 .await
186 .map_err(|e| E2eError::ClickHouse(e.to_string()))?;
187 Ok(text)
188 } else {
189 let status = response.status();
190 let error_text = response.text().await.unwrap_or_default();
191 Err(E2eError::ClickHouse(format!(
192 "HTTP {}: {}",
193 status, error_text
194 )))
195 }
196 }
197}
198
199impl Drop for ClickHouseResource {
200 fn drop(&mut self) {
201 if !self.should_drop {
203 return;
204 }
205
206 if let Ok(handle) = tokio::runtime::Handle::try_current() {
207 let database = self.database.clone();
208 let admin_client = self.admin_client.clone();
209
210 handle.spawn(async move {
211 let drop_sql = format!("DROP DATABASE IF EXISTS `{}`", database);
212 if let Err(e) = admin_client.query(&drop_sql).execute().await {
213 tracing::warn!("Failed to drop ClickHouse database {}: {}", database, e);
214 } else {
215 info!("Dropped ClickHouse database: {}", database);
216 }
217 });
218 }
219 }
220}