1use arrow::datatypes::{Float64Type, Int64Type, UInt32Type, UInt64Type};
2use chrono::{DateTime, NaiveDateTime};
3use num_traits::ToPrimitive;
4use serde::{Deserialize, Serialize};
5use statistical::{mean, population_standard_deviation};
6use std::collections::HashMap;
7use std::fmt;
8use std::hash::Hash;
9use std::iter::Iterator;
10use std::net::{IpAddr, Ipv4Addr};
11
12use crate::table::{Column, ColumnType};
13
14const MAX_TIME_INTERVAL: u32 = 86_400; const MIN_TIME_INTERVAL: u32 = 30; #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
19pub enum Element {
20 Int(i64),
21 UInt(u64),
22 Enum(String), Float(f64),
24 FloatRange(FloatRange),
25 Text(String),
26 Binary(Vec<u8>),
27 IpAddr(IpAddr),
28 DateTime(NaiveDateTime),
29}
30
31#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Hash)]
32pub enum GroupElement {
33 Int(i64),
34 UInt(u32),
35 Enum(String),
36 Text(String),
37 IpAddr(IpAddr),
38 DateTime(NaiveDateTime),
39}
40
41#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
42pub struct FloatRange {
43 pub smallest: f64,
44 pub largest: f64,
45}
46
47impl fmt::Display for Element {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Self::Int(x) => write!(f, "{x}"),
51 Self::UInt(x) => write!(f, "{x}"),
52 Self::Enum(x) | Self::Text(x) => write!(f, "{x}"),
53 Self::Binary(x) => write!(f, "{x:#?}"),
54 Self::Float(x) => write!(f, "{x}"),
55 Self::FloatRange(x) => {
56 if x.smallest == 0.0_f64 && x.largest == 0.0_f64 {
57 write!(f, "0")
58 } else {
59 write!(f, "{:.3}~{:.3}", x.smallest, x.largest)
60 }
61 }
62 Self::IpAddr(x) => write!(f, "{x}"),
63 Self::DateTime(x) => write!(f, "{x}"),
64 }
65 }
66}
67
68impl PartialOrd for GroupElement {
69 fn partial_cmp(&self, other: &GroupElement) -> Option<std::cmp::Ordering> {
70 match (self, other) {
71 (Self::Int(s), Self::Int(o)) => Some(s.cmp(o)),
72 (Self::UInt(s), Self::UInt(o)) => Some(s.cmp(o)),
73 (Self::Enum(s), Self::Enum(o)) | (Self::Text(s), Self::Text(o)) => Some(s.cmp(o)),
74 (Self::IpAddr(s), Self::IpAddr(o)) => Some(s.cmp(o)),
75 (Self::DateTime(s), Self::DateTime(o)) => Some(s.cmp(o)),
76 _ => None,
77 }
78 }
79}
80
81#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
83pub struct ColumnStatistics {
84 pub description: Description,
85 pub n_largest_count: NLargestCount,
86}
87
88#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
89pub struct Description {
90 count: usize,
91 mean: Option<f64>,
92 s_deviation: Option<f64>,
93 min: Option<Element>,
94 max: Option<Element>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98pub struct ElementCount {
99 pub value: Element,
100 pub count: usize,
101}
102
103#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
104pub struct NLargestCount {
105 number_of_elements: usize,
106 top_n: Vec<ElementCount>,
107 mode: Option<Element>,
108}
109
110#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
111pub struct GroupElementCount {
112 pub value: GroupElement,
113 pub count: usize,
114}
115
116#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
117pub struct GroupCount {
118 pub count_index: Option<usize>, pub series: Vec<GroupElementCount>,
120}
121
122impl fmt::Display for Description {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 writeln!(f, "Start of Description")?;
125 writeln!(f, " count: {}", self.count)?;
126 if self.mean.is_some() {
127 writeln!(f, " mean: {}", self.mean().unwrap())?;
128 }
129 if self.s_deviation.is_some() {
130 writeln!(f, " s-deviation: {}", self.std_deviation().unwrap())?;
131 }
132 if self.min.is_some() {
133 writeln!(f, " min: {}", self.min().unwrap())?;
134 }
135 if self.max.is_some() {
136 writeln!(f, " max: {}", self.max().unwrap())?;
137 }
138 writeln!(f, "End of Description")
139 }
140}
141
142impl fmt::Display for NLargestCount {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 writeln!(f, "Start of NLargestCount")?;
145 writeln!(f, " number of elements: {}", self.number_of_elements())?;
146 writeln!(f, " Top N")?;
147 for elem in self.top_n() {
148 writeln!(f, " data: {} count: {}", elem.value, elem.count)?;
149 }
150 if self.mode.is_some() {
151 writeln!(f, " mode: {}", self.mode().unwrap())?;
152 }
153 writeln!(f, "End of NLargestCount")
154 }
155}
156
157impl Description {
158 #[must_use]
159 pub fn new(
160 count: usize,
161 mean: Option<f64>,
162 s_deviation: Option<f64>,
163 min: Option<Element>,
164 max: Option<Element>,
165 ) -> Self {
166 Self {
167 count,
168 mean,
169 s_deviation,
170 min,
171 max,
172 }
173 }
174
175 #[must_use]
176 pub fn count(&self) -> usize {
177 self.count
178 }
179
180 #[must_use]
181 pub fn mean(&self) -> Option<f64> {
182 self.mean
183 }
184
185 #[must_use]
186 pub fn std_deviation(&self) -> Option<f64> {
187 self.s_deviation
188 }
189
190 #[must_use]
191 pub fn min(&self) -> Option<&Element> {
192 self.min.as_ref()
193 }
194
195 #[must_use]
196 pub fn max(&self) -> Option<&Element> {
197 self.max.as_ref()
198 }
199}
200
201impl NLargestCount {
202 #[must_use]
203 pub fn new(number_of_elements: usize, top_n: Vec<ElementCount>, mode: Option<Element>) -> Self {
204 Self {
205 number_of_elements,
206 top_n,
207 mode,
208 }
209 }
210
211 #[must_use]
212 pub fn number_of_elements(&self) -> usize {
213 self.number_of_elements
214 }
215
216 #[must_use]
217 pub fn top_n(&self) -> &Vec<ElementCount> {
218 &self.top_n
219 }
220
221 #[must_use]
222 pub fn mode(&self) -> Option<&Element> {
223 self.mode.as_ref()
224 }
225}
226
227macro_rules! min_max {
228 ( $iter:expr, $d:expr, $t2:expr ) => {{
229 if let Some(minmax) = find_min_max($iter) {
230 $d.min = Some($t2(minmax.min));
231 $d.max = Some($t2(minmax.max));
232 } else {
233 $d.min = None;
234 $d.max = None;
235 }
236 }};
237}
238
239macro_rules! mean_deviation {
240 ( $vf:expr, $t1:ty, $d:expr ) => {
241 let m = mean(&$vf);
242 $d.mean = Some(m);
243 $d.s_deviation = Some(population_standard_deviation(&$vf, Some(m)));
244 };
245}
246
247macro_rules! top_n {
248 ( $iter:expr, $len:expr, $d:expr, $t1:ty, $t2:expr, $num_of_top_n:expr ) => {
249 let top_n_native: Vec<($t1, usize)> = count_sort($iter);
250 $d.number_of_elements = top_n_native.len();
251 let mut top_n: Vec<ElementCount> = Vec::new();
252 let num_of_top_n = $num_of_top_n.to_usize().expect("safe: u32 -> usize");
253 let top_n_num = if num_of_top_n > top_n_native.len() {
254 top_n_native.len()
255 } else {
256 num_of_top_n
257 };
258 for (x, y) in &top_n_native[0..top_n_num] {
259 top_n.push(ElementCount {
260 value: $t2((*x).to_owned()),
261 count: *y,
262 });
263 }
264 $d.mode = top_n.first().map(|v| v.value.clone());
265 $d.top_n = top_n;
266 };
267}
268
269#[must_use]
270pub(crate) fn describe(column: &Column, rows: &[usize], column_type: ColumnType) -> Description {
271 let mut description = Description {
272 count: rows.len(),
273 ..Description::default()
274 };
275
276 match column_type {
277 ColumnType::Int64 => {
278 let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
279 min_max!(iter, description, Element::Int);
280 let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
281 #[allow(clippy::cast_precision_loss)] let f_values: Vec<f64> = iter.map(|v: i64| v as f64).collect();
283 mean_deviation!(f_values, i64, description);
284 }
285 ColumnType::Float64 => {
286 let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
287 min_max!(iter, description, Element::Float);
288 let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
289 let values = iter.collect::<Vec<_>>();
290 mean_deviation!(values, f64, description);
291 }
292 _ => (),
293 }
294
295 description
296}
297
298#[must_use]
299pub(crate) fn n_largest_count(
300 column: &Column,
301 rows: &[usize],
302 column_type: ColumnType,
303 number_of_top_n: u32,
304) -> NLargestCount {
305 let mut n_largest_count = NLargestCount::default();
306
307 match column_type {
308 ColumnType::Int64 => {
309 let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
310 top_n!(
311 iter,
312 rows.len(),
313 n_largest_count,
314 i64,
315 Element::Int,
316 number_of_top_n
317 );
318 }
319 ColumnType::Enum => {
320 let iter = column.primitive_iter::<UInt64Type>(rows).unwrap();
321 top_n!(
322 iter,
323 rows.len(),
324 n_largest_count,
325 u64,
326 Element::UInt,
327 number_of_top_n
328 );
329 }
330 ColumnType::Utf8 => {
331 let iter = column.string_iter(rows).unwrap();
332 top_n!(
333 iter,
334 rows.len(),
335 n_largest_count,
336 &str,
337 Element::Text,
338 number_of_top_n
339 );
340 }
341 ColumnType::Binary => {
342 let iter = column.binary_iter(rows).unwrap();
343 top_n!(
344 iter,
345 rows.len(),
346 n_largest_count,
347 &[u8],
348 Element::Binary,
349 number_of_top_n
350 );
351 }
352 ColumnType::IpAddr => {
353 let values = column
354 .primitive_iter::<UInt32Type>(rows)
355 .unwrap()
356 .map(|v| IpAddr::from(Ipv4Addr::from(v)))
357 .collect::<Vec<_>>();
358 top_n!(
359 values.iter(),
360 rows.len(),
361 n_largest_count,
362 &IpAddr,
363 Element::IpAddr,
364 number_of_top_n
365 );
366 }
367 ColumnType::DateTime | ColumnType::Float64 => unreachable!(), }
369
370 n_largest_count
371}
372
373#[must_use]
374#[allow(clippy::too_many_lines)]
375pub(crate) fn n_largest_count_enum(
376 column: &Column,
377 rows: &[usize],
378 reverse_map: &HashMap<u64, Vec<String>>,
379 number_of_top_n: u32,
380) -> NLargestCount {
381 let n_largest_count = n_largest_count(column, rows, ColumnType::Enum, number_of_top_n);
382
383 let (top_n, mode) = {
384 if reverse_map.is_empty() {
385 (
386 n_largest_count
387 .top_n()
388 .iter()
389 .map(|elem| {
390 if let Element::UInt(value) = elem.value {
391 ElementCount {
392 value: Element::Enum(value.to_string()),
393 count: elem.count,
394 }
395 } else {
396 ElementCount {
397 value: Element::Enum("_N/A_".to_string()),
398 count: elem.count,
399 }
400 }
401 })
402 .collect(),
403 match n_largest_count.mode() {
404 Some(mode) => {
405 if let Element::UInt(value) = mode {
406 Some(Element::Enum(value.to_string()))
407 } else {
408 None
409 }
410 }
411 None => None,
412 },
413 )
414 } else {
415 (
416 n_largest_count
417 .top_n()
418 .iter()
419 .map(|elem| {
420 if let Element::UInt(value) = elem.value {
421 ElementCount {
422 value: Element::Enum(reverse_map.get(&value).map_or(
423 "_NO_MAP_".to_string(),
424 |v| {
425 let mut s = String::new();
426 for (i, e) in v.iter().enumerate() {
427 s.push_str(e);
428 if i < v.len() - 1 {
429 s.push('|');
430 }
431 }
432 s
433 },
434 )),
435 count: elem.count,
436 }
437 } else {
438 ElementCount {
439 value: Element::Enum("_N/A_".to_string()),
440 count: elem.count,
441 }
442 }
443 })
444 .collect(),
445 match n_largest_count.mode() {
446 Some(mode) => {
447 if let Element::UInt(value) = mode {
448 Some(Element::Enum(reverse_map.get(value).map_or(
449 "_NO_MAP_".to_string(),
450 |v| {
451 let mut s = String::new();
452 for (i, e) in v.iter().enumerate() {
453 s.push_str(e);
454 if i < v.len() - 1 {
455 s.push('|');
456 }
457 }
458 s
459 },
460 )))
461 } else {
462 None
463 }
464 }
465 None => None,
466 },
467 )
468 }
469 };
470
471 NLargestCount {
472 number_of_elements: n_largest_count.number_of_elements(),
473 top_n,
474 mode,
475 }
476}
477
478#[must_use]
479pub(crate) fn n_largest_count_float64(
480 column: &Column,
481 rows: &[usize],
482 number_of_top_n: u32,
483 precision: i32,
484) -> NLargestCount {
485 let mut n_largest_count = NLargestCount::default();
486
487 let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
488 let (rc, rt) = top_n_f64(iter, 10.0_f64.powi(precision), number_of_top_n);
489 n_largest_count.number_of_elements = rc;
490 n_largest_count.mode = Some(rt[0].value.clone());
491 n_largest_count.top_n = rt;
492
493 n_largest_count
494}
495
496#[must_use]
497pub(crate) fn n_largest_count_datetime(
498 column: &Column,
499 rows: &[usize],
500 time_interval: u32,
501 number_of_top_n: u32,
502) -> NLargestCount {
503 let mut n_largest_count = NLargestCount::default();
504 let values = convert_time_intervals(column, rows, time_interval);
505
506 top_n!(
507 values.iter(),
508 rows.len(),
509 n_largest_count,
510 &NaiveDateTime,
511 Element::DateTime,
512 number_of_top_n
513 );
514
515 n_largest_count
516}
517
518#[must_use]
522pub(crate) fn convert_time_intervals(
523 column: &Column,
524 rows: &[usize],
525 time_interval: u32,
526) -> Vec<NaiveDateTime> {
527 const A_BILLION: i64 = 1_000_000_000;
528 let time_interval = if time_interval > MAX_TIME_INTERVAL {
529 MAX_TIME_INTERVAL
530 } else {
533 time_interval
534 };
535 let time_interval = if time_interval < MIN_TIME_INTERVAL {
536 MIN_TIME_INTERVAL
537 } else {
538 time_interval
539 };
540 let time_interval = i64::from(time_interval);
541
542 column
543 .primitive_iter::<Int64Type>(rows)
544 .unwrap()
545 .map(|v| {
546 let mut interval_idx = v / A_BILLION;
548 interval_idx = (interval_idx / time_interval) * time_interval;
549 DateTime::from_timestamp(interval_idx, 0)
550 .unwrap_or_default()
551 .naive_utc()
552 })
553 .collect::<Vec<_>>()
554}
555
556fn count_sort<I>(iter: I) -> Vec<(I::Item, usize)>
557where
558 I: Iterator,
559 I::Item: Clone + Eq + Hash,
560{
561 let mut count: HashMap<I::Item, usize> = HashMap::new();
562 for v in iter {
563 let c = count.entry(v).or_insert(0);
564 *c += 1;
565 }
566 let mut top_n: Vec<(I::Item, usize)> = Vec::new();
567 for (k, v) in &count {
568 top_n.push(((*k).clone(), *v));
569 }
570 top_n.sort_unstable_by(|a, b| b.1.cmp(&a.1));
571 top_n
572}
573
574fn top_n_f64<I>(iter: I, precision: f64, number_of_top_n: u32) -> (usize, Vec<ElementCount>)
575where
576 I: Iterator<Item = f64>,
577{
578 use ordered_float::OrderedFloat;
579
580 let mut freqs: Vec<(_, usize)> = iter
581 .map(|v| OrderedFloat((v * precision).round() / precision))
582 .fold(HashMap::new(), |mut freqs, v| {
583 let e = freqs.entry(v).or_default();
584 *e += 1;
585 freqs
586 })
587 .into_iter()
588 .collect();
589
590 freqs.sort_unstable_by(|a, b| b.1.cmp(&a.1));
591
592 (
593 freqs.len(),
594 freqs
595 .into_iter()
596 .take(number_of_top_n.to_usize().expect("safe: u32 -> usize"))
597 .map(|(v, count)| ElementCount {
598 value: Element::Float(v.into_inner()),
599 count,
600 })
601 .collect(),
602 )
603}
604
605struct MinMax<T> {
606 min: T,
607 max: T,
608}
609
610fn find_min_max<I>(mut iter: I) -> Option<MinMax<I::Item>>
612where
613 I: Iterator,
614 I::Item: Copy + PartialOrd,
615{
616 let mut min = iter.next()?;
617 let mut max = min;
618
619 for v in iter {
620 if min > v {
621 min = v;
622 } else if max < v {
623 max = v;
624 }
625 }
626 Some(MinMax { min, max })
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632 use crate::Column;
633 use arrow::datatypes::Int64Type;
634 use chrono::NaiveDate;
635
636 #[test]
637 fn test_convert_time_intervals() {
638 let c4_v: Vec<i64> = vec![
639 NaiveDate::from_ymd_opt(2019, 9, 22)
640 .unwrap()
641 .and_hms_opt(6, 10, 11)
642 .unwrap()
643 .and_utc()
644 .timestamp_nanos_opt()
645 .unwrap(),
646 NaiveDate::from_ymd_opt(2019, 9, 22)
647 .unwrap()
648 .and_hms_opt(6, 15, 11)
649 .unwrap()
650 .and_utc()
651 .timestamp_nanos_opt()
652 .unwrap(),
653 NaiveDate::from_ymd_opt(2019, 9, 21)
654 .unwrap()
655 .and_hms_opt(20, 10, 11)
656 .unwrap()
657 .and_utc()
658 .timestamp_nanos_opt()
659 .unwrap(),
660 NaiveDate::from_ymd_opt(2019, 9, 21)
661 .unwrap()
662 .and_hms_opt(20, 10, 11)
663 .unwrap()
664 .and_utc()
665 .timestamp_nanos_opt()
666 .unwrap(),
667 NaiveDate::from_ymd_opt(2019, 9, 22)
668 .unwrap()
669 .and_hms_opt(6, 45, 11)
670 .unwrap()
671 .and_utc()
672 .timestamp_nanos_opt()
673 .unwrap(),
674 NaiveDate::from_ymd_opt(2019, 9, 21)
675 .unwrap()
676 .and_hms_opt(8, 10, 11)
677 .unwrap()
678 .and_utc()
679 .timestamp_nanos_opt()
680 .unwrap(),
681 NaiveDate::from_ymd_opt(2019, 9, 22)
682 .unwrap()
683 .and_hms_opt(9, 10, 11)
684 .unwrap()
685 .and_utc()
686 .timestamp_nanos_opt()
687 .unwrap(),
688 ];
689 let c4 = Column::try_from_slice::<Int64Type>(&c4_v).unwrap();
690 let rows = vec![0_usize, 3, 1, 4, 2, 6, 5];
691 let time_interval = 3600;
692 let rst = convert_time_intervals(&c4, &rows, time_interval);
693 assert_eq!(rst.len(), 7);
694 assert_eq!(
695 rst.first(),
696 Some(
697 &NaiveDate::from_ymd_opt(2019, 9, 22)
698 .unwrap()
699 .and_hms_opt(6, 0, 0)
700 .unwrap()
701 )
702 );
703 assert_eq!(
704 rst.last(),
705 Some(
706 &NaiveDate::from_ymd_opt(2019, 9, 21)
707 .unwrap()
708 .and_hms_opt(8, 0, 0)
709 .unwrap()
710 )
711 );
712 }
713
714 #[test]
715 fn test_the_first_interval_of_each_day() {
716 let c4_v: Vec<i64> = vec![
717 NaiveDate::from_ymd_opt(2019, 9, 22)
718 .unwrap()
719 .and_hms_opt(0, 3, 20)
720 .unwrap()
721 .and_utc()
722 .timestamp_nanos_opt()
723 .unwrap(),
724 NaiveDate::from_ymd_opt(2019, 9, 22)
725 .unwrap()
726 .and_hms_opt(0, 9, 11)
727 .unwrap()
728 .and_utc()
729 .timestamp_nanos_opt()
730 .unwrap(),
731 NaiveDate::from_ymd_opt(2019, 9, 22)
732 .unwrap()
733 .and_hms_opt(0, 10, 11)
734 .unwrap()
735 .and_utc()
736 .timestamp_nanos_opt()
737 .unwrap(),
738 NaiveDate::from_ymd_opt(2019, 9, 22)
739 .unwrap()
740 .and_hms_opt(1, 15, 11)
741 .unwrap()
742 .and_utc()
743 .timestamp_nanos_opt()
744 .unwrap(),
745 ];
746 let c4 = Column::try_from_slice::<Int64Type>(&c4_v).unwrap();
747 let rows = vec![0_usize, 1, 2, 3];
748 let time_interval = 3600;
749 let rst = convert_time_intervals(&c4, &rows, time_interval);
750 let converted = [
751 NaiveDate::from_ymd_opt(2019, 9, 22)
752 .unwrap()
753 .and_hms_opt(0, 0, 0)
754 .unwrap(),
755 NaiveDate::from_ymd_opt(2019, 9, 22)
756 .unwrap()
757 .and_hms_opt(0, 0, 0)
758 .unwrap(),
759 NaiveDate::from_ymd_opt(2019, 9, 22)
760 .unwrap()
761 .and_hms_opt(0, 0, 0)
762 .unwrap(),
763 NaiveDate::from_ymd_opt(2019, 9, 22)
764 .unwrap()
765 .and_hms_opt(1, 0, 0)
766 .unwrap(),
767 ];
768 for (seq, c) in converted.iter().enumerate() {
769 assert_eq!(rst.get(seq), Some(c));
770 }
771
772 let time_interval = 600;
773 let rst = convert_time_intervals(&c4, &rows, time_interval);
774 let converted = [
775 NaiveDate::from_ymd_opt(2019, 9, 22)
776 .unwrap()
777 .and_hms_opt(0, 0, 0)
778 .unwrap(),
779 NaiveDate::from_ymd_opt(2019, 9, 22)
780 .unwrap()
781 .and_hms_opt(0, 0, 0)
782 .unwrap(),
783 NaiveDate::from_ymd_opt(2019, 9, 22)
784 .unwrap()
785 .and_hms_opt(0, 10, 0)
786 .unwrap(),
787 NaiveDate::from_ymd_opt(2019, 9, 22)
788 .unwrap()
789 .and_hms_opt(1, 10, 0)
790 .unwrap(),
791 ];
792 for (seq, c) in converted.iter().enumerate() {
793 assert_eq!(rst.get(seq), Some(c));
794 }
795 }
796}