structured/
stats.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::hash::Hash;
4use std::iter::Iterator;
5use std::net::{IpAddr, Ipv4Addr};
6
7use arrow::datatypes::{Float64Type, Int64Type, UInt32Type, UInt64Type};
8use jiff::civil::DateTime;
9use num_traits::ToPrimitive;
10use serde::{Deserialize, Serialize};
11use statistical::{mean, population_standard_deviation};
12
13use crate::table::{Column, ColumnType};
14
15const MAX_TIME_INTERVAL: u32 = 86_400; // one day in seconds
16const MIN_TIME_INTERVAL: u32 = 30; // seconds
17
18/// The underlying data type of a column description.
19#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
20pub enum Element {
21    Int(i64),
22    UInt(u64),
23    Enum(String), // describe() converts UInt -> Enum using enum maps. Without maps, by to_string().
24    Float(f64),
25    FloatRange(FloatRange),
26    Text(String),
27    Binary(Vec<u8>),
28    IpAddr(IpAddr),
29    DateTime(DateTime),
30}
31
32#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Hash)]
33pub enum GroupElement {
34    Int(i64),
35    UInt(u32),
36    Enum(String),
37    Text(String),
38    IpAddr(IpAddr),
39    DateTime(DateTime),
40}
41
42#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
43pub struct FloatRange {
44    pub smallest: f64,
45    pub largest: f64,
46}
47
48impl fmt::Display for Element {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        match self {
51            Self::Int(x) => write!(f, "{x}"),
52            Self::UInt(x) => write!(f, "{x}"),
53            Self::Enum(x) | Self::Text(x) => write!(f, "{x}"),
54            Self::Binary(x) => write!(f, "{x:#?}"),
55            Self::Float(x) => write!(f, "{x}"),
56            Self::FloatRange(x) => {
57                if x.smallest == 0.0_f64 && x.largest == 0.0_f64 {
58                    write!(f, "0")
59                } else {
60                    write!(f, "{:.3}~{:.3}", x.smallest, x.largest)
61                }
62            }
63            Self::IpAddr(x) => write!(f, "{x}"),
64            Self::DateTime(x) => write!(f, "{x}"),
65        }
66    }
67}
68
69impl PartialOrd for GroupElement {
70    fn partial_cmp(&self, other: &GroupElement) -> Option<std::cmp::Ordering> {
71        match (self, other) {
72            (Self::Int(s), Self::Int(o)) => Some(s.cmp(o)),
73            (Self::UInt(s), Self::UInt(o)) => Some(s.cmp(o)),
74            (Self::Enum(s), Self::Enum(o)) | (Self::Text(s), Self::Text(o)) => Some(s.cmp(o)),
75            (Self::IpAddr(s), Self::IpAddr(o)) => Some(s.cmp(o)),
76            (Self::DateTime(s), Self::DateTime(o)) => Some(s.cmp(o)),
77            _ => None,
78        }
79    }
80}
81
82/// Statistical summary of data of the same type.
83#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
84pub struct ColumnStatistics {
85    pub description: Description,
86    pub n_largest_count: NLargestCount,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
90pub struct Description {
91    count: usize,
92    mean: Option<f64>,
93    s_deviation: Option<f64>,
94    min: Option<Element>,
95    max: Option<Element>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
99pub struct ElementCount {
100    pub value: Element,
101    pub count: usize,
102}
103
104#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
105pub struct NLargestCount {
106    number_of_elements: usize,
107    top_n: Vec<ElementCount>,
108    mode: Option<Element>,
109}
110
111#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
112pub struct GroupElementCount {
113    pub value: GroupElement,
114    pub count: usize,
115}
116
117#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
118pub struct GroupCount {
119    pub count_index: Option<usize>, // if None, count just rows. If Some, count values of the column.
120    pub series: Vec<GroupElementCount>,
121}
122
123impl fmt::Display for Description {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        writeln!(f, "Start of Description")?;
126        writeln!(f, "   count: {}", self.count)?;
127        if self.mean.is_some() {
128            writeln!(f, "   mean: {}", self.mean().unwrap())?;
129        }
130        if self.s_deviation.is_some() {
131            writeln!(f, "   s-deviation: {}", self.std_deviation().unwrap())?;
132        }
133        if self.min.is_some() {
134            writeln!(f, "   min: {}", self.min().unwrap())?;
135        }
136        if self.max.is_some() {
137            writeln!(f, "   max: {}", self.max().unwrap())?;
138        }
139        writeln!(f, "End of Description")
140    }
141}
142
143impl fmt::Display for NLargestCount {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        writeln!(f, "Start of NLargestCount")?;
146        writeln!(f, "   number of elements: {}", self.number_of_elements())?;
147        writeln!(f, "   Top N")?;
148        for elem in self.top_n() {
149            writeln!(f, "      data: {}      count: {}", elem.value, elem.count)?;
150        }
151        if self.mode.is_some() {
152            writeln!(f, "   mode: {}", self.mode().unwrap())?;
153        }
154        writeln!(f, "End of NLargestCount")
155    }
156}
157
158impl Description {
159    #[must_use]
160    pub fn new(
161        count: usize,
162        mean: Option<f64>,
163        s_deviation: Option<f64>,
164        min: Option<Element>,
165        max: Option<Element>,
166    ) -> Self {
167        Self {
168            count,
169            mean,
170            s_deviation,
171            min,
172            max,
173        }
174    }
175
176    #[must_use]
177    pub fn count(&self) -> usize {
178        self.count
179    }
180
181    #[must_use]
182    pub fn mean(&self) -> Option<f64> {
183        self.mean
184    }
185
186    #[must_use]
187    pub fn std_deviation(&self) -> Option<f64> {
188        self.s_deviation
189    }
190
191    #[must_use]
192    pub fn min(&self) -> Option<&Element> {
193        self.min.as_ref()
194    }
195
196    #[must_use]
197    pub fn max(&self) -> Option<&Element> {
198        self.max.as_ref()
199    }
200}
201
202impl NLargestCount {
203    #[must_use]
204    pub fn new(number_of_elements: usize, top_n: Vec<ElementCount>, mode: Option<Element>) -> Self {
205        Self {
206            number_of_elements,
207            top_n,
208            mode,
209        }
210    }
211
212    #[must_use]
213    pub fn number_of_elements(&self) -> usize {
214        self.number_of_elements
215    }
216
217    #[must_use]
218    pub fn top_n(&self) -> &Vec<ElementCount> {
219        &self.top_n
220    }
221
222    #[must_use]
223    pub fn mode(&self) -> Option<&Element> {
224        self.mode.as_ref()
225    }
226}
227
228macro_rules! min_max {
229    ( $iter:expr, $d:expr, $t2:expr ) => {{
230        if let Some(minmax) = find_min_max($iter) {
231            $d.min = Some($t2(minmax.min));
232            $d.max = Some($t2(minmax.max));
233        } else {
234            $d.min = None;
235            $d.max = None;
236        }
237    }};
238}
239
240macro_rules! mean_deviation {
241    ( $vf:expr, $t1:ty, $d:expr ) => {
242        let m = mean(&$vf);
243        $d.mean = Some(m);
244        $d.s_deviation = Some(population_standard_deviation(&$vf, Some(m)));
245    };
246}
247
248macro_rules! top_n {
249    ( $iter:expr, $len:expr, $d:expr, $t1:ty, $t2:expr, $num_of_top_n:expr ) => {
250        let top_n_native: Vec<($t1, usize)> = count_sort($iter);
251        $d.number_of_elements = top_n_native.len();
252        let mut top_n: Vec<ElementCount> = Vec::new();
253        let num_of_top_n = $num_of_top_n.to_usize().expect("safe: u32 -> usize");
254        let top_n_num = if num_of_top_n > top_n_native.len() {
255            top_n_native.len()
256        } else {
257            num_of_top_n
258        };
259        for (x, y) in &top_n_native[0..top_n_num] {
260            top_n.push(ElementCount {
261                value: $t2((*x).to_owned()),
262                count: *y,
263            });
264        }
265        $d.mode = top_n.first().map(|v| v.value.clone());
266        $d.top_n = top_n;
267    };
268}
269
270#[must_use]
271pub(crate) fn describe(column: &Column, rows: &[usize], column_type: ColumnType) -> Description {
272    let mut description = Description {
273        count: rows.len(),
274        ..Description::default()
275    };
276
277    match column_type {
278        ColumnType::Int64 => {
279            let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
280            min_max!(iter, description, Element::Int);
281            let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
282            #[allow(clippy::cast_precision_loss)] // 52-bit precision is good enough
283            let f_values: Vec<f64> = iter.map(|v: i64| v as f64).collect();
284            mean_deviation!(f_values, i64, description);
285        }
286        ColumnType::Float64 => {
287            let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
288            min_max!(iter, description, Element::Float);
289            let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
290            let values = iter.collect::<Vec<_>>();
291            mean_deviation!(values, f64, description);
292        }
293        _ => (),
294    }
295
296    description
297}
298
299#[must_use]
300pub(crate) fn n_largest_count(
301    column: &Column,
302    rows: &[usize],
303    column_type: ColumnType,
304    number_of_top_n: u32,
305) -> NLargestCount {
306    let mut n_largest_count = NLargestCount::default();
307
308    match column_type {
309        ColumnType::Int64 => {
310            let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
311            top_n!(
312                iter,
313                rows.len(),
314                n_largest_count,
315                i64,
316                Element::Int,
317                number_of_top_n
318            );
319        }
320        ColumnType::Enum => {
321            let iter = column.primitive_iter::<UInt64Type>(rows).unwrap();
322            top_n!(
323                iter,
324                rows.len(),
325                n_largest_count,
326                u64,
327                Element::UInt,
328                number_of_top_n
329            );
330        }
331        ColumnType::Utf8 => {
332            let iter = column.string_iter(rows).unwrap();
333            top_n!(
334                iter,
335                rows.len(),
336                n_largest_count,
337                &str,
338                Element::Text,
339                number_of_top_n
340            );
341        }
342        ColumnType::Binary => {
343            let iter = column.binary_iter(rows).unwrap();
344            top_n!(
345                iter,
346                rows.len(),
347                n_largest_count,
348                &[u8],
349                Element::Binary,
350                number_of_top_n
351            );
352        }
353        ColumnType::IpAddr => {
354            let values = column
355                .primitive_iter::<UInt32Type>(rows)
356                .unwrap()
357                .map(|v| IpAddr::from(Ipv4Addr::from(v)))
358                .collect::<Vec<_>>();
359            top_n!(
360                values.iter(),
361                rows.len(),
362                n_largest_count,
363                &IpAddr,
364                Element::IpAddr,
365                number_of_top_n
366            );
367        }
368        ColumnType::DateTime | ColumnType::Float64 => unreachable!(), // by implementation
369    }
370
371    n_largest_count
372}
373
374#[must_use]
375#[allow(clippy::too_many_lines)]
376pub(crate) fn n_largest_count_enum(
377    column: &Column,
378    rows: &[usize],
379    reverse_map: &HashMap<u64, Vec<String>>,
380    number_of_top_n: u32,
381) -> NLargestCount {
382    let n_largest_count = n_largest_count(column, rows, ColumnType::Enum, number_of_top_n);
383
384    let (top_n, mode) = {
385        if reverse_map.is_empty() {
386            (
387                n_largest_count
388                    .top_n()
389                    .iter()
390                    .map(|elem| {
391                        if let Element::UInt(value) = elem.value {
392                            ElementCount {
393                                value: Element::Enum(value.to_string()),
394                                count: elem.count,
395                            }
396                        } else {
397                            ElementCount {
398                                value: Element::Enum("_N/A_".to_string()),
399                                count: elem.count,
400                            }
401                        }
402                    })
403                    .collect(),
404                match n_largest_count.mode() {
405                    Some(mode) => {
406                        if let Element::UInt(value) = mode {
407                            Some(Element::Enum(value.to_string()))
408                        } else {
409                            None
410                        }
411                    }
412                    None => None,
413                },
414            )
415        } else {
416            (
417                n_largest_count
418                    .top_n()
419                    .iter()
420                    .map(|elem| {
421                        if let Element::UInt(value) = elem.value {
422                            ElementCount {
423                                value: Element::Enum(reverse_map.get(&value).map_or(
424                                    "_NO_MAP_".to_string(),
425                                    |v| {
426                                        let mut s = String::new();
427                                        for (i, e) in v.iter().enumerate() {
428                                            s.push_str(e);
429                                            if i < v.len() - 1 {
430                                                s.push('|');
431                                            }
432                                        }
433                                        s
434                                    },
435                                )),
436                                count: elem.count,
437                            }
438                        } else {
439                            ElementCount {
440                                value: Element::Enum("_N/A_".to_string()),
441                                count: elem.count,
442                            }
443                        }
444                    })
445                    .collect(),
446                match n_largest_count.mode() {
447                    Some(mode) => {
448                        if let Element::UInt(value) = mode {
449                            Some(Element::Enum(reverse_map.get(value).map_or(
450                                "_NO_MAP_".to_string(),
451                                |v| {
452                                    let mut s = String::new();
453                                    for (i, e) in v.iter().enumerate() {
454                                        s.push_str(e);
455                                        if i < v.len() - 1 {
456                                            s.push('|');
457                                        }
458                                    }
459                                    s
460                                },
461                            )))
462                        } else {
463                            None
464                        }
465                    }
466                    None => None,
467                },
468            )
469        }
470    };
471
472    NLargestCount {
473        number_of_elements: n_largest_count.number_of_elements(),
474        top_n,
475        mode,
476    }
477}
478
479#[must_use]
480pub(crate) fn n_largest_count_float64(
481    column: &Column,
482    rows: &[usize],
483    number_of_top_n: u32,
484    precision: i32,
485) -> NLargestCount {
486    let mut n_largest_count = NLargestCount::default();
487
488    let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
489    let (rc, rt) = top_n_f64(iter, 10.0_f64.powi(precision), number_of_top_n);
490    n_largest_count.number_of_elements = rc;
491    n_largest_count.mode = Some(rt[0].value.clone());
492    n_largest_count.top_n = rt;
493
494    n_largest_count
495}
496
497#[must_use]
498pub(crate) fn n_largest_count_datetime(
499    column: &Column,
500    rows: &[usize],
501    time_interval: u32,
502    number_of_top_n: u32,
503) -> NLargestCount {
504    let mut n_largest_count = NLargestCount::default();
505    let values = convert_time_intervals(column, rows, time_interval);
506
507    top_n!(
508        values.iter(),
509        rows.len(),
510        n_largest_count,
511        &DateTime,
512        Element::DateTime,
513        number_of_top_n
514    );
515
516    n_largest_count
517}
518
519/// # Panics
520///
521/// If `rows` contains an invalid timestamp in nanoseconds.
522#[must_use]
523pub(crate) fn convert_time_intervals(
524    column: &Column,
525    rows: &[usize],
526    time_interval: u32,
527) -> Vec<DateTime> {
528    const A_BILLION: i64 = 1_000_000_000;
529    let time_interval = if time_interval > MAX_TIME_INTERVAL {
530        MAX_TIME_INTERVAL
531    // Users want to see time series of the same order intervals within MAX_TIME_INTERVAL which is in date units.
532    // If the interval is larger than a day, it should be in date units.
533    } else {
534        time_interval
535    };
536    let time_interval = if time_interval < MIN_TIME_INTERVAL {
537        MIN_TIME_INTERVAL
538    } else {
539        time_interval
540    };
541    let time_interval = i64::from(time_interval);
542
543    column
544        .primitive_iter::<Int64Type>(rows)
545        .unwrap()
546        .map(|v| {
547            // The first interval of each day should start with 00:00:00.
548            let mut interval_idx = v / A_BILLION;
549            interval_idx = (interval_idx / time_interval) * time_interval;
550            jiff::Timestamp::from_second(interval_idx)
551                .unwrap_or_default()
552                .to_zoned(jiff::tz::TimeZone::UTC)
553                .datetime()
554        })
555        .collect::<Vec<_>>()
556}
557
558fn count_sort<I>(iter: I) -> Vec<(I::Item, usize)>
559where
560    I: Iterator,
561    I::Item: Clone + Eq + Hash,
562{
563    let mut count: HashMap<I::Item, usize> = HashMap::new();
564    for v in iter {
565        let c = count.entry(v).or_insert(0);
566        *c += 1;
567    }
568    let mut top_n: Vec<(I::Item, usize)> = Vec::new();
569    for (k, v) in &count {
570        top_n.push(((*k).clone(), *v));
571    }
572    top_n.sort_unstable_by(|a, b| b.1.cmp(&a.1));
573    top_n
574}
575
576fn top_n_f64<I>(iter: I, precision: f64, number_of_top_n: u32) -> (usize, Vec<ElementCount>)
577where
578    I: Iterator<Item = f64>,
579{
580    use ordered_float::OrderedFloat;
581
582    let mut freqs: Vec<(_, usize)> = iter
583        .map(|v| OrderedFloat((v * precision).round() / precision))
584        .fold(HashMap::new(), |mut freqs, v| {
585            let e = freqs.entry(v).or_default();
586            *e += 1;
587            freqs
588        })
589        .into_iter()
590        .collect();
591
592    freqs.sort_unstable_by(|a, b| b.1.cmp(&a.1));
593
594    (
595        freqs.len(),
596        freqs
597            .into_iter()
598            .take(number_of_top_n.to_usize().expect("safe: u32 -> usize"))
599            .map(|(v, count)| ElementCount {
600                value: Element::Float(v.into_inner()),
601                count,
602            })
603            .collect(),
604    )
605}
606
607struct MinMax<T> {
608    min: T,
609    max: T,
610}
611
612/// Returns the minimum and maximum values.
613fn find_min_max<I>(mut iter: I) -> Option<MinMax<I::Item>>
614where
615    I: Iterator,
616    I::Item: Copy + PartialOrd,
617{
618    let mut min = iter.next()?;
619    let mut max = min;
620
621    for v in iter {
622        if min > v {
623            min = v;
624        } else if max < v {
625            max = v;
626        }
627    }
628    Some(MinMax { min, max })
629}
630
631#[cfg(test)]
632mod tests {
633    use arrow::datatypes::Int64Type;
634    use jiff::civil::date;
635
636    use super::*;
637    use crate::Column;
638
639    #[test]
640    fn test_convert_time_intervals() {
641        let c4_v: Vec<i64> = vec![
642            date(2019, 9, 22)
643                .at(6, 10, 11, 0)
644                .to_zoned(jiff::tz::TimeZone::UTC)
645                .unwrap()
646                .timestamp()
647                .as_nanosecond()
648                .try_into()
649                .unwrap(),
650            date(2019, 9, 22)
651                .at(6, 15, 11, 0)
652                .to_zoned(jiff::tz::TimeZone::UTC)
653                .unwrap()
654                .timestamp()
655                .as_nanosecond()
656                .try_into()
657                .unwrap(),
658            date(2019, 9, 21)
659                .at(20, 10, 11, 0)
660                .to_zoned(jiff::tz::TimeZone::UTC)
661                .unwrap()
662                .timestamp()
663                .as_nanosecond()
664                .try_into()
665                .unwrap(),
666            date(2019, 9, 21)
667                .at(20, 10, 11, 0)
668                .to_zoned(jiff::tz::TimeZone::UTC)
669                .unwrap()
670                .timestamp()
671                .as_nanosecond()
672                .try_into()
673                .unwrap(),
674            date(2019, 9, 22)
675                .at(6, 45, 11, 0)
676                .to_zoned(jiff::tz::TimeZone::UTC)
677                .unwrap()
678                .timestamp()
679                .as_nanosecond()
680                .try_into()
681                .unwrap(),
682            date(2019, 9, 21)
683                .at(8, 10, 11, 0)
684                .to_zoned(jiff::tz::TimeZone::UTC)
685                .unwrap()
686                .timestamp()
687                .as_nanosecond()
688                .try_into()
689                .unwrap(),
690            date(2019, 9, 22)
691                .at(9, 10, 11, 0)
692                .to_zoned(jiff::tz::TimeZone::UTC)
693                .unwrap()
694                .timestamp()
695                .as_nanosecond()
696                .try_into()
697                .unwrap(),
698        ];
699        let c4 = Column::try_from_slice::<Int64Type>(&c4_v).unwrap();
700        let rows = vec![0_usize, 3, 1, 4, 2, 6, 5];
701        let time_interval = 3600;
702        let rst = convert_time_intervals(&c4, &rows, time_interval);
703        assert_eq!(rst.len(), 7);
704        assert_eq!(rst.first(), Some(&date(2019, 9, 22).at(6, 0, 0, 0)));
705        assert_eq!(rst.last(), Some(&date(2019, 9, 21).at(8, 0, 0, 0)));
706    }
707
708    #[test]
709    fn test_the_first_interval_of_each_day() {
710        let c4_v: Vec<i64> = vec![
711            date(2019, 9, 22)
712                .at(0, 3, 20, 0)
713                .to_zoned(jiff::tz::TimeZone::UTC)
714                .unwrap()
715                .timestamp()
716                .as_nanosecond()
717                .try_into()
718                .unwrap(),
719            date(2019, 9, 22)
720                .at(0, 9, 11, 0)
721                .to_zoned(jiff::tz::TimeZone::UTC)
722                .unwrap()
723                .timestamp()
724                .as_nanosecond()
725                .try_into()
726                .unwrap(),
727            date(2019, 9, 22)
728                .at(0, 10, 11, 0)
729                .to_zoned(jiff::tz::TimeZone::UTC)
730                .unwrap()
731                .timestamp()
732                .as_nanosecond()
733                .try_into()
734                .unwrap(),
735            date(2019, 9, 22)
736                .at(1, 15, 11, 0)
737                .to_zoned(jiff::tz::TimeZone::UTC)
738                .unwrap()
739                .timestamp()
740                .as_nanosecond()
741                .try_into()
742                .unwrap(),
743        ];
744        let c4 = Column::try_from_slice::<Int64Type>(&c4_v).unwrap();
745        let rows = vec![0_usize, 1, 2, 3];
746        let time_interval = 3600;
747        let rst = convert_time_intervals(&c4, &rows, time_interval);
748        let converted = [
749            date(2019, 9, 22).at(0, 0, 0, 0),
750            date(2019, 9, 22).at(0, 0, 0, 0),
751            date(2019, 9, 22).at(0, 0, 0, 0),
752            date(2019, 9, 22).at(1, 0, 0, 0),
753        ];
754        for (seq, c) in converted.iter().enumerate() {
755            assert_eq!(rst.get(seq), Some(c));
756        }
757
758        let time_interval = 600;
759        let rst = convert_time_intervals(&c4, &rows, time_interval);
760        let converted = [
761            date(2019, 9, 22).at(0, 0, 0, 0),
762            date(2019, 9, 22).at(0, 0, 0, 0),
763            date(2019, 9, 22).at(0, 10, 0, 0),
764            date(2019, 9, 22).at(1, 10, 0, 0),
765        ];
766        for (seq, c) in converted.iter().enumerate() {
767            assert_eq!(rst.get(seq), Some(c));
768        }
769    }
770}