await_tree/
registry.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fmt::{Debug, Display};
17use std::hash::Hash;
18use std::sync::{Arc, Weak};
19
20use derive_builder::Builder;
21use parking_lot::RwLock;
22use weak_table::WeakValueHashMap;
23
24use crate::context::{ContextId, Tree, TreeContext};
25use crate::obj_utils::{DynEq, DynHash};
26use crate::{span, Span, TreeRoot};
27
28/// Configuration for an await-tree registry, which affects the behavior of all await-trees in the
29/// registry.
30#[derive(Debug, Clone, Builder)]
31#[builder(default)]
32pub struct Config {
33    /// Whether to include the **verbose** span in the await-tree.
34    verbose: bool,
35}
36
37#[allow(clippy::derivable_impls)]
38impl Default for Config {
39    fn default() -> Self {
40        Self { verbose: false }
41    }
42}
43
44/// A key that can be used to identify a task and its await-tree in the [`Registry`].
45///
46/// All thread-safe types that can be used as a key of a hash map are automatically implemented with
47/// this trait.
48pub trait Key: Hash + Eq + Debug + Send + Sync + 'static {}
49impl<T> Key for T where T: Hash + Eq + Debug + Send + Sync + 'static {}
50
51/// The object-safe version of [`Key`], automatically implemented.
52trait ObjKey: DynHash + DynEq + Debug + Send + Sync + 'static {}
53impl<T> ObjKey for T where T: DynHash + DynEq + Debug + Send + Sync + 'static {}
54
55/// A trait for types that can be converted to a [`Span`] that can be used as the root of an
56/// await-tree.
57pub trait ToRootSpan {
58    /// Convert the type to a [`Span`] that can be used as the root of an await-tree.
59    fn to_root_span(&self) -> Span;
60}
61
62impl<T: Display> ToRootSpan for T {
63    fn to_root_span(&self) -> Span {
64        span!("{self}")
65    }
66}
67
68/// Key type for anonymous await-trees.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70struct AnonymousKey(ContextId);
71
72impl Display for AnonymousKey {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "Anonymous #{}", self.0 .0)
75    }
76}
77
78/// Type-erased key for the [`Registry`].
79#[derive(Clone)]
80pub struct AnyKey(Arc<dyn ObjKey>);
81
82impl PartialEq for AnyKey {
83    fn eq(&self, other: &Self) -> bool {
84        self.0.dyn_eq(other.0.as_dyn_eq())
85    }
86}
87
88impl Eq for AnyKey {}
89
90impl Hash for AnyKey {
91    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
92        self.0.dyn_hash(state);
93    }
94}
95
96impl Debug for AnyKey {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        self.0.fmt(f)
99    }
100}
101
102impl Display for AnyKey {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        // TODO: for all `impl Display`?
105        macro_rules! delegate_to_display {
106            ($($t:ty),* $(,)?) => {
107                $(
108                    if let Some(k) = self.as_any().downcast_ref::<$t>() {
109                        return write!(f, "{}", k);
110                    }
111                )*
112            };
113        }
114        delegate_to_display!(String, &str, AnonymousKey);
115
116        write!(f, "{:?}", self)
117    }
118}
119
120impl AnyKey {
121    fn new(key: impl ObjKey) -> Self {
122        Self(Arc::new(key))
123    }
124
125    /// Cast the key to `dyn Any`.
126    pub fn as_any(&self) -> &dyn Any {
127        self.0.as_ref().as_any()
128    }
129
130    /// Returns whether the key is of type `K`.
131    ///
132    /// Equivalent to `self.as_any().is::<K>()`.
133    pub fn is<K: Any>(&self) -> bool {
134        self.as_any().is::<K>()
135    }
136
137    /// Returns whether the key corresponds to an anonymous await-tree.
138    pub fn is_anonymous(&self) -> bool {
139        self.as_any().is::<AnonymousKey>()
140    }
141
142    /// Returns the key as a reference to type `K`, if it is of type `K`.
143    ///
144    /// Equivalent to `self.as_any().downcast_ref::<K>()`.
145    pub fn downcast_ref<K: Any>(&self) -> Option<&K> {
146        self.as_any().downcast_ref()
147    }
148}
149
150type Contexts = RwLock<WeakValueHashMap<AnyKey, Weak<TreeContext>>>;
151
152struct RegistryCore {
153    contexts: Contexts,
154    config: Config,
155}
156
157/// The registry of multiple await-trees.
158///
159/// Can be cheaply cloned to share the same registry.
160pub struct Registry(Arc<RegistryCore>);
161
162impl Debug for Registry {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("Registry")
165            .field("config", self.config())
166            .finish_non_exhaustive()
167    }
168}
169
170impl Clone for Registry {
171    fn clone(&self) -> Self {
172        Self(Arc::clone(&self.0))
173    }
174}
175
176impl Registry {
177    fn contexts(&self) -> &Contexts {
178        &self.0.contexts
179    }
180
181    fn config(&self) -> &Config {
182        &self.0.config
183    }
184}
185
186impl Registry {
187    /// Create a new registry with given `config`.
188    pub fn new(config: Config) -> Self {
189        Self(
190            RegistryCore {
191                contexts: Default::default(),
192                config,
193            }
194            .into(),
195        )
196    }
197
198    /// Returns the current registry, if exists.
199    ///
200    /// 1. If the current task is registered with a registry, returns the registry.
201    /// 2. If the global registry is initialized with
202    ///    [`init_global_registry`](crate::global::init_global_registry), returns the global
203    ///    registry.
204    /// 3. Otherwise, returns `None`.
205    pub fn try_current() -> Option<Self> {
206        crate::root::current_registry()
207    }
208
209    /// Returns the current registry, panics if not exists.
210    ///
211    /// See [`Registry::try_current`] for more information.
212    pub fn current() -> Self {
213        Self::try_current().expect("no current registry")
214    }
215
216    fn register_inner(&self, key: impl Key, context: Arc<TreeContext>) -> TreeRoot {
217        self.contexts()
218            .write()
219            .insert(AnyKey::new(key), Arc::clone(&context));
220
221        TreeRoot {
222            context,
223            registry: WeakRegistry(Arc::downgrade(&self.0)),
224        }
225    }
226
227    /// Register with given key and root span. Returns a [`TreeRoot`] that can be used to instrument
228    /// a future.
229    ///
230    /// If the key already exists, a new [`TreeRoot`] is returned and the reference to the old
231    /// [`TreeRoot`] is dropped.
232    pub fn register(&self, key: impl Key, root_span: impl Into<Span>) -> TreeRoot {
233        let context = Arc::new(TreeContext::new(root_span.into(), self.config().verbose));
234        self.register_inner(key, context)
235    }
236
237    /// Derive the root span from the given key and register with it.
238    ///
239    /// This is a convenience method for `self.register(key, key.to_root_span())`. See
240    /// [`Registry::register`] for more details.
241    pub fn register_derived_root(&self, key: impl Key + ToRootSpan) -> TreeRoot {
242        let root_span = key.to_root_span();
243        self.register(key, root_span)
244    }
245
246    /// Register an anonymous await-tree without specifying a key. Returns a [`TreeRoot`] that can
247    /// be used to instrument a future.
248    ///
249    /// Anonymous await-trees are not able to be retrieved through the [`Registry::get`] method. Use
250    /// [`Registry::collect_anonymous`] or [`Registry::collect_all`] to collect them.
251    // TODO: we have keyed and anonymous, should we also have a typed-anonymous (for classification
252    // only)?
253    pub fn register_anonymous(&self, root_span: impl Into<Span>) -> TreeRoot {
254        let context = Arc::new(TreeContext::new(root_span.into(), self.config().verbose));
255        self.register_inner(AnonymousKey(context.id()), context) // use the private id as the key
256    }
257
258    /// Get a clone of the await-tree with given key.
259    ///
260    /// Returns `None` if the key does not exist or the tree root has been dropped.
261    pub fn get(&self, key: impl Key) -> Option<Tree> {
262        self.contexts()
263            .read()
264            .get(&AnyKey::new(key)) // TODO: accept ref can?
265            .map(|v| v.tree().clone())
266    }
267
268    /// Remove all the registered await-trees.
269    pub fn clear(&self) {
270        self.contexts().write().clear();
271    }
272
273    /// Collect the snapshots of all await-trees with the key of type `K`.
274    pub fn collect<K: Key + Clone>(&self) -> Vec<(K, Tree)> {
275        self.contexts()
276            .read()
277            .iter()
278            .filter_map(|(k, v)| {
279                k.0.as_ref()
280                    .as_any()
281                    .downcast_ref::<K>()
282                    .map(|k| (k.clone(), v.tree().clone()))
283            })
284            .collect()
285    }
286
287    /// Collect the snapshots of all await-trees registered with [`Registry::register_anonymous`].
288    pub fn collect_anonymous(&self) -> Vec<Tree> {
289        self.contexts()
290            .read()
291            .iter()
292            .filter_map(|(k, v)| {
293                if k.is_anonymous() {
294                    Some(v.tree().clone())
295                } else {
296                    None
297                }
298            })
299            .collect()
300    }
301
302    /// Collect the snapshots of all await-trees regardless of the key type.
303    pub fn collect_all(&self) -> Vec<(AnyKey, Tree)> {
304        self.contexts()
305            .read()
306            .iter()
307            .map(|(k, v)| (k.clone(), v.tree().clone()))
308            .collect()
309    }
310}
311
312pub(crate) struct WeakRegistry(Weak<RegistryCore>);
313
314impl WeakRegistry {
315    pub fn upgrade(&self) -> Option<Registry> {
316        self.0.upgrade().map(Registry)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn test_registry() {
326        let registry = Registry::new(Config::default());
327
328        let _0_i32 = registry.register(0_i32, "0");
329        let _1_i32 = registry.register(1_i32, "1");
330        let _2_i32 = registry.register(2_i32, "2");
331
332        let _0_str = registry.register("0", "0");
333        let _1_str = registry.register("1", "1");
334
335        let _unit = registry.register((), "()");
336        let _unit_replaced = registry.register((), "[]");
337
338        let _anon = registry.register_anonymous("anon");
339        let _anon = registry.register_anonymous("anon");
340
341        let i32s = registry.collect::<i32>();
342        assert_eq!(i32s.len(), 3);
343
344        let strs = registry.collect::<&'static str>();
345        assert_eq!(strs.len(), 2);
346
347        let units = registry.collect::<()>();
348        assert_eq!(units.len(), 1);
349
350        let anons = registry.collect_anonymous();
351        assert_eq!(anons.len(), 2);
352
353        let all = registry.collect_all();
354        assert_eq!(all.len(), 8);
355    }
356}