1use crate::error::{TsdbError, TsdbResult};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::fmt::Write as FmtWrite;
29use std::path::Path;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
37pub enum DataValueType {
38 #[default]
40 Float64,
41 Int64,
43 Float32,
45 Text,
47}
48
49impl DataValueType {
50 pub fn sql_type(&self) -> &'static str {
52 match self {
53 DataValueType::Float64 => "DOUBLE",
54 DataValueType::Int64 => "BIGINT",
55 DataValueType::Float32 => "FLOAT",
56 DataValueType::Text => "VARCHAR",
57 }
58 }
59}
60
61impl std::fmt::Display for DataValueType {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.sql_type())
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct MetricSchema {
82 pub metric_name: String,
84 pub tag_keys: Vec<String>,
86 pub value_type: DataValueType,
88 pub description: Option<String>,
90}
91
92impl MetricSchema {
93 pub fn builder(metric_name: impl Into<String>) -> MetricSchemaBuilder {
95 MetricSchemaBuilder::new(metric_name)
96 }
97
98 pub fn table_name(&self) -> String {
102 sanitize_sql_identifier(&self.metric_name)
103 }
104
105 pub fn has_tags(&self) -> bool {
107 !self.tag_keys.is_empty()
108 }
109
110 pub fn column_count(&self) -> usize {
112 2 + self.tag_keys.len()
113 }
114}
115
116pub struct MetricSchemaBuilder {
122 metric_name: String,
123 tag_keys: Vec<String>,
124 value_type: DataValueType,
125 description: Option<String>,
126}
127
128impl MetricSchemaBuilder {
129 pub fn new(metric_name: impl Into<String>) -> Self {
131 Self {
132 metric_name: metric_name.into(),
133 tag_keys: Vec::new(),
134 value_type: DataValueType::Float64,
135 description: None,
136 }
137 }
138
139 pub fn with_tag(mut self, key: impl Into<String>) -> Self {
141 self.tag_keys.push(key.into());
142 self
143 }
144
145 pub fn with_value_type(mut self, vt: DataValueType) -> Self {
147 self.value_type = vt;
148 self
149 }
150
151 pub fn with_description(mut self, desc: impl Into<String>) -> Self {
153 self.description = Some(desc.into());
154 self
155 }
156
157 pub fn build(self) -> MetricSchema {
159 MetricSchema {
160 metric_name: self.metric_name,
161 tag_keys: self.tag_keys,
162 value_type: self.value_type,
163 description: self.description,
164 }
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
174pub struct SqlDataPoint {
175 pub timestamp_ms: i64,
177 pub value: f64,
179 pub tags: HashMap<String, String>,
181}
182
183impl SqlDataPoint {
184 pub fn new(timestamp_ms: i64, value: f64) -> Self {
186 Self {
187 timestamp_ms,
188 value,
189 tags: HashMap::new(),
190 }
191 }
192
193 pub fn with_tags(timestamp_ms: i64, value: f64, tags: HashMap<String, String>) -> Self {
195 Self {
196 timestamp_ms,
197 value,
198 tags,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Default)]
212pub struct SqlExporter {
213 pub batch_size: usize,
215 pub drop_existing: bool,
217 pub if_not_exists: bool,
219}
220
221impl SqlExporter {
222 pub fn new() -> Self {
224 Self {
225 batch_size: 1000,
226 drop_existing: false,
227 if_not_exists: true,
228 }
229 }
230
231 pub fn with_batch_size(mut self, size: usize) -> Self {
233 self.batch_size = size;
234 self
235 }
236
237 pub fn with_drop_existing(mut self, drop: bool) -> Self {
239 self.drop_existing = drop;
240 self
241 }
242
243 pub fn create_table_sql(&self, schema: &MetricSchema) -> String {
249 let mut sql = String::new();
250
251 if let Some(desc) = &schema.description {
252 let _ = writeln!(sql, "-- {desc}");
253 }
254
255 if self.drop_existing {
256 let _ = writeln!(
257 sql,
258 "DROP TABLE IF EXISTS {table};",
259 table = schema.table_name()
260 );
261 }
262
263 let if_not_exists = if self.if_not_exists {
264 "IF NOT EXISTS "
265 } else {
266 ""
267 };
268
269 let _ = write!(
270 sql,
271 "CREATE TABLE {if_not_exists}{table} (\n timestamp_ms BIGINT NOT NULL,\n value {vtype} NOT NULL",
272 table = schema.table_name(),
273 vtype = schema.value_type.sql_type(),
274 );
275
276 for tag in &schema.tag_keys {
277 let _ = write!(sql, ",\n {} VARCHAR", sanitize_sql_identifier(tag));
278 }
279
280 let _ = write!(sql, "\n);");
281 sql
282 }
283
284 pub fn insert_sql(&self, schema: &MetricSchema, points: &[SqlDataPoint]) -> Vec<String> {
289 if points.is_empty() {
290 return vec![];
291 }
292
293 let chunk_size = if self.batch_size == 0 {
294 points.len()
295 } else {
296 self.batch_size
297 };
298
299 let table = schema.table_name();
300 let tag_cols: Vec<String> = schema
301 .tag_keys
302 .iter()
303 .map(|k| sanitize_sql_identifier(k))
304 .collect();
305
306 let col_list = if tag_cols.is_empty() {
308 "timestamp_ms, value".to_string()
309 } else {
310 format!("timestamp_ms, value, {}", tag_cols.join(", "))
311 };
312
313 points
314 .chunks(chunk_size)
315 .map(|chunk| {
316 let mut sql = format!("INSERT INTO {table} ({col_list}) VALUES\n");
317 let mut first = true;
318 for pt in chunk {
319 if !first {
320 sql.push_str(",\n");
321 }
322 first = false;
323 if tag_cols.is_empty() {
324 let _ =
325 write!(sql, " ({}, {})", pt.timestamp_ms, escape_float(pt.value));
326 } else {
327 let tag_vals: Vec<String> = schema
328 .tag_keys
329 .iter()
330 .map(|k| {
331 pt.tags
332 .get(k)
333 .map(|v| format!("'{}'", v.replace('\'', "''")))
334 .unwrap_or_else(|| "NULL".to_string())
335 })
336 .collect();
337 let _ = write!(
338 sql,
339 " ({}, {}, {})",
340 pt.timestamp_ms,
341 escape_float(pt.value),
342 tag_vals.join(", ")
343 );
344 }
345 }
346 sql.push(';');
347 sql
348 })
349 .collect()
350 }
351
352 pub fn export_to_sql_file(
358 &self,
359 schema: &MetricSchema,
360 points: &[SqlDataPoint],
361 path: &Path,
362 ) -> TsdbResult<()> {
363 use std::io::Write as IoWrite;
364 let file = std::fs::File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
365 let mut writer = std::io::BufWriter::new(file);
366
367 writeln!(writer, "{}", self.create_table_sql(schema))
368 .map_err(|e| TsdbError::Io(e.to_string()))?;
369
370 for batch in self.insert_sql(schema, points) {
371 writeln!(writer, "{batch}").map_err(|e| TsdbError::Io(e.to_string()))?;
372 }
373
374 Ok(())
375 }
376
377 pub fn select_range_sql(&self, schema: &MetricSchema, start_ms: i64, end_ms: i64) -> String {
383 let tag_cols = if schema.tag_keys.is_empty() {
384 String::new()
385 } else {
386 let cols: Vec<String> = schema
387 .tag_keys
388 .iter()
389 .map(|k| sanitize_sql_identifier(k))
390 .collect();
391 format!(", {}", cols.join(", "))
392 };
393 format!(
394 "SELECT timestamp_ms, value{tag_cols} \
395 FROM {table} \
396 WHERE timestamp_ms BETWEEN {start} AND {end} \
397 ORDER BY timestamp_ms ASC;",
398 table = schema.table_name(),
399 start = start_ms,
400 end = end_ms,
401 tag_cols = tag_cols,
402 )
403 }
404
405 pub fn count_sql(&self, schema: &MetricSchema) -> String {
407 format!(
408 "SELECT COUNT(*) AS row_count FROM {table};",
409 table = schema.table_name()
410 )
411 }
412
413 pub fn summary_sql(&self, schema: &MetricSchema) -> String {
415 format!(
416 "SELECT \
417 MIN(timestamp_ms) AS first_ts, \
418 MAX(timestamp_ms) AS last_ts, \
419 MIN(value) AS min_val, \
420 MAX(value) AS max_val, \
421 AVG(value) AS avg_val, \
422 COUNT(*) AS row_count \
423 FROM {table};",
424 table = schema.table_name()
425 )
426 }
427
428 pub fn delete_before_sql(&self, schema: &MetricSchema, before_ms: i64) -> String {
430 format!(
431 "DELETE FROM {table} WHERE timestamp_ms < {before_ms};",
432 table = schema.table_name()
433 )
434 }
435
436 pub fn create_index_sql(&self, schema: &MetricSchema) -> String {
438 format!(
439 "CREATE INDEX IF NOT EXISTS idx_{table}_ts ON {table} (timestamp_ms);",
440 table = schema.table_name()
441 )
442 }
443
444 pub fn add_tag_column_sql(&self, schema: &MetricSchema, tag_key: &str) -> String {
446 format!(
447 "ALTER TABLE {table} ADD COLUMN {col} VARCHAR;",
448 table = schema.table_name(),
449 col = sanitize_sql_identifier(tag_key)
450 )
451 }
452
453 pub fn infer_schema(metric_name: &str, points: &[SqlDataPoint]) -> MetricSchema {
455 let mut tag_keys: Vec<String> = points
456 .iter()
457 .flat_map(|p| p.tags.keys().cloned())
458 .collect::<std::collections::HashSet<_>>()
459 .into_iter()
460 .collect();
461 tag_keys.sort();
462 MetricSchema::builder(metric_name)
463 .with_value_type(DataValueType::Float64)
464 .build()
465 .with_sorted_tags(tag_keys)
466 }
467}
468
469fn sanitize_sql_identifier(name: &str) -> String {
475 if name.is_empty() {
476 return "_".to_string();
477 }
478 let mut out = String::with_capacity(name.len());
479 for c in name.chars() {
480 if c.is_ascii_alphanumeric() || c == '_' {
481 out.push(c);
482 } else {
483 out.push('_');
484 }
485 }
486 if out.starts_with(|c: char| c.is_ascii_digit()) {
488 out.insert(0, '_');
489 }
490 out
491}
492
493fn escape_float(v: f64) -> String {
500 if v.is_nan() {
501 "'NaN'".to_string()
502 } else if v.is_infinite() && v > 0.0 {
503 "'Infinity'".to_string()
504 } else if v.is_infinite() {
505 "'-Infinity'".to_string()
506 } else {
507 format!("{v}")
508 }
509}
510
511impl MetricSchema {
513 fn with_sorted_tags(mut self, tags: Vec<String>) -> Self {
514 self.tag_keys = tags;
515 self
516 }
517}
518
519#[cfg(test)]
524mod tests {
525 use super::*;
526
527 fn sample_schema() -> MetricSchema {
528 MetricSchema::builder("cpu_usage")
529 .with_tag("host")
530 .with_tag("region")
531 .with_description("CPU utilisation percent")
532 .build()
533 }
534
535 fn sample_points(n: usize) -> Vec<SqlDataPoint> {
536 (0..n)
537 .map(|i| {
538 let mut tags = HashMap::new();
539 tags.insert("host".to_string(), format!("srv-{i:02}"));
540 tags.insert("region".to_string(), "eu-west".to_string());
541 SqlDataPoint::with_tags(i as i64 * 1_000, i as f64 * 1.5, tags)
542 })
543 .collect()
544 }
545
546 #[test]
549 fn test_value_type_sql_types() {
550 assert_eq!(DataValueType::Float64.sql_type(), "DOUBLE");
551 assert_eq!(DataValueType::Int64.sql_type(), "BIGINT");
552 assert_eq!(DataValueType::Float32.sql_type(), "FLOAT");
553 assert_eq!(DataValueType::Text.sql_type(), "VARCHAR");
554 }
555
556 #[test]
557 fn test_value_type_display() {
558 assert_eq!(format!("{}", DataValueType::Float64), "DOUBLE");
559 assert_eq!(format!("{}", DataValueType::Text), "VARCHAR");
560 }
561
562 #[test]
563 fn test_value_type_default() {
564 assert_eq!(DataValueType::default(), DataValueType::Float64);
565 }
566
567 #[test]
570 fn test_metric_schema_table_name_clean() {
571 let schema = MetricSchema::builder("cpu_usage").build();
572 assert_eq!(schema.table_name(), "cpu_usage");
573 }
574
575 #[test]
576 fn test_metric_schema_table_name_special_chars() {
577 let schema = MetricSchema::builder("my-metric.v2").build();
578 let name = schema.table_name();
579 assert!(!name.contains('-'));
581 assert!(!name.contains('.'));
582 }
583
584 #[test]
585 fn test_metric_schema_has_tags() {
586 let no_tags = MetricSchema::builder("x").build();
587 assert!(!no_tags.has_tags());
588
589 let with_tags = MetricSchema::builder("x").with_tag("host").build();
590 assert!(with_tags.has_tags());
591 }
592
593 #[test]
594 fn test_metric_schema_column_count() {
595 let s = MetricSchema::builder("m")
596 .with_tag("a")
597 .with_tag("b")
598 .build();
599 assert_eq!(s.column_count(), 4);
601 }
602
603 #[test]
604 fn test_metric_schema_builder_value_type() {
605 let schema = MetricSchema::builder("count")
606 .with_value_type(DataValueType::Int64)
607 .build();
608 assert_eq!(schema.value_type, DataValueType::Int64);
609 }
610
611 #[test]
614 fn test_create_table_sql_contains_table_name() {
615 let schema = sample_schema();
616 let exporter = SqlExporter::new();
617 let sql = exporter.create_table_sql(&schema);
618 assert!(sql.contains("cpu_usage"), "sql = {sql}");
619 }
620
621 #[test]
622 fn test_create_table_sql_has_timestamp_and_value() {
623 let schema = sample_schema();
624 let exporter = SqlExporter::new();
625 let sql = exporter.create_table_sql(&schema);
626 assert!(sql.contains("timestamp_ms BIGINT NOT NULL"));
627 assert!(sql.contains("value DOUBLE NOT NULL"));
628 }
629
630 #[test]
631 fn test_create_table_sql_has_tag_columns() {
632 let schema = sample_schema();
633 let exporter = SqlExporter::new();
634 let sql = exporter.create_table_sql(&schema);
635 assert!(sql.contains("host VARCHAR"));
636 assert!(sql.contains("region VARCHAR"));
637 }
638
639 #[test]
640 fn test_create_table_sql_no_tags() {
641 let schema = MetricSchema::builder("temp").build();
642 let exporter = SqlExporter::new();
643 let sql = exporter.create_table_sql(&schema);
644 assert!(sql.contains("timestamp_ms"));
645 assert!(sql.contains("value DOUBLE"));
646 assert!(!sql.contains("VARCHAR"));
647 }
648
649 #[test]
650 fn test_create_table_sql_if_not_exists() {
651 let exporter = SqlExporter::new(); let schema = MetricSchema::builder("m").build();
653 let sql = exporter.create_table_sql(&schema);
654 assert!(sql.contains("IF NOT EXISTS"));
655 }
656
657 #[test]
658 fn test_create_table_sql_drop_existing() {
659 let exporter = SqlExporter::new().with_drop_existing(true);
660 let schema = MetricSchema::builder("m").build();
661 let sql = exporter.create_table_sql(&schema);
662 assert!(sql.contains("DROP TABLE IF EXISTS"));
663 }
664
665 #[test]
666 fn test_create_table_sql_description_comment() {
667 let schema = MetricSchema::builder("cpu")
668 .with_description("CPU usage")
669 .build();
670 let exporter = SqlExporter::new();
671 let sql = exporter.create_table_sql(&schema);
672 assert!(sql.contains("-- CPU usage"));
673 }
674
675 #[test]
678 fn test_insert_sql_empty_returns_empty_vec() {
679 let schema = sample_schema();
680 let exporter = SqlExporter::new();
681 let batches = exporter.insert_sql(&schema, &[]);
682 assert!(batches.is_empty());
683 }
684
685 #[test]
686 fn test_insert_sql_contains_values() {
687 let schema = MetricSchema::builder("temp").build();
688 let points = vec![
689 SqlDataPoint::new(1_000, 22.5),
690 SqlDataPoint::new(2_000, 23.0),
691 ];
692 let exporter = SqlExporter::new();
693 let batches = exporter.insert_sql(&schema, &points);
694 assert_eq!(batches.len(), 1);
695 let sql = &batches[0];
696 assert!(sql.contains("INSERT INTO"));
697 assert!(sql.contains("1000"));
698 assert!(sql.contains("2000"));
699 }
700
701 #[test]
702 fn test_insert_sql_with_tags() {
703 let schema = sample_schema();
704 let points = sample_points(3);
705 let exporter = SqlExporter::new();
706 let batches = exporter.insert_sql(&schema, &points);
707 assert_eq!(batches.len(), 1);
708 let sql = &batches[0];
709 assert!(sql.contains("eu-west"));
710 assert!(sql.contains("host"));
711 }
712
713 #[test]
714 fn test_insert_sql_batching() {
715 let schema = MetricSchema::builder("m").build();
716 let points: Vec<SqlDataPoint> = (0..10).map(|i| SqlDataPoint::new(i, i as f64)).collect();
717 let exporter = SqlExporter::new().with_batch_size(3);
718 let batches = exporter.insert_sql(&schema, &points);
719 assert_eq!(batches.len(), 4);
721 }
722
723 #[test]
724 fn test_insert_sql_tag_null_when_missing() {
725 let schema = MetricSchema::builder("m").with_tag("host").build();
726 let points = vec![SqlDataPoint::new(1_000, 5.0)]; let exporter = SqlExporter::new();
728 let batches = exporter.insert_sql(&schema, &points);
729 assert_eq!(batches.len(), 1);
730 assert!(batches[0].contains("NULL"));
731 }
732
733 #[test]
736 fn test_export_to_sql_file() {
737 let dir = std::env::temp_dir();
738 let path = dir.join("oxirs_tsdb_test_export.sql");
739
740 let schema = MetricSchema::builder("sensor").with_tag("device").build();
741 let points = sample_points(5);
742 let exporter = SqlExporter::new();
743 exporter
744 .export_to_sql_file(&schema, &points, &path)
745 .expect("export should succeed");
746
747 let content = std::fs::read_to_string(&path).expect("read sql file");
748 assert!(content.contains("CREATE TABLE"));
749 assert!(content.contains("INSERT INTO"));
750 let _ = std::fs::remove_file(&path);
751 }
752
753 #[test]
756 fn test_select_range_sql() {
757 let schema = sample_schema();
758 let exporter = SqlExporter::new();
759 let sql = exporter.select_range_sql(&schema, 1_000, 5_000);
760 assert!(sql.contains("timestamp_ms BETWEEN 1000 AND 5000"));
761 assert!(sql.contains("ORDER BY timestamp_ms ASC"));
762 }
763
764 #[test]
765 fn test_select_range_sql_includes_tag_columns() {
766 let schema = sample_schema();
767 let exporter = SqlExporter::new();
768 let sql = exporter.select_range_sql(&schema, 0, 9_999);
769 assert!(sql.contains("host"));
770 assert!(sql.contains("region"));
771 }
772
773 #[test]
774 fn test_count_sql() {
775 let schema = sample_schema();
776 let exporter = SqlExporter::new();
777 let sql = exporter.count_sql(&schema);
778 assert!(sql.contains("COUNT(*)"));
779 assert!(sql.contains("row_count"));
780 assert!(sql.contains("cpu_usage"));
781 }
782
783 #[test]
784 fn test_summary_sql() {
785 let schema = sample_schema();
786 let exporter = SqlExporter::new();
787 let sql = exporter.summary_sql(&schema);
788 assert!(sql.contains("MIN(value)"));
789 assert!(sql.contains("MAX(value)"));
790 assert!(sql.contains("AVG(value)"));
791 assert!(sql.contains("row_count"));
792 }
793
794 #[test]
795 fn test_delete_before_sql() {
796 let schema = sample_schema();
797 let exporter = SqlExporter::new();
798 let sql = exporter.delete_before_sql(&schema, 1_000_000);
799 assert!(sql.contains("DELETE FROM"));
800 assert!(sql.contains("1000000"));
801 assert!(sql.contains("timestamp_ms <"));
802 }
803
804 #[test]
805 fn test_create_index_sql() {
806 let schema = sample_schema();
807 let exporter = SqlExporter::new();
808 let sql = exporter.create_index_sql(&schema);
809 assert!(sql.contains("CREATE INDEX IF NOT EXISTS"));
810 assert!(sql.contains("timestamp_ms"));
811 }
812
813 #[test]
814 fn test_add_tag_column_sql() {
815 let schema = MetricSchema::builder("m").build();
816 let exporter = SqlExporter::new();
817 let sql = exporter.add_tag_column_sql(&schema, "datacenter");
818 assert!(sql.contains("ALTER TABLE"));
819 assert!(sql.contains("ADD COLUMN datacenter VARCHAR"));
820 }
821
822 #[test]
825 fn test_escape_float_normal() {
826 let s = escape_float(42.0);
827 assert!(s.contains("42"));
828 }
829
830 #[test]
831 fn test_escape_float_special_values() {
832 assert_eq!(escape_float(f64::NAN), "'NaN'");
833 assert_eq!(escape_float(f64::INFINITY), "'Infinity'");
834 assert_eq!(escape_float(f64::NEG_INFINITY), "'-Infinity'");
835 }
836
837 #[test]
840 fn test_sanitize_identifier_clean() {
841 assert_eq!(sanitize_sql_identifier("cpu_usage"), "cpu_usage");
842 }
843
844 #[test]
845 fn test_sanitize_identifier_hyphen() {
846 let id = sanitize_sql_identifier("my-metric");
847 assert!(!id.contains('-'));
848 }
849
850 #[test]
851 fn test_sanitize_identifier_leading_digit() {
852 let id = sanitize_sql_identifier("1bad");
853 assert!(!id.starts_with(|c: char| c.is_ascii_digit()));
854 }
855
856 #[test]
857 fn test_sanitize_identifier_empty() {
858 assert_eq!(sanitize_sql_identifier(""), "_");
859 }
860
861 #[test]
864 fn test_sql_data_point_new() {
865 let p = SqlDataPoint::new(1_000, 42.0);
866 assert_eq!(p.timestamp_ms, 1_000);
867 assert!((p.value - 42.0).abs() < f64::EPSILON);
868 assert!(p.tags.is_empty());
869 }
870
871 #[test]
872 fn test_sql_data_point_with_tags() {
873 let mut tags = HashMap::new();
874 tags.insert("host".to_string(), "srv-01".to_string());
875 let p = SqlDataPoint::with_tags(2_000, 55.5, tags);
876 assert_eq!(p.tags["host"], "srv-01");
877 }
878
879 #[test]
882 fn test_infer_schema_from_points() {
883 let points = sample_points(5);
884 let schema = SqlExporter::infer_schema("cpu_usage", &points);
885 assert_eq!(schema.metric_name, "cpu_usage");
886 assert_eq!(schema.value_type, DataValueType::Float64);
887 }
888
889 #[test]
890 fn test_infer_schema_no_points() {
891 let schema = SqlExporter::infer_schema("empty_metric", &[]);
892 assert_eq!(schema.metric_name, "empty_metric");
893 assert!(schema.tag_keys.is_empty());
894 }
895}