stats/
thread_local_aggregator.rs1use std::fmt;
25use std::future::Future as NewFuture;
26use std::sync::atomic;
27use std::sync::Arc;
28use std::sync::Mutex;
29use std::time::Duration;
30
31use futures::future::ready;
32use futures::FutureExt as _;
33use futures::Stream as NewStream;
34use futures::StreamExt as _;
35use once_cell::sync::Lazy;
36use perthread::ThreadMap;
37use stats_traits::stats_manager::BoxStatsManager;
38use stats_traits::stats_manager::StatsManager;
39
40static STATS_SCHEDULED: atomic::AtomicBool = atomic::AtomicBool::new(false);
41static STATS_AGGREGATOR: Lazy<StatsAggregator> =
42 Lazy::new(|| StatsAggregator(Mutex::new(Vec::new())));
43
44type SchedulerPreview = std::pin::Pin<Box<dyn NewFuture<Output = ()> + Send>>;
45
46pub struct StatsScheduledErrorPreview(pub SchedulerPreview);
50
51impl fmt::Debug for StatsScheduledErrorPreview {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 write!(f, "Stats aggregation was already scheduled")
54 }
55}
56
57impl fmt::Display for StatsScheduledErrorPreview {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 write!(f, "Stats aggregation was already scheduled")
60 }
61}
62
63impl ::std::error::Error for StatsScheduledErrorPreview {}
64
65struct StatsAggregator(Mutex<Vec<Arc<ThreadMap<BoxStatsManager>>>>);
66
67impl StatsAggregator {
68 fn aggregate(&self) {
69 let thread_maps = self.0.lock().expect("poisoned mutex");
70 for thread_map in &*thread_maps {
71 thread_map.for_each(|stats| stats.aggregate());
72 }
73 }
74}
75
76pub fn create_map() -> Arc<ThreadMap<BoxStatsManager>> {
78 let map = ThreadMap::default();
79 let map = Arc::new(map);
80 let mut vec = STATS_AGGREGATOR.0.lock().expect("poisoned lock");
81 vec.push(map.clone());
82 map
83}
84
85pub fn schedule_stats_aggregation_preview() -> Result<SchedulerPreview, StatsScheduledErrorPreview>
101{
102 let stream =
103 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)));
104 let scheduler = schedule_stats_on_stream_preview(stream);
105
106 if STATS_SCHEDULED.swap(true, atomic::Ordering::Relaxed) {
107 Err(StatsScheduledErrorPreview(scheduler))
108 } else {
109 Ok(scheduler)
110 }
111}
112
113#[doc(hidden)]
116pub fn schedule_stats_on_stream_preview<S>(stream: S) -> SchedulerPreview
117where
118 S: NewStream + Send + 'static,
119{
120 stream
121 .for_each(|_| {
122 STATS_AGGREGATOR.aggregate();
123 ready(())
124 })
125 .boxed()
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 static TEST_MUTEX: Mutex<()> = Mutex::new(());
134
135 #[tokio::test]
136 async fn test_schedule_stats_aggregation_preview() {
137 let _lock = TEST_MUTEX.lock().expect("poisoned lock");
138
139 if let Err(err) = schedule_stats_aggregation_preview() {
140 panic!("Scheduler is not Ok. Reason: {err:?}");
141 }
142
143 if schedule_stats_aggregation_preview().is_ok() {
144 panic!("Scheduler should already be initialized");
145 }
146
147 STATS_SCHEDULED.swap(false, atomic::Ordering::AcqRel);
148 }
149}