use arrow::datatypes::{Float64Type, Int64Type, UInt32Type, UInt64Type};
use chrono::NaiveDateTime;
use num_traits::ToPrimitive;
use serde::{Deserialize, Serialize};
use statistical::{mean, population_standard_deviation};
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::iter::Iterator;
use std::net::{IpAddr, Ipv4Addr};
use std::vec;
use crate::table::{Column, ColumnType};
const NUM_OF_FLOAT_INTERVALS: usize = 100;
const MAX_TIME_INTERVAL: u32 = 86_400; const MIN_TIME_INTERVAL: u32 = 30;
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum Element {
Int(i64),
UInt(u64),
Enum(String), Float(f64),
FloatRange(FloatRange),
Text(String),
Binary(Vec<u8>),
IpAddr(IpAddr),
DateTime(NaiveDateTime),
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Hash)]
pub enum GroupElement {
Int(i64),
UInt(u32),
Enum(String),
Text(String),
IpAddr(IpAddr),
DateTime(NaiveDateTime),
}
#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
pub struct FloatRange {
pub smallest: f64,
pub largest: f64,
}
impl fmt::Display for Element {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Int(x) => write!(f, "{}", x),
Self::UInt(x) => write!(f, "{}", x),
Self::Enum(x) | Self::Text(x) => write!(f, "{}", x),
Self::Binary(x) => write!(f, "{:#?}", x),
Self::Float(x) => write!(f, "{}", x),
Self::FloatRange(x) => {
if x.smallest == 0.0_f64 && x.largest == 0.0_f64 {
write!(f, "0")
} else {
write!(f, "{:.3}~{:.3}", x.smallest, x.largest)
}
}
Self::IpAddr(x) => write!(f, "{}", x),
Self::DateTime(x) => write!(f, "{}", x),
}
}
}
impl PartialOrd for GroupElement {
fn partial_cmp(&self, other: &GroupElement) -> Option<std::cmp::Ordering> {
match (self, other) {
(Self::Int(s), Self::Int(o)) => Some(s.cmp(o)),
(Self::UInt(s), Self::UInt(o)) => Some(s.cmp(o)),
(Self::Enum(s), Self::Enum(o)) | (Self::Text(s), Self::Text(o)) => Some(s.cmp(o)),
(Self::IpAddr(s), Self::IpAddr(o)) => Some(s.cmp(o)),
(Self::DateTime(s), Self::DateTime(o)) => Some(s.cmp(o)),
_ => None,
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
pub struct ColumnStatistics {
pub description: Description,
pub n_largest_count: NLargestCount,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
pub struct Description {
count: usize,
mean: Option<f64>,
s_deviation: Option<f64>,
min: Option<Element>,
max: Option<Element>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ElementCount {
pub value: Element,
pub count: usize,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
pub struct NLargestCount {
number_of_elements: usize,
top_n: Vec<ElementCount>,
mode: Option<Element>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct GroupElementCount {
pub value: GroupElement,
pub count: usize,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct GroupCount {
pub count_index: Option<usize>, pub series: Vec<GroupElementCount>,
}
impl fmt::Display for Description {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Start of Description")?;
writeln!(f, " count: {}", self.count)?;
if self.mean.is_some() {
writeln!(f, " mean: {}", self.mean().unwrap())?;
}
if self.s_deviation.is_some() {
writeln!(f, " s-deviation: {}", self.std_deviation().unwrap())?;
}
if self.min.is_some() {
writeln!(f, " min: {}", self.min().unwrap())?;
}
if self.max.is_some() {
writeln!(f, " max: {}", self.max().unwrap())?;
}
writeln!(f, "End of Description")
}
}
impl fmt::Display for NLargestCount {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Start of NLargestCount")?;
writeln!(f, " number of elements: {}", self.number_of_elements())?;
writeln!(f, " Top N")?;
for elem in self.top_n() {
writeln!(f, " data: {} count: {}", elem.value, elem.count)?;
}
if self.mode.is_some() {
writeln!(f, " mode: {}", self.mode().unwrap())?;
}
writeln!(f, "End of NLargestCount")
}
}
impl Description {
#[must_use]
pub fn new(
count: usize,
mean: Option<f64>,
s_deviation: Option<f64>,
min: Option<Element>,
max: Option<Element>,
) -> Self {
Self {
count,
mean,
s_deviation,
min,
max,
}
}
#[must_use]
pub fn count(&self) -> usize {
self.count
}
#[must_use]
pub fn mean(&self) -> Option<f64> {
self.mean
}
#[must_use]
pub fn std_deviation(&self) -> Option<f64> {
self.s_deviation
}
#[must_use]
pub fn min(&self) -> Option<&Element> {
self.min.as_ref()
}
#[must_use]
pub fn max(&self) -> Option<&Element> {
self.max.as_ref()
}
}
impl NLargestCount {
#[must_use]
pub fn new(number_of_elements: usize, top_n: Vec<ElementCount>, mode: Option<Element>) -> Self {
Self {
number_of_elements,
top_n,
mode,
}
}
#[must_use]
pub fn number_of_elements(&self) -> usize {
self.number_of_elements
}
#[must_use]
pub fn top_n(&self) -> &Vec<ElementCount> {
&self.top_n
}
#[must_use]
pub fn mode(&self) -> Option<&Element> {
self.mode.as_ref()
}
}
macro_rules! min_max {
( $iter:expr, $d:expr, $t2:expr ) => {{
if let Some(minmax) = find_min_max($iter) {
$d.min = Some($t2(minmax.min));
$d.max = Some($t2(minmax.max));
} else {
$d.min = None;
$d.max = None;
}
}};
}
macro_rules! mean_deviation {
( $vf:expr, $t1:ty, $d:expr ) => {
let m = mean(&$vf);
$d.mean = Some(m);
$d.s_deviation = Some(population_standard_deviation(&$vf, Some(m)));
};
}
macro_rules! top_n {
( $iter:expr, $len:expr, $d:expr, $t1:ty, $t2:expr, $num_of_top_n:expr ) => {
let top_n_native: Vec<($t1, usize)> = count_sort($iter);
$d.number_of_elements = top_n_native.len();
let mut top_n: Vec<ElementCount> = Vec::new();
let num_of_top_n = $num_of_top_n.to_usize().expect("safe: u32 -> usize");
let top_n_num = if num_of_top_n > top_n_native.len() {
top_n_native.len()
} else {
num_of_top_n
};
for (x, y) in &top_n_native[0..top_n_num] {
top_n.push(ElementCount {
value: $t2((*x).to_owned()),
count: *y,
});
}
$d.mode = top_n.first().map(|v| v.value.clone());
$d.top_n = top_n;
};
}
#[must_use]
pub(crate) fn describe(column: &Column, rows: &[usize], column_type: ColumnType) -> Description {
let mut description = Description {
count: rows.len(),
..Description::default()
};
match column_type {
ColumnType::Int64 => {
let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
min_max!(iter, description, Element::Int);
let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
#[allow(clippy::cast_precision_loss)] let f_values: Vec<f64> = iter.map(|v: i64| v as f64).collect();
mean_deviation!(f_values, i64, description);
}
ColumnType::Float64 => {
let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
min_max!(iter, description, Element::Float);
let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
let values = iter.collect::<Vec<_>>();
mean_deviation!(values, f64, description);
}
_ => (),
}
description
}
#[must_use]
pub(crate) fn n_largest_count(
column: &Column,
rows: &[usize],
column_type: ColumnType,
number_of_top_n: u32,
) -> NLargestCount {
let mut n_largest_count = NLargestCount::default();
match column_type {
ColumnType::Int64 => {
let iter = column.primitive_iter::<Int64Type>(rows).unwrap();
top_n!(
iter,
rows.len(),
n_largest_count,
i64,
Element::Int,
number_of_top_n
);
}
ColumnType::Enum => {
let iter = column.primitive_iter::<UInt64Type>(rows).unwrap();
top_n!(
iter,
rows.len(),
n_largest_count,
u64,
Element::UInt,
number_of_top_n
);
}
ColumnType::Utf8 => {
let iter = column.string_iter(rows).unwrap();
top_n!(
iter,
rows.len(),
n_largest_count,
&str,
Element::Text,
number_of_top_n
);
}
ColumnType::Binary => {
let iter = column.binary_iter(rows).unwrap();
top_n!(
iter,
rows.len(),
n_largest_count,
&[u8],
Element::Binary,
number_of_top_n
);
}
ColumnType::IpAddr => {
let values = column
.primitive_iter::<UInt32Type>(rows)
.unwrap()
.map(|v| IpAddr::from(Ipv4Addr::from(v)))
.collect::<Vec<_>>();
top_n!(
values.iter(),
rows.len(),
n_largest_count,
&IpAddr,
Element::IpAddr,
number_of_top_n
);
}
ColumnType::DateTime | ColumnType::Float64 => unreachable!(), }
n_largest_count
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub(crate) fn n_largest_count_enum(
column: &Column,
rows: &[usize],
reverse_map: &HashMap<u64, Vec<String>>,
number_of_top_n: u32,
) -> NLargestCount {
let n_largest_count = n_largest_count(column, rows, ColumnType::Enum, number_of_top_n);
let (top_n, mode) = {
if reverse_map.is_empty() {
(
n_largest_count
.top_n()
.iter()
.map(|elem| {
if let Element::UInt(value) = elem.value {
ElementCount {
value: Element::Enum(value.to_string()),
count: elem.count,
}
} else {
ElementCount {
value: Element::Enum("_N/A_".to_string()),
count: elem.count,
}
}
})
.collect(),
match n_largest_count.mode() {
Some(mode) => {
if let Element::UInt(value) = mode {
Some(Element::Enum(value.to_string()))
} else {
None
}
}
None => None,
},
)
} else {
(
n_largest_count
.top_n()
.iter()
.map(|elem| {
if let Element::UInt(value) = elem.value {
ElementCount {
value: Element::Enum(reverse_map.get(&value).map_or(
"_NO_MAP_".to_string(),
|v| {
let mut s = String::new();
for (i, e) in v.iter().enumerate() {
s.push_str(e);
if i < v.len() - 1 {
s.push('|');
}
}
s
},
)),
count: elem.count,
}
} else {
ElementCount {
value: Element::Enum("_N/A_".to_string()),
count: elem.count,
}
}
})
.collect(),
match n_largest_count.mode() {
Some(mode) => {
if let Element::UInt(value) = mode {
Some(Element::Enum(reverse_map.get(value).map_or(
"_NO_MAP_".to_string(),
|v| {
let mut s = String::new();
for (i, e) in v.iter().enumerate() {
s.push_str(e);
if i < v.len() - 1 {
s.push('|');
}
}
s
},
)))
} else {
None
}
}
None => None,
},
)
}
};
NLargestCount {
number_of_elements: n_largest_count.number_of_elements(),
top_n,
mode,
}
}
#[must_use]
pub(crate) fn n_largest_count_float64(
column: &Column,
rows: &[usize],
number_of_top_n: u32,
min: f64,
max: f64,
) -> NLargestCount {
let mut n_largest_count = NLargestCount::default();
let iter = column.primitive_iter::<Float64Type>(rows).unwrap();
let (rc, rt) = top_n_f64(iter, min, max, number_of_top_n);
n_largest_count.number_of_elements = rc;
n_largest_count.mode = Some(rt[0].value.clone());
n_largest_count.top_n = rt;
n_largest_count
}
#[must_use]
pub(crate) fn n_largest_count_datetime(
column: &Column,
rows: &[usize],
time_interval: u32,
number_of_top_n: u32,
) -> NLargestCount {
let mut n_largest_count = NLargestCount::default();
let values = convert_time_intervals(column, rows, time_interval);
top_n!(
values.iter(),
rows.len(),
n_largest_count,
&NaiveDateTime,
Element::DateTime,
number_of_top_n
);
n_largest_count
}
#[must_use]
pub(crate) fn convert_time_intervals(
column: &Column,
rows: &[usize],
time_interval: u32,
) -> Vec<NaiveDateTime> {
let time_interval = if time_interval > MAX_TIME_INTERVAL {
MAX_TIME_INTERVAL
} else {
time_interval
};
let time_interval = if time_interval < MIN_TIME_INTERVAL {
MIN_TIME_INTERVAL
} else {
time_interval
};
let time_interval = i64::from(time_interval);
column
.primitive_iter::<Int64Type>(rows)
.unwrap()
.map(|v| {
let mut ts = v / (24 * 60 * 60) * (24 * 60 * 60);
ts += (v - ts) / time_interval * time_interval;
NaiveDateTime::from_timestamp(ts, 0)
})
.collect::<Vec<_>>()
}
fn count_sort<I>(iter: I) -> Vec<(I::Item, usize)>
where
I: Iterator,
I::Item: Clone + Eq + Hash,
{
let mut count: HashMap<I::Item, usize> = HashMap::new();
for v in iter {
let c = count.entry(v).or_insert(0);
*c += 1;
}
let mut top_n: Vec<(I::Item, usize)> = Vec::new();
for (k, v) in &count {
top_n.push(((*k).clone(), *v));
}
top_n.sort_unstable_by(|a, b| b.1.cmp(&a.1));
top_n
}
fn top_n_f64<I>(iter: I, min: f64, max: f64, number_of_top_n: u32) -> (usize, Vec<ElementCount>)
where
I: Iterator<Item = f64>,
{
let interval: f64 = (max - min) / NUM_OF_FLOAT_INTERVALS.to_f64().expect("<= 100");
let mut count: Vec<(usize, usize)> = vec![(0, 0); NUM_OF_FLOAT_INTERVALS];
for (i, item) in count.iter_mut().enumerate().take(NUM_OF_FLOAT_INTERVALS) {
item.0 = i;
}
for v in iter {
let mut slot = if interval == 0.0_f64 {
0_usize
} else {
((v - min) / interval).floor().to_usize().expect("< 100")
};
if slot == NUM_OF_FLOAT_INTERVALS {
slot = NUM_OF_FLOAT_INTERVALS - 1;
}
count[slot].1 += 1;
}
count.retain(|&c| c.1 > 0);
count.sort_unstable_by(|a, b| b.1.cmp(&a.1));
let mut top_n: Vec<ElementCount> = Vec::new();
let number_of_top_n = number_of_top_n.to_usize().expect("safe: u32 -> usize");
let num_top_n = if number_of_top_n > count.len() {
count.len()
} else {
number_of_top_n
};
for item in count.iter().take(num_top_n) {
if item.1 == 0 {
break;
}
top_n.push(ElementCount {
value: Element::FloatRange(FloatRange {
smallest: min + (item.0).to_f64().expect("< 30") * interval,
largest: min + (item.0 + 1).to_f64().expect("<= 30") * interval,
}),
count: item.1,
});
}
(count.len(), top_n)
}
struct MinMax<T> {
min: T,
max: T,
}
fn find_min_max<I>(mut iter: I) -> Option<MinMax<I::Item>>
where
I: Iterator,
I::Item: Copy + PartialOrd,
{
let mut min = iter.next()?;
let mut max = min;
for v in iter {
if min > v {
min = v;
} else if max < v {
max = v;
}
}
Some(MinMax { min, max })
}