datafusion_common_runtime/trace_utils.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::future::BoxFuture;
19use futures::FutureExt;
20use std::any::Any;
21use std::error::Error;
22use std::fmt::{Display, Formatter, Result as FmtResult};
23use std::future::Future;
24use tokio::sync::OnceCell;
25
26/// A trait for injecting instrumentation into either asynchronous futures or
27/// blocking closures at runtime.
28pub trait JoinSetTracer: Send + Sync + 'static {
29 /// Function pointer type for tracing a future.
30 ///
31 /// This function takes a boxed future (with its output type erased)
32 /// and returns a boxed future (with its output still erased). The
33 /// tracer must apply instrumentation without altering the output.
34 fn trace_future(
35 &self,
36 fut: BoxFuture<'static, Box<dyn Any + Send>>,
37 ) -> BoxFuture<'static, Box<dyn Any + Send>>;
38
39 /// Function pointer type for tracing a blocking closure.
40 ///
41 /// This function takes a boxed closure (with its return type erased)
42 /// and returns a boxed closure (with its return type still erased). The
43 /// tracer must apply instrumentation without changing the return value.
44 fn trace_block(
45 &self,
46 f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
47 ) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>;
48}
49
50/// A no-op tracer that does not modify or instrument any futures or closures.
51/// This is used as a fallback if no custom tracer is set.
52struct NoopTracer;
53
54impl JoinSetTracer for NoopTracer {
55 fn trace_future(
56 &self,
57 fut: BoxFuture<'static, Box<dyn Any + Send>>,
58 ) -> BoxFuture<'static, Box<dyn Any + Send>> {
59 fut
60 }
61
62 fn trace_block(
63 &self,
64 f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
65 ) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
66 f
67 }
68}
69
70/// A custom error type for tracer injection failures.
71#[derive(Debug)]
72pub enum JoinSetTracerError {
73 /// The global tracer has already been set.
74 AlreadySet,
75}
76
77impl Display for JoinSetTracerError {
78 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79 match self {
80 JoinSetTracerError::AlreadySet => {
81 write!(f, "The global JoinSetTracer is already set")
82 }
83 }
84 }
85}
86
87impl Error for JoinSetTracerError {}
88
89/// Global storage for an injected tracer. If no tracer is injected, a no-op
90/// tracer is used instead. This ensures that calls to [`trace_future`] or
91/// [`trace_block`] never panic due to missing instrumentation.
92static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new();
93
94/// A no-op tracer singleton that is returned by [`get_tracer`] if no custom
95/// tracer has been registered.
96static NOOP_TRACER: NoopTracer = NoopTracer;
97
98/// Return the currently registered tracer, or the no-op tracer if none was
99/// registered.
100#[inline]
101fn get_tracer() -> &'static dyn JoinSetTracer {
102 GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER)
103}
104
105/// Set the custom tracer for both futures and blocking closures.
106///
107/// This should be called once at startup. If called more than once, an
108/// `Err(JoinSetTracerError)` is returned. If not called at all, a no-op tracer that does nothing
109/// is used.
110pub fn set_join_set_tracer(
111 tracer: &'static dyn JoinSetTracer,
112) -> Result<(), JoinSetTracerError> {
113 GLOBAL_TRACER
114 .set(tracer)
115 .map_err(|_set_err| JoinSetTracerError::AlreadySet)
116}
117
118/// Optionally instruments a future with custom tracing.
119///
120/// If a tracer has been injected via `set_tracer`, the future's output is
121/// boxed (erasing its type), passed to the tracer, and then downcast back
122/// to the expected type. If no tracer is set, the original future is returned.
123///
124/// # Type Parameters
125/// * `T` - The concrete output type of the future.
126/// * `F` - The future type.
127///
128/// # Parameters
129/// * `future` - The future to potentially instrument.
130pub fn trace_future<T, F>(future: F) -> BoxFuture<'static, T>
131where
132 F: Future<Output = T> + Send + 'static,
133 T: Send + 'static,
134{
135 // Erase the future’s output type first:
136 let erased_future = async move {
137 let result = future.await;
138 Box::new(result) as Box<dyn Any + Send>
139 }
140 .boxed();
141
142 // Forward through the global tracer:
143 get_tracer()
144 .trace_future(erased_future)
145 // Downcast from `Box<dyn Any + Send>` back to `T`:
146 .map(|any_box| {
147 *any_box
148 .downcast::<T>()
149 .expect("Tracer must preserve the future’s output type!")
150 })
151 .boxed()
152}
153
154/// Optionally instruments a blocking closure with custom tracing.
155///
156/// If a tracer has been injected via `set_tracer`, the closure is wrapped so that
157/// its return value is boxed (erasing its type), passed to the tracer, and then the
158/// result is downcast back to the original type. If no tracer is set, the closure is
159/// returned unmodified (except for being boxed).
160///
161/// # Type Parameters
162/// * `T` - The concrete return type of the closure.
163/// * `F` - The closure type.
164///
165/// # Parameters
166/// * `f` - The blocking closure to potentially instrument.
167pub fn trace_block<T, F>(f: F) -> Box<dyn FnOnce() -> T + Send>
168where
169 F: FnOnce() -> T + Send + 'static,
170 T: Send + 'static,
171{
172 // Erase the closure’s return type first:
173 let erased_closure = Box::new(|| {
174 let result = f();
175 Box::new(result) as Box<dyn Any + Send>
176 });
177
178 // Forward through the global tracer:
179 let traced_closure = get_tracer().trace_block(erased_closure);
180
181 // Downcast from `Box<dyn Any + Send>` back to `T`:
182 Box::new(move || {
183 let any_box = traced_closure();
184 *any_box
185 .downcast::<T>()
186 .expect("Tracer must preserve the closure’s return type!")
187 })
188}