Skip to main content

oxirs_tsdb/analytics/
sql_export.rs

1//! DuckDB-compatible SQL export for TSDB time-series data.
2//!
3//! Generates `CREATE TABLE` DDL and `INSERT` DML statements from TSDB metric
4//! data so that recordings can be loaded into any SQL engine — including
5//! DuckDB, SQLite, or PostgreSQL — without an external dependency.
6//!
7//! ## Usage
8//!
9//! ```rust
10//! use oxirs_tsdb::analytics::sql_export::{SqlExporter, MetricSchema, DataValueType};
11//!
12//! // Describe the schema of a metric.
13//! let schema = MetricSchema::builder("cpu_usage")
14//!     .with_tag("host")
15//!     .with_tag("region")
16//!     .build();
17//!
18//! // Generate SQL.
19//! let exporter = SqlExporter::new();
20//! let ddl = exporter.create_table_sql(&schema);
21//! assert!(ddl.contains("CREATE TABLE"));
22//! assert!(ddl.contains("cpu_usage"));
23//! ```
24
25use crate::error::{TsdbError, TsdbResult};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::fmt::Write as FmtWrite;
29use std::path::Path;
30
31// =============================================================================
32// DataValueType
33// =============================================================================
34
35/// The SQL column type used to represent a metric's value.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
37pub enum DataValueType {
38    /// 64-bit IEEE 754 floating-point (default).
39    #[default]
40    Float64,
41    /// 64-bit signed integer.
42    Int64,
43    /// 32-bit IEEE 754 floating-point.
44    Float32,
45    /// UTF-8 text (for string-valued metrics or enumerations).
46    Text,
47}
48
49impl DataValueType {
50    /// Return the SQL type keyword for this variant.
51    pub fn sql_type(&self) -> &'static str {
52        match self {
53            DataValueType::Float64 => "DOUBLE",
54            DataValueType::Int64 => "BIGINT",
55            DataValueType::Float32 => "FLOAT",
56            DataValueType::Text => "VARCHAR",
57        }
58    }
59}
60
61impl std::fmt::Display for DataValueType {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.sql_type())
64    }
65}
66
67// =============================================================================
68// MetricSchema
69// =============================================================================
70
71/// Describes the SQL schema of a TSDB metric.
72///
73/// A metric has:
74/// - A mandatory `timestamp_ms BIGINT NOT NULL` column.
75/// - A mandatory `value <DataValueType> NOT NULL` column.
76/// - Zero or more `VARCHAR` tag columns (one per tag key).
77///
78/// The resulting table name is `"<metric_name>"` (or a sanitised version if the
79/// name contains special characters).
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct MetricSchema {
82    /// Name of the metric (used as the SQL table name).
83    pub metric_name: String,
84    /// Ordered list of tag key names.
85    pub tag_keys: Vec<String>,
86    /// SQL type for the `value` column.
87    pub value_type: DataValueType,
88    /// Optional human-readable description (stored as a SQL comment).
89    pub description: Option<String>,
90}
91
92impl MetricSchema {
93    /// Create a `MetricSchemaBuilder` for `metric_name`.
94    pub fn builder(metric_name: impl Into<String>) -> MetricSchemaBuilder {
95        MetricSchemaBuilder::new(metric_name)
96    }
97
98    /// Return a sanitised SQL table name derived from `metric_name`.
99    ///
100    /// Non-alphanumeric / non-underscore characters are replaced with `_`.
101    pub fn table_name(&self) -> String {
102        sanitize_sql_identifier(&self.metric_name)
103    }
104
105    /// Return `true` if this schema has at least one tag column.
106    pub fn has_tags(&self) -> bool {
107        !self.tag_keys.is_empty()
108    }
109
110    /// Return the total number of columns (2 fixed + tags).
111    pub fn column_count(&self) -> usize {
112        2 + self.tag_keys.len()
113    }
114}
115
116// =============================================================================
117// MetricSchemaBuilder
118// =============================================================================
119
120/// Fluent builder for [`MetricSchema`].
121pub struct MetricSchemaBuilder {
122    metric_name: String,
123    tag_keys: Vec<String>,
124    value_type: DataValueType,
125    description: Option<String>,
126}
127
128impl MetricSchemaBuilder {
129    /// Create a builder with default settings.
130    pub fn new(metric_name: impl Into<String>) -> Self {
131        Self {
132            metric_name: metric_name.into(),
133            tag_keys: Vec::new(),
134            value_type: DataValueType::Float64,
135            description: None,
136        }
137    }
138
139    /// Add a tag key column.
140    pub fn with_tag(mut self, key: impl Into<String>) -> Self {
141        self.tag_keys.push(key.into());
142        self
143    }
144
145    /// Set the value column type.
146    pub fn with_value_type(mut self, vt: DataValueType) -> Self {
147        self.value_type = vt;
148        self
149    }
150
151    /// Set an optional description comment.
152    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
153        self.description = Some(desc.into());
154        self
155    }
156
157    /// Consume the builder and return the finished [`MetricSchema`].
158    pub fn build(self) -> MetricSchema {
159        MetricSchema {
160            metric_name: self.metric_name,
161            tag_keys: self.tag_keys,
162            value_type: self.value_type,
163            description: self.description,
164        }
165    }
166}
167
168// =============================================================================
169// DataPoint (SQL export local type)
170// =============================================================================
171
172/// A single time-series point ready for SQL export.
173#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
174pub struct SqlDataPoint {
175    /// Unix epoch milliseconds.
176    pub timestamp_ms: i64,
177    /// Observed value.
178    pub value: f64,
179    /// Tag key-value pairs (must match the tag keys in the associated schema).
180    pub tags: HashMap<String, String>,
181}
182
183impl SqlDataPoint {
184    /// Create a point without tags.
185    pub fn new(timestamp_ms: i64, value: f64) -> Self {
186        Self {
187            timestamp_ms,
188            value,
189            tags: HashMap::new(),
190        }
191    }
192
193    /// Create a point with a tag map.
194    pub fn with_tags(timestamp_ms: i64, value: f64, tags: HashMap<String, String>) -> Self {
195        Self {
196            timestamp_ms,
197            value,
198            tags,
199        }
200    }
201}
202
203// =============================================================================
204// SqlExporter
205// =============================================================================
206
207/// Generates SQL DDL and DML statements for TSDB data.
208///
209/// `SqlExporter` is stateless: each method takes the relevant inputs and
210/// returns a `String`.  No database connection is required.
211#[derive(Debug, Clone, Default)]
212pub struct SqlExporter {
213    /// Number of rows per batched `INSERT` statement (0 = all rows in one statement).
214    pub batch_size: usize,
215    /// When `true`, a `DROP TABLE IF EXISTS` is emitted before `CREATE TABLE`.
216    pub drop_existing: bool,
217    /// When `true`, `IF NOT EXISTS` is added to `CREATE TABLE`.
218    pub if_not_exists: bool,
219}
220
221impl SqlExporter {
222    /// Create an exporter with sensible defaults.
223    pub fn new() -> Self {
224        Self {
225            batch_size: 1000,
226            drop_existing: false,
227            if_not_exists: true,
228        }
229    }
230
231    /// Set batch size.
232    pub fn with_batch_size(mut self, size: usize) -> Self {
233        self.batch_size = size;
234        self
235    }
236
237    /// Include `DROP TABLE IF EXISTS` before `CREATE TABLE`.
238    pub fn with_drop_existing(mut self, drop: bool) -> Self {
239        self.drop_existing = drop;
240        self
241    }
242
243    // -------------------------------------------------------------------------
244    // DDL generation
245    // -------------------------------------------------------------------------
246
247    /// Generate a `CREATE TABLE` statement for the given schema.
248    pub fn create_table_sql(&self, schema: &MetricSchema) -> String {
249        let mut sql = String::new();
250
251        if let Some(desc) = &schema.description {
252            let _ = writeln!(sql, "-- {desc}");
253        }
254
255        if self.drop_existing {
256            let _ = writeln!(
257                sql,
258                "DROP TABLE IF EXISTS {table};",
259                table = schema.table_name()
260            );
261        }
262
263        let if_not_exists = if self.if_not_exists {
264            "IF NOT EXISTS "
265        } else {
266            ""
267        };
268
269        let _ = write!(
270            sql,
271            "CREATE TABLE {if_not_exists}{table} (\n    timestamp_ms BIGINT NOT NULL,\n    value {vtype} NOT NULL",
272            table = schema.table_name(),
273            vtype = schema.value_type.sql_type(),
274        );
275
276        for tag in &schema.tag_keys {
277            let _ = write!(sql, ",\n    {} VARCHAR", sanitize_sql_identifier(tag));
278        }
279
280        let _ = write!(sql, "\n);");
281        sql
282    }
283
284    /// Generate a batched `INSERT INTO` statement for a slice of points.
285    ///
286    /// Points are split into chunks of `batch_size` (or all at once if
287    /// `batch_size == 0`).  Returns a `Vec` of SQL strings, one per batch.
288    pub fn insert_sql(&self, schema: &MetricSchema, points: &[SqlDataPoint]) -> Vec<String> {
289        if points.is_empty() {
290            return vec![];
291        }
292
293        let chunk_size = if self.batch_size == 0 {
294            points.len()
295        } else {
296            self.batch_size
297        };
298
299        let table = schema.table_name();
300        let tag_cols: Vec<String> = schema
301            .tag_keys
302            .iter()
303            .map(|k| sanitize_sql_identifier(k))
304            .collect();
305
306        // Column list header
307        let col_list = if tag_cols.is_empty() {
308            "timestamp_ms, value".to_string()
309        } else {
310            format!("timestamp_ms, value, {}", tag_cols.join(", "))
311        };
312
313        points
314            .chunks(chunk_size)
315            .map(|chunk| {
316                let mut sql = format!("INSERT INTO {table} ({col_list}) VALUES\n");
317                let mut first = true;
318                for pt in chunk {
319                    if !first {
320                        sql.push_str(",\n");
321                    }
322                    first = false;
323                    if tag_cols.is_empty() {
324                        let _ =
325                            write!(sql, "    ({}, {})", pt.timestamp_ms, escape_float(pt.value));
326                    } else {
327                        let tag_vals: Vec<String> = schema
328                            .tag_keys
329                            .iter()
330                            .map(|k| {
331                                pt.tags
332                                    .get(k)
333                                    .map(|v| format!("'{}'", v.replace('\'', "''")))
334                                    .unwrap_or_else(|| "NULL".to_string())
335                            })
336                            .collect();
337                        let _ = write!(
338                            sql,
339                            "    ({}, {}, {})",
340                            pt.timestamp_ms,
341                            escape_float(pt.value),
342                            tag_vals.join(", ")
343                        );
344                    }
345                }
346                sql.push(';');
347                sql
348            })
349            .collect()
350    }
351
352    /// Generate a complete SQL script (DDL + DML) and write it to `path`.
353    ///
354    /// The script contains:
355    /// 1. `CREATE TABLE` (and optionally `DROP TABLE IF EXISTS`).
356    /// 2. All `INSERT INTO` batches.
357    pub fn export_to_sql_file(
358        &self,
359        schema: &MetricSchema,
360        points: &[SqlDataPoint],
361        path: &Path,
362    ) -> TsdbResult<()> {
363        use std::io::Write as IoWrite;
364        let file = std::fs::File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
365        let mut writer = std::io::BufWriter::new(file);
366
367        writeln!(writer, "{}", self.create_table_sql(schema))
368            .map_err(|e| TsdbError::Io(e.to_string()))?;
369
370        for batch in self.insert_sql(schema, points) {
371            writeln!(writer, "{batch}").map_err(|e| TsdbError::Io(e.to_string()))?;
372        }
373
374        Ok(())
375    }
376
377    // -------------------------------------------------------------------------
378    // Additional helpers
379    // -------------------------------------------------------------------------
380
381    /// Generate a `SELECT` query for a metric within a time range.
382    pub fn select_range_sql(&self, schema: &MetricSchema, start_ms: i64, end_ms: i64) -> String {
383        let tag_cols = if schema.tag_keys.is_empty() {
384            String::new()
385        } else {
386            let cols: Vec<String> = schema
387                .tag_keys
388                .iter()
389                .map(|k| sanitize_sql_identifier(k))
390                .collect();
391            format!(", {}", cols.join(", "))
392        };
393        format!(
394            "SELECT timestamp_ms, value{tag_cols} \
395             FROM {table} \
396             WHERE timestamp_ms BETWEEN {start} AND {end} \
397             ORDER BY timestamp_ms ASC;",
398            table = schema.table_name(),
399            start = start_ms,
400            end = end_ms,
401            tag_cols = tag_cols,
402        )
403    }
404
405    /// Generate a `SELECT COUNT(*)` query.
406    pub fn count_sql(&self, schema: &MetricSchema) -> String {
407        format!(
408            "SELECT COUNT(*) AS row_count FROM {table};",
409            table = schema.table_name()
410        )
411    }
412
413    /// Generate a `SELECT MIN/MAX/AVG(value)` summary query.
414    pub fn summary_sql(&self, schema: &MetricSchema) -> String {
415        format!(
416            "SELECT \
417               MIN(timestamp_ms) AS first_ts, \
418               MAX(timestamp_ms) AS last_ts, \
419               MIN(value) AS min_val, \
420               MAX(value) AS max_val, \
421               AVG(value) AS avg_val, \
422               COUNT(*) AS row_count \
423             FROM {table};",
424            table = schema.table_name()
425        )
426    }
427
428    /// Generate a `DELETE FROM` statement to purge data older than `before_ms`.
429    pub fn delete_before_sql(&self, schema: &MetricSchema, before_ms: i64) -> String {
430        format!(
431            "DELETE FROM {table} WHERE timestamp_ms < {before_ms};",
432            table = schema.table_name()
433        )
434    }
435
436    /// Generate a `CREATE INDEX` for fast time-range queries.
437    pub fn create_index_sql(&self, schema: &MetricSchema) -> String {
438        format!(
439            "CREATE INDEX IF NOT EXISTS idx_{table}_ts ON {table} (timestamp_ms);",
440            table = schema.table_name()
441        )
442    }
443
444    /// Generate an `ALTER TABLE ... ADD COLUMN` for a new tag.
445    pub fn add_tag_column_sql(&self, schema: &MetricSchema, tag_key: &str) -> String {
446        format!(
447            "ALTER TABLE {table} ADD COLUMN {col} VARCHAR;",
448            table = schema.table_name(),
449            col = sanitize_sql_identifier(tag_key)
450        )
451    }
452
453    /// Infer a [`MetricSchema`] from a slice of [`SqlDataPoint`]s and a metric name.
454    pub fn infer_schema(metric_name: &str, points: &[SqlDataPoint]) -> MetricSchema {
455        let mut tag_keys: Vec<String> = points
456            .iter()
457            .flat_map(|p| p.tags.keys().cloned())
458            .collect::<std::collections::HashSet<_>>()
459            .into_iter()
460            .collect();
461        tag_keys.sort();
462        MetricSchema::builder(metric_name)
463            .with_value_type(DataValueType::Float64)
464            .build()
465            .with_sorted_tags(tag_keys)
466    }
467}
468
469// =============================================================================
470// Private helpers
471// =============================================================================
472
473/// Sanitise an identifier by replacing invalid characters with underscores.
474fn sanitize_sql_identifier(name: &str) -> String {
475    if name.is_empty() {
476        return "_".to_string();
477    }
478    let mut out = String::with_capacity(name.len());
479    for c in name.chars() {
480        if c.is_ascii_alphanumeric() || c == '_' {
481            out.push(c);
482        } else {
483            out.push('_');
484        }
485    }
486    // If the first char is a digit, prepend an underscore.
487    if out.starts_with(|c: char| c.is_ascii_digit()) {
488        out.insert(0, '_');
489    }
490    out
491}
492
493/// Format a floating-point value as a SQL literal.
494///
495/// - `f64::INFINITY`  → `'Infinity'`
496/// - `f64::NEG_INFINITY` → `'-Infinity'`
497/// - `f64::NAN` → `'NaN'`
498/// - Otherwise → the standard decimal representation.
499fn escape_float(v: f64) -> String {
500    if v.is_nan() {
501        "'NaN'".to_string()
502    } else if v.is_infinite() && v > 0.0 {
503        "'Infinity'".to_string()
504    } else if v.is_infinite() {
505        "'-Infinity'".to_string()
506    } else {
507        format!("{v}")
508    }
509}
510
511// Extension method on MetricSchema to apply a pre-sorted tag list.
512impl MetricSchema {
513    fn with_sorted_tags(mut self, tags: Vec<String>) -> Self {
514        self.tag_keys = tags;
515        self
516    }
517}
518
519// =============================================================================
520// Tests
521// =============================================================================
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526
527    fn sample_schema() -> MetricSchema {
528        MetricSchema::builder("cpu_usage")
529            .with_tag("host")
530            .with_tag("region")
531            .with_description("CPU utilisation percent")
532            .build()
533    }
534
535    fn sample_points(n: usize) -> Vec<SqlDataPoint> {
536        (0..n)
537            .map(|i| {
538                let mut tags = HashMap::new();
539                tags.insert("host".to_string(), format!("srv-{i:02}"));
540                tags.insert("region".to_string(), "eu-west".to_string());
541                SqlDataPoint::with_tags(i as i64 * 1_000, i as f64 * 1.5, tags)
542            })
543            .collect()
544    }
545
546    // -- DataValueType --------------------------------------------------------
547
548    #[test]
549    fn test_value_type_sql_types() {
550        assert_eq!(DataValueType::Float64.sql_type(), "DOUBLE");
551        assert_eq!(DataValueType::Int64.sql_type(), "BIGINT");
552        assert_eq!(DataValueType::Float32.sql_type(), "FLOAT");
553        assert_eq!(DataValueType::Text.sql_type(), "VARCHAR");
554    }
555
556    #[test]
557    fn test_value_type_display() {
558        assert_eq!(format!("{}", DataValueType::Float64), "DOUBLE");
559        assert_eq!(format!("{}", DataValueType::Text), "VARCHAR");
560    }
561
562    #[test]
563    fn test_value_type_default() {
564        assert_eq!(DataValueType::default(), DataValueType::Float64);
565    }
566
567    // -- MetricSchema / MetricSchemaBuilder -----------------------------------
568
569    #[test]
570    fn test_metric_schema_table_name_clean() {
571        let schema = MetricSchema::builder("cpu_usage").build();
572        assert_eq!(schema.table_name(), "cpu_usage");
573    }
574
575    #[test]
576    fn test_metric_schema_table_name_special_chars() {
577        let schema = MetricSchema::builder("my-metric.v2").build();
578        let name = schema.table_name();
579        // Hyphens and dots should be replaced.
580        assert!(!name.contains('-'));
581        assert!(!name.contains('.'));
582    }
583
584    #[test]
585    fn test_metric_schema_has_tags() {
586        let no_tags = MetricSchema::builder("x").build();
587        assert!(!no_tags.has_tags());
588
589        let with_tags = MetricSchema::builder("x").with_tag("host").build();
590        assert!(with_tags.has_tags());
591    }
592
593    #[test]
594    fn test_metric_schema_column_count() {
595        let s = MetricSchema::builder("m")
596            .with_tag("a")
597            .with_tag("b")
598            .build();
599        // 2 fixed + 2 tags = 4
600        assert_eq!(s.column_count(), 4);
601    }
602
603    #[test]
604    fn test_metric_schema_builder_value_type() {
605        let schema = MetricSchema::builder("count")
606            .with_value_type(DataValueType::Int64)
607            .build();
608        assert_eq!(schema.value_type, DataValueType::Int64);
609    }
610
611    // -- SqlExporter DDL generation ------------------------------------------
612
613    #[test]
614    fn test_create_table_sql_contains_table_name() {
615        let schema = sample_schema();
616        let exporter = SqlExporter::new();
617        let sql = exporter.create_table_sql(&schema);
618        assert!(sql.contains("cpu_usage"), "sql = {sql}");
619    }
620
621    #[test]
622    fn test_create_table_sql_has_timestamp_and_value() {
623        let schema = sample_schema();
624        let exporter = SqlExporter::new();
625        let sql = exporter.create_table_sql(&schema);
626        assert!(sql.contains("timestamp_ms BIGINT NOT NULL"));
627        assert!(sql.contains("value DOUBLE NOT NULL"));
628    }
629
630    #[test]
631    fn test_create_table_sql_has_tag_columns() {
632        let schema = sample_schema();
633        let exporter = SqlExporter::new();
634        let sql = exporter.create_table_sql(&schema);
635        assert!(sql.contains("host VARCHAR"));
636        assert!(sql.contains("region VARCHAR"));
637    }
638
639    #[test]
640    fn test_create_table_sql_no_tags() {
641        let schema = MetricSchema::builder("temp").build();
642        let exporter = SqlExporter::new();
643        let sql = exporter.create_table_sql(&schema);
644        assert!(sql.contains("timestamp_ms"));
645        assert!(sql.contains("value DOUBLE"));
646        assert!(!sql.contains("VARCHAR"));
647    }
648
649    #[test]
650    fn test_create_table_sql_if_not_exists() {
651        let exporter = SqlExporter::new(); // if_not_exists = true by default
652        let schema = MetricSchema::builder("m").build();
653        let sql = exporter.create_table_sql(&schema);
654        assert!(sql.contains("IF NOT EXISTS"));
655    }
656
657    #[test]
658    fn test_create_table_sql_drop_existing() {
659        let exporter = SqlExporter::new().with_drop_existing(true);
660        let schema = MetricSchema::builder("m").build();
661        let sql = exporter.create_table_sql(&schema);
662        assert!(sql.contains("DROP TABLE IF EXISTS"));
663    }
664
665    #[test]
666    fn test_create_table_sql_description_comment() {
667        let schema = MetricSchema::builder("cpu")
668            .with_description("CPU usage")
669            .build();
670        let exporter = SqlExporter::new();
671        let sql = exporter.create_table_sql(&schema);
672        assert!(sql.contains("-- CPU usage"));
673    }
674
675    // -- SqlExporter INSERT generation ----------------------------------------
676
677    #[test]
678    fn test_insert_sql_empty_returns_empty_vec() {
679        let schema = sample_schema();
680        let exporter = SqlExporter::new();
681        let batches = exporter.insert_sql(&schema, &[]);
682        assert!(batches.is_empty());
683    }
684
685    #[test]
686    fn test_insert_sql_contains_values() {
687        let schema = MetricSchema::builder("temp").build();
688        let points = vec![
689            SqlDataPoint::new(1_000, 22.5),
690            SqlDataPoint::new(2_000, 23.0),
691        ];
692        let exporter = SqlExporter::new();
693        let batches = exporter.insert_sql(&schema, &points);
694        assert_eq!(batches.len(), 1);
695        let sql = &batches[0];
696        assert!(sql.contains("INSERT INTO"));
697        assert!(sql.contains("1000"));
698        assert!(sql.contains("2000"));
699    }
700
701    #[test]
702    fn test_insert_sql_with_tags() {
703        let schema = sample_schema();
704        let points = sample_points(3);
705        let exporter = SqlExporter::new();
706        let batches = exporter.insert_sql(&schema, &points);
707        assert_eq!(batches.len(), 1);
708        let sql = &batches[0];
709        assert!(sql.contains("eu-west"));
710        assert!(sql.contains("host"));
711    }
712
713    #[test]
714    fn test_insert_sql_batching() {
715        let schema = MetricSchema::builder("m").build();
716        let points: Vec<SqlDataPoint> = (0..10).map(|i| SqlDataPoint::new(i, i as f64)).collect();
717        let exporter = SqlExporter::new().with_batch_size(3);
718        let batches = exporter.insert_sql(&schema, &points);
719        // 10 rows / 3 per batch = 4 batches (3, 3, 3, 1)
720        assert_eq!(batches.len(), 4);
721    }
722
723    #[test]
724    fn test_insert_sql_tag_null_when_missing() {
725        let schema = MetricSchema::builder("m").with_tag("host").build();
726        let points = vec![SqlDataPoint::new(1_000, 5.0)]; // no tags
727        let exporter = SqlExporter::new();
728        let batches = exporter.insert_sql(&schema, &points);
729        assert_eq!(batches.len(), 1);
730        assert!(batches[0].contains("NULL"));
731    }
732
733    // -- export_to_sql_file ---------------------------------------------------
734
735    #[test]
736    fn test_export_to_sql_file() {
737        let dir = std::env::temp_dir();
738        let path = dir.join("oxirs_tsdb_test_export.sql");
739
740        let schema = MetricSchema::builder("sensor").with_tag("device").build();
741        let points = sample_points(5);
742        let exporter = SqlExporter::new();
743        exporter
744            .export_to_sql_file(&schema, &points, &path)
745            .expect("export should succeed");
746
747        let content = std::fs::read_to_string(&path).expect("read sql file");
748        assert!(content.contains("CREATE TABLE"));
749        assert!(content.contains("INSERT INTO"));
750        let _ = std::fs::remove_file(&path);
751    }
752
753    // -- Helper queries -------------------------------------------------------
754
755    #[test]
756    fn test_select_range_sql() {
757        let schema = sample_schema();
758        let exporter = SqlExporter::new();
759        let sql = exporter.select_range_sql(&schema, 1_000, 5_000);
760        assert!(sql.contains("timestamp_ms BETWEEN 1000 AND 5000"));
761        assert!(sql.contains("ORDER BY timestamp_ms ASC"));
762    }
763
764    #[test]
765    fn test_select_range_sql_includes_tag_columns() {
766        let schema = sample_schema();
767        let exporter = SqlExporter::new();
768        let sql = exporter.select_range_sql(&schema, 0, 9_999);
769        assert!(sql.contains("host"));
770        assert!(sql.contains("region"));
771    }
772
773    #[test]
774    fn test_count_sql() {
775        let schema = sample_schema();
776        let exporter = SqlExporter::new();
777        let sql = exporter.count_sql(&schema);
778        assert!(sql.contains("COUNT(*)"));
779        assert!(sql.contains("row_count"));
780        assert!(sql.contains("cpu_usage"));
781    }
782
783    #[test]
784    fn test_summary_sql() {
785        let schema = sample_schema();
786        let exporter = SqlExporter::new();
787        let sql = exporter.summary_sql(&schema);
788        assert!(sql.contains("MIN(value)"));
789        assert!(sql.contains("MAX(value)"));
790        assert!(sql.contains("AVG(value)"));
791        assert!(sql.contains("row_count"));
792    }
793
794    #[test]
795    fn test_delete_before_sql() {
796        let schema = sample_schema();
797        let exporter = SqlExporter::new();
798        let sql = exporter.delete_before_sql(&schema, 1_000_000);
799        assert!(sql.contains("DELETE FROM"));
800        assert!(sql.contains("1000000"));
801        assert!(sql.contains("timestamp_ms <"));
802    }
803
804    #[test]
805    fn test_create_index_sql() {
806        let schema = sample_schema();
807        let exporter = SqlExporter::new();
808        let sql = exporter.create_index_sql(&schema);
809        assert!(sql.contains("CREATE INDEX IF NOT EXISTS"));
810        assert!(sql.contains("timestamp_ms"));
811    }
812
813    #[test]
814    fn test_add_tag_column_sql() {
815        let schema = MetricSchema::builder("m").build();
816        let exporter = SqlExporter::new();
817        let sql = exporter.add_tag_column_sql(&schema, "datacenter");
818        assert!(sql.contains("ALTER TABLE"));
819        assert!(sql.contains("ADD COLUMN datacenter VARCHAR"));
820    }
821
822    // -- escape_float ---------------------------------------------------------
823
824    #[test]
825    fn test_escape_float_normal() {
826        let s = escape_float(42.0);
827        assert!(s.contains("42"));
828    }
829
830    #[test]
831    fn test_escape_float_special_values() {
832        assert_eq!(escape_float(f64::NAN), "'NaN'");
833        assert_eq!(escape_float(f64::INFINITY), "'Infinity'");
834        assert_eq!(escape_float(f64::NEG_INFINITY), "'-Infinity'");
835    }
836
837    // -- sanitize_sql_identifier ----------------------------------------------
838
839    #[test]
840    fn test_sanitize_identifier_clean() {
841        assert_eq!(sanitize_sql_identifier("cpu_usage"), "cpu_usage");
842    }
843
844    #[test]
845    fn test_sanitize_identifier_hyphen() {
846        let id = sanitize_sql_identifier("my-metric");
847        assert!(!id.contains('-'));
848    }
849
850    #[test]
851    fn test_sanitize_identifier_leading_digit() {
852        let id = sanitize_sql_identifier("1bad");
853        assert!(!id.starts_with(|c: char| c.is_ascii_digit()));
854    }
855
856    #[test]
857    fn test_sanitize_identifier_empty() {
858        assert_eq!(sanitize_sql_identifier(""), "_");
859    }
860
861    // -- SqlDataPoint ---------------------------------------------------------
862
863    #[test]
864    fn test_sql_data_point_new() {
865        let p = SqlDataPoint::new(1_000, 42.0);
866        assert_eq!(p.timestamp_ms, 1_000);
867        assert!((p.value - 42.0).abs() < f64::EPSILON);
868        assert!(p.tags.is_empty());
869    }
870
871    #[test]
872    fn test_sql_data_point_with_tags() {
873        let mut tags = HashMap::new();
874        tags.insert("host".to_string(), "srv-01".to_string());
875        let p = SqlDataPoint::with_tags(2_000, 55.5, tags);
876        assert_eq!(p.tags["host"], "srv-01");
877    }
878
879    // -- SqlExporter::infer_schema -------------------------------------------
880
881    #[test]
882    fn test_infer_schema_from_points() {
883        let points = sample_points(5);
884        let schema = SqlExporter::infer_schema("cpu_usage", &points);
885        assert_eq!(schema.metric_name, "cpu_usage");
886        assert_eq!(schema.value_type, DataValueType::Float64);
887    }
888
889    #[test]
890    fn test_infer_schema_no_points() {
891        let schema = SqlExporter::infer_schema("empty_metric", &[]);
892        assert_eq!(schema.metric_name, "empty_metric");
893        assert!(schema.tag_keys.is_empty());
894    }
895}