Skip to main content

oxirs_tsdb/analytics/
arrow_export.rs

1//! Apache Arrow and Parquet export for TSDB time-series data.
2//!
3//! Converts TSDB `DataPoint` collections to Arrow `RecordBatch` structures
4//! and optionally serializes them to Parquet files for interoperability with
5//! external analytics tools (DuckDB, Spark, Polars, etc.).
6//!
7//! ## Schema
8//!
9//! Every exported batch uses the following schema:
10//!
11//! | Column      | Arrow type              | Notes                            |
12//! |-------------|-------------------------|----------------------------------|
13//! | `timestamp` | `Int64`                 | Unix epoch milliseconds (UTC)    |
14//! | `metric`    | `Utf8`                  | Metric / series name             |
15//! | `value`     | `Float64`               | Observed measurement             |
16//! | `tags_json` | `Utf8`                  | JSON-encoded tag key-value pairs |
17
18use crate::error::{TsdbError, TsdbResult};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21
22// -- Feature-gated Arrow / Parquet imports ------------------------------------
23#[cfg(feature = "arrow-export")]
24use {
25    arrow::{
26        array::{Float64Array, Int64Array, StringArray},
27        datatypes::{DataType, Field, Schema},
28        record_batch::RecordBatch,
29    },
30    std::sync::Arc,
31};
32
33// -- Parquet-specific imports -------------------------------------------------
34#[cfg(feature = "arrow-export")]
35use {
36    parquet::{
37        arrow::ArrowWriter, basic::Compression as PqCompression, file::properties::WriterProperties,
38    },
39    std::{fs::File, path::Path},
40};
41
42// =============================================================================
43// Public data types (always compiled -- no feature gate)
44// =============================================================================
45
46/// A single time-series measurement ready for Arrow export.
47///
48/// This is a flat, owned representation that can be accumulated across
49/// multiple series before batch conversion.
50#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
51pub struct ExportedPoint {
52    /// Unix epoch milliseconds (UTC).
53    pub timestamp_ms: i64,
54    /// Series / metric name.
55    pub metric: String,
56    /// Observed value.
57    pub value: f64,
58    /// Tag key-value pairs serialised as a JSON object string.
59    pub tags_json: String,
60}
61
62impl ExportedPoint {
63    /// Construct a point with no tags.
64    pub fn new(timestamp_ms: i64, metric: impl Into<String>, value: f64) -> Self {
65        Self {
66            timestamp_ms,
67            metric: metric.into(),
68            value,
69            tags_json: "{}".to_string(),
70        }
71    }
72
73    /// Construct a point with a tag map.
74    pub fn with_tags(
75        timestamp_ms: i64,
76        metric: impl Into<String>,
77        value: f64,
78        tags: &HashMap<String, String>,
79    ) -> TsdbResult<Self> {
80        let tags_json =
81            serde_json::to_string(tags).map_err(|e| TsdbError::Serialization(e.to_string()))?;
82        Ok(Self {
83            timestamp_ms,
84            metric: metric.into(),
85            value,
86            tags_json,
87        })
88    }
89
90    /// Parse the embedded JSON tag string back to a map.
91    pub fn parse_tags(&self) -> TsdbResult<HashMap<String, String>> {
92        serde_json::from_str(&self.tags_json).map_err(|e| TsdbError::Serialization(e.to_string()))
93    }
94}
95
96// =============================================================================
97// Compression enum (always compiled)
98// =============================================================================
99
100/// Parquet compression codec selection.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
102pub enum ParquetCompression {
103    /// No compression (fastest writes, largest files).
104    None,
105    /// Snappy compression (balanced speed / size).
106    #[default]
107    Snappy,
108    /// Zstandard compression (best size, slower writes).
109    Zstd,
110    /// Gzip compression (maximum compatibility).
111    Gzip,
112}
113
114impl ParquetCompression {
115    /// Return a human-readable label for this codec.
116    pub fn label(&self) -> &'static str {
117        match self {
118            Self::None => "none",
119            Self::Snappy => "snappy",
120            Self::Zstd => "zstd",
121            Self::Gzip => "gzip",
122        }
123    }
124}
125
126impl std::fmt::Display for ParquetCompression {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "{}", self.label())
129    }
130}
131
132// =============================================================================
133// ArrowExporter
134// =============================================================================
135
136/// Converts TSDB data points to Apache Arrow `RecordBatch` objects.
137#[derive(Debug, Default)]
138pub struct ArrowExporter {
139    /// Maximum rows per batch (0 = unlimited).
140    max_rows_per_batch: usize,
141}
142
143impl ArrowExporter {
144    /// Create an exporter with no row-count limit.
145    pub fn new() -> Self {
146        Self {
147            max_rows_per_batch: 0,
148        }
149    }
150
151    /// Create an exporter that caps each batch at `max_rows` rows.
152    pub fn with_max_rows(max_rows: usize) -> Self {
153        Self {
154            max_rows_per_batch: max_rows,
155        }
156    }
157
158    /// Return the configured maximum rows per batch (0 = unlimited).
159    pub fn max_rows_per_batch(&self) -> usize {
160        self.max_rows_per_batch
161    }
162
163    /// Export a slice of points to an Arrow `RecordBatch`.
164    #[cfg(feature = "arrow-export")]
165    pub fn export_batch(&self, points: &[ExportedPoint]) -> TsdbResult<RecordBatch> {
166        let schema = Self::schema();
167
168        let timestamps: Int64Array = points.iter().map(|p| p.timestamp_ms).collect();
169        let metrics: StringArray = points.iter().map(|p| Some(p.metric.as_str())).collect();
170        let values: Float64Array = points.iter().map(|p| p.value).collect();
171        let tags: StringArray = points.iter().map(|p| Some(p.tags_json.as_str())).collect();
172
173        RecordBatch::try_new(
174            Arc::new(schema),
175            vec![
176                Arc::new(timestamps),
177                Arc::new(metrics),
178                Arc::new(values),
179                Arc::new(tags),
180            ],
181        )
182        .map_err(|e| TsdbError::Arrow(e.to_string()))
183    }
184
185    /// Split points into multiple batches capped at `max_rows_per_batch`.
186    #[cfg(feature = "arrow-export")]
187    pub fn export_batches(&self, points: &[ExportedPoint]) -> TsdbResult<Vec<RecordBatch>> {
188        if points.is_empty() {
189            return Ok(vec![]);
190        }
191        let chunk_size = if self.max_rows_per_batch == 0 {
192            points.len()
193        } else {
194            self.max_rows_per_batch
195        };
196        points
197            .chunks(chunk_size)
198            .map(|chunk| self.export_batch(chunk))
199            .collect()
200    }
201
202    /// Return a stub row count without the `arrow-export` feature.
203    #[cfg(not(feature = "arrow-export"))]
204    pub fn export_batch_count(&self, points: &[ExportedPoint]) -> usize {
205        points.len()
206    }
207
208    /// Arrow schema used for all TSDB exports.
209    #[cfg(feature = "arrow-export")]
210    pub fn schema() -> Schema {
211        Schema::new(vec![
212            Field::new("timestamp", DataType::Int64, false),
213            Field::new("metric", DataType::Utf8, false),
214            Field::new("value", DataType::Float64, false),
215            Field::new("tags_json", DataType::Utf8, false),
216        ])
217    }
218
219    /// Filter exported points to a specific metric name.
220    pub fn filter_by_metric<'a>(
221        points: &'a [ExportedPoint],
222        metric: &str,
223    ) -> Vec<&'a ExportedPoint> {
224        points.iter().filter(|p| p.metric == metric).collect()
225    }
226
227    /// Filter exported points to a time range (inclusive on both ends).
228    pub fn filter_by_time_range(
229        points: &[ExportedPoint],
230        start_ms: i64,
231        end_ms: i64,
232    ) -> Vec<ExportedPoint> {
233        points
234            .iter()
235            .filter(|p| p.timestamp_ms >= start_ms && p.timestamp_ms <= end_ms)
236            .cloned()
237            .collect()
238    }
239
240    /// Compute basic statistics over a collection of exported points.
241    pub fn compute_stats(points: &[ExportedPoint]) -> ExportStats {
242        if points.is_empty() {
243            return ExportStats::default();
244        }
245        let count = points.len();
246        let sum: f64 = points.iter().map(|p| p.value).sum();
247        let mean = sum / count as f64;
248        let min = points.iter().map(|p| p.value).fold(f64::INFINITY, f64::min);
249        let max = points
250            .iter()
251            .map(|p| p.value)
252            .fold(f64::NEG_INFINITY, f64::max);
253        let variance = if count > 1 {
254            points.iter().map(|p| (p.value - mean).powi(2)).sum::<f64>() / (count - 1) as f64
255        } else {
256            0.0
257        };
258        let first_ts = points.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
259        let last_ts = points.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
260        let distinct_metrics: std::collections::HashSet<&str> =
261            points.iter().map(|p| p.metric.as_str()).collect();
262
263        ExportStats {
264            count,
265            sum,
266            mean,
267            min,
268            max,
269            variance,
270            stddev: variance.sqrt(),
271            first_timestamp_ms: first_ts,
272            last_timestamp_ms: last_ts,
273            distinct_metrics: distinct_metrics.len(),
274        }
275    }
276
277    /// Group exported points by metric name.
278    pub fn group_by_metric(points: &[ExportedPoint]) -> HashMap<String, Vec<ExportedPoint>> {
279        let mut groups: HashMap<String, Vec<ExportedPoint>> = HashMap::new();
280        for p in points {
281            groups.entry(p.metric.clone()).or_default().push(p.clone());
282        }
283        groups
284    }
285
286    /// Sort exported points by timestamp in ascending order.
287    pub fn sort_by_timestamp(points: &mut [ExportedPoint]) {
288        points.sort_by_key(|p| p.timestamp_ms);
289    }
290}
291
292/// Statistics computed over a set of exported points.
293#[derive(Debug, Clone, Default)]
294pub struct ExportStats {
295    /// Number of points.
296    pub count: usize,
297    /// Sum of values.
298    pub sum: f64,
299    /// Arithmetic mean of values.
300    pub mean: f64,
301    /// Minimum value.
302    pub min: f64,
303    /// Maximum value.
304    pub max: f64,
305    /// Sample variance.
306    pub variance: f64,
307    /// Sample standard deviation.
308    pub stddev: f64,
309    /// Earliest timestamp in the set.
310    pub first_timestamp_ms: i64,
311    /// Latest timestamp in the set.
312    pub last_timestamp_ms: i64,
313    /// Number of distinct metric names.
314    pub distinct_metrics: usize,
315}
316
317// =============================================================================
318// ParquetExporter
319// =============================================================================
320
321/// Exports TSDB data to Parquet files via the Arrow columnar format.
322#[derive(Debug)]
323pub struct ParquetExporter {
324    #[cfg_attr(not(feature = "arrow-export"), allow(dead_code))]
325    arrow: ArrowExporter,
326    compression: ParquetCompression,
327    /// Target row-group size in bytes (Parquet default is 128 MiB).
328    row_group_size: usize,
329}
330
331impl ParquetExporter {
332    /// Create a Parquet exporter with Snappy compression and default row-group
333    /// size (134_217_728 bytes = 128 MiB).
334    pub fn new() -> Self {
335        Self {
336            arrow: ArrowExporter::new(),
337            compression: ParquetCompression::Snappy,
338            row_group_size: 134_217_728,
339        }
340    }
341
342    /// Set compression codec.
343    pub fn with_compression(mut self, codec: ParquetCompression) -> Self {
344        self.compression = codec;
345        self
346    }
347
348    /// Set row-group size in bytes.
349    pub fn with_row_group_size(mut self, bytes: usize) -> Self {
350        self.row_group_size = bytes;
351        self
352    }
353
354    /// Return the current compression codec.
355    pub fn compression(&self) -> ParquetCompression {
356        self.compression
357    }
358
359    /// Return the configured row-group size in bytes.
360    pub fn row_group_size(&self) -> usize {
361        self.row_group_size
362    }
363
364    /// Write points to a Parquet file at `path`.
365    #[cfg(feature = "arrow-export")]
366    pub fn write_file(&self, points: &[ExportedPoint], path: &Path) -> TsdbResult<u64> {
367        let batch = self.arrow.export_batch(points)?;
368
369        let codec = match self.compression {
370            ParquetCompression::None => PqCompression::UNCOMPRESSED,
371            ParquetCompression::Snappy => PqCompression::SNAPPY,
372            ParquetCompression::Zstd => PqCompression::ZSTD(Default::default()),
373            ParquetCompression::Gzip => PqCompression::GZIP(Default::default()),
374        };
375
376        let props = WriterProperties::builder()
377            .set_compression(codec)
378            .set_max_row_group_row_count(Some(self.row_group_size / 8))
379            .build();
380
381        let file = File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
382
383        let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
384            .map_err(|e| TsdbError::Arrow(e.to_string()))?;
385
386        writer
387            .write(&batch)
388            .map_err(|e| TsdbError::Arrow(e.to_string()))?;
389
390        let metadata = writer
391            .close()
392            .map_err(|e| TsdbError::Arrow(e.to_string()))?;
393
394        Ok(metadata.file_metadata().num_rows() as u64)
395    }
396
397    /// Count rows that would be exported (works without `arrow-export`).
398    pub fn count_rows(&self, points: &[ExportedPoint]) -> usize {
399        points.len()
400    }
401
402    /// Return metadata about a planned export.
403    pub fn export_metadata(&self, points: &[ExportedPoint]) -> ExportMetadata {
404        let stats = ArrowExporter::compute_stats(points);
405        ExportMetadata {
406            row_count: points.len(),
407            compression: self.compression,
408            row_group_size: self.row_group_size,
409            distinct_metrics: stats.distinct_metrics,
410            time_span_ms: stats
411                .last_timestamp_ms
412                .saturating_sub(stats.first_timestamp_ms),
413        }
414    }
415}
416
417impl Default for ParquetExporter {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423/// Metadata about a planned or completed Parquet export.
424#[derive(Debug, Clone)]
425pub struct ExportMetadata {
426    /// Total row count.
427    pub row_count: usize,
428    /// Compression codec used.
429    pub compression: ParquetCompression,
430    /// Configured row-group size (bytes).
431    pub row_group_size: usize,
432    /// Number of distinct metrics in the dataset.
433    pub distinct_metrics: usize,
434    /// Time span covered (last_ts - first_ts) in milliseconds.
435    pub time_span_ms: i64,
436}
437
438// =============================================================================
439// DuckDbQueryAdapter
440// =============================================================================
441
442/// Generates DuckDB-compatible SQL queries over exported Parquet files.
443///
444/// This adapter does **not** embed DuckDB as a dependency; instead it produces
445/// SQL strings that can be sent to an external DuckDB process or connection
446/// pool.
447#[derive(Debug, Clone)]
448pub struct DuckDbQueryAdapter {
449    /// Path to the Parquet file(s).  Glob patterns are supported by DuckDB.
450    parquet_path: String,
451}
452
453impl DuckDbQueryAdapter {
454    /// Create an adapter for a Parquet file or glob (e.g. `/data/*.parquet`).
455    pub fn new(path: impl Into<String>) -> Self {
456        Self {
457            parquet_path: path.into(),
458        }
459    }
460
461    /// Return the configured Parquet path / glob.
462    pub fn parquet_path(&self) -> &str {
463        &self.parquet_path
464    }
465
466    /// Generate SQL to select all rows for a specific metric within a time range.
467    pub fn select_metric(&self, metric: &str, start_ms: i64, end_ms: i64) -> String {
468        format!(
469            "SELECT timestamp, metric, value, tags_json \
470             FROM read_parquet('{}') \
471             WHERE metric = '{}' AND timestamp BETWEEN {} AND {} \
472             ORDER BY timestamp ASC;",
473            self.parquet_path, metric, start_ms, end_ms
474        )
475    }
476
477    /// Generate SQL to compute per-metric aggregates over a time range.
478    pub fn aggregate_metric(
479        &self,
480        metric: &str,
481        start_ms: i64,
482        end_ms: i64,
483        aggregation: AggregationFunction,
484    ) -> String {
485        let agg_expr = match aggregation {
486            AggregationFunction::Avg => "AVG(value)",
487            AggregationFunction::Min => "MIN(value)",
488            AggregationFunction::Max => "MAX(value)",
489            AggregationFunction::Sum => "SUM(value)",
490            AggregationFunction::Count => "COUNT(*)",
491            AggregationFunction::StdDev => "STDDEV(value)",
492            AggregationFunction::Percentile50 => {
493                "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value)"
494            }
495            AggregationFunction::Percentile95 => {
496                "PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value)"
497            }
498            AggregationFunction::Percentile99 => {
499                "PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY value)"
500            }
501        };
502        format!(
503            "SELECT metric, {agg} AS result \
504             FROM read_parquet('{path}') \
505             WHERE metric = '{metric}' AND timestamp BETWEEN {start} AND {end} \
506             GROUP BY metric;",
507            agg = agg_expr,
508            path = self.parquet_path,
509            metric = metric,
510            start = start_ms,
511            end = end_ms,
512        )
513    }
514
515    /// Generate SQL to downsample data into fixed-width time buckets (ms).
516    pub fn resample(&self, metric: &str, bucket_ms: i64) -> String {
517        format!(
518            "SELECT \
519               (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
520               metric, \
521               AVG(value) AS avg_value, \
522               MIN(value) AS min_value, \
523               MAX(value) AS max_value, \
524               COUNT(*) AS sample_count \
525             FROM read_parquet('{path}') \
526             WHERE metric = '{metric}' \
527             GROUP BY bucket_start_ms, metric \
528             ORDER BY bucket_start_ms ASC;",
529            bucket = bucket_ms,
530            path = self.parquet_path,
531            metric = metric,
532        )
533    }
534
535    /// Generate SQL to export query results back to another Parquet file.
536    pub fn export_query_to_parquet(&self, query: &str, output_path: &str) -> String {
537        format!(
538            "COPY ({query}) TO '{output}' (FORMAT PARQUET, COMPRESSION SNAPPY);",
539            query = query,
540            output = output_path,
541        )
542    }
543
544    /// Generate SQL to list distinct metrics in the dataset.
545    pub fn list_metrics(&self) -> String {
546        format!(
547            "SELECT DISTINCT metric FROM read_parquet('{}') ORDER BY metric;",
548            self.parquet_path
549        )
550    }
551
552    /// Generate SQL to compute a time-range summary for all metrics.
553    pub fn time_range_summary(&self) -> String {
554        format!(
555            "SELECT metric, \
556               MIN(timestamp) AS first_ts_ms, \
557               MAX(timestamp) AS last_ts_ms, \
558               COUNT(*) AS total_points, \
559               AVG(value) AS mean_value \
560             FROM read_parquet('{}') \
561             GROUP BY metric \
562             ORDER BY metric;",
563            self.parquet_path
564        )
565    }
566
567    /// Generate SQL to join two metric time-series by timestamp bucket.
568    pub fn join_metrics(&self, metric_a: &str, metric_b: &str, bucket_ms: i64) -> String {
569        format!(
570            "SELECT \
571               a.bucket_start_ms, \
572               a.avg_value AS {metric_a}_avg, \
573               b.avg_value AS {metric_b}_avg \
574             FROM (\
575               SELECT (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
576                      AVG(value) AS avg_value \
577               FROM read_parquet('{path}') \
578               WHERE metric = '{metric_a}' \
579               GROUP BY bucket_start_ms\
580             ) a \
581             INNER JOIN (\
582               SELECT (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
583                      AVG(value) AS avg_value \
584               FROM read_parquet('{path}') \
585               WHERE metric = '{metric_b}' \
586               GROUP BY bucket_start_ms\
587             ) b ON a.bucket_start_ms = b.bucket_start_ms \
588             ORDER BY a.bucket_start_ms ASC;",
589            metric_a = metric_a,
590            metric_b = metric_b,
591            bucket = bucket_ms,
592            path = self.parquet_path,
593        )
594    }
595
596    /// Generate SQL to compute rate of change (derivative) per bucket.
597    pub fn rate_of_change(&self, metric: &str, bucket_ms: i64) -> String {
598        format!(
599            "SELECT \
600               bucket_start_ms, \
601               avg_value, \
602               avg_value - LAG(avg_value) OVER (ORDER BY bucket_start_ms) AS delta, \
603               (avg_value - LAG(avg_value) OVER (ORDER BY bucket_start_ms)) / \
604                 NULLIF(({bucket}::DOUBLE / 1000.0), 0) AS rate_per_sec \
605             FROM (\
606               SELECT (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
607                      AVG(value) AS avg_value \
608               FROM read_parquet('{path}') \
609               WHERE metric = '{metric}' \
610               GROUP BY bucket_start_ms\
611             ) \
612             ORDER BY bucket_start_ms ASC;",
613            bucket = bucket_ms,
614            path = self.parquet_path,
615            metric = metric,
616        )
617    }
618
619    /// Generate SQL to create a DuckDB view for repeated queries.
620    pub fn create_view(&self, view_name: &str) -> String {
621        format!(
622            "CREATE OR REPLACE VIEW {view} AS \
623             SELECT * FROM read_parquet('{}');",
624            self.parquet_path,
625            view = view_name,
626        )
627    }
628
629    /// Generate SQL to count total data points per metric.
630    pub fn count_per_metric(&self) -> String {
631        format!(
632            "SELECT metric, COUNT(*) AS point_count \
633             FROM read_parquet('{}') \
634             GROUP BY metric \
635             ORDER BY point_count DESC;",
636            self.parquet_path,
637        )
638    }
639}
640
641/// Aggregation function for DuckDB SQL generation.
642#[derive(Debug, Clone, Copy, PartialEq, Eq)]
643pub enum AggregationFunction {
644    /// Arithmetic mean.
645    Avg,
646    /// Minimum value.
647    Min,
648    /// Maximum value.
649    Max,
650    /// Sum of all values.
651    Sum,
652    /// Row count.
653    Count,
654    /// Population standard deviation.
655    StdDev,
656    /// 50th percentile (median).
657    Percentile50,
658    /// 95th percentile.
659    Percentile95,
660    /// 99th percentile.
661    Percentile99,
662}
663
664// =============================================================================
665// ColumnarExport — always-compiled flat export without Arrow/Parquet deps
666// =============================================================================
667
668/// A flat, owned columnar representation of exported time-series data.
669///
670/// `ColumnarExport` stores three parallel vectors:
671/// - `timestamps`: Unix epoch milliseconds.
672/// - `metrics`: series / metric names.
673/// - `values`: observed measurement values.
674///
675/// It is independent of the `arrow-export` feature and is always compiled.
676/// Use it as a lightweight bridge to CSV, TSV, or JSON exports for small to
677/// medium datasets, or as a staging area before Arrow conversion.
678#[derive(Debug, Clone, Default)]
679pub struct ColumnarExport {
680    /// Parallel array of Unix epoch milliseconds.
681    pub timestamps: Vec<i64>,
682    /// Parallel array of metric / series names.
683    pub metrics: Vec<String>,
684    /// Parallel array of observed values.
685    pub values: Vec<f64>,
686}
687
688impl ColumnarExport {
689    /// Create an empty `ColumnarExport`.
690    pub fn new() -> Self {
691        Self::default()
692    }
693
694    /// Create a `ColumnarExport` pre-allocated for `capacity` rows.
695    pub fn with_capacity(capacity: usize) -> Self {
696        Self {
697            timestamps: Vec::with_capacity(capacity),
698            metrics: Vec::with_capacity(capacity),
699            values: Vec::with_capacity(capacity),
700        }
701    }
702
703    /// Populate from a slice of [`ExportedPoint`]s.
704    pub fn from_points(points: &[ExportedPoint]) -> Self {
705        let mut out = Self::with_capacity(points.len());
706        for p in points {
707            out.timestamps.push(p.timestamp_ms);
708            out.metrics.push(p.metric.clone());
709            out.values.push(p.value);
710        }
711        out
712    }
713
714    /// Return the number of rows in this export.
715    pub fn len(&self) -> usize {
716        self.timestamps.len()
717    }
718
719    /// Return `true` if the export contains no rows.
720    pub fn is_empty(&self) -> bool {
721        self.timestamps.is_empty()
722    }
723
724    /// Append a single row.
725    pub fn push(&mut self, timestamp_ms: i64, metric: impl Into<String>, value: f64) {
726        self.timestamps.push(timestamp_ms);
727        self.metrics.push(metric.into());
728        self.values.push(value);
729    }
730
731    /// Convert to a `Vec<ExportedPoint>`.
732    pub fn to_points(&self) -> Vec<ExportedPoint> {
733        self.timestamps
734            .iter()
735            .zip(self.metrics.iter())
736            .zip(self.values.iter())
737            .map(|((ts, m), v)| ExportedPoint::new(*ts, m.clone(), *v))
738            .collect()
739    }
740
741    /// Write this export as CSV to the file at `path`.
742    ///
743    /// The header row is `timestamp_ms,metric,value`.
744    pub fn to_csv(&self, path: &std::path::Path) -> TsdbResult<()> {
745        use std::io::Write as IoWrite;
746        let file = std::fs::File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
747        let mut writer = std::io::BufWriter::new(file);
748        writeln!(writer, "timestamp_ms,metric,value").map_err(|e| TsdbError::Io(e.to_string()))?;
749        for i in 0..self.len() {
750            // Escape commas / quotes inside the metric name.
751            let metric_escaped = if self.metrics[i].contains(',') || self.metrics[i].contains('"') {
752                format!("\"{}\"", self.metrics[i].replace('"', "\"\""))
753            } else {
754                self.metrics[i].clone()
755            };
756            writeln!(
757                writer,
758                "{},{},{}",
759                self.timestamps[i], metric_escaped, self.values[i]
760            )
761            .map_err(|e| TsdbError::Io(e.to_string()))?;
762        }
763        Ok(())
764    }
765
766    /// Write this export as TSV (tab-separated values) to the file at `path`.
767    ///
768    /// The header row is `timestamp_ms\tmetric\tvalue`.
769    pub fn to_tsv(&self, path: &std::path::Path) -> TsdbResult<()> {
770        use std::io::Write as IoWrite;
771        let file = std::fs::File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
772        let mut writer = std::io::BufWriter::new(file);
773        writeln!(writer, "timestamp_ms\tmetric\tvalue")
774            .map_err(|e| TsdbError::Io(e.to_string()))?;
775        for i in 0..self.len() {
776            writeln!(
777                writer,
778                "{}\t{}\t{}",
779                self.timestamps[i], self.metrics[i], self.values[i]
780            )
781            .map_err(|e| TsdbError::Io(e.to_string()))?;
782        }
783        Ok(())
784    }
785
786    /// Serialize this export to a JSON array.
787    ///
788    /// Each element is `{"timestamp_ms": <i64>, "metric": <str>, "value": <f64>}`.
789    pub fn to_json(&self) -> TsdbResult<serde_json::Value> {
790        let rows: Vec<serde_json::Value> = self
791            .timestamps
792            .iter()
793            .zip(self.metrics.iter())
794            .zip(self.values.iter())
795            .map(|((ts, m), v)| {
796                serde_json::json!({
797                    "timestamp_ms": ts,
798                    "metric": m,
799                    "value": v,
800                })
801            })
802            .collect();
803        Ok(serde_json::Value::Array(rows))
804    }
805
806    /// Serialize this export to a JSON string.
807    pub fn to_json_string(&self) -> TsdbResult<String> {
808        let v = self.to_json()?;
809        serde_json::to_string(&v).map_err(|e| TsdbError::Serialization(e.to_string()))
810    }
811
812    /// Filter rows to only those matching the given metric name, returning a
813    /// new `ColumnarExport`.
814    pub fn filter_metric(&self, metric: &str) -> Self {
815        let mut out = Self::new();
816        for i in 0..self.len() {
817            if self.metrics[i] == metric {
818                out.push(self.timestamps[i], self.metrics[i].clone(), self.values[i]);
819            }
820        }
821        out
822    }
823
824    /// Filter rows to the given time range [start_ms, end_ms] inclusive.
825    pub fn filter_time_range(&self, start_ms: i64, end_ms: i64) -> Self {
826        let mut out = Self::new();
827        for i in 0..self.len() {
828            let ts = self.timestamps[i];
829            if ts >= start_ms && ts <= end_ms {
830                out.push(ts, self.metrics[i].clone(), self.values[i]);
831            }
832        }
833        out
834    }
835
836    /// Sort rows by timestamp in ascending order (in-place).
837    pub fn sort_by_timestamp(&mut self) {
838        // Collect indices sorted by timestamp.
839        let n = self.len();
840        let mut indices: Vec<usize> = (0..n).collect();
841        indices.sort_by_key(|&i| self.timestamps[i]);
842
843        let old_ts = self.timestamps.clone();
844        let old_m = self.metrics.clone();
845        let old_v = self.values.clone();
846        for (new_pos, &old_pos) in indices.iter().enumerate() {
847            self.timestamps[new_pos] = old_ts[old_pos];
848            self.metrics[new_pos] = old_m[old_pos].clone();
849            self.values[new_pos] = old_v[old_pos];
850        }
851    }
852
853    /// Compute basic descriptive statistics over the `value` column.
854    pub fn value_stats(&self) -> ColumnarStats {
855        if self.values.is_empty() {
856            return ColumnarStats::default();
857        }
858        let n = self.values.len();
859        let sum: f64 = self.values.iter().sum();
860        let mean = sum / n as f64;
861        let min = self.values.iter().cloned().fold(f64::INFINITY, f64::min);
862        let max = self
863            .values
864            .iter()
865            .cloned()
866            .fold(f64::NEG_INFINITY, f64::max);
867        let variance = if n > 1 {
868            self.values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1) as f64
869        } else {
870            0.0
871        };
872        ColumnarStats {
873            count: n,
874            sum,
875            mean,
876            min,
877            max,
878            variance,
879            stddev: variance.sqrt(),
880        }
881    }
882
883    /// Read a CSV file previously written by `to_csv` back into a
884    /// `ColumnarExport`.
885    pub fn from_csv(path: &std::path::Path) -> TsdbResult<Self> {
886        use std::io::{BufRead, BufReader};
887        let file = std::fs::File::open(path).map_err(|e| TsdbError::Io(e.to_string()))?;
888        let reader = BufReader::new(file);
889        let mut out = Self::new();
890        let mut first = true;
891        for line in reader.lines() {
892            let line = line.map_err(|e| TsdbError::Io(e.to_string()))?;
893            if first {
894                first = false;
895                continue; // skip header
896            }
897            if line.is_empty() {
898                continue;
899            }
900            // Parse: timestamp_ms,metric,value  (metric may be quoted)
901            let parts = parse_csv_line(&line);
902            if parts.len() < 3 {
903                return Err(TsdbError::Serialization(format!(
904                    "CSV line has fewer than 3 fields: {line}"
905                )));
906            }
907            let ts: i64 = parts[0].parse().map_err(|_| {
908                TsdbError::Serialization(format!("invalid timestamp: {}", parts[0]))
909            })?;
910            let metric = parts[1].clone();
911            let value: f64 = parts[2]
912                .parse()
913                .map_err(|_| TsdbError::Serialization(format!("invalid value: {}", parts[2])))?;
914            out.push(ts, metric, value);
915        }
916        Ok(out)
917    }
918}
919
920/// Descriptive statistics over a [`ColumnarExport`]'s value column.
921#[derive(Debug, Clone, Default)]
922pub struct ColumnarStats {
923    /// Number of data points.
924    pub count: usize,
925    /// Sum of values.
926    pub sum: f64,
927    /// Arithmetic mean.
928    pub mean: f64,
929    /// Minimum value.
930    pub min: f64,
931    /// Maximum value.
932    pub max: f64,
933    /// Sample variance.
934    pub variance: f64,
935    /// Sample standard deviation.
936    pub stddev: f64,
937}
938
939/// Minimal CSV line parser that handles optional double-quoted fields.
940fn parse_csv_line(line: &str) -> Vec<String> {
941    let mut fields = Vec::new();
942    let mut current = String::new();
943    let mut in_quotes = false;
944    let mut chars = line.chars().peekable();
945    while let Some(c) = chars.next() {
946        match c {
947            '"' if !in_quotes => {
948                in_quotes = true;
949            }
950            '"' if in_quotes => {
951                // Escaped quote "".
952                if chars.peek() == Some(&'"') {
953                    chars.next();
954                    current.push('"');
955                } else {
956                    in_quotes = false;
957                }
958            }
959            ',' if !in_quotes => {
960                fields.push(current.clone());
961                current.clear();
962            }
963            other => {
964                current.push(other);
965            }
966        }
967    }
968    fields.push(current);
969    fields
970}
971
972// =============================================================================
973// Tests
974// =============================================================================
975
976#[cfg(test)]
977mod tests {
978    use super::*;
979    use std::collections::HashMap;
980
981    // -- ExportedPoint --------------------------------------------------------
982
983    #[test]
984    fn test_exported_point_new_no_tags() {
985        let p = ExportedPoint::new(1_700_000_000_000, "cpu", 88.5);
986        assert_eq!(p.metric, "cpu");
987        assert_eq!(p.value, 88.5);
988        assert_eq!(p.tags_json, "{}");
989    }
990
991    #[test]
992    fn test_exported_point_with_tags_roundtrip() {
993        let mut tags = HashMap::new();
994        tags.insert("host".to_string(), "srv-01".to_string());
995        tags.insert("region".to_string(), "eu-west".to_string());
996
997        let p = ExportedPoint::with_tags(1_700_000_000_000, "mem", 4096.0, &tags)
998            .expect("serialization should succeed");
999
1000        let parsed = p.parse_tags().expect("deserialization should succeed");
1001        assert_eq!(parsed["host"], "srv-01");
1002        assert_eq!(parsed["region"], "eu-west");
1003    }
1004
1005    #[test]
1006    fn test_exported_point_serialization() {
1007        let p = ExportedPoint::new(42_000, "latency_ms", 1.5);
1008        let json = serde_json::to_string(&p).expect("json serialize");
1009        let back: ExportedPoint = serde_json::from_str(&json).expect("json deserialize");
1010        assert_eq!(p, back);
1011    }
1012
1013    #[test]
1014    fn test_exported_point_clone() {
1015        let p = ExportedPoint::new(1, "x", 0.0);
1016        let q = p.clone();
1017        assert_eq!(p, q);
1018    }
1019
1020    // -- ArrowExporter --------------------------------------------------------
1021
1022    #[test]
1023    fn test_arrow_exporter_default() {
1024        let e = ArrowExporter::default();
1025        assert_eq!(e.max_rows_per_batch(), 0);
1026    }
1027
1028    #[test]
1029    fn test_arrow_exporter_with_max_rows() {
1030        let e = ArrowExporter::with_max_rows(100);
1031        assert_eq!(e.max_rows_per_batch(), 100);
1032    }
1033
1034    #[test]
1035    fn test_arrow_filter_by_metric() {
1036        let pts = vec![
1037            ExportedPoint::new(1, "cpu", 10.0),
1038            ExportedPoint::new(2, "mem", 20.0),
1039            ExportedPoint::new(3, "cpu", 30.0),
1040        ];
1041        let filtered = ArrowExporter::filter_by_metric(&pts, "cpu");
1042        assert_eq!(filtered.len(), 2);
1043        assert!(filtered.iter().all(|p| p.metric == "cpu"));
1044    }
1045
1046    #[test]
1047    fn test_arrow_filter_by_time_range() {
1048        let pts = vec![
1049            ExportedPoint::new(100, "x", 1.0),
1050            ExportedPoint::new(200, "x", 2.0),
1051            ExportedPoint::new(300, "x", 3.0),
1052            ExportedPoint::new(400, "x", 4.0),
1053        ];
1054        let filtered = ArrowExporter::filter_by_time_range(&pts, 150, 350);
1055        assert_eq!(filtered.len(), 2);
1056        assert_eq!(filtered[0].timestamp_ms, 200);
1057        assert_eq!(filtered[1].timestamp_ms, 300);
1058    }
1059
1060    #[test]
1061    fn test_arrow_filter_by_time_range_empty() {
1062        let pts: Vec<ExportedPoint> = vec![];
1063        let filtered = ArrowExporter::filter_by_time_range(&pts, 0, 1000);
1064        assert!(filtered.is_empty());
1065    }
1066
1067    #[test]
1068    fn test_arrow_filter_by_time_range_no_match() {
1069        let pts = vec![ExportedPoint::new(1000, "x", 1.0)];
1070        let filtered = ArrowExporter::filter_by_time_range(&pts, 0, 500);
1071        assert!(filtered.is_empty());
1072    }
1073
1074    #[test]
1075    fn test_arrow_filter_by_time_range_inclusive_boundary() {
1076        let pts = vec![
1077            ExportedPoint::new(100, "x", 1.0),
1078            ExportedPoint::new(200, "x", 2.0),
1079        ];
1080        let filtered = ArrowExporter::filter_by_time_range(&pts, 100, 200);
1081        assert_eq!(filtered.len(), 2);
1082    }
1083
1084    #[cfg(feature = "arrow-export")]
1085    #[test]
1086    fn test_arrow_export_batch_schema() {
1087        use arrow::datatypes::DataType;
1088        let schema = ArrowExporter::schema();
1089        assert_eq!(schema.field(0).name(), "timestamp");
1090        assert_eq!(schema.field(0).data_type(), &DataType::Int64);
1091        assert_eq!(schema.field(1).name(), "metric");
1092        assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1093        assert_eq!(schema.field(2).name(), "value");
1094        assert_eq!(schema.field(2).data_type(), &DataType::Float64);
1095        assert_eq!(schema.field(3).name(), "tags_json");
1096        assert_eq!(schema.field(3).data_type(), &DataType::Utf8);
1097    }
1098
1099    #[cfg(feature = "arrow-export")]
1100    #[test]
1101    fn test_arrow_export_batch_row_count() {
1102        let pts: Vec<_> = (0..50)
1103            .map(|i| ExportedPoint::new(i * 1000, "temp", i as f64))
1104            .collect();
1105        let exporter = ArrowExporter::new();
1106        let batch = exporter.export_batch(&pts).expect("export should succeed");
1107        assert_eq!(batch.num_rows(), 50);
1108        assert_eq!(batch.num_columns(), 4);
1109    }
1110
1111    #[cfg(feature = "arrow-export")]
1112    #[test]
1113    fn test_arrow_export_batch_values() {
1114        let pts = vec![
1115            ExportedPoint::new(1_000, "pressure", 101.325),
1116            ExportedPoint::new(2_000, "pressure", 102.0),
1117        ];
1118        let exporter = ArrowExporter::new();
1119        let batch = exporter.export_batch(&pts).expect("export");
1120
1121        use arrow::array::Float64Array;
1122        let values = batch
1123            .column(2)
1124            .as_any()
1125            .downcast_ref::<Float64Array>()
1126            .expect("Float64Array");
1127        assert!((values.value(0) - 101.325).abs() < 1e-9);
1128        assert!((values.value(1) - 102.0).abs() < 1e-9);
1129    }
1130
1131    #[cfg(feature = "arrow-export")]
1132    #[test]
1133    fn test_arrow_export_batches_chunking() {
1134        let pts: Vec<_> = (0..25)
1135            .map(|i| ExportedPoint::new(i, "x", i as f64))
1136            .collect();
1137        let exporter = ArrowExporter::with_max_rows(10);
1138        let batches = exporter.export_batches(&pts).expect("export batches");
1139        assert_eq!(batches.len(), 3); // 10 + 10 + 5
1140        assert_eq!(batches[0].num_rows(), 10);
1141        assert_eq!(batches[2].num_rows(), 5);
1142    }
1143
1144    #[cfg(feature = "arrow-export")]
1145    #[test]
1146    fn test_arrow_export_batches_empty() {
1147        let pts: Vec<ExportedPoint> = vec![];
1148        let exporter = ArrowExporter::new();
1149        let batches = exporter.export_batches(&pts).expect("empty export");
1150        assert!(batches.is_empty());
1151    }
1152
1153    // -- ExportStats ----------------------------------------------------------
1154
1155    #[test]
1156    fn test_compute_stats_basic() {
1157        let pts = vec![
1158            ExportedPoint::new(100, "cpu", 10.0),
1159            ExportedPoint::new(200, "cpu", 20.0),
1160            ExportedPoint::new(300, "cpu", 30.0),
1161        ];
1162        let stats = ArrowExporter::compute_stats(&pts);
1163        assert_eq!(stats.count, 3);
1164        assert!((stats.mean - 20.0).abs() < f64::EPSILON);
1165        assert!((stats.min - 10.0).abs() < f64::EPSILON);
1166        assert!((stats.max - 30.0).abs() < f64::EPSILON);
1167        assert!((stats.sum - 60.0).abs() < f64::EPSILON);
1168        assert_eq!(stats.first_timestamp_ms, 100);
1169        assert_eq!(stats.last_timestamp_ms, 300);
1170        assert_eq!(stats.distinct_metrics, 1);
1171    }
1172
1173    #[test]
1174    fn test_compute_stats_empty() {
1175        let pts: Vec<ExportedPoint> = vec![];
1176        let stats = ArrowExporter::compute_stats(&pts);
1177        assert_eq!(stats.count, 0);
1178        assert!((stats.mean - 0.0).abs() < f64::EPSILON);
1179    }
1180
1181    #[test]
1182    fn test_compute_stats_single_point() {
1183        let pts = vec![ExportedPoint::new(1000, "mem", 42.0)];
1184        let stats = ArrowExporter::compute_stats(&pts);
1185        assert_eq!(stats.count, 1);
1186        assert!((stats.mean - 42.0).abs() < f64::EPSILON);
1187        assert!((stats.variance - 0.0).abs() < f64::EPSILON);
1188        assert_eq!(stats.distinct_metrics, 1);
1189    }
1190
1191    #[test]
1192    fn test_compute_stats_multiple_metrics() {
1193        let pts = vec![
1194            ExportedPoint::new(100, "cpu", 10.0),
1195            ExportedPoint::new(200, "mem", 20.0),
1196            ExportedPoint::new(300, "disk", 30.0),
1197        ];
1198        let stats = ArrowExporter::compute_stats(&pts);
1199        assert_eq!(stats.distinct_metrics, 3);
1200    }
1201
1202    #[test]
1203    fn test_compute_stats_stddev() {
1204        let pts = vec![
1205            ExportedPoint::new(1, "x", 2.0),
1206            ExportedPoint::new(2, "x", 4.0),
1207            ExportedPoint::new(3, "x", 6.0),
1208            ExportedPoint::new(4, "x", 8.0),
1209        ];
1210        let stats = ArrowExporter::compute_stats(&pts);
1211        // variance of [2,4,6,8] = sum((x-5)^2)/3 = (9+1+1+9)/3 = 20/3
1212        assert!((stats.variance - 20.0 / 3.0).abs() < 1e-9);
1213        assert!((stats.stddev - (20.0_f64 / 3.0).sqrt()).abs() < 1e-9);
1214    }
1215
1216    // -- group_by_metric / sort -----------------------------------------------
1217
1218    #[test]
1219    fn test_group_by_metric() {
1220        let pts = vec![
1221            ExportedPoint::new(1, "cpu", 10.0),
1222            ExportedPoint::new(2, "mem", 20.0),
1223            ExportedPoint::new(3, "cpu", 30.0),
1224            ExportedPoint::new(4, "disk", 40.0),
1225        ];
1226        let groups = ArrowExporter::group_by_metric(&pts);
1227        assert_eq!(groups.len(), 3);
1228        assert_eq!(groups["cpu"].len(), 2);
1229        assert_eq!(groups["mem"].len(), 1);
1230        assert_eq!(groups["disk"].len(), 1);
1231    }
1232
1233    #[test]
1234    fn test_sort_by_timestamp() {
1235        let mut pts = vec![
1236            ExportedPoint::new(300, "x", 3.0),
1237            ExportedPoint::new(100, "x", 1.0),
1238            ExportedPoint::new(200, "x", 2.0),
1239        ];
1240        ArrowExporter::sort_by_timestamp(&mut pts);
1241        assert_eq!(pts[0].timestamp_ms, 100);
1242        assert_eq!(pts[1].timestamp_ms, 200);
1243        assert_eq!(pts[2].timestamp_ms, 300);
1244    }
1245
1246    // -- ParquetExporter ------------------------------------------------------
1247
1248    #[test]
1249    fn test_parquet_exporter_default() {
1250        let e = ParquetExporter::default();
1251        assert_eq!(e.compression(), ParquetCompression::Snappy);
1252        assert_eq!(e.row_group_size(), 134_217_728);
1253    }
1254
1255    #[test]
1256    fn test_parquet_exporter_builder() {
1257        let e = ParquetExporter::new()
1258            .with_compression(ParquetCompression::Zstd)
1259            .with_row_group_size(1024 * 1024);
1260        assert_eq!(e.compression(), ParquetCompression::Zstd);
1261        assert_eq!(e.row_group_size(), 1024 * 1024);
1262    }
1263
1264    #[test]
1265    fn test_parquet_exporter_count_rows() {
1266        let pts: Vec<_> = (0..100)
1267            .map(|i| ExportedPoint::new(i, "x", i as f64))
1268            .collect();
1269        let e = ParquetExporter::new();
1270        assert_eq!(e.count_rows(&pts), 100);
1271    }
1272
1273    #[test]
1274    fn test_parquet_compression_label() {
1275        assert_eq!(ParquetCompression::None.label(), "none");
1276        assert_eq!(ParquetCompression::Snappy.label(), "snappy");
1277        assert_eq!(ParquetCompression::Zstd.label(), "zstd");
1278        assert_eq!(ParquetCompression::Gzip.label(), "gzip");
1279    }
1280
1281    #[test]
1282    fn test_parquet_compression_display() {
1283        assert_eq!(format!("{}", ParquetCompression::Snappy), "snappy");
1284        assert_eq!(format!("{}", ParquetCompression::Zstd), "zstd");
1285    }
1286
1287    #[test]
1288    fn test_export_metadata() {
1289        let pts = vec![
1290            ExportedPoint::new(1000, "cpu", 10.0),
1291            ExportedPoint::new(2000, "cpu", 20.0),
1292            ExportedPoint::new(3000, "mem", 30.0),
1293        ];
1294        let e = ParquetExporter::new().with_compression(ParquetCompression::Zstd);
1295        let meta = e.export_metadata(&pts);
1296        assert_eq!(meta.row_count, 3);
1297        assert_eq!(meta.compression, ParquetCompression::Zstd);
1298        assert_eq!(meta.distinct_metrics, 2);
1299        assert_eq!(meta.time_span_ms, 2000);
1300    }
1301
1302    #[cfg(feature = "arrow-export")]
1303    #[test]
1304    fn test_parquet_write_snappy() {
1305        let dir = std::env::temp_dir();
1306        let path = dir.join("oxirs_tsdb_test_snappy.parquet");
1307
1308        let pts: Vec<_> = (0..20)
1309            .map(|i| ExportedPoint::new(i * 1_000, "temp", 20.0 + i as f64))
1310            .collect();
1311
1312        let exporter = ParquetExporter::new().with_compression(ParquetCompression::Snappy);
1313        let rows = exporter.write_file(&pts, &path).expect("write parquet");
1314        assert_eq!(rows, 20);
1315        assert!(path.exists());
1316        let _ = std::fs::remove_file(&path);
1317    }
1318
1319    #[cfg(feature = "arrow-export")]
1320    #[test]
1321    fn test_parquet_write_zstd() {
1322        let path = std::env::temp_dir().join("oxirs_tsdb_test_zstd.parquet");
1323        let pts: Vec<_> = (0..10)
1324            .map(|i| ExportedPoint::new(i, "pressure", i as f64 * 1.1))
1325            .collect();
1326
1327        let exporter = ParquetExporter::new().with_compression(ParquetCompression::Zstd);
1328        let rows = exporter
1329            .write_file(&pts, &path)
1330            .expect("write parquet zstd");
1331        assert_eq!(rows, 10);
1332        let _ = std::fs::remove_file(path);
1333    }
1334
1335    #[cfg(feature = "arrow-export")]
1336    #[test]
1337    fn test_parquet_write_no_compression() {
1338        let path = std::env::temp_dir().join("oxirs_tsdb_test_none.parquet");
1339        let pts = vec![ExportedPoint::new(0, "v", 1.0)];
1340
1341        let exporter = ParquetExporter::new().with_compression(ParquetCompression::None);
1342        let rows = exporter.write_file(&pts, &path).expect("write");
1343        assert_eq!(rows, 1);
1344        let _ = std::fs::remove_file(path);
1345    }
1346
1347    // -- DuckDbQueryAdapter ---------------------------------------------------
1348
1349    #[test]
1350    fn test_duckdb_adapter_select_metric_sql() {
1351        let adapter = DuckDbQueryAdapter::new("/data/tsdb.parquet");
1352        let sql = adapter.select_metric("cpu", 1_000_000, 2_000_000);
1353        assert!(sql.contains("read_parquet('/data/tsdb.parquet')"));
1354        assert!(sql.contains("metric = 'cpu'"));
1355        assert!(sql.contains("1000000"));
1356        assert!(sql.contains("2000000"));
1357        assert!(sql.contains("ORDER BY timestamp ASC"));
1358    }
1359
1360    #[test]
1361    fn test_duckdb_adapter_aggregate_avg_sql() {
1362        let adapter = DuckDbQueryAdapter::new("/tmp/data.parquet");
1363        let sql = adapter.aggregate_metric("mem", 0, 9_999, AggregationFunction::Avg);
1364        assert!(sql.contains("AVG(value)"));
1365        assert!(sql.contains("GROUP BY metric"));
1366    }
1367
1368    #[test]
1369    fn test_duckdb_adapter_aggregate_all_functions() {
1370        let adapter = DuckDbQueryAdapter::new("/tmp/t.parquet");
1371        for (func, expected) in [
1372            (AggregationFunction::Min, "MIN(value)"),
1373            (AggregationFunction::Max, "MAX(value)"),
1374            (AggregationFunction::Sum, "SUM(value)"),
1375            (AggregationFunction::Count, "COUNT(*)"),
1376            (AggregationFunction::StdDev, "STDDEV(value)"),
1377        ] {
1378            let sql = adapter.aggregate_metric("x", 0, 1, func);
1379            assert!(sql.contains(expected), "expected {expected} for {func:?}");
1380        }
1381    }
1382
1383    #[test]
1384    fn test_duckdb_adapter_percentile_functions() {
1385        let adapter = DuckDbQueryAdapter::new("/tmp/t.parquet");
1386        let sql = adapter.aggregate_metric("cpu", 0, 1000, AggregationFunction::Percentile50);
1387        assert!(sql.contains("PERCENTILE_CONT(0.5)"));
1388
1389        let sql = adapter.aggregate_metric("cpu", 0, 1000, AggregationFunction::Percentile95);
1390        assert!(sql.contains("PERCENTILE_CONT(0.95)"));
1391
1392        let sql = adapter.aggregate_metric("cpu", 0, 1000, AggregationFunction::Percentile99);
1393        assert!(sql.contains("PERCENTILE_CONT(0.99)"));
1394    }
1395
1396    #[test]
1397    fn test_duckdb_adapter_resample_sql() {
1398        let adapter = DuckDbQueryAdapter::new("/tmp/t.parquet");
1399        let sql = adapter.resample("cpu", 60_000);
1400        assert!(sql.contains("60000"));
1401        assert!(sql.contains("AVG(value)"));
1402        assert!(sql.contains("GROUP BY bucket_start_ms, metric"));
1403    }
1404
1405    #[test]
1406    fn test_duckdb_adapter_list_metrics_sql() {
1407        let adapter = DuckDbQueryAdapter::new("/data/*.parquet");
1408        let sql = adapter.list_metrics();
1409        assert!(sql.contains("DISTINCT metric"));
1410        assert!(sql.contains("read_parquet('/data/*.parquet')"));
1411    }
1412
1413    #[test]
1414    fn test_duckdb_adapter_time_range_summary_sql() {
1415        let adapter = DuckDbQueryAdapter::new("data.parquet");
1416        let sql = adapter.time_range_summary();
1417        assert!(sql.contains("MIN(timestamp)"));
1418        assert!(sql.contains("MAX(timestamp)"));
1419        assert!(sql.contains("AVG(value)"));
1420        assert!(sql.contains("GROUP BY metric"));
1421    }
1422
1423    #[test]
1424    fn test_duckdb_adapter_export_query_to_parquet() {
1425        let adapter = DuckDbQueryAdapter::new("input.parquet");
1426        let inner = adapter.select_metric("cpu", 0, 1_000);
1427        let sql = adapter.export_query_to_parquet(&inner, "output.parquet");
1428        assert!(sql.starts_with("COPY ("));
1429        assert!(sql.contains("output.parquet"));
1430        assert!(sql.contains("FORMAT PARQUET"));
1431    }
1432
1433    #[test]
1434    fn test_duckdb_adapter_parquet_path() {
1435        let adapter = DuckDbQueryAdapter::new("/mnt/data/*.parquet");
1436        assert_eq!(adapter.parquet_path(), "/mnt/data/*.parquet");
1437    }
1438
1439    #[test]
1440    fn test_duckdb_adapter_join_metrics() {
1441        let adapter = DuckDbQueryAdapter::new("data.parquet");
1442        let sql = adapter.join_metrics("cpu", "mem", 60_000);
1443        assert!(sql.contains("cpu_avg"));
1444        assert!(sql.contains("mem_avg"));
1445        assert!(sql.contains("INNER JOIN"));
1446        assert!(sql.contains("60000"));
1447    }
1448
1449    #[test]
1450    fn test_duckdb_adapter_rate_of_change() {
1451        let adapter = DuckDbQueryAdapter::new("data.parquet");
1452        let sql = adapter.rate_of_change("cpu", 1000);
1453        assert!(sql.contains("LAG(avg_value)"));
1454        assert!(sql.contains("rate_per_sec"));
1455        assert!(sql.contains("1000"));
1456    }
1457
1458    #[test]
1459    fn test_duckdb_adapter_create_view() {
1460        let adapter = DuckDbQueryAdapter::new("data.parquet");
1461        let sql = adapter.create_view("tsdb_data");
1462        assert!(sql.contains("CREATE OR REPLACE VIEW tsdb_data"));
1463        assert!(sql.contains("read_parquet"));
1464    }
1465
1466    #[test]
1467    fn test_duckdb_adapter_count_per_metric() {
1468        let adapter = DuckDbQueryAdapter::new("data.parquet");
1469        let sql = adapter.count_per_metric();
1470        assert!(sql.contains("COUNT(*)"));
1471        assert!(sql.contains("GROUP BY metric"));
1472        assert!(sql.contains("ORDER BY point_count DESC"));
1473    }
1474
1475    // -- ColumnarExport -------------------------------------------------------
1476
1477    #[test]
1478    fn test_columnar_export_new_is_empty() {
1479        let ce = ColumnarExport::new();
1480        assert!(ce.is_empty());
1481        assert_eq!(ce.len(), 0);
1482    }
1483
1484    #[test]
1485    fn test_columnar_export_push_and_len() {
1486        let mut ce = ColumnarExport::new();
1487        ce.push(1_000, "cpu", 55.0);
1488        ce.push(2_000, "mem", 70.0);
1489        assert_eq!(ce.len(), 2);
1490        assert!(!ce.is_empty());
1491    }
1492
1493    #[test]
1494    fn test_columnar_export_from_points() {
1495        let pts = vec![
1496            ExportedPoint::new(100, "temp", 22.0),
1497            ExportedPoint::new(200, "temp", 23.5),
1498            ExportedPoint::new(300, "pressure", 1013.0),
1499        ];
1500        let ce = ColumnarExport::from_points(&pts);
1501        assert_eq!(ce.len(), 3);
1502        assert_eq!(ce.metrics[0], "temp");
1503        assert_eq!(ce.metrics[2], "pressure");
1504        assert!((ce.values[1] - 23.5).abs() < f64::EPSILON);
1505    }
1506
1507    #[test]
1508    fn test_columnar_export_to_points_roundtrip() {
1509        let pts = vec![
1510            ExportedPoint::new(10, "x", 1.0),
1511            ExportedPoint::new(20, "y", 2.0),
1512        ];
1513        let ce = ColumnarExport::from_points(&pts);
1514        let back = ce.to_points();
1515        assert_eq!(back.len(), pts.len());
1516        for (a, b) in pts.iter().zip(back.iter()) {
1517            assert_eq!(a.timestamp_ms, b.timestamp_ms);
1518            assert_eq!(a.metric, b.metric);
1519            assert!((a.value - b.value).abs() < f64::EPSILON);
1520        }
1521    }
1522
1523    #[test]
1524    fn test_columnar_export_csv_roundtrip() {
1525        let dir = std::env::temp_dir();
1526        let path = dir.join("oxirs_tsdb_test_columnar.csv");
1527
1528        let mut ce = ColumnarExport::with_capacity(4);
1529        ce.push(1_000, "cpu", 10.0);
1530        ce.push(2_000, "mem", 20.5);
1531        ce.push(3_000, "disk", 30.1);
1532
1533        ce.to_csv(&path).expect("csv write should succeed");
1534        let loaded = ColumnarExport::from_csv(&path).expect("csv read should succeed");
1535
1536        assert_eq!(loaded.len(), 3);
1537        assert_eq!(loaded.metrics[1], "mem");
1538        assert!((loaded.values[2] - 30.1).abs() < 1e-9);
1539        let _ = std::fs::remove_file(&path);
1540    }
1541
1542    #[test]
1543    fn test_columnar_export_csv_with_comma_in_metric() {
1544        let dir = std::env::temp_dir();
1545        let path = dir.join("oxirs_tsdb_test_comma_metric.csv");
1546
1547        let mut ce = ColumnarExport::new();
1548        ce.push(1_000, "node,A", 5.0);
1549
1550        ce.to_csv(&path).expect("csv write");
1551        let loaded = ColumnarExport::from_csv(&path).expect("csv read");
1552        assert_eq!(loaded.len(), 1);
1553        assert_eq!(loaded.metrics[0], "node,A");
1554        let _ = std::fs::remove_file(&path);
1555    }
1556
1557    #[test]
1558    fn test_columnar_export_tsv_roundtrip() {
1559        let dir = std::env::temp_dir();
1560        let path = dir.join("oxirs_tsdb_test_columnar.tsv");
1561
1562        let mut ce = ColumnarExport::new();
1563        ce.push(100, "sensor_1", 99.9);
1564        ce.push(200, "sensor_2", 88.8);
1565
1566        ce.to_tsv(&path).expect("tsv write");
1567
1568        // Read back and verify manually (TSV is tab-separated).
1569        let content = std::fs::read_to_string(&path).expect("read tsv");
1570        let lines: Vec<&str> = content.lines().collect();
1571        assert_eq!(lines[0], "timestamp_ms\tmetric\tvalue");
1572        assert!(lines[1].contains("sensor_1"));
1573        assert!(lines[2].contains("sensor_2"));
1574        let _ = std::fs::remove_file(&path);
1575    }
1576
1577    #[test]
1578    fn test_columnar_export_to_json_structure() {
1579        let mut ce = ColumnarExport::new();
1580        ce.push(1_000, "cpu", 42.0);
1581        ce.push(2_000, "mem", 84.0);
1582
1583        let json = ce.to_json().expect("json conversion");
1584        let arr = json.as_array().expect("should be array");
1585        assert_eq!(arr.len(), 2);
1586        assert_eq!(arr[0]["metric"], "cpu");
1587        assert_eq!(arr[0]["timestamp_ms"], 1_000_i64);
1588        assert!((arr[0]["value"].as_f64().expect("should succeed") - 42.0).abs() < f64::EPSILON);
1589        assert_eq!(arr[1]["metric"], "mem");
1590    }
1591
1592    #[test]
1593    fn test_columnar_export_to_json_empty() {
1594        let ce = ColumnarExport::new();
1595        let json = ce.to_json().expect("json for empty");
1596        let arr = json.as_array().expect("array");
1597        assert!(arr.is_empty());
1598    }
1599
1600    #[test]
1601    fn test_columnar_export_to_json_string() {
1602        let mut ce = ColumnarExport::new();
1603        ce.push(5, "x", 1.0);
1604        let s = ce.to_json_string().expect("json string");
1605        assert!(s.contains("\"metric\""));
1606        assert!(s.contains("\"timestamp_ms\""));
1607        assert!(s.contains("\"value\""));
1608    }
1609
1610    #[test]
1611    fn test_columnar_export_filter_metric() {
1612        let mut ce = ColumnarExport::new();
1613        ce.push(1, "cpu", 1.0);
1614        ce.push(2, "mem", 2.0);
1615        ce.push(3, "cpu", 3.0);
1616
1617        let filtered = ce.filter_metric("cpu");
1618        assert_eq!(filtered.len(), 2);
1619        assert!(filtered.metrics.iter().all(|m| m == "cpu"));
1620    }
1621
1622    #[test]
1623    fn test_columnar_export_filter_time_range() {
1624        let mut ce = ColumnarExport::new();
1625        for i in 0..10_i64 {
1626            ce.push(i * 100, "x", i as f64);
1627        }
1628        let filtered = ce.filter_time_range(200, 500);
1629        assert_eq!(filtered.len(), 4); // 200, 300, 400, 500
1630        assert_eq!(filtered.timestamps[0], 200);
1631        assert_eq!(filtered.timestamps[3], 500);
1632    }
1633
1634    #[test]
1635    fn test_columnar_export_filter_time_range_empty() {
1636        let ce = ColumnarExport::new();
1637        let filtered = ce.filter_time_range(0, 1000);
1638        assert!(filtered.is_empty());
1639    }
1640
1641    #[test]
1642    fn test_columnar_export_sort_by_timestamp() {
1643        let mut ce = ColumnarExport::new();
1644        ce.push(300, "c", 3.0);
1645        ce.push(100, "a", 1.0);
1646        ce.push(200, "b", 2.0);
1647
1648        ce.sort_by_timestamp();
1649        assert_eq!(ce.timestamps[0], 100);
1650        assert_eq!(ce.metrics[0], "a");
1651        assert_eq!(ce.timestamps[1], 200);
1652        assert_eq!(ce.timestamps[2], 300);
1653    }
1654
1655    #[test]
1656    fn test_columnar_export_value_stats_basic() {
1657        let mut ce = ColumnarExport::new();
1658        ce.push(1, "x", 10.0);
1659        ce.push(2, "x", 20.0);
1660        ce.push(3, "x", 30.0);
1661
1662        let stats = ce.value_stats();
1663        assert_eq!(stats.count, 3);
1664        assert!((stats.mean - 20.0).abs() < f64::EPSILON);
1665        assert!((stats.min - 10.0).abs() < f64::EPSILON);
1666        assert!((stats.max - 30.0).abs() < f64::EPSILON);
1667        assert!((stats.sum - 60.0).abs() < f64::EPSILON);
1668    }
1669
1670    #[test]
1671    fn test_columnar_export_value_stats_empty() {
1672        let ce = ColumnarExport::new();
1673        let stats = ce.value_stats();
1674        assert_eq!(stats.count, 0);
1675    }
1676
1677    #[test]
1678    fn test_columnar_export_value_stats_single() {
1679        let mut ce = ColumnarExport::new();
1680        ce.push(1, "x", 7.0);
1681        let stats = ce.value_stats();
1682        assert_eq!(stats.count, 1);
1683        assert!((stats.mean - 7.0).abs() < f64::EPSILON);
1684        assert!((stats.variance - 0.0).abs() < f64::EPSILON);
1685    }
1686
1687    #[test]
1688    fn test_columnar_export_with_capacity() {
1689        let ce = ColumnarExport::with_capacity(100);
1690        assert!(ce.is_empty());
1691        assert_eq!(ce.len(), 0);
1692    }
1693
1694    #[test]
1695    fn test_columnar_export_csv_empty_file() {
1696        let dir = std::env::temp_dir();
1697        let path = dir.join("oxirs_tsdb_test_empty.csv");
1698
1699        let ce = ColumnarExport::new();
1700        ce.to_csv(&path).expect("csv write empty");
1701        let loaded = ColumnarExport::from_csv(&path).expect("csv read empty");
1702        assert!(loaded.is_empty());
1703        let _ = std::fs::remove_file(&path);
1704    }
1705
1706    #[test]
1707    fn test_columnar_export_tsv_multiple_metrics() {
1708        let dir = std::env::temp_dir();
1709        let path = dir.join("oxirs_tsdb_multi_metric.tsv");
1710
1711        let mut ce = ColumnarExport::new();
1712        for i in 0..5_i64 {
1713            ce.push(i * 1000, "cpu", i as f64 * 10.0);
1714            ce.push(i * 1000 + 1, "mem", i as f64 * 20.0);
1715        }
1716
1717        ce.to_tsv(&path).expect("tsv write");
1718        let content = std::fs::read_to_string(&path).expect("tsv read");
1719        let lines: Vec<&str> = content.lines().collect();
1720        // header + 10 data rows
1721        assert_eq!(lines.len(), 11);
1722        let _ = std::fs::remove_file(&path);
1723    }
1724}