use std::sync::Arc;
use arc_cell::ArcCell;
use parking_lot::RwLock;
use crate::summary::traits::{NonConcurrentSummaryProvider, SummaryProvider};
pub const DEFAULT_BATCH_SIZE: usize = 128;
#[derive(Clone)]
pub struct BatchOpts<O> {
pub batch_size: usize,
pub inner: O,
}
impl<O> BatchOpts<O> {
pub fn from_inner(inner: O) -> Self {
Self { batch_size: DEFAULT_BATCH_SIZE, inner }
}
pub fn with_batch_size(self, batch_size: usize) -> Self {
Self { batch_size, ..self }
}
}
type Batch<T> = orx_concurrent_vec::ConcurrentVec<
T,
orx_concurrent_vec::SplitVec<
orx_concurrent_vec::ConcurrentElement<T>,
orx_concurrent_vec::Doubling,
>,
>;
#[derive(Debug)]
pub struct BatchedSummary<P> {
batch_size: usize,
measurements: ArcCell<Batch<f64>>,
inner: RwLock<P>,
}
impl<P: Clone> Clone for BatchedSummary<P> {
fn clone(&self) -> Self {
let measurements = Batch::clone(&self.measurements.get());
Self {
measurements: ArcCell::new(Arc::new(measurements)),
batch_size: self.batch_size,
inner: RwLock::new(self.inner.read().clone()),
}
}
}
impl<P: NonConcurrentSummaryProvider> BatchedSummary<P> {
pub fn new(opts: &BatchOpts<P::Opts>) -> Self {
<Self as SummaryProvider>::new_provider(opts)
}
pub fn snapshot(&self) -> P::Summary {
SummaryProvider::snapshot(self)
}
fn new_batch(batch_size: usize) -> Arc<Batch<f64>> {
let mut batch = Batch::new();
batch.reserve_maximum_capacity(batch_size);
Arc::new(batch)
}
fn wait_for_arc<T>(mut arc: Arc<T>) -> T {
loop {
match Arc::try_unwrap(arc) {
Ok(inner) => return inner,
Err(this) => {
arc = this;
}
}
std::hint::spin_loop();
}
}
pub fn commit(&self) {
let measurements = self.measurements.set(Self::new_batch(self.batch_size));
let measurements = Self::wait_for_arc(measurements);
let mut inner = self.inner.write();
for measure in measurements.into_iter() {
inner.observe(measure);
}
}
pub fn into_inner(self) -> P {
self.commit();
self.inner.into_inner()
}
}
impl<P: NonConcurrentSummaryProvider> SummaryProvider for BatchedSummary<P> {
type Opts = BatchOpts<P::Opts>;
type Summary = P::Summary;
fn new_provider(opts: &Self::Opts) -> Self {
let inner = RwLock::new(P::new_provider(&opts.inner));
Self {
inner,
measurements: ArcCell::new(Self::new_batch(opts.batch_size)),
batch_size: opts.batch_size,
}
}
fn observe(&self, val: f64) {
let measurements = self.measurements.get();
measurements.push(val);
if measurements.len() >= self.batch_size {
std::mem::drop(measurements);
self.commit()
}
}
fn snapshot(&self) -> Self::Summary {
self.commit();
self.inner.read().snapshot()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::{
simple::{SimpleSummary, SimpleSummaryOpts},
traits::Summary,
};
use super::*;
#[test]
fn concurrent_observe() {
let batch_size = DEFAULT_BATCH_SIZE;
let opts = SimpleSummaryOpts::default();
let opts = BatchOpts::from_inner(opts).with_batch_size(batch_size);
let summary = BatchedSummary::<SimpleSummary>::new(&opts);
let summary = Arc::new(summary);
let tasks = 8;
let measurements = 50_000;
let mut handles = Vec::with_capacity(tasks);
for _ in 0..tasks {
let summary = summary.clone();
let task = std::thread::spawn(move || {
for i in 0..measurements {
summary.observe(i as f64)
}
});
handles.push(task);
}
for h in handles {
h.join().expect("no task panics");
}
let result = summary.snapshot();
assert_eq!(
result.sample_count(),
tasks as u64 * measurements,
"Should have all measurements present in the collection"
);
}
#[test]
fn single_threaded_observe() {
let batch_size = DEFAULT_BATCH_SIZE;
let opts = SimpleSummaryOpts::default();
let opts = BatchOpts::from_inner(opts).with_batch_size(batch_size);
let summary = BatchedSummary::<SimpleSummary>::new(&opts);
let measurements = 50_000;
for i in 0..measurements {
let start = std::time::Instant::now();
summary.observe(i as f64);
if i % 100 == 0 {
println!("Time taken: {:?}", start.elapsed());
}
}
let result = summary.snapshot();
assert_eq!(
result.sample_count(),
measurements,
"Should have all measurements present in the collection"
);
}
}