Skip to main content

apfsds_storage/
clickhouse_backup.rs

1//! ClickHouse backup client for connection state persistence
2//!
3//! This module provides optional ClickHouse integration for backing up
4//! connection state. The client is enabled via configuration, not feature flags.
5
6use apfsds_protocol::ConnMeta;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11use tokio::sync::RwLock;
12use tracing::{debug, error, info, warn};
13
14/// ClickHouse client errors
15#[derive(Error, Debug)]
16pub enum ClickHouseError {
17    #[error("Connection failed: {0}")]
18    ConnectionFailed(String),
19
20    #[error("Query failed: {0}")]
21    QueryFailed(String),
22
23    #[error("Serialization error: {0}")]
24    SerializationError(String),
25
26    #[error("Client not enabled")]
27    NotEnabled,
28}
29
30/// ClickHouse client configuration
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ClickHouseConfig {
33    /// Enable ClickHouse backup
34    pub enabled: bool,
35
36    /// ClickHouse server URL
37    pub url: String,
38
39    /// Database name
40    pub database: String,
41
42    /// Table name for connection records
43    pub table: String,
44
45    /// Username (optional)
46    pub username: Option<String>,
47
48    /// Password (optional)
49    pub password: Option<String>,
50
51    /// Batch size for bulk inserts
52    pub batch_size: usize,
53
54    /// Flush interval
55    pub flush_interval: Duration,
56}
57
58impl Default for ClickHouseConfig {
59    fn default() -> Self {
60        Self {
61            enabled: false,
62            url: "http://localhost:8123".to_string(),
63            database: "apfsds".to_string(),
64            table: "connections".to_string(),
65            username: None,
66            password: None,
67            batch_size: 1000,
68            flush_interval: Duration::from_secs(5),
69        }
70    }
71}
72
73/// Connection record for ClickHouse storage
74#[derive(Debug, Clone, Serialize, clickhouse::Row)]
75pub struct ConnectionRecord {
76    pub conn_id: u64,
77    pub client_addr: String,
78    pub local_port: u16,
79    pub remote_port: u16,
80    pub assigned_pod: u32,
81    pub created_at: u64,
82}
83
84impl ConnectionRecord {
85    pub fn from_conn_meta(conn_id: u64, meta: &ConnMeta, timestamp: u64) -> Self {
86        let client_addr = format!(
87            "{}.{}.{}.{}",
88            meta.client_addr[12], meta.client_addr[13], meta.client_addr[14], meta.client_addr[15]
89        );
90
91        Self {
92            conn_id,
93            client_addr,
94            local_port: meta.nat_entry.0,
95            remote_port: meta.nat_entry.1,
96            assigned_pod: meta.assigned_pod,
97            created_at: timestamp,
98        }
99    }
100}
101
102/// ClickHouse backup client
103pub struct ClickHouseBackup {
104    client: Option<clickhouse::Client>,
105    config: ClickHouseConfig,
106    buffer: RwLock<Vec<ConnectionRecord>>,
107    raft_buffer: RwLock<Vec<RaftLogRecord>>,
108}
109
110impl ClickHouseBackup {
111    /// Create a new ClickHouse backup client
112    pub fn new(config: ClickHouseConfig) -> Result<Self, ClickHouseError> {
113        let client = if config.enabled {
114            let mut builder = clickhouse::Client::default().with_url(&config.url);
115
116            if let Some(ref user) = config.username {
117                builder = builder.with_user(user);
118            }
119            if let Some(ref pass) = config.password {
120                builder = builder.with_password(pass);
121            }
122
123            builder = builder.with_database(&config.database);
124
125            info!("ClickHouse backup enabled: {}", config.url);
126            Some(builder)
127        } else {
128            info!("ClickHouse backup disabled");
129            None
130        };
131
132        Ok(Self {
133            client,
134            config,
135            buffer: RwLock::new(Vec::new()),
136            raft_buffer: RwLock::new(Vec::new()),
137        })
138    }
139
140    /// Check if backup is enabled
141    pub fn is_enabled(&self) -> bool {
142        self.client.is_some()
143    }
144
145    /// Record a new connection
146    pub async fn record_connection(
147        &self,
148        conn_id: u64,
149        meta: &ConnMeta,
150    ) -> Result<(), ClickHouseError> {
151        if !self.is_enabled() {
152            return Ok(()); // Silently skip if not enabled
153        }
154
155        let timestamp = std::time::SystemTime::now()
156            .duration_since(std::time::UNIX_EPOCH)
157            .unwrap()
158            .as_secs();
159
160        let record = ConnectionRecord::from_conn_meta(conn_id, meta, timestamp);
161
162        let mut buffer = self.buffer.write().await;
163        buffer.push(record);
164
165        // Check if we should flush
166        if buffer.len() >= self.config.batch_size {
167            drop(buffer); // Release lock before flush
168            self.flush().await?;
169        }
170
171        Ok(())
172    }
173
174    /// Flush buffered records to ClickHouse
175    pub async fn flush(&self) -> Result<usize, ClickHouseError> {
176        let client = match &self.client {
177            Some(c) => c,
178            None => return Ok(0),
179        };
180
181        let mut buffer = self.buffer.write().await;
182        if buffer.is_empty() {
183            return Ok(0);
184        }
185
186        let records: Vec<_> = buffer.drain(..).collect();
187        let count = records.len();
188        drop(buffer); // Release lock before insert
189
190        debug!("Flushing {} records to ClickHouse", count);
191
192        let mut insert = client
193            .insert(&self.config.table)
194            .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
195
196        for record in records {
197            insert
198                .write(&record)
199                .await
200                .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
201        }
202
203        insert
204            .end()
205            .await
206            .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
207
208        info!("Flushed {} records to ClickHouse", count);
209        Ok(count)
210    }
211
212    /// Start background flush task
213    pub fn start_flush_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
214        let interval = self.config.flush_interval;
215
216        tokio::spawn(async move {
217            let mut ticker = tokio::time::interval(interval);
218
219            loop {
220                ticker.tick().await;
221                if let Err(e) = self.flush().await {
222                    warn!("ClickHouse flush error: {}", e);
223                }
224            }
225        })
226    }
227
228    /// Create table if not exists
229    pub async fn ensure_table(&self) -> Result<(), ClickHouseError> {
230        let client = match &self.client {
231            Some(c) => c,
232            None => return Ok(()),
233        };
234
235        let ddl = format!(
236            r#"
237            CREATE TABLE IF NOT EXISTS {}.{} (
238                conn_id UInt64,
239                client_addr String,
240                local_port UInt16,
241                remote_port UInt16,
242                assigned_pod UInt32,
243                created_at DateTime64(3)
244            ) ENGINE = MergeTree()
245            ORDER BY (created_at, conn_id)
246            TTL toDateTime(created_at) + INTERVAL 7 DAY
247            "#,
248            self.config.database, self.config.table
249        );
250
251        client
252            .query(&ddl)
253            .execute()
254            .await
255            .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
256
257        info!(
258            "ClickHouse table ensured: {}.{}",
259            self.config.database, self.config.table
260        );
261        Ok(())
262    }
263
264    /// Get buffered record count
265    pub async fn buffered_count(&self) -> usize {
266        self.buffer.read().await.len()
267    }
268
269    /// Record a raft log entry
270    pub async fn archive_raft_log(
271        &self,
272        index: u64,
273        term: u64,
274        operation: &str,
275        payload: &str,
276    ) -> Result<(), ClickHouseError> {
277        if !self.is_enabled() {
278            return Ok(());
279        }
280
281        let timestamp = std::time::SystemTime::now()
282            .duration_since(std::time::UNIX_EPOCH)
283            .unwrap()
284            .as_millis() as u64;
285
286        let record = RaftLogRecord {
287            index,
288            term,
289            operation: operation.to_string(),
290            payload: payload.to_string(),
291            created_at: timestamp,
292        };
293
294        let mut buffer = self.raft_buffer.write().await;
295        buffer.push(record);
296
297        if buffer.len() >= self.config.batch_size {
298            drop(buffer);
299            self.flush_raft_logs().await?;
300        }
301
302        Ok(())
303    }
304
305    /// Flush buffered raft logs
306    pub async fn flush_raft_logs(&self) -> Result<usize, ClickHouseError> {
307        let client = match &self.client {
308            Some(c) => c,
309            None => return Ok(0),
310        };
311
312        let mut buffer = self.raft_buffer.write().await;
313        if buffer.is_empty() {
314            return Ok(0);
315        }
316
317        let records: Vec<_> = buffer.drain(..).collect();
318        let count = records.len();
319        drop(buffer);
320
321        let table_name = format!("{}_logs", self.config.table);
322
323        let mut insert = client
324            .insert(&table_name)
325            .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
326
327        for record in records {
328            insert
329                .write(&record)
330                .await
331                .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
332        }
333
334        insert
335            .end()
336            .await
337            .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
338
339        Ok(count)
340    }
341
342    /// Create tables if not exists
343    pub async fn ensure_tables(&self) -> Result<(), ClickHouseError> {
344        let client = match &self.client {
345            Some(c) => c,
346            None => return Ok(()),
347        };
348
349        // Ensure connections table
350        self.ensure_table().await?;
351
352        // Ensure raft logs table
353        let table_name = format!("{}_logs", self.config.table);
354        let ddl = format!(
355            r#"
356            CREATE TABLE IF NOT EXISTS {}.{} (
357                index UInt64,
358                term UInt64,
359                operation String,
360                payload String,
361                created_at DateTime64(3)
362            ) ENGINE = MergeTree()
363            ORDER BY (created_at, index)
364            TTL toDateTime(created_at) + INTERVAL 30 DAY
365            "#,
366            self.config.database, table_name
367        );
368
369        client
370            .query(&ddl)
371            .execute()
372            .await
373            .map_err(|e| ClickHouseError::QueryFailed(e.to_string()))?;
374
375        info!(
376            "ClickHouse table ensured: {}.{}",
377            self.config.database, table_name
378        );
379        Ok(())
380    }
381}
382
383/// Raft log record for ClickHouse storage
384#[derive(Debug, Clone, Serialize, clickhouse::Row)]
385pub struct RaftLogRecord {
386    pub index: u64,
387    pub term: u64,
388    pub operation: String,
389    pub payload: String,
390    pub created_at: u64,
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_config_default_disabled() {
399        let config = ClickHouseConfig::default();
400        assert!(!config.enabled);
401    }
402
403    #[tokio::test]
404    async fn test_disabled_client() {
405        let config = ClickHouseConfig::default();
406        let backup = ClickHouseBackup::new(config).unwrap();
407        assert!(!backup.is_enabled());
408    }
409}