datafusion_common_runtime/
trace_utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::future::BoxFuture;
19use futures::FutureExt;
20use std::any::Any;
21use std::error::Error;
22use std::fmt::{Display, Formatter, Result as FmtResult};
23use std::future::Future;
24use tokio::sync::OnceCell;
25
26/// A trait for injecting instrumentation into either asynchronous futures or
27/// blocking closures at runtime.
28pub trait JoinSetTracer: Send + Sync + 'static {
29    /// Function pointer type for tracing a future.
30    ///
31    /// This function takes a boxed future (with its output type erased)
32    /// and returns a boxed future (with its output still erased). The
33    /// tracer must apply instrumentation without altering the output.
34    fn trace_future(
35        &self,
36        fut: BoxFuture<'static, Box<dyn Any + Send>>,
37    ) -> BoxFuture<'static, Box<dyn Any + Send>>;
38
39    /// Function pointer type for tracing a blocking closure.
40    ///
41    /// This function takes a boxed closure (with its return type erased)
42    /// and returns a boxed closure (with its return type still erased). The
43    /// tracer must apply instrumentation without changing the return value.
44    fn trace_block(
45        &self,
46        f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
47    ) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>;
48}
49
50/// A no-op tracer that does not modify or instrument any futures or closures.
51/// This is used as a fallback if no custom tracer is set.
52struct NoopTracer;
53
54impl JoinSetTracer for NoopTracer {
55    fn trace_future(
56        &self,
57        fut: BoxFuture<'static, Box<dyn Any + Send>>,
58    ) -> BoxFuture<'static, Box<dyn Any + Send>> {
59        fut
60    }
61
62    fn trace_block(
63        &self,
64        f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
65    ) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
66        f
67    }
68}
69
70/// A custom error type for tracer injection failures.
71#[derive(Debug)]
72pub enum JoinSetTracerError {
73    /// The global tracer has already been set.
74    AlreadySet,
75}
76
77impl Display for JoinSetTracerError {
78    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79        match self {
80            JoinSetTracerError::AlreadySet => {
81                write!(f, "The global JoinSetTracer is already set")
82            }
83        }
84    }
85}
86
87impl Error for JoinSetTracerError {}
88
89/// Global storage for an injected tracer. If no tracer is injected, a no-op
90/// tracer is used instead. This ensures that calls to [`trace_future`] or
91/// [`trace_block`] never panic due to missing instrumentation.
92static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new();
93
94/// A no-op tracer singleton that is returned by [`get_tracer`] if no custom
95/// tracer has been registered.
96static NOOP_TRACER: NoopTracer = NoopTracer;
97
98/// Return the currently registered tracer, or the no-op tracer if none was
99/// registered.
100#[inline]
101fn get_tracer() -> &'static dyn JoinSetTracer {
102    GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER)
103}
104
105/// Set the custom tracer for both futures and blocking closures.
106///
107/// This should be called once at startup. If called more than once, an
108/// `Err(JoinSetTracerError)` is returned. If not called at all, a no-op tracer that does nothing
109/// is used.
110pub fn set_join_set_tracer(
111    tracer: &'static dyn JoinSetTracer,
112) -> Result<(), JoinSetTracerError> {
113    GLOBAL_TRACER
114        .set(tracer)
115        .map_err(|_set_err| JoinSetTracerError::AlreadySet)
116}
117
118/// Optionally instruments a future with custom tracing.
119///
120/// If a tracer has been injected via `set_tracer`, the future's output is
121/// boxed (erasing its type), passed to the tracer, and then downcast back
122/// to the expected type. If no tracer is set, the original future is returned.
123///
124/// # Type Parameters
125/// * `T` - The concrete output type of the future.
126/// * `F` - The future type.
127///
128/// # Parameters
129/// * `future` - The future to potentially instrument.
130pub fn trace_future<T, F>(future: F) -> BoxFuture<'static, T>
131where
132    F: Future<Output = T> + Send + 'static,
133    T: Send + 'static,
134{
135    // Erase the future’s output type first:
136    let erased_future = async move {
137        let result = future.await;
138        Box::new(result) as Box<dyn Any + Send>
139    }
140    .boxed();
141
142    // Forward through the global tracer:
143    get_tracer()
144        .trace_future(erased_future)
145        // Downcast from `Box<dyn Any + Send>` back to `T`:
146        .map(|any_box| {
147            *any_box
148                .downcast::<T>()
149                .expect("Tracer must preserve the future’s output type!")
150        })
151        .boxed()
152}
153
154/// Optionally instruments a blocking closure with custom tracing.
155///
156/// If a tracer has been injected via `set_tracer`, the closure is wrapped so that
157/// its return value is boxed (erasing its type), passed to the tracer, and then the
158/// result is downcast back to the original type. If no tracer is set, the closure is
159/// returned unmodified (except for being boxed).
160///
161/// # Type Parameters
162/// * `T` - The concrete return type of the closure.
163/// * `F` - The closure type.
164///
165/// # Parameters
166/// * `f` - The blocking closure to potentially instrument.
167pub fn trace_block<T, F>(f: F) -> Box<dyn FnOnce() -> T + Send>
168where
169    F: FnOnce() -> T + Send + 'static,
170    T: Send + 'static,
171{
172    // Erase the closure’s return type first:
173    let erased_closure = Box::new(|| {
174        let result = f();
175        Box::new(result) as Box<dyn Any + Send>
176    });
177
178    // Forward through the global tracer:
179    let traced_closure = get_tracer().trace_block(erased_closure);
180
181    // Downcast from `Box<dyn Any + Send>` back to `T`:
182    Box::new(move || {
183        let any_box = traced_closure();
184        *any_box
185            .downcast::<T>()
186            .expect("Tracer must preserve the closure’s return type!")
187    })
188}