stats/
thread_local_aggregator.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10//! This module provides the means to define stats that are thread local and to
11//! schedule a periodic aggregation process of those stats.
12//!
13//! The assumption behind this library is that there are much more writes to the
14//! stats than there are reads. Because of that it's vital that writes are quick
15//! while reads can be a little relaxed, so that recent writes might not be
16//! visible in the read until an aggregation is called.
17//!
18//! Thread local stats help with the speed goal of writes - except for
19//! infrequent reads no one is racing with the current thread to access those
20//! values. As for periodic aggregation - this is achieved via a future running
21//! every second thanks to tokio timer and aggregating every thread local stat.
22//! The future must be spawned on tokio in order for the aggregation to work.
23
24use std::fmt;
25use std::future::Future as NewFuture;
26use std::sync::atomic;
27use std::sync::Arc;
28use std::sync::Mutex;
29use std::time::Duration;
30
31use futures::future::ready;
32use futures::FutureExt as _;
33use futures::Stream as NewStream;
34use futures::StreamExt as _;
35use once_cell::sync::Lazy;
36use perthread::ThreadMap;
37use stats_traits::stats_manager::BoxStatsManager;
38use stats_traits::stats_manager::StatsManager;
39
40static STATS_SCHEDULED: atomic::AtomicBool = atomic::AtomicBool::new(false);
41static STATS_AGGREGATOR: Lazy<StatsAggregator> =
42    Lazy::new(|| StatsAggregator(Mutex::new(Vec::new())));
43
44type SchedulerPreview = std::pin::Pin<Box<dyn NewFuture<Output = ()> + Send>>;
45
46/// This error is returned to indicate that the stats scheduler was already
47/// retrieved before and potentially is already running, but might be retrieved
48/// again from this error
49pub struct StatsScheduledErrorPreview(pub SchedulerPreview);
50
51impl fmt::Debug for StatsScheduledErrorPreview {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        write!(f, "Stats aggregation was already scheduled")
54    }
55}
56
57impl fmt::Display for StatsScheduledErrorPreview {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        write!(f, "Stats aggregation was already scheduled")
60    }
61}
62
63impl ::std::error::Error for StatsScheduledErrorPreview {}
64
65struct StatsAggregator(Mutex<Vec<Arc<ThreadMap<BoxStatsManager>>>>);
66
67impl StatsAggregator {
68    fn aggregate(&self) {
69        let thread_maps = self.0.lock().expect("poisoned mutex");
70        for thread_map in &*thread_maps {
71            thread_map.for_each(|stats| stats.aggregate());
72        }
73    }
74}
75
76/// Creates the ThreadMap and registers it for periodic calls for aggregation of stats
77pub fn create_map() -> Arc<ThreadMap<BoxStatsManager>> {
78    let map = ThreadMap::default();
79    let map = Arc::new(map);
80    let mut vec = STATS_AGGREGATOR.0.lock().expect("poisoned lock");
81    vec.push(map.clone());
82    map
83}
84
85/// Upon the first call to this function it will return a future that results in
86/// periodically calling aggregation of stats.
87/// On subsequent calls it will return `Error::StatsScheduled` that contain the
88/// future, so that the caller might still use it, but knows that it is not the
89/// first this function was called.
90///
91/// # Examples
92///
93/// ```no_run
94/// use stats::schedule_stats_aggregation_preview;
95/// use tokio::spawn;
96///
97/// let s = schedule_stats_aggregation_preview().unwrap();
98/// spawn(s);
99/// ```
100pub fn schedule_stats_aggregation_preview() -> Result<SchedulerPreview, StatsScheduledErrorPreview>
101{
102    let stream =
103        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)));
104    let scheduler = schedule_stats_on_stream_preview(stream);
105
106    if STATS_SCHEDULED.swap(true, atomic::Ordering::Relaxed) {
107        Err(StatsScheduledErrorPreview(scheduler))
108    } else {
109        Ok(scheduler)
110    }
111}
112
113/// Schedules aggregation of stats on the provided stream. This method should not
114/// be used directly, it is here for testing purposes
115#[doc(hidden)]
116pub fn schedule_stats_on_stream_preview<S>(stream: S) -> SchedulerPreview
117where
118    S: NewStream + Send + 'static,
119{
120    stream
121        .for_each(|_| {
122            STATS_AGGREGATOR.aggregate();
123            ready(())
124        })
125        .boxed()
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    // Those tests work on global state so they cannot be run in parallel
133    static TEST_MUTEX: Mutex<()> = Mutex::new(());
134
135    #[tokio::test]
136    async fn test_schedule_stats_aggregation_preview() {
137        let _lock = TEST_MUTEX.lock().expect("poisoned lock");
138
139        if let Err(err) = schedule_stats_aggregation_preview() {
140            panic!("Scheduler is not Ok. Reason: {err:?}");
141        }
142
143        if schedule_stats_aggregation_preview().is_ok() {
144            panic!("Scheduler should already be initialized");
145        }
146
147        STATS_SCHEDULED.swap(false, atomic::Ordering::AcqRel);
148    }
149}