use heavykeeper::{BucketedTopK, CuckooTopK, TopK};
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{self, BufReader, Read};
use std::time::{Duration, Instant};
const KEY_SIZE: usize = 13;
const TOP_K: usize = 1000;
const DECAY: f64 = 0.95;
#[allow(dead_code)]
fn read_in_trace(
trace_prefix: &str,
max_item_num: usize,
) -> io::Result<(Vec<Vec<u8>>, HashMap<Vec<u8>, u32>)> {
let mut count = 0;
let mut keys = Vec::new();
let mut actual_flow_sizes = HashMap::new();
let datafile_cnt = 0;
let trace_file_path = format!("{}{}.dat", trace_prefix, datafile_cnt);
println!("Start reading {}", trace_file_path);
let file = File::open(&trace_file_path)?;
let mut reader = BufReader::new(file);
let mut temp = vec![0; KEY_SIZE];
while reader.read_exact(&mut temp).is_ok() {
let key = temp.clone();
keys.push(key.clone());
let counter = actual_flow_sizes.entry(key).or_insert(0);
*counter += 1;
count += 1;
if count >= max_item_num {
panic!(
"The dataset has more than {} items, set a larger value for max_item_num",
max_item_num
);
}
}
println!(
"Finished reading {} ({} items), the dataset now has {} items",
trace_file_path,
count,
keys.len()
);
Ok((keys, actual_flow_sizes))
}
fn read_in_traces(
trace_prefix: &str,
max_item_num: usize,
) -> io::Result<(Vec<Vec<u8>>, HashMap<Vec<u8>, u32>)> {
let mut count = 0;
let mut keys = Vec::new();
let mut actual_flow_sizes = HashMap::new();
for datafile_cnt in 0..=10 {
let trace_file_path = format!("{}{}.dat", trace_prefix, datafile_cnt);
println!("Start reading {}", trace_file_path);
let file = File::open(&trace_file_path)?;
let mut reader = BufReader::new(file);
let mut temp = vec![0; KEY_SIZE];
while reader.read_exact(&mut temp).is_ok() {
let key = temp.clone();
keys.push(key.clone());
let counter = actual_flow_sizes.entry(key).or_insert(0);
*counter += 1;
count += 1;
if count > max_item_num {
panic!(
"The dataset has more than {} items, set a larger value for max_item_num",
max_item_num
);
}
}
println!(
"Finished reading {} ({} items), the dataset now has {} items",
trace_file_path,
count,
keys.len()
);
}
Ok((keys, actual_flow_sizes))
}
fn format_flow(item: &[u8]) -> String {
let src_ip = format!("{}.{}.{}.{}", item[0], item[1], item[2], item[3]);
let src_port = u16::from_be_bytes([item[4], item[5]]);
let dst_ip = format!("{}.{}.{}.{}", item[6], item[7], item[8], item[9]);
let dst_port = u16::from_be_bytes([item[10], item[11]]);
let protocol = item[12];
format!(
"{} {}:{} -> {}:{}",
protocol, src_ip, src_port, dst_ip, dst_port
)
}
fn true_top_k(truth: &HashMap<Vec<u8>, u32>, k: usize) -> Vec<(Vec<u8>, u32)> {
let mut v: Vec<(Vec<u8>, u32)> = truth.iter().map(|(k, c)| (k.clone(), *c)).collect();
v.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
v.truncate(k);
v
}
struct AccuracyMetrics {
hit_ratio: f64,
are_reported: f64,
are_true_top_k: f64,
}
fn score_results(
results: &[(Vec<u8>, u64)],
truth: &HashMap<Vec<u8>, u32>,
true_top_set: &HashSet<Vec<u8>>,
sketch_count: impl Fn(&[u8]) -> u64,
) -> AccuracyMetrics {
let hits = results
.iter()
.filter(|(item, _)| true_top_set.contains(item))
.count();
let hit_ratio = if results.is_empty() {
0.0
} else {
hits as f64 / results.len() as f64
};
let mut sum = 0.0;
let mut n = 0usize;
for (item, est) in results {
if let Some(&true_c) = truth.get(item) {
if true_c > 0 {
sum += (*est as f64 - true_c as f64).abs() / true_c as f64;
n += 1;
}
}
}
let are_reported = if n == 0 { 0.0 } else { sum / n as f64 };
let mut sum_true = 0.0;
let true_n = true_top_set.len();
for item in true_top_set {
let est = sketch_count(item);
let true_c = truth[item] as f64;
sum_true += (est as f64 - true_c).abs() / true_c;
}
let are_true_top_k = if true_n == 0 {
0.0
} else {
sum_true / true_n as f64
};
AccuracyMetrics {
hit_ratio,
are_reported,
are_true_top_k,
}
}
fn report(
name: &str,
num_keys: usize,
duration: Duration,
results: &[(Vec<u8>, u64)],
metrics: &AccuracyMetrics,
) {
let secs = duration.as_secs_f64();
let throughput_mpps = (num_keys as f64 / 1_000_000.0) / secs;
println!("\n=== {} ===", name);
println!("inserts: {} in {:.3}s", num_keys, secs);
println!(
"throughput: {:.2} Mpps, {:.1} ns/op",
throughput_mpps,
1_000.0 / throughput_mpps
);
println!(
"accuracy: hit_ratio={:.4} ARE_reported={:.6} ARE_true_top_k={:.6}",
metrics.hit_ratio, metrics.are_reported, metrics.are_true_top_k
);
println!("top {} flows:", results.len().min(10));
for (item, count) in results.iter().take(10) {
println!(" {} count={}", format_flow(item), count);
}
}
fn main() -> io::Result<()> {
let max_item_num = 40 * 1_000_000;
let (keys, actual_flow_sizes) = read_in_traces("data/", max_item_num)?;
println!("number of items: {}", keys.len());
println!("number of flows: {}", actual_flow_sizes.len());
let truth_top_k = true_top_k(&actual_flow_sizes, TOP_K);
let true_top_set: HashSet<Vec<u8>> = truth_top_k.iter().map(|(k, _)| k.clone()).collect();
println!(
"ground-truth top-{} threshold count: {} (smallest count in true top-K)",
TOP_K,
truth_top_k.last().map(|(_, c)| *c).unwrap_or(0)
);
{
let mut topk = TopK::<Vec<u8>>::new(TOP_K, 16384, 2, DECAY);
let start = Instant::now();
for key in &keys {
topk.add(key.as_slice(), 1);
}
let duration = start.elapsed();
let results: Vec<(Vec<u8>, u64)> =
topk.list().into_iter().map(|n| (n.item, n.count)).collect();
let metrics = score_results(&results, &actual_flow_sizes, &true_top_set, |item| {
topk.count(item)
});
report(
"TopK (HeavyKeeper)",
keys.len(),
duration,
&results,
&metrics,
);
}
{
let mut topk = BucketedTopK::<Vec<u8>>::new(TOP_K, 8192, 4, DECAY);
let start = Instant::now();
for key in &keys {
topk.add(key.as_slice(), 1);
}
let duration = start.elapsed();
let results: Vec<(Vec<u8>, u64)> =
topk.list().into_iter().map(|n| (n.item, n.count)).collect();
let metrics = score_results(&results, &actual_flow_sizes, &true_top_set, |item| {
topk.count(item)
});
report("BucketedTopK", keys.len(), duration, &results, &metrics);
}
{
let mut topk = CuckooTopK::<Vec<u8>>::new(TOP_K, 8192, 4, DECAY);
let start = Instant::now();
for key in &keys {
topk.add(key.as_slice(), 1);
}
let duration = start.elapsed();
let results: Vec<(Vec<u8>, u64)> =
topk.list().into_iter().map(|n| (n.item, n.count)).collect();
let metrics = score_results(&results, &actual_flow_sizes, &true_top_set, |item| {
topk.count(item)
});
report("CuckooTopK", keys.len(), duration, &results, &metrics);
}
Ok(())
}