mod treap;
use crate::treap::Treap;
use rand::Rng;
use rand::SeedableRng;
use rand::rngs::StdRng;
#[derive(Debug, Clone, Copy)]
pub enum ConfidenceSpec {
Delta(f64),
Confidence(f64),
}
impl ConfidenceSpec {
fn to_delta(self) -> f64 {
match self {
ConfidenceSpec::Delta(delta) => delta,
ConfidenceSpec::Confidence(confidence) => 1.0 - confidence,
}
}
fn validate(self) -> Result<Self, String> {
match self {
ConfidenceSpec::Delta(delta) => {
if delta <= 0.0 || delta >= 1.0 {
Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string())
} else {
Ok(self)
}
}
ConfidenceSpec::Confidence(confidence) => {
if confidence <= 0.0 || confidence >= 1.0 {
Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string())
} else {
Ok(self)
}
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CVMBuilder {
epsilon: Option<f64>,
confidence_spec: Option<ConfidenceSpec>,
stream_size: Option<usize>,
}
impl CVMBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn epsilon(mut self, epsilon: f64) -> Self {
self.epsilon = Some(epsilon);
self
}
pub fn confidence(mut self, confidence: f64) -> Self {
self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence));
self
}
pub fn delta(mut self, delta: f64) -> Self {
self.confidence_spec = Some(ConfidenceSpec::Delta(delta));
self
}
pub fn estimated_size(mut self, size: usize) -> Self {
self.stream_size = Some(size);
self
}
pub fn build<T: Ord>(self) -> Result<CVM<T>, String> {
let epsilon = self.epsilon.unwrap_or(0.8);
if epsilon <= 0.0 || epsilon >= 1.0 {
return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string());
}
let confidence_spec = self
.confidence_spec
.unwrap_or(ConfidenceSpec::Confidence(0.9));
let validated_spec = confidence_spec.validate()?;
let delta = validated_spec.to_delta();
let stream_size = self.stream_size.unwrap_or(1000);
if stream_size == 0 {
return Err("Stream size must be greater than 0".to_string());
}
Ok(CVM::new(epsilon, delta, stream_size))
}
}
pub struct CVM<T: Ord> {
buf_size: usize,
buf: Treap<T>,
probability: f64,
rng: StdRng,
}
impl<T: Ord> CVM<T> {
pub fn builder() -> CVMBuilder {
CVMBuilder::new()
}
pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self {
let bufsize = buffer_size(epsilon, delta, stream_size);
Self {
buf_size: bufsize,
buf: Treap::new(),
probability: 1.0,
rng: StdRng::from_os_rng(),
}
}
pub fn process_element(&mut self, elem: T) {
if self.buf.contains(&elem) {
self.buf.remove(&elem);
}
if self.rng.random_bool(self.probability) {
self.buf.insert(elem, &mut self.rng);
}
while self.buf.len() == self.buf_size {
self.clear_about_half();
self.probability /= 2.0;
}
}
fn clear_about_half(&mut self) {
let rng = &mut self.rng;
self.buf.retain(|_| rng.random_bool(0.5));
}
pub fn process_stream<I>(&mut self, iter: I) -> f64
where
I: IntoIterator<Item = T>,
{
for item in iter {
self.process_element(item);
}
self.calculate_final_result()
}
pub fn calculate_final_result(&self) -> f64 {
self.buf.len() as f64 / self.probability
}
}
pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized {
fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 {
let mut cvm = CVM::new(epsilon, delta, estimated_size);
cvm.process_stream(self)
}
fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> {
let mut cvm: CVM<T> = builder.build()?;
Ok(cvm.process_stream(self))
}
}
impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {}
fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
}
#[cfg(test)]
mod tests {
use std::{
fs::File,
io::{BufRead, BufReader},
path::Path,
};
use super::{CVM, ConfidenceSpec, EstimateDistinct};
use regex::Regex;
use std::collections::HashSet;
fn open_file<P>(filename: P) -> BufReader<File>
where
P: AsRef<Path>,
{
let f = File::open(filename).expect("Couldn't read from file");
BufReader::new(f)
}
fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) {
let words = line.split(' ');
words.for_each(|word| {
let clean_word = re.replace_all(word, "").to_lowercase();
hs.insert(clean_word);
})
}
#[test]
fn actual() {
let input_file = "benches/kiy.txt";
let re = Regex::new(r"[^\w\s]").unwrap();
let br = open_file(input_file);
let mut hs = HashSet::new();
br.lines()
.for_each(|line| line_to_word(&re, &mut hs, &line.unwrap()));
assert_eq!(hs.len(), 9016)
}
#[test]
fn test_builder_defaults() {
let cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
assert_eq!(cvm.calculate_final_result(), 0.0); }
#[test]
fn test_builder_custom_params() {
let cvm: CVM<i32> = CVM::<i32>::builder()
.epsilon(0.05)
.confidence(0.99)
.estimated_size(5000)
.build()
.unwrap();
let mut cvm = cvm;
for i in 0..100 {
cvm.process_element(i);
}
let result = cvm.calculate_final_result();
assert!(result > 0.0);
}
#[test]
fn test_builder_delta_vs_confidence() {
let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap();
let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap();
assert_eq!(cvm1.calculate_final_result(), 0.0);
assert_eq!(cvm2.calculate_final_result(), 0.0);
}
#[test]
fn test_builder_last_wins() {
let cvm: CVM<i32> = CVM::<i32>::builder()
.confidence(0.9)
.delta(0.05) .build()
.unwrap();
assert_eq!(cvm.calculate_final_result(), 0.0);
}
#[test]
fn test_builder_validation() {
let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().confidence(0.0).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().confidence(1.0).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().delta(0.0).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().delta(1.0).build::<i32>();
assert!(result.is_err());
let result = CVM::<i32>::builder().estimated_size(0).build::<i32>();
assert!(result.is_err());
}
#[test]
fn test_builder_method_chaining() {
let result = CVM::<String>::builder()
.epsilon(0.1)
.confidence(0.95)
.estimated_size(2000)
.build::<String>();
assert!(result.is_ok());
}
#[test]
fn test_confidence_spec_conversion() {
let confidence_spec = ConfidenceSpec::Confidence(0.9);
assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON);
let delta_spec = ConfidenceSpec::Delta(0.05);
assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
}
#[test]
fn test_process_stream() {
let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3];
let estimate = cvm.process_stream(numbers);
assert!(estimate > 0.0);
let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap();
let estimate2 = cvm2.process_stream(1..=100);
assert!(estimate2 > 0.0);
}
#[test]
fn test_process_stream_strings() {
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
let words = vec![
"hello".to_string(),
"world".to_string(),
"hello".to_string(),
"rust".to_string(),
];
let estimate = cvm.process_stream(words);
assert!(estimate > 0.0);
}
#[test]
fn test_process_stream_with_map() {
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
let borrowed_words = ["hello", "world", "hello", "rust"];
let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
assert!(estimate > 0.0);
}
#[test]
fn test_estimate_distinct_trait() {
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
assert!(estimate > 0.0);
let words = vec![
"hello".to_string(),
"world".to_string(),
"hello".to_string(),
];
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
let estimate = words
.into_iter()
.estimate_distinct_with_builder(builder)
.unwrap();
assert!(estimate > 0.0);
}
#[test]
fn test_estimate_distinct_with_cloning() {
let borrowed_numbers = [1, 2, 3, 2, 1, 4];
let estimate = borrowed_numbers
.iter()
.cloned()
.estimate_distinct_count(0.1, 0.1, 100);
assert!(estimate > 0.0);
}
#[test]
fn test_streaming_integration_with_file_processing() {
let lines = vec![
"hello world".to_string(),
"world peace".to_string(),
"hello rust".to_string(),
];
let mut cvm: CVM<String> = CVM::<String>::builder()
.epsilon(0.1)
.confidence(0.9)
.build()
.unwrap();
let words: Vec<String> = lines
.into_iter()
.flat_map(|line| {
line.split_whitespace()
.map(|s| s.to_string())
.collect::<Vec<_>>()
})
.collect();
let estimate = cvm.process_stream(words);
assert!(estimate > 0.0);
}
#[test]
fn test_streaming_large_dataset() {
let mut cvm: CVM<i32> = CVM::<i32>::builder()
.epsilon(0.1)
.confidence(0.9)
.estimated_size(10_000)
.build()
.unwrap();
let data: Vec<i32> = (0..1000).cycle().take(10_000).collect();
let estimate = cvm.process_stream(data);
assert!(estimate > 500.0 && estimate < 2000.0);
}
}