use std::fmt;
use std::future::Future as NewFuture;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use futures::future::ready;
use futures::FutureExt as _;
use futures::Stream as NewStream;
use futures::StreamExt as _;
use once_cell::sync::Lazy;
use perthread::ThreadMap;
use stats_traits::stats_manager::BoxStatsManager;
use stats_traits::stats_manager::StatsManager;
static STATS_SCHEDULED: atomic::AtomicBool = atomic::AtomicBool::new(false);
static STATS_AGGREGATOR: Lazy<StatsAggregator> =
Lazy::new(|| StatsAggregator(Mutex::new(Vec::new())));
type SchedulerPreview = std::pin::Pin<Box<dyn NewFuture<Output = ()> + Send>>;
pub struct StatsScheduledErrorPreview(pub SchedulerPreview);
impl fmt::Debug for StatsScheduledErrorPreview {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stats aggregation was already scheduled")
}
}
impl fmt::Display for StatsScheduledErrorPreview {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stats aggregation was already scheduled")
}
}
impl ::std::error::Error for StatsScheduledErrorPreview {}
struct StatsAggregator(Mutex<Vec<Arc<ThreadMap<BoxStatsManager>>>>);
impl StatsAggregator {
fn aggregate(&self) {
let thread_maps = self.0.lock().expect("poisoned mutex");
for thread_map in &*thread_maps {
thread_map.for_each(|stats| stats.aggregate());
}
}
}
pub fn create_map() -> Arc<ThreadMap<BoxStatsManager>> {
let map = ThreadMap::default();
let map = Arc::new(map);
let mut vec = STATS_AGGREGATOR.0.lock().expect("poisoned lock");
vec.push(map.clone());
map
}
pub fn schedule_stats_aggregation_preview() -> Result<SchedulerPreview, StatsScheduledErrorPreview>
{
let stream =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)));
let scheduler = schedule_stats_on_stream_preview(stream);
if STATS_SCHEDULED.swap(true, atomic::Ordering::Relaxed) {
Err(StatsScheduledErrorPreview(scheduler))
} else {
Ok(scheduler)
}
}
#[doc(hidden)]
pub fn schedule_stats_on_stream_preview<S>(stream: S) -> SchedulerPreview
where
S: NewStream + Send + 'static,
{
stream
.for_each(|_| {
STATS_AGGREGATOR.aggregate();
ready(())
})
.boxed()
}
#[cfg(test)]
mod tests {
use super::*;
static TEST_MUTEX: Mutex<()> = Mutex::new(());
#[tokio::test]
async fn test_schedule_stats_aggregation_preview() {
let _lock = TEST_MUTEX.lock().expect("poisoned lock");
if let Err(err) = schedule_stats_aggregation_preview() {
panic!("Scheduler is not Ok. Reason: {err:?}");
}
if schedule_stats_aggregation_preview().is_ok() {
panic!("Scheduler should already be initialized");
}
STATS_SCHEDULED.swap(false, atomic::Ordering::AcqRel);
}
}