1use crate::models::FieldMapping;
4use anyhow::Result;
5use serde_json::Value;
6use std::collections::HashMap;
7
8#[derive(Debug, Clone)]
10pub enum TimeSeriesDataType {
11 Float,
13 Integer,
15 UInteger,
17 Boolean,
19 String,
21 Bytes,
23}
24
25#[derive(Debug, Clone)]
27pub struct TimeSeriesConfig {
28 pub timestamp_field: String,
30 pub metric_fields: Vec<String>,
32 pub tag_fields: Vec<String>,
34 pub retention_policy: Option<String>,
36 pub database: String,
38 pub measurement: String,
40 pub partition_strategy: PartitionStrategy,
42}
43
44impl Default for TimeSeriesConfig {
45 fn default() -> Self {
46 Self {
47 timestamp_field: "timestamp".to_string(),
48 metric_fields: Vec::new(),
49 tag_fields: Vec::new(),
50 retention_policy: None,
51 database: "default".to_string(),
52 measurement: "data".to_string(),
53 partition_strategy: PartitionStrategy::ByTime {
54 interval: TimeInterval::Hours(1),
55 },
56 }
57 }
58}
59
60#[derive(Debug, Clone, PartialEq)]
61pub enum PartitionStrategy {
62 ByTime { interval: TimeInterval },
64 ByTag { tag: String },
66 ByValue { size: f64 },
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub enum TimeInterval {
72 Minutes(u32),
73 Hours(u32),
74 Days(u32),
75}
76
77impl TimeInterval {
78 pub fn as_seconds(&self) -> u64 {
79 match self {
80 TimeInterval::Minutes(m) => (*m as u64) * 60,
81 TimeInterval::Hours(h) => (*h as u64) * 3600,
82 TimeInterval::Days(d) => (*d as u64) * 86400,
83 }
84 }
85
86 pub fn as_string(&self) -> String {
87 match self {
88 TimeInterval::Minutes(m) => format!("{}m", m),
89 TimeInterval::Hours(h) => format!("{}h", h),
90 TimeInterval::Days(d) => format!("{}d", d),
91 }
92 }
93}
94
95pub trait TimeSeriesConverter {
97 fn convert(&self, data: &HashMap<String, Value>, mappings: &[FieldMapping]) -> Result<TimeSeriesPoint>;
99
100 fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>>;
102}
103
104pub struct InfluxDbConverter {
106 config: TimeSeriesConfig,
107}
108
109impl InfluxDbConverter {
110 pub fn new(config: TimeSeriesConfig) -> Self {
111 Self { config }
112 }
113
114 pub fn with_default_config() -> Self {
115 Self::new(TimeSeriesConfig::default())
116 }
117
118 pub fn convert_to_line_protocol(
120 &self,
121 data: &HashMap<String, Value>,
122 mappings: &[FieldMapping],
123 ) -> Result<String> {
124 let point = self.convert(data, mappings)?;
125 Ok(self.point_to_line_protocol(&point))
126 }
127
128 fn point_to_line_protocol(&self, point: &TimeSeriesPoint) -> String {
130 let mut output = format!("{},", self.config.measurement);
131
132 let tags: Vec<String> = point.tags.iter()
134 .map(|(k, v)| format!("{}={}", k, v))
135 .collect();
136 output.push_str(&tags.join(","));
137 output.push(' ');
138
139 let fields: Vec<String> = point.fields.iter()
141 .map(|(k, v)| format!("{}={}", k, v))
142 .collect();
143 output.push_str(&fields.join(","));
144 output.push(' ');
145
146 output.push_str(&point.timestamp.to_string());
148
149 output
150 }
151}
152
153impl TimeSeriesConverter for InfluxDbConverter {
154 fn convert(&self, data: &HashMap<String, Value>, _mappings: &[FieldMapping]) -> Result<TimeSeriesPoint> {
155 let mut point = TimeSeriesPoint::default();
156
157 point.timestamp = data.get(&self.config.timestamp_field)
159 .and_then(|v| v.as_i64())
160 .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
161
162 for tag_field in &self.config.tag_fields {
164 if let Some(value) = data.get(tag_field) {
165 if let Some(s) = value.as_str() {
166 point.tags.insert(tag_field.clone(), s.to_string());
167 }
168 }
169 }
170
171 for metric_field in &self.config.metric_fields {
173 if let Some(value) = data.get(metric_field) {
174 let metric_value = match value {
175 Value::Number(n) => {
176 if n.is_f64() {
177 MetricValue::Float(n.as_f64().unwrap())
178 } else if n.is_i64() {
179 MetricValue::Integer(n.as_i64().unwrap())
180 } else {
181 MetricValue::Float(n.to_string().parse().unwrap_or(0.0))
182 }
183 }
184 Value::Bool(b) => MetricValue::Boolean(*b),
185 Value::String(s) => MetricValue::String(s.clone()),
186 _ => continue,
187 };
188 point.fields.insert(metric_field.clone(), metric_value);
189 }
190 }
191
192 Ok(point)
193 }
194
195 fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>> {
196 let mut points = Vec::new();
197 for data in data_list {
198 let point = self.convert(data, mappings)?;
199 points.push(point);
200 }
201 Ok(points)
202 }
203}
204
205#[derive(Debug, Clone, Default)]
207pub struct TimeSeriesPoint {
208 pub measurement: String,
210 pub tags: HashMap<String, String>,
212 pub fields: HashMap<String, MetricValue>,
214 pub timestamp: i64,
216}
217
218impl TimeSeriesPoint {
219 pub fn new(measurement: &str) -> Self {
220 Self {
221 measurement: measurement.to_string(),
222 ..Default::default()
223 }
224 }
225
226 pub fn with_tag(mut self, key: &str, value: &str) -> Self {
227 self.tags.insert(key.to_string(), value.to_string());
228 self
229 }
230
231 pub fn with_field(mut self, key: &str, value: MetricValue) -> Self {
232 self.fields.insert(key.to_string(), value);
233 self
234 }
235
236 pub fn with_timestamp(mut self, timestamp: i64) -> Self {
237 self.timestamp = timestamp;
238 self
239 }
240}
241
242#[derive(Debug, Clone)]
244pub enum MetricValue {
245 Float(f64),
246 Integer(i64),
247 UInteger(u64),
248 Boolean(bool),
249 String(String),
250}
251
252impl std::fmt::Display for MetricValue {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 match self {
255 MetricValue::Float(v) => write!(f, "{}", v),
256 MetricValue::Integer(v) => write!(f, "{}i", v),
257 MetricValue::UInteger(v) => write!(f, "{}u", v),
258 MetricValue::Boolean(v) => write!(f, "{}", v),
259 MetricValue::String(v) => write!(f, "\"{}\"", v),
260 }
261 }
262}
263
264impl MetricValue {
265 pub fn as_f64(&self) -> f64 {
266 match self {
267 MetricValue::Float(v) => *v,
268 MetricValue::Integer(v) => *v as f64,
269 MetricValue::UInteger(v) => *v as f64,
270 MetricValue::Boolean(v) => if *v { 1.0 } else { 0.0 },
271 MetricValue::String(v) => v.parse().unwrap_or(0.0),
272 }
273 }
274
275 pub fn as_string(&self) -> String {
276 match self {
277 MetricValue::Float(v) => format!("{}", v),
278 MetricValue::Integer(v) => format!("{}i", v),
279 MetricValue::UInteger(v) => format!("{}u", v),
280 MetricValue::Boolean(v) => format!("{}", v),
281 MetricValue::String(v) => format!("\"{}\"", v),
282 }
283 }
284}
285
286pub struct TimescaleDbConverter {
288 config: TimeSeriesConfig,
289}
290
291impl TimescaleDbConverter {
292 pub fn new(config: TimeSeriesConfig) -> Self {
293 Self { config }
294 }
295
296 pub fn with_default_config() -> Self {
297 Self::new(TimeSeriesConfig::default())
298 }
299
300 pub fn generate_hypertable_sql(&self) -> String {
302 let partition_interval = match self.config.partition_strategy {
303 PartitionStrategy::ByTime { interval } => interval.as_string(),
304 _ => "1 day".to_string(),
305 };
306
307 format!(
308 "SELECT create_hypertable('{}.{}', '{}', chunk_time_interval => INTERVAL '{}');",
309 self.config.database,
310 self.config.measurement,
311 self.config.timestamp_field,
312 partition_interval
313 )
314 }
315}
316
317impl TimeSeriesConverter for TimescaleDbConverter {
318 fn convert(&self, data: &HashMap<String, Value>, _mappings: &[FieldMapping]) -> Result<TimeSeriesPoint> {
319 let mut point = TimeSeriesPoint::new(&self.config.measurement);
320
321 point.timestamp = data.get(&self.config.timestamp_field)
323 .and_then(|v| v.as_i64())
324 .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
325
326 for tag_field in &self.config.tag_fields {
328 if let Some(value) = data.get(tag_field) {
329 if let Some(s) = value.as_str() {
330 point.tags.insert(tag_field.clone(), s.to_string());
331 }
332 }
333 }
334
335 for metric_field in &self.config.metric_fields {
337 if let Some(value) = data.get(metric_field) {
338 let metric_value = match value {
339 Value::Number(n) => {
340 if n.is_f64() {
341 MetricValue::Float(n.as_f64().unwrap())
342 } else {
343 MetricValue::Integer(n.as_i64().unwrap_or(0))
344 }
345 }
346 Value::Bool(b) => MetricValue::Boolean(*b),
347 Value::String(s) => MetricValue::String(s.clone()),
348 _ => continue,
349 };
350 point.fields.insert(metric_field.clone(), metric_value);
351 }
352 }
353
354 Ok(point)
355 }
356
357 fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>> {
358 let mut points = Vec::new();
359 for data in data_list {
360 let point = self.convert(data, mappings)?;
361 points.push(point);
362 }
363 Ok(points)
364 }
365}
366
367pub struct TaodbConverter {
369 config: TimeSeriesConfig,
370}
371
372impl TaodbConverter {
373 pub fn new(config: TimeSeriesConfig) -> Self {
374 Self { config }
375 }
376
377 pub fn with_default_config() -> Self {
378 Self::new(TimeSeriesConfig::default())
379 }
380
381 pub fn convert_to_taodb(
383 &self,
384 data: &HashMap<String, Value>,
385 _mappings: &[FieldMapping],
386 ) -> Result<TaodbRecord> {
387 let mut record = TaodbRecord::default();
388
389 record.timestamp = data.get(&self.config.timestamp_field)
391 .and_then(|v| v.as_i64())
392 .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
393
394 for tag_field in &self.config.tag_fields {
396 if let Some(value) = data.get(tag_field) {
397 if let Some(s) = value.as_str() {
398 record.tags.insert(tag_field.clone(), s.to_string());
399 }
400 }
401 }
402
403 for metric_field in &self.config.metric_fields {
405 if let Some(value) = data.get(metric_field) {
406 if let Some(n) = value.as_f64() {
407 record.metrics.insert(metric_field.clone(), n);
408 }
409 }
410 }
411
412 Ok(record)
413 }
414
415 pub fn generate_insert_sql(&self, record: &TaodbRecord) -> String {
417 let tags: Vec<String> = record.tags.iter()
418 .map(|(k, v)| format!("tag {}='{}'", k, v))
419 .collect();
420
421 let metrics: Vec<String> = record.metrics.iter()
422 .map(|(k, v)| format!("field {}={}", k, v))
423 .collect();
424
425 format!(
426 "INSERT INTO {} ({}) TAGS ({}) VALUES ({}) {}",
427 self.config.measurement,
428 metrics.join(", "),
429 tags.join(", "),
430 self.config.timestamp_field,
431 record.timestamp
432 )
433 }
434}
435
436impl TimeSeriesConverter for TaodbConverter {
437 fn convert(&self, data: &HashMap<String, Value>, _mappings: &[FieldMapping]) -> Result<TimeSeriesPoint> {
438 let mut point = TimeSeriesPoint::new(&self.config.measurement);
439
440 point.timestamp = data.get(&self.config.timestamp_field)
441 .and_then(|v| v.as_i64())
442 .unwrap_or_else(|| chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0));
443
444 for tag_field in &self.config.tag_fields {
445 if let Some(value) = data.get(tag_field) {
446 if let Some(s) = value.as_str() {
447 point.tags.insert(tag_field.clone(), s.to_string());
448 }
449 }
450 }
451
452 for metric_field in &self.config.metric_fields {
453 if let Some(value) = data.get(metric_field) {
454 let metric_value = match value {
455 Value::Number(n) => {
456 if n.is_f64() {
457 MetricValue::Float(n.as_f64().unwrap())
458 } else {
459 MetricValue::Integer(n.as_i64().unwrap_or(0))
460 }
461 }
462 Value::Bool(b) => MetricValue::Boolean(*b),
463 Value::String(s) => MetricValue::String(s.clone()),
464 _ => continue,
465 };
466 point.fields.insert(metric_field.clone(), metric_value);
467 }
468 }
469
470 Ok(point)
471 }
472
473 fn convert_batch(&self, data_list: &[HashMap<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<TimeSeriesPoint>> {
474 let mut points = Vec::new();
475 for data in data_list {
476 let point = self.convert(data, mappings)?;
477 points.push(point);
478 }
479 Ok(points)
480 }
481}
482
483#[derive(Debug, Clone, Default)]
485pub struct TaodbRecord {
486 pub timestamp: i64,
487 pub tags: HashMap<String, String>,
488 pub metrics: HashMap<String, f64>,
489}
490
491pub struct TimeSeriesBatchWriter {
493 converters: Vec<Box<dyn TimeSeriesConverter>>,
494 batch_size: usize,
495}
496
497impl TimeSeriesBatchWriter {
498 pub fn new(batch_size: usize) -> Self {
499 Self {
500 converters: Vec::new(),
501 batch_size,
502 }
503 }
504
505 pub fn add_converter<C: TimeSeriesConverter + 'static>(&mut self, converter: C) {
506 self.converters.push(Box::new(converter));
507 }
508
509 pub fn write_batch(
511 &self,
512 data_list: &[HashMap<String, Value>],
513 mappings: &[FieldMapping],
514 ) -> Result<BatchWriteResult> {
515 let mut total_points = 0;
516 let mut batches = Vec::new();
517
518 for converter in &self.converters {
519 let points = converter.convert_batch(data_list, mappings)?;
520 total_points += points.len();
521 batches.push(points);
522 }
523
524 Ok(BatchWriteResult {
525 total_points,
526 batch_count: batches.len(),
527 status: WriteStatus::Success,
528 })
529 }
530}
531
532#[derive(Debug, Clone)]
534pub struct BatchWriteResult {
535 pub total_points: usize,
536 pub batch_count: usize,
537 pub status: WriteStatus,
538}
539
540#[derive(Debug, Clone, PartialEq, Eq)]
541pub enum WriteStatus {
542 Success,
543 PartialSuccess,
544 Failed,
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550
551 #[test]
552 fn test_influxdb_converter() {
553 let config = TimeSeriesConfig {
554 timestamp_field: "time".to_string(),
555 metric_fields: vec!["temperature".to_string(), "humidity".to_string()],
556 tag_fields: vec!["location".to_string(), "sensor_id".to_string()],
557 ..Default::default()
558 };
559
560 let converter = InfluxDbConverter::new(config);
561
562 let mut data = HashMap::new();
563 data.insert("time".to_string(), Value::Number(1640000000000000000i64.into()));
564 data.insert("temperature".to_string(), Value::Number(serde_json::Number::from_f64(25.5).unwrap()));
565 data.insert("humidity".to_string(), Value::Number(serde_json::Number::from_f64(60.0).unwrap()));
566 data.insert("location".to_string(), Value::String("Beijing".to_string()));
567 data.insert("sensor_id".to_string(), Value::String("sensor_001".to_string()));
568
569 let point = converter.convert(&data, &[]).unwrap();
570
571 assert_eq!(point.timestamp, 1640000000000000000i64);
572 assert!(point.tags.contains_key("location"));
573 assert!(point.fields.contains_key("temperature"));
574
575 let line_protocol = converter.point_to_line_protocol(&point);
576 println!("Line Protocol: {}", line_protocol);
577 }
578
579 #[test]
580 fn test_timescale_hypertable_sql() {
581 let config = TimeSeriesConfig {
582 database: "metrics".to_string(),
583 measurement: "sensor_data".to_string(),
584 timestamp_field: "timestamp".to_string(),
585 partition_strategy: PartitionStrategy::ByTime { interval: TimeInterval::Hours(1) },
586 ..Default::default()
587 };
588
589 let converter = TimescaleDbConverter::new(config);
590 let sql = converter.generate_hypertable_sql();
591
592 assert!(sql.contains("create_hypertable"));
593 assert!(sql.contains("sensor_data"));
594
595 println!("Hypertable SQL: {}", sql);
596 }
597
598 #[test]
599 fn test_taodb_converter() {
600 let config = TimeSeriesConfig {
601 timestamp_field: "ts".to_string(),
602 metric_fields: vec!["cpu_usage".to_string()],
603 tag_fields: vec!["host".to_string()],
604 ..Default::default()
605 };
606
607 let converter = TaodbConverter::new(config);
608
609 let mut data = HashMap::new();
610 data.insert("ts".to_string(), Value::Number(1640000000000i64.into()));
611 data.insert("cpu_usage".to_string(), Value::Number(serde_json::Number::from_f64(85.5).unwrap()));
612 data.insert("host".to_string(), Value::String("server01".to_string()));
613
614 let record = converter.convert_to_taodb(&data, &[]).unwrap();
615
616 assert_eq!(record.timestamp, 1640000000000i64);
617 assert!(record.metrics.contains_key("cpu_usage"));
618
619 let sql = converter.generate_insert_sql(&record);
620 println!("Taodb SQL: {}", sql);
621 }
622}