1use crate::error::{TsdbError, TsdbResult};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21
22#[cfg(feature = "arrow-export")]
24use {
25 arrow::{
26 array::{Float64Array, Int64Array, StringArray},
27 datatypes::{DataType, Field, Schema},
28 record_batch::RecordBatch,
29 },
30 std::sync::Arc,
31};
32
33#[cfg(feature = "arrow-export")]
35use {
36 parquet::{
37 arrow::ArrowWriter, basic::Compression as PqCompression, file::properties::WriterProperties,
38 },
39 std::{fs::File, path::Path},
40};
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
51pub struct ExportedPoint {
52 pub timestamp_ms: i64,
54 pub metric: String,
56 pub value: f64,
58 pub tags_json: String,
60}
61
62impl ExportedPoint {
63 pub fn new(timestamp_ms: i64, metric: impl Into<String>, value: f64) -> Self {
65 Self {
66 timestamp_ms,
67 metric: metric.into(),
68 value,
69 tags_json: "{}".to_string(),
70 }
71 }
72
73 pub fn with_tags(
75 timestamp_ms: i64,
76 metric: impl Into<String>,
77 value: f64,
78 tags: &HashMap<String, String>,
79 ) -> TsdbResult<Self> {
80 let tags_json =
81 serde_json::to_string(tags).map_err(|e| TsdbError::Serialization(e.to_string()))?;
82 Ok(Self {
83 timestamp_ms,
84 metric: metric.into(),
85 value,
86 tags_json,
87 })
88 }
89
90 pub fn parse_tags(&self) -> TsdbResult<HashMap<String, String>> {
92 serde_json::from_str(&self.tags_json).map_err(|e| TsdbError::Serialization(e.to_string()))
93 }
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
102pub enum ParquetCompression {
103 None,
105 #[default]
107 Snappy,
108 Zstd,
110 Gzip,
112}
113
114impl ParquetCompression {
115 pub fn label(&self) -> &'static str {
117 match self {
118 Self::None => "none",
119 Self::Snappy => "snappy",
120 Self::Zstd => "zstd",
121 Self::Gzip => "gzip",
122 }
123 }
124}
125
126impl std::fmt::Display for ParquetCompression {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "{}", self.label())
129 }
130}
131
132#[derive(Debug, Default)]
138pub struct ArrowExporter {
139 max_rows_per_batch: usize,
141}
142
143impl ArrowExporter {
144 pub fn new() -> Self {
146 Self {
147 max_rows_per_batch: 0,
148 }
149 }
150
151 pub fn with_max_rows(max_rows: usize) -> Self {
153 Self {
154 max_rows_per_batch: max_rows,
155 }
156 }
157
158 pub fn max_rows_per_batch(&self) -> usize {
160 self.max_rows_per_batch
161 }
162
163 #[cfg(feature = "arrow-export")]
165 pub fn export_batch(&self, points: &[ExportedPoint]) -> TsdbResult<RecordBatch> {
166 let schema = Self::schema();
167
168 let timestamps: Int64Array = points.iter().map(|p| p.timestamp_ms).collect();
169 let metrics: StringArray = points.iter().map(|p| Some(p.metric.as_str())).collect();
170 let values: Float64Array = points.iter().map(|p| p.value).collect();
171 let tags: StringArray = points.iter().map(|p| Some(p.tags_json.as_str())).collect();
172
173 RecordBatch::try_new(
174 Arc::new(schema),
175 vec![
176 Arc::new(timestamps),
177 Arc::new(metrics),
178 Arc::new(values),
179 Arc::new(tags),
180 ],
181 )
182 .map_err(|e| TsdbError::Arrow(e.to_string()))
183 }
184
185 #[cfg(feature = "arrow-export")]
187 pub fn export_batches(&self, points: &[ExportedPoint]) -> TsdbResult<Vec<RecordBatch>> {
188 if points.is_empty() {
189 return Ok(vec![]);
190 }
191 let chunk_size = if self.max_rows_per_batch == 0 {
192 points.len()
193 } else {
194 self.max_rows_per_batch
195 };
196 points
197 .chunks(chunk_size)
198 .map(|chunk| self.export_batch(chunk))
199 .collect()
200 }
201
202 #[cfg(not(feature = "arrow-export"))]
204 pub fn export_batch_count(&self, points: &[ExportedPoint]) -> usize {
205 points.len()
206 }
207
208 #[cfg(feature = "arrow-export")]
210 pub fn schema() -> Schema {
211 Schema::new(vec![
212 Field::new("timestamp", DataType::Int64, false),
213 Field::new("metric", DataType::Utf8, false),
214 Field::new("value", DataType::Float64, false),
215 Field::new("tags_json", DataType::Utf8, false),
216 ])
217 }
218
219 pub fn filter_by_metric<'a>(
221 points: &'a [ExportedPoint],
222 metric: &str,
223 ) -> Vec<&'a ExportedPoint> {
224 points.iter().filter(|p| p.metric == metric).collect()
225 }
226
227 pub fn filter_by_time_range(
229 points: &[ExportedPoint],
230 start_ms: i64,
231 end_ms: i64,
232 ) -> Vec<ExportedPoint> {
233 points
234 .iter()
235 .filter(|p| p.timestamp_ms >= start_ms && p.timestamp_ms <= end_ms)
236 .cloned()
237 .collect()
238 }
239
240 pub fn compute_stats(points: &[ExportedPoint]) -> ExportStats {
242 if points.is_empty() {
243 return ExportStats::default();
244 }
245 let count = points.len();
246 let sum: f64 = points.iter().map(|p| p.value).sum();
247 let mean = sum / count as f64;
248 let min = points.iter().map(|p| p.value).fold(f64::INFINITY, f64::min);
249 let max = points
250 .iter()
251 .map(|p| p.value)
252 .fold(f64::NEG_INFINITY, f64::max);
253 let variance = if count > 1 {
254 points.iter().map(|p| (p.value - mean).powi(2)).sum::<f64>() / (count - 1) as f64
255 } else {
256 0.0
257 };
258 let first_ts = points.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
259 let last_ts = points.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
260 let distinct_metrics: std::collections::HashSet<&str> =
261 points.iter().map(|p| p.metric.as_str()).collect();
262
263 ExportStats {
264 count,
265 sum,
266 mean,
267 min,
268 max,
269 variance,
270 stddev: variance.sqrt(),
271 first_timestamp_ms: first_ts,
272 last_timestamp_ms: last_ts,
273 distinct_metrics: distinct_metrics.len(),
274 }
275 }
276
277 pub fn group_by_metric(points: &[ExportedPoint]) -> HashMap<String, Vec<ExportedPoint>> {
279 let mut groups: HashMap<String, Vec<ExportedPoint>> = HashMap::new();
280 for p in points {
281 groups.entry(p.metric.clone()).or_default().push(p.clone());
282 }
283 groups
284 }
285
286 pub fn sort_by_timestamp(points: &mut [ExportedPoint]) {
288 points.sort_by_key(|p| p.timestamp_ms);
289 }
290}
291
292#[derive(Debug, Clone, Default)]
294pub struct ExportStats {
295 pub count: usize,
297 pub sum: f64,
299 pub mean: f64,
301 pub min: f64,
303 pub max: f64,
305 pub variance: f64,
307 pub stddev: f64,
309 pub first_timestamp_ms: i64,
311 pub last_timestamp_ms: i64,
313 pub distinct_metrics: usize,
315}
316
317#[derive(Debug)]
323pub struct ParquetExporter {
324 #[cfg_attr(not(feature = "arrow-export"), allow(dead_code))]
325 arrow: ArrowExporter,
326 compression: ParquetCompression,
327 row_group_size: usize,
329}
330
331impl ParquetExporter {
332 pub fn new() -> Self {
335 Self {
336 arrow: ArrowExporter::new(),
337 compression: ParquetCompression::Snappy,
338 row_group_size: 134_217_728,
339 }
340 }
341
342 pub fn with_compression(mut self, codec: ParquetCompression) -> Self {
344 self.compression = codec;
345 self
346 }
347
348 pub fn with_row_group_size(mut self, bytes: usize) -> Self {
350 self.row_group_size = bytes;
351 self
352 }
353
354 pub fn compression(&self) -> ParquetCompression {
356 self.compression
357 }
358
359 pub fn row_group_size(&self) -> usize {
361 self.row_group_size
362 }
363
364 #[cfg(feature = "arrow-export")]
366 pub fn write_file(&self, points: &[ExportedPoint], path: &Path) -> TsdbResult<u64> {
367 let batch = self.arrow.export_batch(points)?;
368
369 let codec = match self.compression {
370 ParquetCompression::None => PqCompression::UNCOMPRESSED,
371 ParquetCompression::Snappy => PqCompression::SNAPPY,
372 ParquetCompression::Zstd => PqCompression::ZSTD(Default::default()),
373 ParquetCompression::Gzip => PqCompression::GZIP(Default::default()),
374 };
375
376 let props = WriterProperties::builder()
377 .set_compression(codec)
378 .set_max_row_group_row_count(Some(self.row_group_size / 8))
379 .build();
380
381 let file = File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
382
383 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
384 .map_err(|e| TsdbError::Arrow(e.to_string()))?;
385
386 writer
387 .write(&batch)
388 .map_err(|e| TsdbError::Arrow(e.to_string()))?;
389
390 let metadata = writer
391 .close()
392 .map_err(|e| TsdbError::Arrow(e.to_string()))?;
393
394 Ok(metadata.file_metadata().num_rows() as u64)
395 }
396
397 pub fn count_rows(&self, points: &[ExportedPoint]) -> usize {
399 points.len()
400 }
401
402 pub fn export_metadata(&self, points: &[ExportedPoint]) -> ExportMetadata {
404 let stats = ArrowExporter::compute_stats(points);
405 ExportMetadata {
406 row_count: points.len(),
407 compression: self.compression,
408 row_group_size: self.row_group_size,
409 distinct_metrics: stats.distinct_metrics,
410 time_span_ms: stats
411 .last_timestamp_ms
412 .saturating_sub(stats.first_timestamp_ms),
413 }
414 }
415}
416
417impl Default for ParquetExporter {
418 fn default() -> Self {
419 Self::new()
420 }
421}
422
423#[derive(Debug, Clone)]
425pub struct ExportMetadata {
426 pub row_count: usize,
428 pub compression: ParquetCompression,
430 pub row_group_size: usize,
432 pub distinct_metrics: usize,
434 pub time_span_ms: i64,
436}
437
438#[derive(Debug, Clone)]
448pub struct DuckDbQueryAdapter {
449 parquet_path: String,
451}
452
453impl DuckDbQueryAdapter {
454 pub fn new(path: impl Into<String>) -> Self {
456 Self {
457 parquet_path: path.into(),
458 }
459 }
460
461 pub fn parquet_path(&self) -> &str {
463 &self.parquet_path
464 }
465
466 pub fn select_metric(&self, metric: &str, start_ms: i64, end_ms: i64) -> String {
468 format!(
469 "SELECT timestamp, metric, value, tags_json \
470 FROM read_parquet('{}') \
471 WHERE metric = '{}' AND timestamp BETWEEN {} AND {} \
472 ORDER BY timestamp ASC;",
473 self.parquet_path, metric, start_ms, end_ms
474 )
475 }
476
477 pub fn aggregate_metric(
479 &self,
480 metric: &str,
481 start_ms: i64,
482 end_ms: i64,
483 aggregation: AggregationFunction,
484 ) -> String {
485 let agg_expr = match aggregation {
486 AggregationFunction::Avg => "AVG(value)",
487 AggregationFunction::Min => "MIN(value)",
488 AggregationFunction::Max => "MAX(value)",
489 AggregationFunction::Sum => "SUM(value)",
490 AggregationFunction::Count => "COUNT(*)",
491 AggregationFunction::StdDev => "STDDEV(value)",
492 AggregationFunction::Percentile50 => {
493 "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value)"
494 }
495 AggregationFunction::Percentile95 => {
496 "PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value)"
497 }
498 AggregationFunction::Percentile99 => {
499 "PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY value)"
500 }
501 };
502 format!(
503 "SELECT metric, {agg} AS result \
504 FROM read_parquet('{path}') \
505 WHERE metric = '{metric}' AND timestamp BETWEEN {start} AND {end} \
506 GROUP BY metric;",
507 agg = agg_expr,
508 path = self.parquet_path,
509 metric = metric,
510 start = start_ms,
511 end = end_ms,
512 )
513 }
514
515 pub fn resample(&self, metric: &str, bucket_ms: i64) -> String {
517 format!(
518 "SELECT \
519 (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
520 metric, \
521 AVG(value) AS avg_value, \
522 MIN(value) AS min_value, \
523 MAX(value) AS max_value, \
524 COUNT(*) AS sample_count \
525 FROM read_parquet('{path}') \
526 WHERE metric = '{metric}' \
527 GROUP BY bucket_start_ms, metric \
528 ORDER BY bucket_start_ms ASC;",
529 bucket = bucket_ms,
530 path = self.parquet_path,
531 metric = metric,
532 )
533 }
534
535 pub fn export_query_to_parquet(&self, query: &str, output_path: &str) -> String {
537 format!(
538 "COPY ({query}) TO '{output}' (FORMAT PARQUET, COMPRESSION SNAPPY);",
539 query = query,
540 output = output_path,
541 )
542 }
543
544 pub fn list_metrics(&self) -> String {
546 format!(
547 "SELECT DISTINCT metric FROM read_parquet('{}') ORDER BY metric;",
548 self.parquet_path
549 )
550 }
551
552 pub fn time_range_summary(&self) -> String {
554 format!(
555 "SELECT metric, \
556 MIN(timestamp) AS first_ts_ms, \
557 MAX(timestamp) AS last_ts_ms, \
558 COUNT(*) AS total_points, \
559 AVG(value) AS mean_value \
560 FROM read_parquet('{}') \
561 GROUP BY metric \
562 ORDER BY metric;",
563 self.parquet_path
564 )
565 }
566
567 pub fn join_metrics(&self, metric_a: &str, metric_b: &str, bucket_ms: i64) -> String {
569 format!(
570 "SELECT \
571 a.bucket_start_ms, \
572 a.avg_value AS {metric_a}_avg, \
573 b.avg_value AS {metric_b}_avg \
574 FROM (\
575 SELECT (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
576 AVG(value) AS avg_value \
577 FROM read_parquet('{path}') \
578 WHERE metric = '{metric_a}' \
579 GROUP BY bucket_start_ms\
580 ) a \
581 INNER JOIN (\
582 SELECT (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
583 AVG(value) AS avg_value \
584 FROM read_parquet('{path}') \
585 WHERE metric = '{metric_b}' \
586 GROUP BY bucket_start_ms\
587 ) b ON a.bucket_start_ms = b.bucket_start_ms \
588 ORDER BY a.bucket_start_ms ASC;",
589 metric_a = metric_a,
590 metric_b = metric_b,
591 bucket = bucket_ms,
592 path = self.parquet_path,
593 )
594 }
595
596 pub fn rate_of_change(&self, metric: &str, bucket_ms: i64) -> String {
598 format!(
599 "SELECT \
600 bucket_start_ms, \
601 avg_value, \
602 avg_value - LAG(avg_value) OVER (ORDER BY bucket_start_ms) AS delta, \
603 (avg_value - LAG(avg_value) OVER (ORDER BY bucket_start_ms)) / \
604 NULLIF(({bucket}::DOUBLE / 1000.0), 0) AS rate_per_sec \
605 FROM (\
606 SELECT (timestamp / {bucket}) * {bucket} AS bucket_start_ms, \
607 AVG(value) AS avg_value \
608 FROM read_parquet('{path}') \
609 WHERE metric = '{metric}' \
610 GROUP BY bucket_start_ms\
611 ) \
612 ORDER BY bucket_start_ms ASC;",
613 bucket = bucket_ms,
614 path = self.parquet_path,
615 metric = metric,
616 )
617 }
618
619 pub fn create_view(&self, view_name: &str) -> String {
621 format!(
622 "CREATE OR REPLACE VIEW {view} AS \
623 SELECT * FROM read_parquet('{}');",
624 self.parquet_path,
625 view = view_name,
626 )
627 }
628
629 pub fn count_per_metric(&self) -> String {
631 format!(
632 "SELECT metric, COUNT(*) AS point_count \
633 FROM read_parquet('{}') \
634 GROUP BY metric \
635 ORDER BY point_count DESC;",
636 self.parquet_path,
637 )
638 }
639}
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
643pub enum AggregationFunction {
644 Avg,
646 Min,
648 Max,
650 Sum,
652 Count,
654 StdDev,
656 Percentile50,
658 Percentile95,
660 Percentile99,
662}
663
664#[derive(Debug, Clone, Default)]
679pub struct ColumnarExport {
680 pub timestamps: Vec<i64>,
682 pub metrics: Vec<String>,
684 pub values: Vec<f64>,
686}
687
688impl ColumnarExport {
689 pub fn new() -> Self {
691 Self::default()
692 }
693
694 pub fn with_capacity(capacity: usize) -> Self {
696 Self {
697 timestamps: Vec::with_capacity(capacity),
698 metrics: Vec::with_capacity(capacity),
699 values: Vec::with_capacity(capacity),
700 }
701 }
702
703 pub fn from_points(points: &[ExportedPoint]) -> Self {
705 let mut out = Self::with_capacity(points.len());
706 for p in points {
707 out.timestamps.push(p.timestamp_ms);
708 out.metrics.push(p.metric.clone());
709 out.values.push(p.value);
710 }
711 out
712 }
713
714 pub fn len(&self) -> usize {
716 self.timestamps.len()
717 }
718
719 pub fn is_empty(&self) -> bool {
721 self.timestamps.is_empty()
722 }
723
724 pub fn push(&mut self, timestamp_ms: i64, metric: impl Into<String>, value: f64) {
726 self.timestamps.push(timestamp_ms);
727 self.metrics.push(metric.into());
728 self.values.push(value);
729 }
730
731 pub fn to_points(&self) -> Vec<ExportedPoint> {
733 self.timestamps
734 .iter()
735 .zip(self.metrics.iter())
736 .zip(self.values.iter())
737 .map(|((ts, m), v)| ExportedPoint::new(*ts, m.clone(), *v))
738 .collect()
739 }
740
741 pub fn to_csv(&self, path: &std::path::Path) -> TsdbResult<()> {
745 use std::io::Write as IoWrite;
746 let file = std::fs::File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
747 let mut writer = std::io::BufWriter::new(file);
748 writeln!(writer, "timestamp_ms,metric,value").map_err(|e| TsdbError::Io(e.to_string()))?;
749 for i in 0..self.len() {
750 let metric_escaped = if self.metrics[i].contains(',') || self.metrics[i].contains('"') {
752 format!("\"{}\"", self.metrics[i].replace('"', "\"\""))
753 } else {
754 self.metrics[i].clone()
755 };
756 writeln!(
757 writer,
758 "{},{},{}",
759 self.timestamps[i], metric_escaped, self.values[i]
760 )
761 .map_err(|e| TsdbError::Io(e.to_string()))?;
762 }
763 Ok(())
764 }
765
766 pub fn to_tsv(&self, path: &std::path::Path) -> TsdbResult<()> {
770 use std::io::Write as IoWrite;
771 let file = std::fs::File::create(path).map_err(|e| TsdbError::Io(e.to_string()))?;
772 let mut writer = std::io::BufWriter::new(file);
773 writeln!(writer, "timestamp_ms\tmetric\tvalue")
774 .map_err(|e| TsdbError::Io(e.to_string()))?;
775 for i in 0..self.len() {
776 writeln!(
777 writer,
778 "{}\t{}\t{}",
779 self.timestamps[i], self.metrics[i], self.values[i]
780 )
781 .map_err(|e| TsdbError::Io(e.to_string()))?;
782 }
783 Ok(())
784 }
785
786 pub fn to_json(&self) -> TsdbResult<serde_json::Value> {
790 let rows: Vec<serde_json::Value> = self
791 .timestamps
792 .iter()
793 .zip(self.metrics.iter())
794 .zip(self.values.iter())
795 .map(|((ts, m), v)| {
796 serde_json::json!({
797 "timestamp_ms": ts,
798 "metric": m,
799 "value": v,
800 })
801 })
802 .collect();
803 Ok(serde_json::Value::Array(rows))
804 }
805
806 pub fn to_json_string(&self) -> TsdbResult<String> {
808 let v = self.to_json()?;
809 serde_json::to_string(&v).map_err(|e| TsdbError::Serialization(e.to_string()))
810 }
811
812 pub fn filter_metric(&self, metric: &str) -> Self {
815 let mut out = Self::new();
816 for i in 0..self.len() {
817 if self.metrics[i] == metric {
818 out.push(self.timestamps[i], self.metrics[i].clone(), self.values[i]);
819 }
820 }
821 out
822 }
823
824 pub fn filter_time_range(&self, start_ms: i64, end_ms: i64) -> Self {
826 let mut out = Self::new();
827 for i in 0..self.len() {
828 let ts = self.timestamps[i];
829 if ts >= start_ms && ts <= end_ms {
830 out.push(ts, self.metrics[i].clone(), self.values[i]);
831 }
832 }
833 out
834 }
835
836 pub fn sort_by_timestamp(&mut self) {
838 let n = self.len();
840 let mut indices: Vec<usize> = (0..n).collect();
841 indices.sort_by_key(|&i| self.timestamps[i]);
842
843 let old_ts = self.timestamps.clone();
844 let old_m = self.metrics.clone();
845 let old_v = self.values.clone();
846 for (new_pos, &old_pos) in indices.iter().enumerate() {
847 self.timestamps[new_pos] = old_ts[old_pos];
848 self.metrics[new_pos] = old_m[old_pos].clone();
849 self.values[new_pos] = old_v[old_pos];
850 }
851 }
852
853 pub fn value_stats(&self) -> ColumnarStats {
855 if self.values.is_empty() {
856 return ColumnarStats::default();
857 }
858 let n = self.values.len();
859 let sum: f64 = self.values.iter().sum();
860 let mean = sum / n as f64;
861 let min = self.values.iter().cloned().fold(f64::INFINITY, f64::min);
862 let max = self
863 .values
864 .iter()
865 .cloned()
866 .fold(f64::NEG_INFINITY, f64::max);
867 let variance = if n > 1 {
868 self.values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1) as f64
869 } else {
870 0.0
871 };
872 ColumnarStats {
873 count: n,
874 sum,
875 mean,
876 min,
877 max,
878 variance,
879 stddev: variance.sqrt(),
880 }
881 }
882
883 pub fn from_csv(path: &std::path::Path) -> TsdbResult<Self> {
886 use std::io::{BufRead, BufReader};
887 let file = std::fs::File::open(path).map_err(|e| TsdbError::Io(e.to_string()))?;
888 let reader = BufReader::new(file);
889 let mut out = Self::new();
890 let mut first = true;
891 for line in reader.lines() {
892 let line = line.map_err(|e| TsdbError::Io(e.to_string()))?;
893 if first {
894 first = false;
895 continue; }
897 if line.is_empty() {
898 continue;
899 }
900 let parts = parse_csv_line(&line);
902 if parts.len() < 3 {
903 return Err(TsdbError::Serialization(format!(
904 "CSV line has fewer than 3 fields: {line}"
905 )));
906 }
907 let ts: i64 = parts[0].parse().map_err(|_| {
908 TsdbError::Serialization(format!("invalid timestamp: {}", parts[0]))
909 })?;
910 let metric = parts[1].clone();
911 let value: f64 = parts[2]
912 .parse()
913 .map_err(|_| TsdbError::Serialization(format!("invalid value: {}", parts[2])))?;
914 out.push(ts, metric, value);
915 }
916 Ok(out)
917 }
918}
919
920#[derive(Debug, Clone, Default)]
922pub struct ColumnarStats {
923 pub count: usize,
925 pub sum: f64,
927 pub mean: f64,
929 pub min: f64,
931 pub max: f64,
933 pub variance: f64,
935 pub stddev: f64,
937}
938
939fn parse_csv_line(line: &str) -> Vec<String> {
941 let mut fields = Vec::new();
942 let mut current = String::new();
943 let mut in_quotes = false;
944 let mut chars = line.chars().peekable();
945 while let Some(c) = chars.next() {
946 match c {
947 '"' if !in_quotes => {
948 in_quotes = true;
949 }
950 '"' if in_quotes => {
951 if chars.peek() == Some(&'"') {
953 chars.next();
954 current.push('"');
955 } else {
956 in_quotes = false;
957 }
958 }
959 ',' if !in_quotes => {
960 fields.push(current.clone());
961 current.clear();
962 }
963 other => {
964 current.push(other);
965 }
966 }
967 }
968 fields.push(current);
969 fields
970}
971
972#[cfg(test)]
977mod tests {
978 use super::*;
979 use std::collections::HashMap;
980
981 #[test]
984 fn test_exported_point_new_no_tags() {
985 let p = ExportedPoint::new(1_700_000_000_000, "cpu", 88.5);
986 assert_eq!(p.metric, "cpu");
987 assert_eq!(p.value, 88.5);
988 assert_eq!(p.tags_json, "{}");
989 }
990
991 #[test]
992 fn test_exported_point_with_tags_roundtrip() {
993 let mut tags = HashMap::new();
994 tags.insert("host".to_string(), "srv-01".to_string());
995 tags.insert("region".to_string(), "eu-west".to_string());
996
997 let p = ExportedPoint::with_tags(1_700_000_000_000, "mem", 4096.0, &tags)
998 .expect("serialization should succeed");
999
1000 let parsed = p.parse_tags().expect("deserialization should succeed");
1001 assert_eq!(parsed["host"], "srv-01");
1002 assert_eq!(parsed["region"], "eu-west");
1003 }
1004
1005 #[test]
1006 fn test_exported_point_serialization() {
1007 let p = ExportedPoint::new(42_000, "latency_ms", 1.5);
1008 let json = serde_json::to_string(&p).expect("json serialize");
1009 let back: ExportedPoint = serde_json::from_str(&json).expect("json deserialize");
1010 assert_eq!(p, back);
1011 }
1012
1013 #[test]
1014 fn test_exported_point_clone() {
1015 let p = ExportedPoint::new(1, "x", 0.0);
1016 let q = p.clone();
1017 assert_eq!(p, q);
1018 }
1019
1020 #[test]
1023 fn test_arrow_exporter_default() {
1024 let e = ArrowExporter::default();
1025 assert_eq!(e.max_rows_per_batch(), 0);
1026 }
1027
1028 #[test]
1029 fn test_arrow_exporter_with_max_rows() {
1030 let e = ArrowExporter::with_max_rows(100);
1031 assert_eq!(e.max_rows_per_batch(), 100);
1032 }
1033
1034 #[test]
1035 fn test_arrow_filter_by_metric() {
1036 let pts = vec![
1037 ExportedPoint::new(1, "cpu", 10.0),
1038 ExportedPoint::new(2, "mem", 20.0),
1039 ExportedPoint::new(3, "cpu", 30.0),
1040 ];
1041 let filtered = ArrowExporter::filter_by_metric(&pts, "cpu");
1042 assert_eq!(filtered.len(), 2);
1043 assert!(filtered.iter().all(|p| p.metric == "cpu"));
1044 }
1045
1046 #[test]
1047 fn test_arrow_filter_by_time_range() {
1048 let pts = vec![
1049 ExportedPoint::new(100, "x", 1.0),
1050 ExportedPoint::new(200, "x", 2.0),
1051 ExportedPoint::new(300, "x", 3.0),
1052 ExportedPoint::new(400, "x", 4.0),
1053 ];
1054 let filtered = ArrowExporter::filter_by_time_range(&pts, 150, 350);
1055 assert_eq!(filtered.len(), 2);
1056 assert_eq!(filtered[0].timestamp_ms, 200);
1057 assert_eq!(filtered[1].timestamp_ms, 300);
1058 }
1059
1060 #[test]
1061 fn test_arrow_filter_by_time_range_empty() {
1062 let pts: Vec<ExportedPoint> = vec![];
1063 let filtered = ArrowExporter::filter_by_time_range(&pts, 0, 1000);
1064 assert!(filtered.is_empty());
1065 }
1066
1067 #[test]
1068 fn test_arrow_filter_by_time_range_no_match() {
1069 let pts = vec![ExportedPoint::new(1000, "x", 1.0)];
1070 let filtered = ArrowExporter::filter_by_time_range(&pts, 0, 500);
1071 assert!(filtered.is_empty());
1072 }
1073
1074 #[test]
1075 fn test_arrow_filter_by_time_range_inclusive_boundary() {
1076 let pts = vec![
1077 ExportedPoint::new(100, "x", 1.0),
1078 ExportedPoint::new(200, "x", 2.0),
1079 ];
1080 let filtered = ArrowExporter::filter_by_time_range(&pts, 100, 200);
1081 assert_eq!(filtered.len(), 2);
1082 }
1083
1084 #[cfg(feature = "arrow-export")]
1085 #[test]
1086 fn test_arrow_export_batch_schema() {
1087 use arrow::datatypes::DataType;
1088 let schema = ArrowExporter::schema();
1089 assert_eq!(schema.field(0).name(), "timestamp");
1090 assert_eq!(schema.field(0).data_type(), &DataType::Int64);
1091 assert_eq!(schema.field(1).name(), "metric");
1092 assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
1093 assert_eq!(schema.field(2).name(), "value");
1094 assert_eq!(schema.field(2).data_type(), &DataType::Float64);
1095 assert_eq!(schema.field(3).name(), "tags_json");
1096 assert_eq!(schema.field(3).data_type(), &DataType::Utf8);
1097 }
1098
1099 #[cfg(feature = "arrow-export")]
1100 #[test]
1101 fn test_arrow_export_batch_row_count() {
1102 let pts: Vec<_> = (0..50)
1103 .map(|i| ExportedPoint::new(i * 1000, "temp", i as f64))
1104 .collect();
1105 let exporter = ArrowExporter::new();
1106 let batch = exporter.export_batch(&pts).expect("export should succeed");
1107 assert_eq!(batch.num_rows(), 50);
1108 assert_eq!(batch.num_columns(), 4);
1109 }
1110
1111 #[cfg(feature = "arrow-export")]
1112 #[test]
1113 fn test_arrow_export_batch_values() {
1114 let pts = vec![
1115 ExportedPoint::new(1_000, "pressure", 101.325),
1116 ExportedPoint::new(2_000, "pressure", 102.0),
1117 ];
1118 let exporter = ArrowExporter::new();
1119 let batch = exporter.export_batch(&pts).expect("export");
1120
1121 use arrow::array::Float64Array;
1122 let values = batch
1123 .column(2)
1124 .as_any()
1125 .downcast_ref::<Float64Array>()
1126 .expect("Float64Array");
1127 assert!((values.value(0) - 101.325).abs() < 1e-9);
1128 assert!((values.value(1) - 102.0).abs() < 1e-9);
1129 }
1130
1131 #[cfg(feature = "arrow-export")]
1132 #[test]
1133 fn test_arrow_export_batches_chunking() {
1134 let pts: Vec<_> = (0..25)
1135 .map(|i| ExportedPoint::new(i, "x", i as f64))
1136 .collect();
1137 let exporter = ArrowExporter::with_max_rows(10);
1138 let batches = exporter.export_batches(&pts).expect("export batches");
1139 assert_eq!(batches.len(), 3); assert_eq!(batches[0].num_rows(), 10);
1141 assert_eq!(batches[2].num_rows(), 5);
1142 }
1143
1144 #[cfg(feature = "arrow-export")]
1145 #[test]
1146 fn test_arrow_export_batches_empty() {
1147 let pts: Vec<ExportedPoint> = vec![];
1148 let exporter = ArrowExporter::new();
1149 let batches = exporter.export_batches(&pts).expect("empty export");
1150 assert!(batches.is_empty());
1151 }
1152
1153 #[test]
1156 fn test_compute_stats_basic() {
1157 let pts = vec![
1158 ExportedPoint::new(100, "cpu", 10.0),
1159 ExportedPoint::new(200, "cpu", 20.0),
1160 ExportedPoint::new(300, "cpu", 30.0),
1161 ];
1162 let stats = ArrowExporter::compute_stats(&pts);
1163 assert_eq!(stats.count, 3);
1164 assert!((stats.mean - 20.0).abs() < f64::EPSILON);
1165 assert!((stats.min - 10.0).abs() < f64::EPSILON);
1166 assert!((stats.max - 30.0).abs() < f64::EPSILON);
1167 assert!((stats.sum - 60.0).abs() < f64::EPSILON);
1168 assert_eq!(stats.first_timestamp_ms, 100);
1169 assert_eq!(stats.last_timestamp_ms, 300);
1170 assert_eq!(stats.distinct_metrics, 1);
1171 }
1172
1173 #[test]
1174 fn test_compute_stats_empty() {
1175 let pts: Vec<ExportedPoint> = vec![];
1176 let stats = ArrowExporter::compute_stats(&pts);
1177 assert_eq!(stats.count, 0);
1178 assert!((stats.mean - 0.0).abs() < f64::EPSILON);
1179 }
1180
1181 #[test]
1182 fn test_compute_stats_single_point() {
1183 let pts = vec![ExportedPoint::new(1000, "mem", 42.0)];
1184 let stats = ArrowExporter::compute_stats(&pts);
1185 assert_eq!(stats.count, 1);
1186 assert!((stats.mean - 42.0).abs() < f64::EPSILON);
1187 assert!((stats.variance - 0.0).abs() < f64::EPSILON);
1188 assert_eq!(stats.distinct_metrics, 1);
1189 }
1190
1191 #[test]
1192 fn test_compute_stats_multiple_metrics() {
1193 let pts = vec![
1194 ExportedPoint::new(100, "cpu", 10.0),
1195 ExportedPoint::new(200, "mem", 20.0),
1196 ExportedPoint::new(300, "disk", 30.0),
1197 ];
1198 let stats = ArrowExporter::compute_stats(&pts);
1199 assert_eq!(stats.distinct_metrics, 3);
1200 }
1201
1202 #[test]
1203 fn test_compute_stats_stddev() {
1204 let pts = vec![
1205 ExportedPoint::new(1, "x", 2.0),
1206 ExportedPoint::new(2, "x", 4.0),
1207 ExportedPoint::new(3, "x", 6.0),
1208 ExportedPoint::new(4, "x", 8.0),
1209 ];
1210 let stats = ArrowExporter::compute_stats(&pts);
1211 assert!((stats.variance - 20.0 / 3.0).abs() < 1e-9);
1213 assert!((stats.stddev - (20.0_f64 / 3.0).sqrt()).abs() < 1e-9);
1214 }
1215
1216 #[test]
1219 fn test_group_by_metric() {
1220 let pts = vec![
1221 ExportedPoint::new(1, "cpu", 10.0),
1222 ExportedPoint::new(2, "mem", 20.0),
1223 ExportedPoint::new(3, "cpu", 30.0),
1224 ExportedPoint::new(4, "disk", 40.0),
1225 ];
1226 let groups = ArrowExporter::group_by_metric(&pts);
1227 assert_eq!(groups.len(), 3);
1228 assert_eq!(groups["cpu"].len(), 2);
1229 assert_eq!(groups["mem"].len(), 1);
1230 assert_eq!(groups["disk"].len(), 1);
1231 }
1232
1233 #[test]
1234 fn test_sort_by_timestamp() {
1235 let mut pts = vec![
1236 ExportedPoint::new(300, "x", 3.0),
1237 ExportedPoint::new(100, "x", 1.0),
1238 ExportedPoint::new(200, "x", 2.0),
1239 ];
1240 ArrowExporter::sort_by_timestamp(&mut pts);
1241 assert_eq!(pts[0].timestamp_ms, 100);
1242 assert_eq!(pts[1].timestamp_ms, 200);
1243 assert_eq!(pts[2].timestamp_ms, 300);
1244 }
1245
1246 #[test]
1249 fn test_parquet_exporter_default() {
1250 let e = ParquetExporter::default();
1251 assert_eq!(e.compression(), ParquetCompression::Snappy);
1252 assert_eq!(e.row_group_size(), 134_217_728);
1253 }
1254
1255 #[test]
1256 fn test_parquet_exporter_builder() {
1257 let e = ParquetExporter::new()
1258 .with_compression(ParquetCompression::Zstd)
1259 .with_row_group_size(1024 * 1024);
1260 assert_eq!(e.compression(), ParquetCompression::Zstd);
1261 assert_eq!(e.row_group_size(), 1024 * 1024);
1262 }
1263
1264 #[test]
1265 fn test_parquet_exporter_count_rows() {
1266 let pts: Vec<_> = (0..100)
1267 .map(|i| ExportedPoint::new(i, "x", i as f64))
1268 .collect();
1269 let e = ParquetExporter::new();
1270 assert_eq!(e.count_rows(&pts), 100);
1271 }
1272
1273 #[test]
1274 fn test_parquet_compression_label() {
1275 assert_eq!(ParquetCompression::None.label(), "none");
1276 assert_eq!(ParquetCompression::Snappy.label(), "snappy");
1277 assert_eq!(ParquetCompression::Zstd.label(), "zstd");
1278 assert_eq!(ParquetCompression::Gzip.label(), "gzip");
1279 }
1280
1281 #[test]
1282 fn test_parquet_compression_display() {
1283 assert_eq!(format!("{}", ParquetCompression::Snappy), "snappy");
1284 assert_eq!(format!("{}", ParquetCompression::Zstd), "zstd");
1285 }
1286
1287 #[test]
1288 fn test_export_metadata() {
1289 let pts = vec![
1290 ExportedPoint::new(1000, "cpu", 10.0),
1291 ExportedPoint::new(2000, "cpu", 20.0),
1292 ExportedPoint::new(3000, "mem", 30.0),
1293 ];
1294 let e = ParquetExporter::new().with_compression(ParquetCompression::Zstd);
1295 let meta = e.export_metadata(&pts);
1296 assert_eq!(meta.row_count, 3);
1297 assert_eq!(meta.compression, ParquetCompression::Zstd);
1298 assert_eq!(meta.distinct_metrics, 2);
1299 assert_eq!(meta.time_span_ms, 2000);
1300 }
1301
1302 #[cfg(feature = "arrow-export")]
1303 #[test]
1304 fn test_parquet_write_snappy() {
1305 let dir = std::env::temp_dir();
1306 let path = dir.join("oxirs_tsdb_test_snappy.parquet");
1307
1308 let pts: Vec<_> = (0..20)
1309 .map(|i| ExportedPoint::new(i * 1_000, "temp", 20.0 + i as f64))
1310 .collect();
1311
1312 let exporter = ParquetExporter::new().with_compression(ParquetCompression::Snappy);
1313 let rows = exporter.write_file(&pts, &path).expect("write parquet");
1314 assert_eq!(rows, 20);
1315 assert!(path.exists());
1316 let _ = std::fs::remove_file(&path);
1317 }
1318
1319 #[cfg(feature = "arrow-export")]
1320 #[test]
1321 fn test_parquet_write_zstd() {
1322 let path = std::env::temp_dir().join("oxirs_tsdb_test_zstd.parquet");
1323 let pts: Vec<_> = (0..10)
1324 .map(|i| ExportedPoint::new(i, "pressure", i as f64 * 1.1))
1325 .collect();
1326
1327 let exporter = ParquetExporter::new().with_compression(ParquetCompression::Zstd);
1328 let rows = exporter
1329 .write_file(&pts, &path)
1330 .expect("write parquet zstd");
1331 assert_eq!(rows, 10);
1332 let _ = std::fs::remove_file(path);
1333 }
1334
1335 #[cfg(feature = "arrow-export")]
1336 #[test]
1337 fn test_parquet_write_no_compression() {
1338 let path = std::env::temp_dir().join("oxirs_tsdb_test_none.parquet");
1339 let pts = vec![ExportedPoint::new(0, "v", 1.0)];
1340
1341 let exporter = ParquetExporter::new().with_compression(ParquetCompression::None);
1342 let rows = exporter.write_file(&pts, &path).expect("write");
1343 assert_eq!(rows, 1);
1344 let _ = std::fs::remove_file(path);
1345 }
1346
1347 #[test]
1350 fn test_duckdb_adapter_select_metric_sql() {
1351 let adapter = DuckDbQueryAdapter::new("/data/tsdb.parquet");
1352 let sql = adapter.select_metric("cpu", 1_000_000, 2_000_000);
1353 assert!(sql.contains("read_parquet('/data/tsdb.parquet')"));
1354 assert!(sql.contains("metric = 'cpu'"));
1355 assert!(sql.contains("1000000"));
1356 assert!(sql.contains("2000000"));
1357 assert!(sql.contains("ORDER BY timestamp ASC"));
1358 }
1359
1360 #[test]
1361 fn test_duckdb_adapter_aggregate_avg_sql() {
1362 let adapter = DuckDbQueryAdapter::new("/tmp/data.parquet");
1363 let sql = adapter.aggregate_metric("mem", 0, 9_999, AggregationFunction::Avg);
1364 assert!(sql.contains("AVG(value)"));
1365 assert!(sql.contains("GROUP BY metric"));
1366 }
1367
1368 #[test]
1369 fn test_duckdb_adapter_aggregate_all_functions() {
1370 let adapter = DuckDbQueryAdapter::new("/tmp/t.parquet");
1371 for (func, expected) in [
1372 (AggregationFunction::Min, "MIN(value)"),
1373 (AggregationFunction::Max, "MAX(value)"),
1374 (AggregationFunction::Sum, "SUM(value)"),
1375 (AggregationFunction::Count, "COUNT(*)"),
1376 (AggregationFunction::StdDev, "STDDEV(value)"),
1377 ] {
1378 let sql = adapter.aggregate_metric("x", 0, 1, func);
1379 assert!(sql.contains(expected), "expected {expected} for {func:?}");
1380 }
1381 }
1382
1383 #[test]
1384 fn test_duckdb_adapter_percentile_functions() {
1385 let adapter = DuckDbQueryAdapter::new("/tmp/t.parquet");
1386 let sql = adapter.aggregate_metric("cpu", 0, 1000, AggregationFunction::Percentile50);
1387 assert!(sql.contains("PERCENTILE_CONT(0.5)"));
1388
1389 let sql = adapter.aggregate_metric("cpu", 0, 1000, AggregationFunction::Percentile95);
1390 assert!(sql.contains("PERCENTILE_CONT(0.95)"));
1391
1392 let sql = adapter.aggregate_metric("cpu", 0, 1000, AggregationFunction::Percentile99);
1393 assert!(sql.contains("PERCENTILE_CONT(0.99)"));
1394 }
1395
1396 #[test]
1397 fn test_duckdb_adapter_resample_sql() {
1398 let adapter = DuckDbQueryAdapter::new("/tmp/t.parquet");
1399 let sql = adapter.resample("cpu", 60_000);
1400 assert!(sql.contains("60000"));
1401 assert!(sql.contains("AVG(value)"));
1402 assert!(sql.contains("GROUP BY bucket_start_ms, metric"));
1403 }
1404
1405 #[test]
1406 fn test_duckdb_adapter_list_metrics_sql() {
1407 let adapter = DuckDbQueryAdapter::new("/data/*.parquet");
1408 let sql = adapter.list_metrics();
1409 assert!(sql.contains("DISTINCT metric"));
1410 assert!(sql.contains("read_parquet('/data/*.parquet')"));
1411 }
1412
1413 #[test]
1414 fn test_duckdb_adapter_time_range_summary_sql() {
1415 let adapter = DuckDbQueryAdapter::new("data.parquet");
1416 let sql = adapter.time_range_summary();
1417 assert!(sql.contains("MIN(timestamp)"));
1418 assert!(sql.contains("MAX(timestamp)"));
1419 assert!(sql.contains("AVG(value)"));
1420 assert!(sql.contains("GROUP BY metric"));
1421 }
1422
1423 #[test]
1424 fn test_duckdb_adapter_export_query_to_parquet() {
1425 let adapter = DuckDbQueryAdapter::new("input.parquet");
1426 let inner = adapter.select_metric("cpu", 0, 1_000);
1427 let sql = adapter.export_query_to_parquet(&inner, "output.parquet");
1428 assert!(sql.starts_with("COPY ("));
1429 assert!(sql.contains("output.parquet"));
1430 assert!(sql.contains("FORMAT PARQUET"));
1431 }
1432
1433 #[test]
1434 fn test_duckdb_adapter_parquet_path() {
1435 let adapter = DuckDbQueryAdapter::new("/mnt/data/*.parquet");
1436 assert_eq!(adapter.parquet_path(), "/mnt/data/*.parquet");
1437 }
1438
1439 #[test]
1440 fn test_duckdb_adapter_join_metrics() {
1441 let adapter = DuckDbQueryAdapter::new("data.parquet");
1442 let sql = adapter.join_metrics("cpu", "mem", 60_000);
1443 assert!(sql.contains("cpu_avg"));
1444 assert!(sql.contains("mem_avg"));
1445 assert!(sql.contains("INNER JOIN"));
1446 assert!(sql.contains("60000"));
1447 }
1448
1449 #[test]
1450 fn test_duckdb_adapter_rate_of_change() {
1451 let adapter = DuckDbQueryAdapter::new("data.parquet");
1452 let sql = adapter.rate_of_change("cpu", 1000);
1453 assert!(sql.contains("LAG(avg_value)"));
1454 assert!(sql.contains("rate_per_sec"));
1455 assert!(sql.contains("1000"));
1456 }
1457
1458 #[test]
1459 fn test_duckdb_adapter_create_view() {
1460 let adapter = DuckDbQueryAdapter::new("data.parquet");
1461 let sql = adapter.create_view("tsdb_data");
1462 assert!(sql.contains("CREATE OR REPLACE VIEW tsdb_data"));
1463 assert!(sql.contains("read_parquet"));
1464 }
1465
1466 #[test]
1467 fn test_duckdb_adapter_count_per_metric() {
1468 let adapter = DuckDbQueryAdapter::new("data.parquet");
1469 let sql = adapter.count_per_metric();
1470 assert!(sql.contains("COUNT(*)"));
1471 assert!(sql.contains("GROUP BY metric"));
1472 assert!(sql.contains("ORDER BY point_count DESC"));
1473 }
1474
1475 #[test]
1478 fn test_columnar_export_new_is_empty() {
1479 let ce = ColumnarExport::new();
1480 assert!(ce.is_empty());
1481 assert_eq!(ce.len(), 0);
1482 }
1483
1484 #[test]
1485 fn test_columnar_export_push_and_len() {
1486 let mut ce = ColumnarExport::new();
1487 ce.push(1_000, "cpu", 55.0);
1488 ce.push(2_000, "mem", 70.0);
1489 assert_eq!(ce.len(), 2);
1490 assert!(!ce.is_empty());
1491 }
1492
1493 #[test]
1494 fn test_columnar_export_from_points() {
1495 let pts = vec![
1496 ExportedPoint::new(100, "temp", 22.0),
1497 ExportedPoint::new(200, "temp", 23.5),
1498 ExportedPoint::new(300, "pressure", 1013.0),
1499 ];
1500 let ce = ColumnarExport::from_points(&pts);
1501 assert_eq!(ce.len(), 3);
1502 assert_eq!(ce.metrics[0], "temp");
1503 assert_eq!(ce.metrics[2], "pressure");
1504 assert!((ce.values[1] - 23.5).abs() < f64::EPSILON);
1505 }
1506
1507 #[test]
1508 fn test_columnar_export_to_points_roundtrip() {
1509 let pts = vec![
1510 ExportedPoint::new(10, "x", 1.0),
1511 ExportedPoint::new(20, "y", 2.0),
1512 ];
1513 let ce = ColumnarExport::from_points(&pts);
1514 let back = ce.to_points();
1515 assert_eq!(back.len(), pts.len());
1516 for (a, b) in pts.iter().zip(back.iter()) {
1517 assert_eq!(a.timestamp_ms, b.timestamp_ms);
1518 assert_eq!(a.metric, b.metric);
1519 assert!((a.value - b.value).abs() < f64::EPSILON);
1520 }
1521 }
1522
1523 #[test]
1524 fn test_columnar_export_csv_roundtrip() {
1525 let dir = std::env::temp_dir();
1526 let path = dir.join("oxirs_tsdb_test_columnar.csv");
1527
1528 let mut ce = ColumnarExport::with_capacity(4);
1529 ce.push(1_000, "cpu", 10.0);
1530 ce.push(2_000, "mem", 20.5);
1531 ce.push(3_000, "disk", 30.1);
1532
1533 ce.to_csv(&path).expect("csv write should succeed");
1534 let loaded = ColumnarExport::from_csv(&path).expect("csv read should succeed");
1535
1536 assert_eq!(loaded.len(), 3);
1537 assert_eq!(loaded.metrics[1], "mem");
1538 assert!((loaded.values[2] - 30.1).abs() < 1e-9);
1539 let _ = std::fs::remove_file(&path);
1540 }
1541
1542 #[test]
1543 fn test_columnar_export_csv_with_comma_in_metric() {
1544 let dir = std::env::temp_dir();
1545 let path = dir.join("oxirs_tsdb_test_comma_metric.csv");
1546
1547 let mut ce = ColumnarExport::new();
1548 ce.push(1_000, "node,A", 5.0);
1549
1550 ce.to_csv(&path).expect("csv write");
1551 let loaded = ColumnarExport::from_csv(&path).expect("csv read");
1552 assert_eq!(loaded.len(), 1);
1553 assert_eq!(loaded.metrics[0], "node,A");
1554 let _ = std::fs::remove_file(&path);
1555 }
1556
1557 #[test]
1558 fn test_columnar_export_tsv_roundtrip() {
1559 let dir = std::env::temp_dir();
1560 let path = dir.join("oxirs_tsdb_test_columnar.tsv");
1561
1562 let mut ce = ColumnarExport::new();
1563 ce.push(100, "sensor_1", 99.9);
1564 ce.push(200, "sensor_2", 88.8);
1565
1566 ce.to_tsv(&path).expect("tsv write");
1567
1568 let content = std::fs::read_to_string(&path).expect("read tsv");
1570 let lines: Vec<&str> = content.lines().collect();
1571 assert_eq!(lines[0], "timestamp_ms\tmetric\tvalue");
1572 assert!(lines[1].contains("sensor_1"));
1573 assert!(lines[2].contains("sensor_2"));
1574 let _ = std::fs::remove_file(&path);
1575 }
1576
1577 #[test]
1578 fn test_columnar_export_to_json_structure() {
1579 let mut ce = ColumnarExport::new();
1580 ce.push(1_000, "cpu", 42.0);
1581 ce.push(2_000, "mem", 84.0);
1582
1583 let json = ce.to_json().expect("json conversion");
1584 let arr = json.as_array().expect("should be array");
1585 assert_eq!(arr.len(), 2);
1586 assert_eq!(arr[0]["metric"], "cpu");
1587 assert_eq!(arr[0]["timestamp_ms"], 1_000_i64);
1588 assert!((arr[0]["value"].as_f64().expect("should succeed") - 42.0).abs() < f64::EPSILON);
1589 assert_eq!(arr[1]["metric"], "mem");
1590 }
1591
1592 #[test]
1593 fn test_columnar_export_to_json_empty() {
1594 let ce = ColumnarExport::new();
1595 let json = ce.to_json().expect("json for empty");
1596 let arr = json.as_array().expect("array");
1597 assert!(arr.is_empty());
1598 }
1599
1600 #[test]
1601 fn test_columnar_export_to_json_string() {
1602 let mut ce = ColumnarExport::new();
1603 ce.push(5, "x", 1.0);
1604 let s = ce.to_json_string().expect("json string");
1605 assert!(s.contains("\"metric\""));
1606 assert!(s.contains("\"timestamp_ms\""));
1607 assert!(s.contains("\"value\""));
1608 }
1609
1610 #[test]
1611 fn test_columnar_export_filter_metric() {
1612 let mut ce = ColumnarExport::new();
1613 ce.push(1, "cpu", 1.0);
1614 ce.push(2, "mem", 2.0);
1615 ce.push(3, "cpu", 3.0);
1616
1617 let filtered = ce.filter_metric("cpu");
1618 assert_eq!(filtered.len(), 2);
1619 assert!(filtered.metrics.iter().all(|m| m == "cpu"));
1620 }
1621
1622 #[test]
1623 fn test_columnar_export_filter_time_range() {
1624 let mut ce = ColumnarExport::new();
1625 for i in 0..10_i64 {
1626 ce.push(i * 100, "x", i as f64);
1627 }
1628 let filtered = ce.filter_time_range(200, 500);
1629 assert_eq!(filtered.len(), 4); assert_eq!(filtered.timestamps[0], 200);
1631 assert_eq!(filtered.timestamps[3], 500);
1632 }
1633
1634 #[test]
1635 fn test_columnar_export_filter_time_range_empty() {
1636 let ce = ColumnarExport::new();
1637 let filtered = ce.filter_time_range(0, 1000);
1638 assert!(filtered.is_empty());
1639 }
1640
1641 #[test]
1642 fn test_columnar_export_sort_by_timestamp() {
1643 let mut ce = ColumnarExport::new();
1644 ce.push(300, "c", 3.0);
1645 ce.push(100, "a", 1.0);
1646 ce.push(200, "b", 2.0);
1647
1648 ce.sort_by_timestamp();
1649 assert_eq!(ce.timestamps[0], 100);
1650 assert_eq!(ce.metrics[0], "a");
1651 assert_eq!(ce.timestamps[1], 200);
1652 assert_eq!(ce.timestamps[2], 300);
1653 }
1654
1655 #[test]
1656 fn test_columnar_export_value_stats_basic() {
1657 let mut ce = ColumnarExport::new();
1658 ce.push(1, "x", 10.0);
1659 ce.push(2, "x", 20.0);
1660 ce.push(3, "x", 30.0);
1661
1662 let stats = ce.value_stats();
1663 assert_eq!(stats.count, 3);
1664 assert!((stats.mean - 20.0).abs() < f64::EPSILON);
1665 assert!((stats.min - 10.0).abs() < f64::EPSILON);
1666 assert!((stats.max - 30.0).abs() < f64::EPSILON);
1667 assert!((stats.sum - 60.0).abs() < f64::EPSILON);
1668 }
1669
1670 #[test]
1671 fn test_columnar_export_value_stats_empty() {
1672 let ce = ColumnarExport::new();
1673 let stats = ce.value_stats();
1674 assert_eq!(stats.count, 0);
1675 }
1676
1677 #[test]
1678 fn test_columnar_export_value_stats_single() {
1679 let mut ce = ColumnarExport::new();
1680 ce.push(1, "x", 7.0);
1681 let stats = ce.value_stats();
1682 assert_eq!(stats.count, 1);
1683 assert!((stats.mean - 7.0).abs() < f64::EPSILON);
1684 assert!((stats.variance - 0.0).abs() < f64::EPSILON);
1685 }
1686
1687 #[test]
1688 fn test_columnar_export_with_capacity() {
1689 let ce = ColumnarExport::with_capacity(100);
1690 assert!(ce.is_empty());
1691 assert_eq!(ce.len(), 0);
1692 }
1693
1694 #[test]
1695 fn test_columnar_export_csv_empty_file() {
1696 let dir = std::env::temp_dir();
1697 let path = dir.join("oxirs_tsdb_test_empty.csv");
1698
1699 let ce = ColumnarExport::new();
1700 ce.to_csv(&path).expect("csv write empty");
1701 let loaded = ColumnarExport::from_csv(&path).expect("csv read empty");
1702 assert!(loaded.is_empty());
1703 let _ = std::fs::remove_file(&path);
1704 }
1705
1706 #[test]
1707 fn test_columnar_export_tsv_multiple_metrics() {
1708 let dir = std::env::temp_dir();
1709 let path = dir.join("oxirs_tsdb_multi_metric.tsv");
1710
1711 let mut ce = ColumnarExport::new();
1712 for i in 0..5_i64 {
1713 ce.push(i * 1000, "cpu", i as f64 * 10.0);
1714 ce.push(i * 1000 + 1, "mem", i as f64 * 20.0);
1715 }
1716
1717 ce.to_tsv(&path).expect("tsv write");
1718 let content = std::fs::read_to_string(&path).expect("tsv read");
1719 let lines: Vec<&str> = content.lines().collect();
1720 assert_eq!(lines.len(), 11);
1722 let _ = std::fs::remove_file(&path);
1723 }
1724}