Skip to main content

sqltool/core/
timeseries_adapter.rs

1/// 时序数据库适配器 - 处理 InfluxDB、TimescaleDB、Taodb 等时序数据库
2
3use crate::models::FieldMapping;
4use anyhow::Result;
5use serde_json::Value;
6use std::collections::HashMap;
7
8/// 时间序列数据类型
9#[derive(Debug, Clone)]
10pub enum TimeSeriesDataType {
11    /// 浮点值
12    Float,
13    /// 整数值
14    Integer,
15    /// 无符号整数值
16    UInteger,
17    /// 布尔值
18    Boolean,
19    /// 字符串
20    String,
21    /// 字节数组
22    Bytes,
23}
24
25/// 时序数据库配置
26#[derive(Debug, Clone)]
27pub struct TimeSeriesConfig {
28    /// 时间戳字段
29    pub timestamp_field: String,
30    /// 指标字段列表
31    pub metric_fields: Vec<String>,
32    /// 标签字段列表
33    pub tag_fields: Vec<String>,
34    /// 保留策略
35    pub retention_policy: Option<String>,
36    /// 数据库名称
37    pub database: String,
38    /// 测量(表)名称
39    pub measurement: String,
40    /// 分区策略
41    pub partition_strategy: PartitionStrategy,
42}
43
44impl Default for TimeSeriesConfig {
45    fn default() -> Self {
46        Self {
47            timestamp_field: "timestamp".to_string(),
48            metric_fields: Vec::new(),
49            tag_fields: Vec::new(),
50            retention_policy: None,
51            database: "default".to_string(),
52            measurement: "data".to_string(),
53            partition_strategy: PartitionStrategy::ByTime {
54                interval: TimeInterval::Hours(1),
55            },
56        }
57    }
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub enum PartitionStrategy {
62    /// 按时间分区
63    ByTime { interval: TimeInterval },
64    /// 按标签值分区
65    ByTag { tag: String },
66    /// 按数值范围分区
67    ByValue { size: f64 },
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub enum TimeInterval {
72    Minutes(u32),
73    Hours(u32),
74    Days(u32),
75}
76
77impl TimeInterval {
78    pub fn as_seconds(&self) -> u64 {
79        match self {
80            TimeInterval::Minutes(m) => (*m as u64) * 60,
81            TimeInterval::Hours(h) => (*h as u64) * 3600,
82            TimeInterval::Days(d) => (*d as u64) * 86400,
83        }
84    }
85
86    pub fn as_string(&self) -> String {
87        match self {
88            TimeInterval::Minutes(m) => format!("{}m", m),
89            TimeInterval::Hours(h) => format!("{}h", h),
90            TimeInterval::Days(d) => format!("{}d", d),
91        }
92    }
93}
94
95/// 时序数据库统一接口
96pub trait TimeSeriesConverter {
97    /// 转换数据
98    fn convert(&self, data: &HashMap<String, Value>, mappings: &[FieldMapping]) -> Result<TimeSeriesPoint>;
99    
100    /// 批量转换
101    fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>>;
102}
103
104/// InfluxDB 转换器
105pub struct InfluxDbConverter {
106    config: TimeSeriesConfig,
107}
108
109impl InfluxDbConverter {
110    pub fn new(config: TimeSeriesConfig) -> Self {
111        Self { config }
112    }
113
114    pub fn with_default_config() -> Self {
115        Self::new(TimeSeriesConfig::default())
116    }
117
118    /// 转换为 InfluxDB 行协议
119    pub fn convert_to_line_protocol(
120        &self,
121        data: &HashMap<String, Value>,
122        mappings: &[FieldMapping],
123    ) -> Result<String> {
124        let point = self.convert(data, mappings)?;
125        Ok(self.point_to_line_protocol(&point))
126    }
127
128    /// 将点转换为行协议
129    fn point_to_line_protocol(&self, point: &TimeSeriesPoint) -> String {
130        let mut output = format!("{},", self.config.measurement);
131        
132        // 添加标签
133        let tags: Vec<String> = point.tags.iter()
134            .map(|(k, v)| format!("{}={}", k, v))
135            .collect();
136        output.push_str(&tags.join(","));
137        output.push(' ');
138        
139        // 添加指标
140        let fields: Vec<String> = point.fields.iter()
141            .map(|(k, v)| format!("{}={}", k, v))
142            .collect();
143        output.push_str(&fields.join(","));
144        output.push(' ');
145        
146        // 添加时间戳
147        output.push_str(&point.timestamp.to_string());
148        
149        output
150    }
151}
152
153impl TimeSeriesConverter for InfluxDbConverter {
154    fn convert(&self, data: &HashMap<String, Value>, _mappings: &[FieldMapping]) -> Result<TimeSeriesPoint> {
155        let mut point = TimeSeriesPoint::default();
156        
157        // 提取时间戳
158        point.timestamp = data.get(&self.config.timestamp_field)
159            .and_then(|v| v.as_i64())
160            .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
161        
162        // 提取标签(字符串字段作为标签)
163        for tag_field in &self.config.tag_fields {
164            if let Some(value) = data.get(tag_field) {
165                if let Some(s) = value.as_str() {
166                    point.tags.insert(tag_field.clone(), s.to_string());
167                }
168            }
169        }
170        
171        // 提取指标(数值字段作为指标)
172        for metric_field in &self.config.metric_fields {
173            if let Some(value) = data.get(metric_field) {
174                let metric_value = match value {
175                    Value::Number(n) => {
176                        if n.is_f64() {
177                            MetricValue::Float(n.as_f64().unwrap())
178                        } else if n.is_i64() {
179                            MetricValue::Integer(n.as_i64().unwrap())
180                        } else {
181                            MetricValue::Float(n.to_string().parse().unwrap_or(0.0))
182                        }
183                    }
184                    Value::Bool(b) => MetricValue::Boolean(*b),
185                    Value::String(s) => MetricValue::String(s.clone()),
186                    _ => continue,
187                };
188                point.fields.insert(metric_field.clone(), metric_value);
189            }
190        }
191        
192        Ok(point)
193    }
194
195    fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>> {
196        let mut points = Vec::new();
197        for data in data_list {
198            let point = self.convert(data, mappings)?;
199            points.push(point);
200        }
201        Ok(points)
202    }
203}
204
205/// 时序数据点
206#[derive(Debug, Clone, Default)]
207pub struct TimeSeriesPoint {
208    /// 测量名称
209    pub measurement: String,
210    /// 标签
211    pub tags: HashMap<String, String>,
212    /// 指标字段
213    pub fields: HashMap<String, MetricValue>,
214    /// 时间戳(纳秒)
215    pub timestamp: i64,
216}
217
218impl TimeSeriesPoint {
219    pub fn new(measurement: &str) -> Self {
220        Self {
221            measurement: measurement.to_string(),
222            ..Default::default()
223        }
224    }
225
226    pub fn with_tag(mut self, key: &str, value: &str) -> Self {
227        self.tags.insert(key.to_string(), value.to_string());
228        self
229    }
230
231    pub fn with_field(mut self, key: &str, value: MetricValue) -> Self {
232        self.fields.insert(key.to_string(), value);
233        self
234    }
235
236    pub fn with_timestamp(mut self, timestamp: i64) -> Self {
237        self.timestamp = timestamp;
238        self
239    }
240}
241
242/// 指标值
243#[derive(Debug, Clone)]
244pub enum MetricValue {
245    Float(f64),
246    Integer(i64),
247    UInteger(u64),
248    Boolean(bool),
249    String(String),
250}
251
252impl std::fmt::Display for MetricValue {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        match self {
255            MetricValue::Float(v) => write!(f, "{}", v),
256            MetricValue::Integer(v) => write!(f, "{}i", v),
257            MetricValue::UInteger(v) => write!(f, "{}u", v),
258            MetricValue::Boolean(v) => write!(f, "{}", v),
259            MetricValue::String(v) => write!(f, "\"{}\"", v),
260        }
261    }
262}
263
264impl MetricValue {
265    pub fn as_f64(&self) -> f64 {
266        match self {
267            MetricValue::Float(v) => *v,
268            MetricValue::Integer(v) => *v as f64,
269            MetricValue::UInteger(v) => *v as f64,
270            MetricValue::Boolean(v) => if *v { 1.0 } else { 0.0 },
271            MetricValue::String(v) => v.parse().unwrap_or(0.0),
272        }
273    }
274
275    pub fn as_string(&self) -> String {
276        match self {
277            MetricValue::Float(v) => format!("{}", v),
278            MetricValue::Integer(v) => format!("{}i", v),
279            MetricValue::UInteger(v) => format!("{}u", v),
280            MetricValue::Boolean(v) => format!("{}", v),
281            MetricValue::String(v) => format!("\"{}\"", v),
282        }
283    }
284}
285
286/// TimescaleDB 转换器
287pub struct TimescaleDbConverter {
288    config: TimeSeriesConfig,
289}
290
291impl TimescaleDbConverter {
292    pub fn new(config: TimeSeriesConfig) -> Self {
293        Self { config }
294    }
295
296    pub fn with_default_config() -> Self {
297        Self::new(TimeSeriesConfig::default())
298    }
299
300    /// 生成创建超表的 SQL
301    pub fn generate_hypertable_sql(&self) -> String {
302        let partition_interval = match self.config.partition_strategy {
303            PartitionStrategy::ByTime { interval } => interval.as_string(),
304            _ => "1 day".to_string(),
305        };
306        
307        format!(
308            "SELECT create_hypertable('{}.{}', '{}', chunk_time_interval => INTERVAL '{}');",
309            self.config.database,
310            self.config.measurement,
311            self.config.timestamp_field,
312            partition_interval
313        )
314    }
315}
316
317impl TimeSeriesConverter for TimescaleDbConverter {
318    fn convert(&self, data: &HashMap<String, Value>, _mappings: &[FieldMapping]) -> Result<TimeSeriesPoint> {
319        let mut point = TimeSeriesPoint::new(&self.config.measurement);
320        
321        // 提取时间戳
322        point.timestamp = data.get(&self.config.timestamp_field)
323            .and_then(|v| v.as_i64())
324            .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
325        
326        // 提取标签
327        for tag_field in &self.config.tag_fields {
328            if let Some(value) = data.get(tag_field) {
329                if let Some(s) = value.as_str() {
330                    point.tags.insert(tag_field.clone(), s.to_string());
331                }
332            }
333        }
334        
335        // 提取指标
336        for metric_field in &self.config.metric_fields {
337            if let Some(value) = data.get(metric_field) {
338                let metric_value = match value {
339                    Value::Number(n) => {
340                        if n.is_f64() {
341                            MetricValue::Float(n.as_f64().unwrap())
342                        } else {
343                            MetricValue::Integer(n.as_i64().unwrap_or(0))
344                        }
345                    }
346                    Value::Bool(b) => MetricValue::Boolean(*b),
347                    Value::String(s) => MetricValue::String(s.clone()),
348                    _ => continue,
349                };
350                point.fields.insert(metric_field.clone(), metric_value);
351            }
352        }
353        
354        Ok(point)
355    }
356
357    fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>> {
358        let mut points = Vec::new();
359        for data in data_list {
360            let point = self.convert(data, mappings)?;
361            points.push(point);
362        }
363        Ok(points)
364    }
365}
366
367/// Taodb 转换器
368pub struct TaodbConverter {
369    config: TimeSeriesConfig,
370}
371
372impl TaodbConverter {
373    pub fn new(config: TimeSeriesConfig) -> Self {
374        Self { config }
375    }
376
377    pub fn with_default_config() -> Self {
378        Self::new(TimeSeriesConfig::default())
379    }
380
381    /// 转换为 Taodb 格式
382    pub fn convert_to_taodb(
383        &self,
384        data: &HashMap<String, Value>,
385        _mappings: &[FieldMapping],
386    ) -> Result<TaodbRecord> {
387        let mut record = TaodbRecord::default();
388        
389        // 设置时间戳
390        record.timestamp = data.get(&self.config.timestamp_field)
391            .and_then(|v| v.as_i64())
392            .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
393        
394        // 设置标签
395        for tag_field in &self.config.tag_fields {
396            if let Some(value) = data.get(tag_field) {
397                if let Some(s) = value.as_str() {
398                    record.tags.insert(tag_field.clone(), s.to_string());
399                }
400            }
401        }
402        
403        // 设置指标
404        for metric_field in &self.config.metric_fields {
405            if let Some(value) = data.get(metric_field) {
406                if let Some(n) = value.as_f64() {
407                    record.metrics.insert(metric_field.clone(), n);
408                }
409            }
410        }
411        
412        Ok(record)
413    }
414
415    /// 生成 Taodb 插入语句
416    pub fn generate_insert_sql(&self, record: &TaodbRecord) -> String {
417        let tags: Vec<String> = record.tags.iter()
418            .map(|(k, v)| format!("tag {}='{}'", k, v))
419            .collect();
420        
421        let metrics: Vec<String> = record.metrics.iter()
422            .map(|(k, v)| format!("field {}={}", k, v))
423            .collect();
424        
425        format!(
426            "INSERT INTO {} ({}) TAGS ({}) VALUES ({}) {}",
427            self.config.measurement,
428            metrics.join(", "),
429            tags.join(", "),
430            self.config.timestamp_field,
431            record.timestamp
432        )
433    }
434}
435
436impl TimeSeriesConverter for TaodbConverter {
437    fn convert(&self, data: &HashMap<String, Value>, _mappings: &[FieldMapping]) -> Result<TimeSeriesPoint> {
438        let mut point = TimeSeriesPoint::new(&self.config.measurement);
439        
440        point.timestamp = data.get(&self.config.timestamp_field)
441            .and_then(|v| v.as_i64())
442            .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
443        
444        for tag_field in &self.config.tag_fields {
445            if let Some(value) = data.get(tag_field) {
446                if let Some(s) = value.as_str() {
447                    point.tags.insert(tag_field.clone(), s.to_string());
448                }
449            }
450        }
451        
452        for metric_field in &self.config.metric_fields {
453            if let Some(value) = data.get(metric_field) {
454                let metric_value = match value {
455                    Value::Number(n) => {
456                        if n.is_f64() {
457                            MetricValue::Float(n.as_f64().unwrap())
458                        } else {
459                            MetricValue::Integer(n.as_i64().unwrap_or(0))
460                        }
461                    }
462                    Value::Bool(b) => MetricValue::Boolean(*b),
463                    Value::String(s) => MetricValue::String(s.clone()),
464                    _ => continue,
465                };
466                point.fields.insert(metric_field.clone(), metric_value);
467            }
468        }
469        
470        Ok(point)
471    }
472
473    fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>> {
474        let mut points = Vec::new();
475        for data in data_list {
476            let point = self.convert(data, mappings)?;
477            points.push(point);
478        }
479        Ok(points)
480    }
481}
482
483/// Taodb 记录
484#[derive(Debug, Clone, Default)]
485pub struct TaodbRecord {
486    pub timestamp: i64,
487    pub tags: HashMap<String, String>,
488    pub metrics: HashMap<String, f64>,
489}
490
491/// 时序数据批量写入器
492pub struct TimeSeriesBatchWriter {
493    converters: Vec<Box<dyn TimeSeriesConverter>>,
494    batch_size: usize,
495}
496
497impl TimeSeriesBatchWriter {
498    pub fn new(batch_size: usize) -> Self {
499        Self {
500            converters: Vec::new(),
501            batch_size,
502        }
503    }
504
505    pub fn add_converter<C: TimeSeriesConverter + 'static>(&mut self, converter: C) {
506        self.converters.push(Box::new(converter));
507    }
508
509    /// 批量写入
510    pub fn write_batch(
511        &self,
512        data_list: &[HashMap<String, Value>],
513        mappings: &[FieldMapping],
514    ) -> Result<BatchWriteResult> {
515        let mut total_points = 0;
516        let mut batches = Vec::new();
517        
518        for converter in &self.converters {
519            let points = converter.convert_batch(data_list, mappings)?;
520            total_points += points.len();
521            batches.push(points);
522        }
523        
524        Ok(BatchWriteResult {
525            total_points,
526            batch_count: batches.len(),
527            status: WriteStatus::Success,
528        })
529    }
530}
531
532/// 批量写入结果
533#[derive(Debug, Clone)]
534pub struct BatchWriteResult {
535    pub total_points: usize,
536    pub batch_count: usize,
537    pub status: WriteStatus,
538}
539
540#[derive(Debug, Clone, PartialEq, Eq)]
541pub enum WriteStatus {
542    Success,
543    PartialSuccess,
544    Failed,
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550
551    #[test]
552    fn test_influxdb_converter() {
553        let config = TimeSeriesConfig {
554            timestamp_field: "time".to_string(),
555            metric_fields: vec!["temperature".to_string(), "humidity".to_string()],
556            tag_fields: vec!["location".to_string(), "sensor_id".to_string()],
557            ..Default::default()
558        };
559
560        let converter = InfluxDbConverter::new(config);
561
562        let mut data = HashMap::new();
563        data.insert("time".to_string(), Value::Number(1640000000000000000i64.into()));
564        data.insert("temperature".to_string(), Value::Number(serde_json::Number::from_f64(25.5).unwrap()));
565        data.insert("humidity".to_string(), Value::Number(serde_json::Number::from_f64(60.0).unwrap()));
566        data.insert("location".to_string(), Value::String("Beijing".to_string()));
567        data.insert("sensor_id".to_string(), Value::String("sensor_001".to_string()));
568
569        let point = converter.convert(&data, &[]).unwrap();
570
571        assert_eq!(point.timestamp, 1640000000000000000i64);
572        assert!(point.tags.contains_key("location"));
573        assert!(point.fields.contains_key("temperature"));
574
575        let line_protocol = converter.point_to_line_protocol(&point);
576        println!("Line Protocol: {}", line_protocol);
577    }
578
579    #[test]
580    fn test_timescale_hypertable_sql() {
581        let config = TimeSeriesConfig {
582            database: "metrics".to_string(),
583            measurement: "sensor_data".to_string(),
584            timestamp_field: "timestamp".to_string(),
585            partition_strategy: PartitionStrategy::ByTime { interval: TimeInterval::Hours(1) },
586            ..Default::default()
587        };
588
589        let converter = TimescaleDbConverter::new(config);
590        let sql = converter.generate_hypertable_sql();
591
592        assert!(sql.contains("create_hypertable"));
593        assert!(sql.contains("sensor_data"));
594
595        println!("Hypertable SQL: {}", sql);
596    }
597
598    #[test]
599    fn test_taodb_converter() {
600        let config = TimeSeriesConfig {
601            timestamp_field: "ts".to_string(),
602            metric_fields: vec!["cpu_usage".to_string()],
603            tag_fields: vec!["host".to_string()],
604            ..Default::default()
605        };
606
607        let converter = TaodbConverter::new(config);
608
609        let mut data = HashMap::new();
610        data.insert("ts".to_string(), Value::Number(1640000000000i64.into()));
611        data.insert("cpu_usage".to_string(), Value::Number(serde_json::Number::from_f64(85.5).unwrap()));
612        data.insert("host".to_string(), Value::String("server01".to_string()));
613
614        let record = converter.convert_to_taodb(&data, &[]).unwrap();
615
616        assert_eq!(record.timestamp, 1640000000000i64);
617        assert!(record.metrics.contains_key("cpu_usage"));
618
619        let sql = converter.generate_insert_sql(&record);
620        println!("Taodb SQL: {}", sql);
621    }
622}