1use apfsds_protocol::ConnMeta;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11use tokio::sync::RwLock;
12use tracing::{debug, error, info, warn};
13
14#[derive(Error, Debug)]
16pub enum ClickHouseError {
17 #[error("Connection failed: {0}")]
18 ConnectionFailed(String),
19
20 #[error("Query failed: {0}")]
21 QueryFailed(String),
22
23 #[error("Serialization error: {0}")]
24 SerializationError(String),
25
26 #[error("Client not enabled")]
27 NotEnabled,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ClickHouseConfig {
33 pub enabled: bool,
35
36 pub url: String,
38
39 pub database: String,
41
42 pub table: String,
44
45 pub username: Option<String>,
47
48 pub password: Option<String>,
50
51 pub batch_size: usize,
53
54 pub flush_interval: Duration,
56}
57
58impl Default for ClickHouseConfig {
59 fn default() -> Self {
60 Self {
61 enabled: false,
62 url: "http://localhost:8123".to_string(),
63 database: "apfsds".to_string(),
64 table: "connections".to_string(),
65 username: None,
66 password: None,
67 batch_size: 1000,
68 flush_interval: Duration::from_secs(5),
69 }
70 }
71}
72
73#[derive(Debug, Clone, Serialize, clickhouse::Row)]
75pub struct ConnectionRecord {
76 pub conn_id: u64,
77 pub client_addr: String,
78 pub local_port: u16,
79 pub remote_port: u16,
80 pub assigned_pod: u32,
81 pub created_at: u64,
82}
83
84impl ConnectionRecord {
85 pub fn from_conn_meta(conn_id: u64, meta: &ConnMeta, timestamp: u64) -> Self {
86 let client_addr = format!(
87 "{}.{}.{}.{}",
88 meta.client_addr[12], meta.client_addr[13], meta.client_addr[14], meta.client_addr[15]
89 );
90
91 Self {
92 conn_id,
93 client_addr,
94 local_port: meta.nat_entry.0,
95 remote_port: meta.nat_entry.1,
96 assigned_pod: meta.assigned_pod,
97 created_at: timestamp,
98 }
99 }
100}
101
102pub struct ClickHouseBackup {
104 client: Option<clickhouse::Client>,
105 config: ClickHouseConfig,
106 buffer: RwLock<Vec<ConnectionRecord>>,
107 raft_buffer: RwLock<Vec<RaftLogRecord>>,
108}
109
110impl ClickHouseBackup {
111 pub fn new(config: ClickHouseConfig) -> Result<Self, ClickHouseError> {
113 let client = if config.enabled {
114 let mut builder = clickhouse::Client::default().with_url(&config.url);
115
116 if let Some(ref user) = config.username {
117 builder = builder.with_user(user);
118 }
119 if let Some(ref pass) = config.password {
120 builder = builder.with_password(pass);
121 }
122
123 builder = builder.with_database(&config.database);
124
125 info!("ClickHouse backup enabled: {}", config.url);
126 Some(builder)
127 } else {
128 info!("ClickHouse backup disabled");
129 None
130 };
131
132 Ok(Self {
133 client,
134 config,
135 buffer: RwLock::new(Vec::new()),
136 raft_buffer: RwLock::new(Vec::new()),
137 })
138 }
139
140 pub fn is_enabled(&self) -> bool {
142 self.client.is_some()
143 }
144
145 pub async fn record_connection(
147 &self,
148 conn_id: u64,
149 meta: &ConnMeta,
150 ) -> Result<(), ClickHouseError> {
151 if !self.is_enabled() {
152 return Ok(()); }
154
155 let timestamp = std::time::SystemTime::now()
156 .duration_since(std::time::UNIX_EPOCH)
157 .unwrap()
158 .as_secs();
159
160 let record = ConnectionRecord::from_conn_meta(conn_id, meta, timestamp);
161
162 let mut buffer = self.buffer.write().await;
163 buffer.push(record);
164
165 if buffer.len() >= self.config.batch_size {
167 drop(buffer); self.flush().await?;
169 }
170
171 Ok(())
172 }
173
174 pub async fn flush(&self) -> Result<usize, ClickHouseError> {
176 let client = match &self.client {
177 Some(c) => c,
178 None => return Ok(0),
179 };
180
181 let mut buffer = self.buffer.write().await;
182 if buffer.is_empty() {
183 return Ok(0);
184 }
185
186 let records: Vec<_> = buffer.drain(..).collect();
187 let count = records.len();
188 drop(buffer); debug!("Flushing {} records to ClickHouse", count);
191
192 let mut insert = client
193 .insert(&self.config.table)
194 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
195
196 for record in records {
197 insert
198 .write(&record)
199 .await
200 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
201 }
202
203 insert
204 .end()
205 .await
206 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
207
208 info!("Flushed {} records to ClickHouse", count);
209 Ok(count)
210 }
211
212 pub fn start_flush_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
214 let interval = self.config.flush_interval;
215
216 tokio::spawn(async move {
217 let mut ticker = tokio::time::interval(interval);
218
219 loop {
220 ticker.tick().await;
221 if let Err(e) = self.flush().await {
222 warn!("ClickHouse flush error: {}", e);
223 }
224 }
225 })
226 }
227
228 pub async fn ensure_table(&self) -> Result<(), ClickHouseError> {
230 let client = match &self.client {
231 Some(c) => c,
232 None => return Ok(()),
233 };
234
235 let ddl = format!(
236 r#"
237 CREATE TABLE IF NOT EXISTS {}.{} (
238 conn_id UInt64,
239 client_addr String,
240 local_port UInt16,
241 remote_port UInt16,
242 assigned_pod UInt32,
243 created_at DateTime64(3)
244 ) ENGINE = MergeTree()
245 ORDER BY (created_at, conn_id)
246 TTL toDateTime(created_at) + INTERVAL 7 DAY
247 "#,
248 self.config.database, self.config.table
249 );
250
251 client
252 .query(&ddl)
253 .execute()
254 .await
255 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
256
257 info!(
258 "ClickHouse table ensured: {}.{}",
259 self.config.database, self.config.table
260 );
261 Ok(())
262 }
263
264 pub async fn buffered_count(&self) -> usize {
266 self.buffer.read().await.len()
267 }
268
269 pub async fn archive_raft_log(
271 &self,
272 index: u64,
273 term: u64,
274 operation: &str,
275 payload: &str,
276 ) -> Result<(), ClickHouseError> {
277 if !self.is_enabled() {
278 return Ok(());
279 }
280
281 let timestamp = std::time::SystemTime::now()
282 .duration_since(std::time::UNIX_EPOCH)
283 .unwrap()
284 .as_millis() as u64;
285
286 let record = RaftLogRecord {
287 index,
288 term,
289 operation: operation.to_string(),
290 payload: payload.to_string(),
291 created_at: timestamp,
292 };
293
294 let mut buffer = self.raft_buffer.write().await;
295 buffer.push(record);
296
297 if buffer.len() >= self.config.batch_size {
298 drop(buffer);
299 self.flush_raft_logs().await?;
300 }
301
302 Ok(())
303 }
304
305 pub async fn flush_raft_logs(&self) -> Result<usize, ClickHouseError> {
307 let client = match &self.client {
308 Some(c) => c,
309 None => return Ok(0),
310 };
311
312 let mut buffer = self.raft_buffer.write().await;
313 if buffer.is_empty() {
314 return Ok(0);
315 }
316
317 let records: Vec<_> = buffer.drain(..).collect();
318 let count = records.len();
319 drop(buffer);
320
321 let table_name = format!("{}_logs", self.config.table);
322
323 let mut insert = client
324 .insert(&table_name)
325 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
326
327 for record in records {
328 insert
329 .write(&record)
330 .await
331 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
332 }
333
334 insert
335 .end()
336 .await
337 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
338
339 Ok(count)
340 }
341
342 pub async fn ensure_tables(&self) -> Result<(), ClickHouseError> {
344 let client = match &self.client {
345 Some(c) => c,
346 None => return Ok(()),
347 };
348
349 self.ensure_table().await?;
351
352 let table_name = format!("{}_logs", self.config.table);
354 let ddl = format!(
355 r#"
356 CREATE TABLE IF NOT EXISTS {}.{} (
357 index UInt64,
358 term UInt64,
359 operation String,
360 payload String,
361 created_at DateTime64(3)
362 ) ENGINE = MergeTree()
363 ORDER BY (created_at, index)
364 TTL toDateTime(created_at) + INTERVAL 30 DAY
365 "#,
366 self.config.database, table_name
367 );
368
369 client
370 .query(&ddl)
371 .execute()
372 .await
373 .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
374
375 info!(
376 "ClickHouse table ensured: {}.{}",
377 self.config.database, table_name
378 );
379 Ok(())
380 }
381}
382
383#[derive(Debug, Clone, Serialize, clickhouse::Row)]
385pub struct RaftLogRecord {
386 pub index: u64,
387 pub term: u64,
388 pub operation: String,
389 pub payload: String,
390 pub created_at: u64,
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_config_default_disabled() {
399 let config = ClickHouseConfig::default();
400 assert!(!config.enabled);
401 }
402
403 #[tokio::test]
404 async fn test_disabled_client() {
405 let config = ClickHouseConfig::default();
406 let backup = ClickHouseBackup::new(config).unwrap();
407 assert!(!backup.is_enabled());
408 }
409}