use std::cmp;
use std::ops::AddAssign;
pub fn find_insert_pos<T>(vs: &[T], v: &T) -> usize
where
T: PartialOrd,
{
if vs.len() <= 10 {
return find_insert_pos_linear(vs, v);
}
let middle = vs.len() / 2;
let pivot = &vs[middle];
if v < pivot {
find_insert_pos(&vs[0..middle], v)
} else {
middle + find_insert_pos(&vs[middle..], v)
}
}
pub fn find_insert_pos_linear<T>(vs: &[T], v: &T) -> usize
where
T: PartialOrd,
{
for (i, vi) in vs.iter().enumerate() {
if v < vi {
return i;
}
}
vs.len()
}
#[cfg_attr(feature = "with_serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Eq, Debug)]
pub struct Tuple<T>
where
T: PartialOrd,
{
pub v: T,
pub g: usize,
pub delta: usize,
}
impl<T> Tuple<T>
where
T: PartialOrd,
{
pub fn new(v: T, g: usize, delta: usize) -> Tuple<T> {
Tuple { v, g, delta }
}
}
impl<T> PartialEq for Tuple<T>
where
T: PartialOrd,
{
fn eq(&self, other: &Self) -> bool {
self.v == other.v
}
}
impl<T> PartialOrd for Tuple<T>
where
T: PartialOrd,
{
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
self.v.partial_cmp(&other.v)
}
}
#[cfg_attr(feature = "with_serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Debug, Clone)]
pub struct Stream<T>
where
T: PartialOrd,
{
summary: Vec<Tuple<T>>,
epsilon: f64,
n: usize,
}
impl<T> Stream<T>
where
T: PartialOrd,
{
pub fn new(epsilon: f64) -> Stream<T> {
Stream {
summary: vec![],
epsilon,
n: 0,
}
}
pub fn insert(&mut self, v: T) {
let mut t = Tuple::new(v, 1, 0);
let pos = find_insert_pos(&self.summary, &t);
if pos != 0 && pos != self.summary.len() {
t.delta = (2f64 * self.epsilon * (self.n as f64).floor()) as usize;
}
self.summary.insert(pos, t);
self.n += 1;
if self.should_compress() {
self.compress();
}
}
pub fn quantile(&self, phi: f64) -> &T {
assert!(!self.summary.is_empty());
assert!(phi >= 0f64 && phi <= 1f64);
let r = (phi * self.n as f64).floor() as usize;
let en = (self.epsilon * self.n as f64) as usize;
let first = &self.summary[0];
let mut prev = &first.v;
let mut prev_rmin = first.g;
for t in self.summary.iter().skip(1) {
let rmax = prev_rmin + t.g + t.delta;
if rmax > r + en {
return prev;
}
prev_rmin += t.g;
prev = &t.v;
}
prev
}
fn should_compress(&self) -> bool {
let period = (1f64 / (2f64 * self.epsilon)).floor() as usize;
self.n % period == 0
}
fn compress(&mut self) {
let s = self.s();
for i in (1..(s - 1)).rev() {
if self.can_delete(i) {
self.delete(i);
}
}
}
fn can_delete(&self, i: usize) -> bool {
assert!(self.summary.len() >= 2);
assert!(i < self.summary.len() - 1);
let t = &self.summary[i];
let tnext = &self.summary[i + 1];
let p = self.p();
let safety_property = t.g + tnext.g + tnext.delta < p;
let optimal = Self::band(t.delta, p) <= Self::band(tnext.delta, p);
safety_property && optimal
}
fn delete(&mut self, i: usize) {
assert!(self.summary.len() >= 2);
assert!(i < self.summary.len() - 1);
let t = self.summary.remove(i);
let tnext = &mut self.summary[i];
tnext.g += t.g;
}
fn band(delta: usize, p: usize) -> usize {
assert!(p >= delta);
let diff = p - delta + 1;
(diff as f64).log(2f64).floor() as usize
}
pub fn p(&self) -> usize {
(2f64 * self.epsilon * (self.n as f64)).floor() as usize
}
pub fn n(&self) -> usize {
self.n
}
pub fn s(&self) -> usize {
self.summary.len()
}
}
impl<T: PartialOrd> AddAssign for Stream<T> {
fn add_assign(&mut self, rhs: Self) {
let mut merged_summary = Vec::with_capacity(self.summary.len() + rhs.summary.len());
let merged_epsilon = self.epsilon.max(rhs.epsilon);
let merged_n = self.n + rhs.n;
let additional_self_delta = (2. * rhs.epsilon * rhs.n as f64).floor() as usize;
let additional_rhs_delta = (2. * self.epsilon * self.n as f64).floor() as usize;
let mut self_samples = std::mem::replace(&mut self.summary, Vec::new())
.into_iter()
.peekable();
let mut rhs_samples = rhs.summary.into_iter().peekable();
let mut started_self = false;
let mut started_rhs = false;
while let (Some(self_sample), Some(rhs_sample)) = (self_samples.peek(), rhs_samples.peek())
{
let (next_sample, additional_delta) = if self_sample.v < rhs_sample.v {
started_self = true;
(
self_samples.next().unwrap(),
if started_rhs {
additional_self_delta
} else {
0
},
)
} else {
started_rhs = true;
(
rhs_samples.next().unwrap(),
if started_self {
additional_rhs_delta
} else {
0
},
)
};
let next_sample = Tuple {
v: next_sample.v,
g: next_sample.g,
delta: next_sample.delta + additional_delta,
};
merged_summary.push(next_sample);
}
merged_summary.extend(self_samples);
merged_summary.extend(rhs_samples);
self.summary = merged_summary;
self.epsilon = merged_epsilon;
self.n = merged_n;
self.compress();
}
}
#[cfg(test)]
mod test {
use super::*;
use std::ops::Range;
#[test]
fn test_find_insert_pos() {
let mut vs = vec![];
for v in 0..10 {
vs.push(v);
}
for v in 0..10 {
assert_eq!(find_insert_pos_linear(&vs, &v), v + 1);
}
}
fn get_quantile_for_range(r: &Range<u32>, phi: f64) -> u32 {
(phi * ((r.end - 1) - r.start) as f64).floor() as u32 + r.start
}
fn get_quantile_bounds_for_range(r: Range<u32>, phi: f64, epsilon: f64) -> (u32, u32) {
let lower = get_quantile_for_range(&r, (phi - epsilon).max(0f64));
let upper = get_quantile_for_range(&r, phi + epsilon);
(lower, upper)
}
fn quantile_in_bounds(r: Range<u32>, s: &Stream<u32>, phi: f64, epsilon: f64) -> bool {
let approx_quantile = *s.quantile(phi);
let (lower, upper) = get_quantile_bounds_for_range(r, phi, epsilon);
approx_quantile >= lower && approx_quantile <= upper
}
#[test]
fn test_basics() {
let epsilon = 0.01;
let mut stream = Stream::new(epsilon);
for i in 1..1001 {
stream.insert(i);
}
for phi in 0..100 {
assert!(quantile_in_bounds(
1..1001,
&stream,
(phi as f64) / 100f64,
epsilon
));
}
}
#[test]
fn test_add_assign() {
let epsilon = 0.01;
let mut stream = Stream::new(epsilon);
let mut stream2 = Stream::new(epsilon);
for i in 0..1000 {
stream.insert(2 * i);
stream2.insert(2 * i + 1);
}
for phi in 0..100 {
assert!(quantile_in_bounds(
0..2000,
&stream,
(phi as f64) / 100f64,
epsilon
));
}
}
quickcheck! {
fn find_insert_pos_log_equals_find_insert_pos_linear(vs: Vec<i32>) -> bool {
let mut vs = vs;
vs.sort();
for v in -100..100 {
if find_insert_pos(&vs, &v) != find_insert_pos_linear(&vs, &v) {
return false;
}
}
true
}
fn test_gk(vs: Vec<u32>) -> bool {
let mut s = Stream::new(0.25);
for v in vs {
s.insert(v);
}
true
}
}
}