1use std::collections::HashMap;
2use std::fmt;
3use std::hash::Hash;
4use std::iter::Iterator;
5use std::net::{IpAddr, Ipv4Addr};
6
7use arrow::datatypes::{Float64Type, Int64Type, UInt32Type, UInt64Type};
8use jiff::civil::DateTime;
9use num_traits::ToPrimitive;
10use serde::{Deserialize, Serialize};
11use statistical::{mean, population_standard_deviation};
12
13use crate::table::{Column, ColumnType};
14
15const MAX_TIME_INTERVAL: u32 = 86_400; const MIN_TIME_INTERVAL: u32 = 30; #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
20pub enum Element {
21 Int(i64),
22 UInt(u64),
23 Enum(String), Float(f64),
25 FloatRange(FloatRange),
26 Text(String),
27 Binary(Vec<u8>),
28 IpAddr(IpAddr),
29 DateTime(DateTime),
30}
31
32#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Hash)]
33pub enum GroupElement {
34 Int(i64),
35 UInt(u32),
36 Enum(String),
37 Text(String),
38 IpAddr(IpAddr),
39 DateTime(DateTime),
40}
41
42#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
43pub struct FloatRange {
44 pub smallest: f64,
45 pub largest: f64,
46}
47
48impl fmt::Display for Element {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 match self {
51 Self::Int(x) => write!(f, "{x}"),
52 Self::UInt(x) => write!(f, "{x}"),
53 Self::Enum(x) | Self::Text(x) => write!(f, "{x}"),
54 Self::Binary(x) => write!(f, "{x:#?}"),
55 Self::Float(x) => write!(f, "{x}"),
56 Self::FloatRange(x) => {
57 if x.smallest == 0.0_f64 && x.largest == 0.0_f64 {
58 write!(f, "0")
59 } else {
60 write!(f, "{:.3}~{:.3}", x.smallest, x.largest)
61 }
62 }
63 Self::IpAddr(x) => write!(f, "{x}"),
64 Self::DateTime(x) => write!(f, "{x}"),
65 }
66 }
67}
68
69impl PartialOrd for GroupElement {
70 fn partial_cmp(&self, other: &GroupElement) -> Option<std::cmp::Ordering> {
71 match (self, other) {
72 (Self::Int(s), Self::Int(o)) => Some(s.cmp(o)),
73 (Self::UInt(s), Self::UInt(o)) => Some(s.cmp(o)),
74 (Self::Enum(s), Self::Enum(o)) | (Self::Text(s), Self::Text(o)) => Some(s.cmp(o)),
75 (Self::IpAddr(s), Self::IpAddr(o)) => Some(s.cmp(o)),
76 (Self::DateTime(s), Self::DateTime(o)) => Some(s.cmp(o)),
77 _ => None,
78 }
79 }
80}
81
82#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
84pub struct ColumnStatistics {
85 pub description: Description,
86 pub n_largest_count: NLargestCount,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
90pub struct Description {
91 count: usize,
92 mean: Option<f64>,
93 s_deviation: Option<f64>,
94 min: Option<Element>,
95 max: Option<Element>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
99pub struct ElementCount {
100 pub value: Element,
101 pub count: usize,
102}
103
104#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
105pub struct NLargestCount {
106 number_of_elements: usize,
107 top_n: Vec<ElementCount>,
108 mode: Option<Element>,
109}
110
111#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
112pub struct GroupElementCount {
113 pub value: GroupElement,
114 pub count: usize,
115}
116
117#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
118pub struct GroupCount {
119 pub count_index: Option<usize>, pub series: Vec<GroupElementCount>,
121}
122
123impl fmt::Display for Description {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 writeln!(f, "Start of Description")?;
126 writeln!(f, " count: {}", self.count)?;
127 if self.mean.is_some() {
128 writeln!(f, " mean: {}", self.mean().unwrap())?;
129 }
130 if self.s_deviation.is_some() {
131 writeln!(f, " s-deviation: {}", self.std_deviation().unwrap())?;
132 }
133 if self.min.is_some() {
134 writeln!(f, " min: {}", self.min().unwrap())?;
135 }
136 if self.max.is_some() {
137 writeln!(f, " max: {}", self.max().unwrap())?;
138 }
139 writeln!(f, "End of Description")
140 }
141}
142
143impl fmt::Display for NLargestCount {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 writeln!(f, "Start of NLargestCount")?;
146 writeln!(f, " number of elements: {}", self.number_of_elements())?;
147 writeln!(f, " Top N")?;
148 for elem in self.top_n() {
149 writeln!(f, " data: {} count: {}", elem.value, elem.count)?;
150 }
151 if self.mode.is_some() {
152 writeln!(f, " mode: {}", self.mode().unwrap())?;
153 }
154 writeln!(f, "End of NLargestCount")
155 }
156}
157
158impl Description {
159 #[must_use]
160 pub fn new(
161 count: usize,
162 mean: Option<f64>,
163 s_deviation: Option<f64>,
164 min: Option<Element>,
165 max: Option<Element>,
166 ) -> Self {
167 Self {
168 count,
169 mean,
170 s_deviation,
171 min,
172 max,
173 }
174 }
175
176 #[must_use]
177 pub fn count(&self) -> usize {
178 self.count
179 }
180
181 #[must_use]
182 pub fn mean(&self) -> Option<f64> {
183 self.mean
184 }
185
186 #[must_use]
187 pub fn std_deviation(&self) -> Option<f64> {
188 self.s_deviation
189 }
190
191 #[must_use]
192 pub fn min(&self) -> Option<&Element> {
193 self.min.as_ref()
194 }
195
196 #[must_use]
197 pub fn max(&self) -> Option<&Element> {
198 self.max.as_ref()
199 }
200}
201
202impl NLargestCount {
203 #[must_use]
204 pub fn new(number_of_elements: usize, top_n: Vec<ElementCount>, mode: Option<Element>) -> Self {
205 Self {
206 number_of_elements,
207 top_n,
208 mode,
209 }
210 }
211
212 #[must_use]
213 pub fn number_of_elements(&self) -> usize {
214 self.number_of_elements
215 }
216
217 #[must_use]
218 pub fn top_n(&self) -> &Vec<ElementCount> {
219 &self.top_n
220 }
221
222 #[must_use]
223 pub fn mode(&self) -> Option<&Element> {
224 self.mode.as_ref()
225 }
226}
227
228macro_rules! min_max {
229 ( $iter:expr, $d:expr, $t2:expr ) => {{
230 if let Some(minmax) = find_min_max($iter) {
231 $d.min = Some($t2(minmax.min));
232 $d.max = Some($t2(minmax.max));
233 } else {
234 $d.min = None;
235 $d.max = None;
236 }
237 }};
238}
239
240macro_rules! mean_deviation {
241 ( $vf:expr, $t1:ty, $d:expr ) => {
242 let m = mean(&$vf);
243 $d.mean = Some(m);
244 $d.s_deviation = Some(population_standard_deviation(&$vf, Some(m)));
245 };
246}
247
248macro_rules! top_n {
249 ( $iter:expr, $len:expr, $d:expr, $t1:ty, $t2:expr, $num_of_top_n:expr ) => {
250 let top_n_native: Vec<($t1, usize)> = count_sort($iter);
251 $d.number_of_elements = top_n_native.len();
252 let mut top_n: Vec<ElementCount> = Vec::new();
253 let num_of_top_n = $num_of_top_n.to_usize().expect("safe: u32 -> usize");
254 let top_n_num = if num_of_top_n > top_n_native.len() {
255 top_n_native.len()
256 } else {
257 num_of_top_n
258 };
259 for (x, y) in &top_n_native[0..top_n_num] {
260 top_n.push(ElementCount {
261 value: $t2((*x).to_owned()),
262 count: *y,
263 });
264 }
265 $d.mode = top_n.first().map(|v| v.value.clone());
266 $d.top_n = top_n;
267 };
268}
269
270#[must_use]
271pub(crate) fn describe(column: &Column, rows: &[usize], column_type: ColumnType) -> Description {
272 let mut description = Description {
273 count: rows.len(),
274 ..Description::default()
275 };
276
277 match column_type {
278 ColumnType::Int64 => {
279 let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
280 min_max!(iter, description, Element::Int);
281 let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
282 #[allow(clippy::cast_precision_loss)] let f_values: Vec<f64> = iter.map(|v: i64| v as f64).collect();
284 mean_deviation!(f_values, i64, description);
285 }
286 ColumnType::Float64 => {
287 let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
288 min_max!(iter, description, Element::Float);
289 let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
290 let values = iter.collect::<Vec<_>>();
291 mean_deviation!(values, f64, description);
292 }
293 _ => (),
294 }
295
296 description
297}
298
299#[must_use]
300pub(crate) fn n_largest_count(
301 column: &Column,
302 rows: &[usize],
303 column_type: ColumnType,
304 number_of_top_n: u32,
305) -> NLargestCount {
306 let mut n_largest_count = NLargestCount::default();
307
308 match column_type {
309 ColumnType::Int64 => {
310 let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
311 top_n!(
312 iter,
313 rows.len(),
314 n_largest_count,
315 i64,
316 Element::Int,
317 number_of_top_n
318 );
319 }
320 ColumnType::Enum => {
321 let iter = column.primitive_iter::<UInt64Type>(rows).unwrap();
322 top_n!(
323 iter,
324 rows.len(),
325 n_largest_count,
326 u64,
327 Element::UInt,
328 number_of_top_n
329 );
330 }
331 ColumnType::Utf8 => {
332 let iter = column.string_iter(rows).unwrap();
333 top_n!(
334 iter,
335 rows.len(),
336 n_largest_count,
337 &str,
338 Element::Text,
339 number_of_top_n
340 );
341 }
342 ColumnType::Binary => {
343 let iter = column.binary_iter(rows).unwrap();
344 top_n!(
345 iter,
346 rows.len(),
347 n_largest_count,
348 &[u8],
349 Element::Binary,
350 number_of_top_n
351 );
352 }
353 ColumnType::IpAddr => {
354 let values = column
355 .primitive_iter::<UInt32Type>(rows)
356 .unwrap()
357 .map(|v| IpAddr::from(Ipv4Addr::from(v)))
358 .collect::<Vec<_>>();
359 top_n!(
360 values.iter(),
361 rows.len(),
362 n_largest_count,
363 &IpAddr,
364 Element::IpAddr,
365 number_of_top_n
366 );
367 }
368 ColumnType::DateTime | ColumnType::Float64 => unreachable!(), }
370
371 n_largest_count
372}
373
374#[must_use]
375#[allow(clippy::too_many_lines)]
376pub(crate) fn n_largest_count_enum(
377 column: &Column,
378 rows: &[usize],
379 reverse_map: &HashMap<u64, Vec<String>>,
380 number_of_top_n: u32,
381) -> NLargestCount {
382 let n_largest_count = n_largest_count(column, rows, ColumnType::Enum, number_of_top_n);
383
384 let (top_n, mode) = {
385 if reverse_map.is_empty() {
386 (
387 n_largest_count
388 .top_n()
389 .iter()
390 .map(|elem| {
391 if let Element::UInt(value) = elem.value {
392 ElementCount {
393 value: Element::Enum(value.to_string()),
394 count: elem.count,
395 }
396 } else {
397 ElementCount {
398 value: Element::Enum("_N/A_".to_string()),
399 count: elem.count,
400 }
401 }
402 })
403 .collect(),
404 match n_largest_count.mode() {
405 Some(mode) => {
406 if let Element::UInt(value) = mode {
407 Some(Element::Enum(value.to_string()))
408 } else {
409 None
410 }
411 }
412 None => None,
413 },
414 )
415 } else {
416 (
417 n_largest_count
418 .top_n()
419 .iter()
420 .map(|elem| {
421 if let Element::UInt(value) = elem.value {
422 ElementCount {
423 value: Element::Enum(reverse_map.get(&value).map_or(
424 "_NO_MAP_".to_string(),
425 |v| {
426 let mut s = String::new();
427 for (i, e) in v.iter().enumerate() {
428 s.push_str(e);
429 if i < v.len() - 1 {
430 s.push('|');
431 }
432 }
433 s
434 },
435 )),
436 count: elem.count,
437 }
438 } else {
439 ElementCount {
440 value: Element::Enum("_N/A_".to_string()),
441 count: elem.count,
442 }
443 }
444 })
445 .collect(),
446 match n_largest_count.mode() {
447 Some(mode) => {
448 if let Element::UInt(value) = mode {
449 Some(Element::Enum(reverse_map.get(value).map_or(
450 "_NO_MAP_".to_string(),
451 |v| {
452 let mut s = String::new();
453 for (i, e) in v.iter().enumerate() {
454 s.push_str(e);
455 if i < v.len() - 1 {
456 s.push('|');
457 }
458 }
459 s
460 },
461 )))
462 } else {
463 None
464 }
465 }
466 None => None,
467 },
468 )
469 }
470 };
471
472 NLargestCount {
473 number_of_elements: n_largest_count.number_of_elements(),
474 top_n,
475 mode,
476 }
477}
478
479#[must_use]
480pub(crate) fn n_largest_count_float64(
481 column: &Column,
482 rows: &[usize],
483 number_of_top_n: u32,
484 precision: i32,
485) -> NLargestCount {
486 let mut n_largest_count = NLargestCount::default();
487
488 let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
489 let (rc, rt) = top_n_f64(iter, 10.0_f64.powi(precision), number_of_top_n);
490 n_largest_count.number_of_elements = rc;
491 n_largest_count.mode = Some(rt[0].value.clone());
492 n_largest_count.top_n = rt;
493
494 n_largest_count
495}
496
497#[must_use]
498pub(crate) fn n_largest_count_datetime(
499 column: &Column,
500 rows: &[usize],
501 time_interval: u32,
502 number_of_top_n: u32,
503) -> NLargestCount {
504 let mut n_largest_count = NLargestCount::default();
505 let values = convert_time_intervals(column, rows, time_interval);
506
507 top_n!(
508 values.iter(),
509 rows.len(),
510 n_largest_count,
511 &DateTime,
512 Element::DateTime,
513 number_of_top_n
514 );
515
516 n_largest_count
517}
518
519#[must_use]
523pub(crate) fn convert_time_intervals(
524 column: &Column,
525 rows: &[usize],
526 time_interval: u32,
527) -> Vec<DateTime> {
528 const A_BILLION: i64 = 1_000_000_000;
529 let time_interval = if time_interval > MAX_TIME_INTERVAL {
530 MAX_TIME_INTERVAL
531 } else {
534 time_interval
535 };
536 let time_interval = if time_interval < MIN_TIME_INTERVAL {
537 MIN_TIME_INTERVAL
538 } else {
539 time_interval
540 };
541 let time_interval = i64::from(time_interval);
542
543 column
544 .primitive_iter::<Int64Type>(rows)
545 .unwrap()
546 .map(|v| {
547 let mut interval_idx = v / A_BILLION;
549 interval_idx = (interval_idx / time_interval) * time_interval;
550 jiff::Timestamp::from_second(interval_idx)
551 .unwrap_or_default()
552 .to_zoned(jiff::tz::TimeZone::UTC)
553 .datetime()
554 })
555 .collect::<Vec<_>>()
556}
557
558fn count_sort<I>(iter: I) -> Vec<(I::Item, usize)>
559where
560 I: Iterator,
561 I::Item: Clone + Eq + Hash,
562{
563 let mut count: HashMap<I::Item, usize> = HashMap::new();
564 for v in iter {
565 let c = count.entry(v).or_insert(0);
566 *c += 1;
567 }
568 let mut top_n: Vec<(I::Item, usize)> = Vec::new();
569 for (k, v) in &count {
570 top_n.push(((*k).clone(), *v));
571 }
572 top_n.sort_unstable_by(|a, b| b.1.cmp(&a.1));
573 top_n
574}
575
576fn top_n_f64<I>(iter: I, precision: f64, number_of_top_n: u32) -> (usize, Vec<ElementCount>)
577where
578 I: Iterator<Item = f64>,
579{
580 use ordered_float::OrderedFloat;
581
582 let mut freqs: Vec<(_, usize)> = iter
583 .map(|v| OrderedFloat((v * precision).round() / precision))
584 .fold(HashMap::new(), |mut freqs, v| {
585 let e = freqs.entry(v).or_default();
586 *e += 1;
587 freqs
588 })
589 .into_iter()
590 .collect();
591
592 freqs.sort_unstable_by(|a, b| b.1.cmp(&a.1));
593
594 (
595 freqs.len(),
596 freqs
597 .into_iter()
598 .take(number_of_top_n.to_usize().expect("safe: u32 -> usize"))
599 .map(|(v, count)| ElementCount {
600 value: Element::Float(v.into_inner()),
601 count,
602 })
603 .collect(),
604 )
605}
606
607struct MinMax<T> {
608 min: T,
609 max: T,
610}
611
612fn find_min_max<I>(mut iter: I) -> Option<MinMax<I::Item>>
614where
615 I: Iterator,
616 I::Item: Copy + PartialOrd,
617{
618 let mut min = iter.next()?;
619 let mut max = min;
620
621 for v in iter {
622 if min > v {
623 min = v;
624 } else if max < v {
625 max = v;
626 }
627 }
628 Some(MinMax { min, max })
629}
630
631#[cfg(test)]
632mod tests {
633 use arrow::datatypes::Int64Type;
634 use jiff::civil::date;
635
636 use super::*;
637 use crate::Column;
638
639 #[test]
640 fn test_convert_time_intervals() {
641 let c4_v: Vec<i64> = vec![
642 date(2019, 9, 22)
643 .at(6, 10, 11, 0)
644 .to_zoned(jiff::tz::TimeZone::UTC)
645 .unwrap()
646 .timestamp()
647 .as_nanosecond()
648 .try_into()
649 .unwrap(),
650 date(2019, 9, 22)
651 .at(6, 15, 11, 0)
652 .to_zoned(jiff::tz::TimeZone::UTC)
653 .unwrap()
654 .timestamp()
655 .as_nanosecond()
656 .try_into()
657 .unwrap(),
658 date(2019, 9, 21)
659 .at(20, 10, 11, 0)
660 .to_zoned(jiff::tz::TimeZone::UTC)
661 .unwrap()
662 .timestamp()
663 .as_nanosecond()
664 .try_into()
665 .unwrap(),
666 date(2019, 9, 21)
667 .at(20, 10, 11, 0)
668 .to_zoned(jiff::tz::TimeZone::UTC)
669 .unwrap()
670 .timestamp()
671 .as_nanosecond()
672 .try_into()
673 .unwrap(),
674 date(2019, 9, 22)
675 .at(6, 45, 11, 0)
676 .to_zoned(jiff::tz::TimeZone::UTC)
677 .unwrap()
678 .timestamp()
679 .as_nanosecond()
680 .try_into()
681 .unwrap(),
682 date(2019, 9, 21)
683 .at(8, 10, 11, 0)
684 .to_zoned(jiff::tz::TimeZone::UTC)
685 .unwrap()
686 .timestamp()
687 .as_nanosecond()
688 .try_into()
689 .unwrap(),
690 date(2019, 9, 22)
691 .at(9, 10, 11, 0)
692 .to_zoned(jiff::tz::TimeZone::UTC)
693 .unwrap()
694 .timestamp()
695 .as_nanosecond()
696 .try_into()
697 .unwrap(),
698 ];
699 let c4 = Column::try_from_slice::<Int64Type>(&c4_v).unwrap();
700 let rows = vec![0_usize, 3, 1, 4, 2, 6, 5];
701 let time_interval = 3600;
702 let rst = convert_time_intervals(&c4, &rows, time_interval);
703 assert_eq!(rst.len(), 7);
704 assert_eq!(rst.first(), Some(&date(2019, 9, 22).at(6, 0, 0, 0)));
705 assert_eq!(rst.last(), Some(&date(2019, 9, 21).at(8, 0, 0, 0)));
706 }
707
708 #[test]
709 fn test_the_first_interval_of_each_day() {
710 let c4_v: Vec<i64> = vec![
711 date(2019, 9, 22)
712 .at(0, 3, 20, 0)
713 .to_zoned(jiff::tz::TimeZone::UTC)
714 .unwrap()
715 .timestamp()
716 .as_nanosecond()
717 .try_into()
718 .unwrap(),
719 date(2019, 9, 22)
720 .at(0, 9, 11, 0)
721 .to_zoned(jiff::tz::TimeZone::UTC)
722 .unwrap()
723 .timestamp()
724 .as_nanosecond()
725 .try_into()
726 .unwrap(),
727 date(2019, 9, 22)
728 .at(0, 10, 11, 0)
729 .to_zoned(jiff::tz::TimeZone::UTC)
730 .unwrap()
731 .timestamp()
732 .as_nanosecond()
733 .try_into()
734 .unwrap(),
735 date(2019, 9, 22)
736 .at(1, 15, 11, 0)
737 .to_zoned(jiff::tz::TimeZone::UTC)
738 .unwrap()
739 .timestamp()
740 .as_nanosecond()
741 .try_into()
742 .unwrap(),
743 ];
744 let c4 = Column::try_from_slice::<Int64Type>(&c4_v).unwrap();
745 let rows = vec![0_usize, 1, 2, 3];
746 let time_interval = 3600;
747 let rst = convert_time_intervals(&c4, &rows, time_interval);
748 let converted = [
749 date(2019, 9, 22).at(0, 0, 0, 0),
750 date(2019, 9, 22).at(0, 0, 0, 0),
751 date(2019, 9, 22).at(0, 0, 0, 0),
752 date(2019, 9, 22).at(1, 0, 0, 0),
753 ];
754 for (seq, c) in converted.iter().enumerate() {
755 assert_eq!(rst.get(seq), Some(c));
756 }
757
758 let time_interval = 600;
759 let rst = convert_time_intervals(&c4, &rows, time_interval);
760 let converted = [
761 date(2019, 9, 22).at(0, 0, 0, 0),
762 date(2019, 9, 22).at(0, 0, 0, 0),
763 date(2019, 9, 22).at(0, 10, 0, 0),
764 date(2019, 9, 22).at(1, 10, 0, 0),
765 ];
766 for (seq, c) in converted.iter().enumerate() {
767 assert_eq!(rst.get(seq), Some(c));
768 }
769 }
770}