use std::fmt;
use std::future::Future as NewFuture;
use std::sync::{atomic, Arc, Mutex};
use std::time::{Duration, Instant};
use futures::{future::ready, FutureExt as _, Stream as NewStream, StreamExt as _};
use futures_ext::{BoxFuture, FutureExt};
use futures_old::Stream;
use lazy_static::lazy_static;
use perthread::ThreadMap;
use stats_traits::stats_manager::{BoxStatsManager, StatsManager};
lazy_static! {
static ref STATS_SCHEDULED: atomic::AtomicBool = atomic::AtomicBool::new(false);
static ref STATS_AGGREGATOR: StatsAggregator = StatsAggregator(Mutex::new(Vec::new()));
}
pub type Scheduler = BoxFuture<(), tokio_old::timer::Error>;
pub struct StatsScheduledError(pub Scheduler);
impl fmt::Debug for StatsScheduledError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stats aggregation was already scheduled")
}
}
impl fmt::Display for StatsScheduledError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stats aggregation was already scheduled")
}
}
impl ::std::error::Error for StatsScheduledError {}
type SchedulerPreview = std::pin::Pin<Box<dyn NewFuture<Output = ()> + Send>>;
pub struct StatsScheduledErrorPreview(pub SchedulerPreview);
impl fmt::Debug for StatsScheduledErrorPreview {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stats aggregation was already scheduled")
}
}
impl fmt::Display for StatsScheduledErrorPreview {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stats aggregation was already scheduled")
}
}
impl ::std::error::Error for StatsScheduledErrorPreview {}
struct StatsAggregator(Mutex<Vec<Arc<ThreadMap<BoxStatsManager>>>>);
impl StatsAggregator {
fn aggregate(&self) {
let thread_maps = self.0.lock().expect("poisoned mutex");
for thread_map in &*thread_maps {
thread_map.for_each(|stats| stats.aggregate());
}
}
}
pub fn create_map() -> Arc<ThreadMap<BoxStatsManager>> {
let map = ThreadMap::default();
let map = Arc::new(map);
let mut vec = STATS_AGGREGATOR.0.lock().expect("poisoned lock");
vec.push(map.clone());
map
}
pub fn schedule_stats_aggregation() -> Result<Scheduler, StatsScheduledError> {
let at = Instant::now() + Duration::from_secs(1);
let interval = Duration::from_secs(1);
let scheduler = schedule_stats_on_stream(tokio_old::timer::Interval::new(at, interval));
if STATS_SCHEDULED.swap(true, atomic::Ordering::Relaxed) {
Err(StatsScheduledError(scheduler))
} else {
Ok(scheduler)
}
}
pub fn schedule_stats_aggregation_preview() -> Result<SchedulerPreview, StatsScheduledErrorPreview>
{
let start = tokio::time::Instant::now() + Duration::from_secs(1);
let period = Duration::from_secs(1);
let scheduler = schedule_stats_on_stream_preview(tokio::time::interval_at(start, period));
if STATS_SCHEDULED.swap(true, atomic::Ordering::Relaxed) {
Err(StatsScheduledErrorPreview(scheduler))
} else {
Ok(scheduler)
}
}
#[doc(hidden)]
pub fn schedule_stats_on_stream<S>(stream: S) -> BoxFuture<(), S::Error>
where
S: Stream + Send + 'static,
<S as Stream>::Error: Send,
{
stream
.for_each(|_| {
STATS_AGGREGATOR.aggregate();
Ok(())
})
.boxify()
}
#[doc(hidden)]
pub fn schedule_stats_on_stream_preview<S>(stream: S) -> SchedulerPreview
where
S: NewStream + Send + 'static,
{
stream
.for_each(|_| {
STATS_AGGREGATOR.aggregate();
ready(())
})
.boxed()
}
#[cfg(test)]
mod tests {
use super::*;
lazy_static! {
static ref TEST_MUTEX: Mutex<()> = Mutex::new(());
}
#[test]
fn test_schedule_stats_aggregation() {
let _lock = TEST_MUTEX.lock().expect("poisoned lock");
match schedule_stats_aggregation() {
Ok(_) => {}
Err(err) => panic!("Scheduler is not Ok. Reason: {:?}", err),
}
match schedule_stats_aggregation() {
Ok(_) => panic!("Scheduler should already be initialized"),
Err(StatsScheduledError(_)) => {}
}
STATS_SCHEDULED.swap(false, atomic::Ordering::AcqRel);
}
#[tokio::test]
async fn test_schedule_stats_aggregation_preview() {
let _lock = TEST_MUTEX.lock().expect("poisoned lock");
match schedule_stats_aggregation_preview() {
Ok(_) => {}
Err(err) => panic!("Scheduler is not Ok. Reason: {:?}", err),
}
match schedule_stats_aggregation_preview() {
Ok(_) => panic!("Scheduler should already be initialized"),
Err(StatsScheduledErrorPreview(_)) => {}
}
STATS_SCHEDULED.swap(false, atomic::Ordering::AcqRel);
}
}