hds_console_subscriber/
lib.rs

1#![doc = include_str!("../README.md")]
2use console_api as proto;
3use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
4use serde::Serialize;
5use std::{
6    cell::RefCell,
7    fmt,
8    net::{IpAddr, Ipv4Addr},
9    sync::{
10        atomic::{AtomicUsize, Ordering},
11        Arc,
12    },
13    time::{Duration, Instant},
14};
15use thread_local::ThreadLocal;
16#[cfg(unix)]
17use tokio::net::UnixListener;
18use tokio::{
19    sync::{mpsc, oneshot},
20    task::JoinHandle,
21};
22#[cfg(unix)]
23use tokio_stream::wrappers::UnixListenerStream;
24use tracing_core::{
25    span::{self, Id},
26    subscriber::{self, Subscriber},
27    Metadata,
28};
29use tracing_subscriber::{
30    layer::Context,
31    registry::{Extensions, LookupSpan},
32    Layer,
33};
34
35mod aggregator;
36mod attribute;
37mod builder;
38mod callsites;
39mod record;
40mod stack;
41mod stats;
42pub(crate) mod sync;
43mod visitors;
44
45pub use aggregator::Aggregator;
46pub use builder::{Builder, ServerAddr};
47use callsites::Callsites;
48use record::Recorder;
49use stack::SpanStack;
50use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
51
52pub use builder::{init, spawn};
53
54use crate::visitors::{PollOpVisitor, StateUpdateVisitor};
55
56/// A [`ConsoleLayer`] is a [`tracing_subscriber::Layer`] that records [`tracing`]
57/// spans and events emitted by the async runtime.
58///
59/// Runtimes emit [`tracing`] spans and events that represent specific operations
60/// that occur in asynchronous Rust programs, such as spawning tasks and waker
61/// operations. The `ConsoleLayer` collects and aggregates these events, and the
62/// resulting diagnostic data is exported to clients by the corresponding gRPC
63/// [`Server`] instance.
64///
65/// [`tracing`]: https://docs.rs/tracing
66pub struct ConsoleLayer {
67    current_spans: ThreadLocal<RefCell<SpanStack>>,
68    tx: mpsc::Sender<Event>,
69    shared: Arc<Shared>,
70    /// When the channel capacity goes under this number, a flush in the aggregator
71    /// will be triggered.
72    flush_under_capacity: usize,
73
74    /// Set of callsites for spans representing spawned tasks.
75    ///
76    /// For task spans, each runtime these will have like, 1-5 callsites in it, max, so
77    /// 8 should be plenty. If several runtimes are in use, we may have to spill
78    /// over into the backup hashmap, but it's unlikely.
79    spawn_callsites: Callsites<8>,
80
81    /// Set of callsites for events representing waker operations.
82    ///
83    /// 16 is probably a reasonable number of waker ops; it's a bit generous if
84    /// there's only one async runtime library in use, but if there are multiple,
85    /// they might all have their own sets of waker ops.
86    waker_callsites: Callsites<16>,
87
88    /// Set of callsites for spans representing resources
89    ///
90    /// TODO: Take some time to determine more reasonable numbers
91    resource_callsites: Callsites<32>,
92
93    /// Set of callsites for spans representing async operations on resources
94    ///
95    /// TODO: Take some time to determine more reasonable numbers
96    async_op_callsites: Callsites<32>,
97
98    /// Set of callsites for spans representing async op poll operations
99    ///
100    /// TODO: Take some time to determine more reasonable numbers
101    async_op_poll_callsites: Callsites<32>,
102
103    /// Set of callsites for events representing poll operation invocations on resources
104    ///
105    /// TODO: Take some time to determine more reasonable numbers
106    poll_op_callsites: Callsites<32>,
107
108    /// Set of callsites for events representing state attribute state updates on resources
109    ///
110    /// TODO: Take some time to determine more reasonable numbers
111    resource_state_update_callsites: Callsites<32>,
112
113    /// Set of callsites for events representing state attribute state updates on async resource ops
114    ///
115    /// TODO: Take some time to determine more reasonable numbers
116    async_op_state_update_callsites: Callsites<32>,
117
118    /// A sink to record all events to a file.
119    recorder: Option<Recorder>,
120
121    /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
122    /// timestamp that can be sent over the wire or recorded to JSON.
123    base_time: stats::TimeAnchor,
124
125    /// Maximum value for the poll time histogram.
126    ///
127    /// By default, this is one second.
128    max_poll_duration_nanos: u64,
129
130    /// Maximum value for the scheduled time histogram.
131    ///
132    /// By default, this is one second.
133    max_scheduled_duration_nanos: u64,
134}
135
136/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
137///
138/// Client applications, such as the [`tokio-console` CLI][cli] connect to the gRPC
139/// server, and stream data about the runtime's history (such as a list of the
140/// currently active tasks, or statistics summarizing polling times). A [`Server`] also
141/// interprets commands from a client application, such a request to focus in on
142/// a specific task, and translates that into a stream of details specific to
143/// that task.
144///
145/// [wire]: https://docs.rs/console-api
146/// [cli]: https://crates.io/crates/tokio-console
147pub struct Server {
148    subscribe: mpsc::Sender<Command>,
149    addr: ServerAddr,
150    aggregator: Option<Aggregator>,
151    client_buffer: usize,
152}
153
154pub(crate) trait ToProto {
155    type Output;
156    fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output;
157}
158
159/// State shared between the `ConsoleLayer` and the `Aggregator` task.
160#[derive(Debug, Default)]
161struct Shared {
162    /// Used to notify the aggregator task when the event buffer should be
163    /// flushed.
164    flush: aggregator::Flush,
165
166    /// A counter of how many task events were dropped because the event buffer
167    /// was at capacity.
168    dropped_tasks: AtomicUsize,
169
170    /// A counter of how many async op events were dropped because the event buffer
171    /// was at capacity.
172    dropped_async_ops: AtomicUsize,
173
174    /// A counter of how many resource events were dropped because the event buffer
175    /// was at capacity.
176    dropped_resources: AtomicUsize,
177}
178
179struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
180
181enum Command {
182    Instrument(Watch<proto::instrument::Update>),
183    WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
184    Pause,
185    Resume,
186}
187
188struct WatchRequest<T> {
189    id: Id,
190    stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
191    buffer: usize,
192}
193
194#[derive(Debug)]
195enum Event {
196    Metadata(&'static Metadata<'static>),
197    Spawn {
198        id: span::Id,
199        metadata: &'static Metadata<'static>,
200        stats: Arc<stats::TaskStats>,
201        fields: Vec<proto::Field>,
202        location: Option<proto::Location>,
203    },
204    Resource {
205        id: span::Id,
206        parent_id: Option<span::Id>,
207        metadata: &'static Metadata<'static>,
208        concrete_type: String,
209        kind: resource::Kind,
210        location: Option<proto::Location>,
211        is_internal: bool,
212        stats: Arc<stats::ResourceStats>,
213    },
214    PollOp {
215        metadata: &'static Metadata<'static>,
216        resource_id: span::Id,
217        op_name: String,
218        async_op_id: span::Id,
219        task_id: span::Id,
220        is_ready: bool,
221    },
222    AsyncResourceOp {
223        id: span::Id,
224        parent_id: Option<span::Id>,
225        resource_id: span::Id,
226        metadata: &'static Metadata<'static>,
227        source: String,
228
229        stats: Arc<stats::AsyncOpStats>,
230    },
231}
232
233#[derive(Clone, Debug, Copy, Serialize)]
234enum WakeOp {
235    Wake { self_wake: bool },
236    WakeByRef { self_wake: bool },
237    Clone,
238    Drop,
239}
240
241/// Marker type used to indicate that a span is actually tracked by the console.
242#[derive(Debug)]
243struct Tracked {}
244
245impl ConsoleLayer {
246    /// Returns a `ConsoleLayer` built with the default settings.
247    ///
248    /// Note: these defaults do *not* include values provided via the
249    /// environment variables specified in [`Builder::with_default_env`].
250    ///
251    /// See also [`Builder::build`].
252    pub fn new() -> (Self, Server) {
253        Self::builder().build()
254    }
255
256    /// Returns a [`Builder`] for configuring a `ConsoleLayer`.
257    ///
258    /// Note that the returned builder does *not* include values provided via
259    /// the environment variables specified in [`Builder::with_default_env`].
260    /// To extract those, you can call that method on the returned builder.
261    pub fn builder() -> Builder {
262        Builder::default()
263    }
264
265    fn build(config: Builder) -> (Self, Server) {
266        // The `cfg` value *appears* to be a constant to clippy, but it changes
267        // depending on the build-time configuration...
268        #![allow(clippy::assertions_on_constants)]
269        assert!(
270            cfg!(any(tokio_unstable, console_without_tokio_unstable)),
271            "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
272        );
273
274        let base_time = stats::TimeAnchor::new();
275        tracing::debug!(
276            config.event_buffer_capacity,
277            config.client_buffer_capacity,
278            ?config.publish_interval,
279            ?config.retention,
280            ?config.server_addr,
281            ?config.recording_path,
282            ?config.filter_env_var,
283            ?config.poll_duration_max,
284            ?config.scheduled_duration_max,
285            ?base_time,
286            "configured console subscriber"
287        );
288        tracing::debug!("red");
289
290        let (tx, events) = mpsc::channel(config.event_buffer_capacity);
291        let (subscribe, rpcs) = mpsc::channel(256);
292        let shared = Arc::new(Shared::default());
293        let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone());
294        // Conservatively, start to trigger a flush when half the channel is full.
295        // This tries to reduce the chance of losing events to a full channel.
296        let flush_under_capacity = config.event_buffer_capacity / 2;
297        let recorder = config
298            .recording_path
299            .as_ref()
300            .map(|path| Recorder::new(path).expect("creating recorder"));
301        let server = Server {
302            aggregator: Some(aggregator),
303            addr: config.server_addr,
304            subscribe,
305            client_buffer: config.client_buffer_capacity,
306        };
307        let layer = Self {
308            current_spans: ThreadLocal::new(),
309            tx,
310            shared,
311            flush_under_capacity,
312            spawn_callsites: Callsites::default(),
313            waker_callsites: Callsites::default(),
314            resource_callsites: Callsites::default(),
315            async_op_callsites: Callsites::default(),
316            async_op_poll_callsites: Callsites::default(),
317            poll_op_callsites: Callsites::default(),
318            resource_state_update_callsites: Callsites::default(),
319            async_op_state_update_callsites: Callsites::default(),
320            recorder,
321            base_time,
322            max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
323            max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
324        };
325        (layer, server)
326    }
327}
328
329impl ConsoleLayer {
330    /// Default maximum capacity for the channel of events sent from a
331    /// [`ConsoleLayer`] to a [`Server`].
332    ///
333    /// When this capacity is exhausted, additional events will be dropped.
334    /// Decreasing this value will reduce memory usage, but may result in
335    /// events being dropped more frequently.
336    ///
337    /// See also [`Builder::event_buffer_capacity`].
338    pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 100;
339    /// Default maximum capacity for th echannel of events sent from a
340    /// [`Server`] to each subscribed client.
341    ///
342    /// When this capacity is exhausted, the client is assumed to be inactive,
343    /// and may be disconnected.
344    ///
345    /// See also [`Builder::client_buffer_capacity`].
346    pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
347
348    /// Default frequency for publishing events to clients.
349    ///
350    /// Note that methods like [`init`][`crate::init`] and [`spawn`][`crate::spawn`] will take the value
351    /// from the `TOKIO_CONSOLE_PUBLISH_INTERVAL` [environment variable] before falling
352    /// back on this default.
353    ///
354    /// See also [`Builder::publish_interval`].
355    ///
356    /// [environment variable]: `Builder::with_default_env`
357    pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);
358
359    /// By default, completed spans are retained for one hour.
360    ///
361    /// Note that methods like [`init`][`crate::init`] and
362    /// [`spawn`][`crate::spawn`] will take the value from the
363    /// `TOKIO_CONSOLE_RETENTION` [environment variable] before falling back on
364    /// this default.
365    ///
366    /// See also [`Builder::retention`].
367    ///
368    /// [environment variable]: `Builder::with_default_env`
369    pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
370
371    /// The default maximum value for task poll duration histograms.
372    ///
373    /// Any poll duration exceeding this will be clamped to this value. By
374    /// default, the maximum poll duration is one second.
375    ///
376    /// See also [`Builder::poll_duration_histogram_max`].
377    pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
378
379    /// The default maximum value for the task scheduled duration histogram.
380    ///
381    /// Any scheduled duration (the time from a task being woken until it is next
382    /// polled) exceeding this will be clamped to this value. By default, the
383    /// maximum scheduled duration is one second.
384    ///
385    /// See also [`Builder::scheduled_duration_histogram_max`].
386    pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);
387
388    fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
389        self.spawn_callsites.contains(meta)
390    }
391
392    fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
393        self.resource_callsites.contains(meta)
394    }
395
396    fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
397        self.async_op_callsites.contains(meta)
398    }
399
400    fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
401    where
402        S: Subscriber + for<'a> LookupSpan<'a>,
403    {
404        cx.span(id)
405            .map(|span| self.is_spawn(span.metadata()))
406            .unwrap_or(false)
407    }
408
409    fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
410    where
411        S: Subscriber + for<'a> LookupSpan<'a>,
412    {
413        cx.span(id)
414            .map(|span| self.is_resource(span.metadata()))
415            .unwrap_or(false)
416    }
417
418    fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
419    where
420        S: Subscriber + for<'a> LookupSpan<'a>,
421    {
422        cx.span(id)
423            .map(|span| self.is_async_op(span.metadata()))
424            .unwrap_or(false)
425    }
426
427    fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
428    where
429        P: Fn(&span::Id) -> bool,
430    {
431        stack
432            .stack()
433            .iter()
434            .rev()
435            .find(|id| p(id.id()))
436            .map(|id| id.id())
437            .cloned()
438    }
439
440    fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
441        self.send_stats(dropped, move || (event, ())).is_some()
442    }
443
444    fn send_stats<S>(
445        &self,
446        dropped: &AtomicUsize,
447        mk_event: impl FnOnce() -> (Event, S),
448    ) -> Option<S> {
449        use mpsc::error::TrySendError;
450
451        // Return whether or not we actually sent the event.
452        let sent = match self.tx.try_reserve() {
453            Ok(permit) => {
454                let (event, stats) = mk_event();
455                permit.send(event);
456                Some(stats)
457            }
458            Err(TrySendError::Closed(_)) => {
459                // we should warn here eventually, but nop for now because we
460                // can't trigger tracing events...
461                None
462            }
463            Err(TrySendError::Full(_)) => {
464                // this shouldn't happen, since we trigger a flush when
465                // approaching the high water line...but if the executor wait
466                // time is very high, maybe the aggregator task hasn't been
467                // polled yet. so... eek?!
468                dropped.fetch_add(1, Ordering::Release);
469                None
470            }
471        };
472
473        let capacity = self.tx.capacity();
474        if capacity <= self.flush_under_capacity {
475            self.shared.flush.trigger();
476        }
477
478        sent
479    }
480
481    fn record(&self, event: impl FnOnce() -> record::Event) {
482        if let Some(ref recorder) = self.recorder {
483            recorder.record(event());
484        }
485    }
486
487    fn state_update<S>(
488        &self,
489        id: &Id,
490        event: &tracing::Event<'_>,
491        ctx: &Context<'_, S>,
492        get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
493    ) where
494        S: Subscriber + for<'a> LookupSpan<'a>,
495    {
496        let meta_id = event.metadata().into();
497        let mut state_update_visitor = StateUpdateVisitor::new(meta_id);
498        event.record(&mut state_update_visitor);
499
500        let update = match state_update_visitor.result() {
501            Some(update) => update,
502            None => return,
503        };
504
505        let span = match ctx.span(id) {
506            Some(span) => span,
507            // XXX(eliza): no span exists for a resource ID, we should maybe
508            // record an error here...
509            None => return,
510        };
511
512        let exts = span.extensions();
513        let stats = match get_stats(&exts) {
514            Some(stats) => stats,
515            // XXX(eliza): a resource span was not a resource??? this is a bug
516            None => return,
517        };
518
519        stats.update_attribute(id, &update);
520
521        if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
522            let exts = parent.extensions();
523            if let Some(stats) = get_stats(&exts) {
524                if stats.inherit_child_attributes {
525                    stats.update_attribute(id, &update);
526                }
527            }
528        }
529    }
530}
531
532impl<S> Layer<S> for ConsoleLayer
533where
534    S: Subscriber + for<'a> LookupSpan<'a>,
535{
536    fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
537        let dropped = match (meta.name(), meta.target()) {
538            ("runtime.spawn", _) | ("task", "tokio::task") => {
539                self.spawn_callsites.insert(meta);
540                &self.shared.dropped_tasks
541            }
542            (_, "runtime::waker") | (_, "tokio::task::waker") => {
543                self.waker_callsites.insert(meta);
544                &self.shared.dropped_tasks
545            }
546            (ResourceVisitor::RES_SPAN_NAME, _) => {
547                self.resource_callsites.insert(meta);
548                &self.shared.dropped_resources
549            }
550            (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
551                self.async_op_callsites.insert(meta);
552                &self.shared.dropped_async_ops
553            }
554            ("runtime.resource.async_op.poll", _) => {
555                self.async_op_poll_callsites.insert(meta);
556                &self.shared.dropped_async_ops
557            }
558            (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
559                self.poll_op_callsites.insert(meta);
560                &self.shared.dropped_async_ops
561            }
562            (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
563                self.resource_state_update_callsites.insert(meta);
564                &self.shared.dropped_resources
565            }
566            (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
567                self.async_op_state_update_callsites.insert(meta);
568                &self.shared.dropped_async_ops
569            }
570            (_, _) => &self.shared.dropped_tasks,
571        };
572
573        self.send_metadata(dropped, Event::Metadata(meta));
574        subscriber::Interest::always()
575    }
576
577    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
578        let metadata = attrs.metadata();
579        if self.is_spawn(metadata) {
580            let at = Instant::now();
581            let mut task_visitor = TaskVisitor::new(metadata.into());
582            attrs.record(&mut task_visitor);
583            let (fields, location) = task_visitor.result();
584            self.record(|| record::Event::Spawn {
585                id: id.into_u64(),
586                at: self.base_time.to_system_time(at),
587                fields: record::SerializeFields(fields.clone()),
588            });
589            if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
590                let stats = Arc::new(stats::TaskStats::new(
591                    self.max_poll_duration_nanos,
592                    self.max_scheduled_duration_nanos,
593                    at,
594                ));
595                let event = Event::Spawn {
596                    id: id.clone(),
597                    stats: stats.clone(),
598                    metadata,
599                    fields,
600                    location,
601                };
602                (event, stats)
603            }) {
604                ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
605            }
606            return;
607        }
608
609        if self.is_resource(metadata) {
610            let at = Instant::now();
611            let mut resource_visitor = ResourceVisitor::default();
612            attrs.record(&mut resource_visitor);
613            if let Some(result) = resource_visitor.result() {
614                let ResourceVisitorResult {
615                    concrete_type,
616                    kind,
617                    location,
618                    is_internal,
619                    inherit_child_attrs,
620                } = result;
621                let parent_id = self.current_spans.get().and_then(|stack| {
622                    self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
623                });
624                if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
625                    let stats = Arc::new(stats::ResourceStats::new(
626                        at,
627                        inherit_child_attrs,
628                        parent_id.clone(),
629                    ));
630                    let event = Event::Resource {
631                        id: id.clone(),
632                        parent_id,
633                        metadata,
634                        concrete_type,
635                        kind,
636                        location,
637                        is_internal,
638                        stats: stats.clone(),
639                    };
640                    (event, stats)
641                }) {
642                    ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
643                }
644            }
645            return;
646        }
647
648        if self.is_async_op(metadata) {
649            let at = Instant::now();
650            let mut async_op_visitor = AsyncOpVisitor::default();
651            attrs.record(&mut async_op_visitor);
652            if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
653                let resource_id = self.current_spans.get().and_then(|stack| {
654                    self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
655                });
656
657                let parent_id = self.current_spans.get().and_then(|stack| {
658                    self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
659                });
660
661                if let Some(resource_id) = resource_id {
662                    if let Some(stats) =
663                        self.send_stats(&self.shared.dropped_async_ops, move || {
664                            let stats = Arc::new(stats::AsyncOpStats::new(
665                                at,
666                                inherit_child_attrs,
667                                parent_id.clone(),
668                            ));
669                            let event = Event::AsyncResourceOp {
670                                id: id.clone(),
671                                parent_id,
672                                resource_id,
673                                metadata,
674                                source,
675                                stats: stats.clone(),
676                            };
677                            (event, stats)
678                        })
679                    {
680                        ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
681                    }
682                }
683            }
684        }
685    }
686
687    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
688        let metadata = event.metadata();
689        if self.waker_callsites.contains(metadata) {
690            let at = Instant::now();
691            let mut visitor = WakerVisitor::default();
692            event.record(&mut visitor);
693            // XXX (eliza): ew...
694            if let Some((id, mut op)) = visitor.result() {
695                if let Some(span) = ctx.span(&id) {
696                    let exts = span.extensions();
697                    if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
698                        if op.is_wake() {
699                            // Are we currently inside the task's span? If so, the task
700                            // has woken itself.
701
702                            let self_wake = self
703                                .current_spans
704                                .get()
705                                .map(|spans| spans.borrow().iter().any(|span| span == &id))
706                                .unwrap_or(false);
707                            op = op.self_wake(self_wake);
708                        }
709
710                        stats.record_wake_op(op, at);
711                        self.record(|| record::Event::Waker {
712                            id: id.into_u64(),
713                            at: self.base_time.to_system_time(at),
714                            op,
715                        });
716                    }
717                }
718            }
719            return;
720        }
721
722        if self.poll_op_callsites.contains(metadata) {
723            let resource_id = self.current_spans.get().and_then(|stack| {
724                self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
725            });
726            // poll op event should have a resource span parent
727            if let Some(resource_id) = resource_id {
728                let mut poll_op_visitor = PollOpVisitor::default();
729                event.record(&mut poll_op_visitor);
730                if let Some((op_name, is_ready)) = poll_op_visitor.result() {
731                    let task_and_async_op_ids = self.current_spans.get().and_then(|stack| {
732                        let stack = stack.borrow();
733                        let task_id =
734                            self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?;
735                        let async_op_id =
736                            self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?;
737                        Some((task_id, async_op_id))
738                    });
739                    // poll op event should be emitted in the context of an async op and task spans
740                    if let Some((task_id, async_op_id)) = task_and_async_op_ids {
741                        if let Some(span) = ctx.span(&async_op_id) {
742                            let exts = span.extensions();
743                            if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
744                                stats.set_task_id(&task_id);
745                            }
746                        }
747
748                        self.send_stats(&self.shared.dropped_async_ops, || {
749                            let event = Event::PollOp {
750                                metadata,
751                                op_name,
752                                resource_id,
753                                async_op_id,
754                                task_id,
755                                is_ready,
756                            };
757                            (event, ())
758                        });
759
760                        // TODO: JSON recorder doesn't care about poll ops.
761                    }
762                }
763            }
764            return;
765        }
766
767        if self.resource_state_update_callsites.contains(metadata) {
768            // state update event should have a resource span parent
769            let resource_id = self.current_spans.get().and_then(|stack| {
770                self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
771            });
772            if let Some(id) = resource_id {
773                self.state_update(&id, event, &ctx, |exts| {
774                    exts.get::<Arc<stats::ResourceStats>>()
775                        .map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
776                });
777            }
778
779            return;
780        }
781
782        if self.async_op_state_update_callsites.contains(metadata) {
783            let async_op_id = self.current_spans.get().and_then(|stack| {
784                self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
785            });
786            if let Some(id) = async_op_id {
787                self.state_update(&id, event, &ctx, |exts| {
788                    let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
789                    Some(&async_op.stats)
790                });
791            }
792        }
793    }
794
795    fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
796        if let Some(span) = cx.span(id) {
797            let now = Instant::now();
798            let exts = span.extensions();
799            // if the span we are entering is a task or async op, record the
800            // poll stats.
801            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
802                stats.start_poll(now);
803            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
804                stats.start_poll(now);
805            } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
806                // otherwise, is the span a resource? in that case, we also want
807                // to enter it, although we don't care about recording poll
808                // stats.
809            } else {
810                return;
811            };
812
813            self.current_spans
814                .get_or_default()
815                .borrow_mut()
816                .push(id.clone());
817
818            self.record(|| record::Event::Enter {
819                id: id.into_u64(),
820                at: self.base_time.to_system_time(now),
821            });
822        }
823    }
824
825    fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
826        if let Some(span) = cx.span(id) {
827            let exts = span.extensions();
828            let now = Instant::now();
829            // if the span we are entering is a task or async op, record the
830            // poll stats.
831            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
832                stats.end_poll(now);
833            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
834                stats.end_poll(now);
835            } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
836                // otherwise, is the span a resource? in that case, we also want
837                // to enter it, although we don't care about recording poll
838                // stats.
839            } else {
840                return;
841            };
842
843            self.current_spans.get_or_default().borrow_mut().pop(id);
844
845            self.record(|| record::Event::Exit {
846                id: id.into_u64(),
847                at: self.base_time.to_system_time(now),
848            });
849        }
850    }
851
852    fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
853        if let Some(span) = cx.span(&id) {
854            let now = Instant::now();
855            let exts = span.extensions();
856            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
857                stats.drop_task(now);
858            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
859                stats.drop_async_op(now);
860            } else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
861                stats.drop_resource(now);
862            }
863            self.record(|| record::Event::Close {
864                id: id.into_u64(),
865                at: self.base_time.to_system_time(now),
866            });
867        }
868    }
869}
870
871impl fmt::Debug for ConsoleLayer {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        f.debug_struct("ConsoleLayer")
874            // mpsc::Sender debug impl is not very useful
875            .field("tx", &format_args!("<...>"))
876            .field("tx.capacity", &self.tx.capacity())
877            .field("shared", &self.shared)
878            .field("spawn_callsites", &self.spawn_callsites)
879            .field("waker_callsites", &self.waker_callsites)
880            .finish()
881    }
882}
883
884impl Server {
885    // XXX(eliza): why is `SocketAddr::new` not `const`???
886    /// A [`Server`] by default binds socket address 127.0.0.1 to service remote
887    /// procedure calls.
888    ///
889    /// Note that methods like [`init`][`crate::init`] and
890    /// [`spawn`][`crate::spawn`] will parse the socket address from the
891    /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
892    /// constructing a socket address from this default.
893    ///
894    /// See also [`Builder::server_addr`].
895    ///
896    /// [environment variable]: `Builder::with_default_env`
897    pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
898
899    /// A [`Server`] by default binds port 6669 to service remote procedure
900    /// calls.
901    ///
902    /// Note that methods like [`init`][`crate::init`] and
903    /// [`spawn`][`crate::spawn`] will parse the socket address from the
904    /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
905    /// constructing a socket address from this default.
906    ///
907    /// See also [`Builder::server_addr`].
908    ///
909    /// [environment variable]: `Builder::with_default_env`
910    pub const DEFAULT_PORT: u16 = 6669;
911
912    /// Starts the gRPC service with the default gRPC settings.
913    ///
914    /// To configure gRPC server settings before starting the server, use
915    /// [`serve_with`] instead. This method is equivalent to calling [`serve_with`]
916    /// and providing the default gRPC server settings:
917    ///
918    /// ```rust
919    /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
920    /// # let (_, server) = hds_console_subscriber::ConsoleLayer::new();
921    /// server.serve_with(tonic::transport::Server::default()).await
922    /// # }
923    /// ```
924    /// [`serve_with`]: Server::serve_with
925    pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
926        self.serve_with(tonic::transport::Server::default()).await
927    }
928
929    /// Starts the gRPC service with the given [`tonic`] gRPC transport server
930    /// `builder`.
931    ///
932    /// The `builder` parameter may be used to configure gRPC-specific settings
933    /// prior to starting the server.
934    ///
935    /// This spawns both the server task and the event aggregation worker
936    /// task on the current async runtime.
937    ///
938    /// [`tonic`]: https://docs.rs/tonic/
939    pub async fn serve_with(
940        self,
941        mut builder: tonic::transport::Server,
942    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
943        let addr = self.addr.clone();
944        let ServerParts {
945            instrument_server,
946            aggregator,
947        } = self.into_parts();
948        let aggregate = spawn_named(aggregator.run(), "console::aggregate");
949        let router = builder.add_service(instrument_server);
950        let res = match addr {
951            ServerAddr::Tcp(addr) => {
952                let serve = router.serve(addr);
953                spawn_named(serve, "console::serve").await
954            }
955            #[cfg(unix)]
956            ServerAddr::Unix(path) => {
957                let incoming = UnixListener::bind(path)?;
958                let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
959                spawn_named(serve, "console::serve").await
960            }
961        };
962        aggregate.abort();
963        res?.map_err(Into::into)
964    }
965
966    /// Starts the gRPC service with the default gRPC settings and gRPC-Web
967    /// support.
968    ///
969    /// # Examples
970    ///
971    /// To serve the instrument server with gRPC-Web support with the default
972    /// settings:
973    ///
974    /// ```rust
975    /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
976    /// # let (_, server) = hds_console_subscriber::ConsoleLayer::new();
977    /// server.serve_with_grpc_web(tonic::transport::Server::default()).await
978    /// # }
979    /// ```
980    ///
981    /// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the
982    /// following code:
983    ///
984    /// ```rust
985    /// # use std::{thread, time::Duration};
986    /// #
987    /// use hds_console_subscriber::{ConsoleLayer, ServerParts};
988    /// use tonic_web::GrpcWebLayer;
989    /// use tower_web::cors::{CorsLayer, AllowOrigin};
990    /// use http::header::HeaderName;
991    /// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
992    /// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
993    /// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
994    /// #    ["grpc-status", "grpc-message", "grpc-status-details-bin"];
995    /// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
996    /// #    "x-grpc-web",
997    /// #    "content-type",
998    /// #    "x-user-agent",
999    /// #    "grpc-timeout",
1000    /// #    "user-agent",
1001    /// # ];
1002    ///
1003    /// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
1004    /// # thread::Builder::new()
1005    /// #    .name("subscriber".into())
1006    /// #    .spawn(move || {
1007    /// // Customize the CORS configuration.
1008    /// let cors = CorsLayer::new()
1009    ///     .allow_origin(AllowOrigin::mirror_request())
1010    ///     .allow_credentials(true)
1011    ///     .max_age(DEFAULT_MAX_AGE)
1012    ///     .expose_headers(
1013    ///         DEFAULT_EXPOSED_HEADERS
1014    ///             .iter()
1015    ///             .cloned()
1016    ///             .map(HeaderName::from_static)
1017    ///             .collect::<Vec<HeaderName>>(),
1018    ///     )
1019    ///     .allow_headers(
1020    ///         DEFAULT_ALLOW_HEADERS
1021    ///             .iter()
1022    ///             .cloned()
1023    ///             .map(HeaderName::from_static)
1024    ///             .collect::<Vec<HeaderName>>(),
1025    ///     );
1026    /// #       let runtime = tokio::runtime::Builder::new_current_thread()
1027    /// #           .enable_all()
1028    /// #           .build()
1029    /// #           .expect("console subscriber runtime initialization failed");
1030    /// #       runtime.block_on(async move {
1031    ///
1032    /// let ServerParts {
1033    ///     instrument_server,
1034    ///     aggregator,
1035    ///     ..
1036    /// } = server.into_parts();
1037    /// tokio::spawn(aggregator.run());
1038    ///
1039    /// // Serve the instrument server with gRPC-Web support and the CORS configuration.
1040    /// let router = tonic::transport::Server::builder()
1041    ///     .accept_http1(true)
1042    ///     .layer(cors)
1043    ///     .layer(GrpcWebLayer::new())
1044    ///     .add_service(instrument_server);
1045    /// let serve = router.serve(std::net::SocketAddr::new(
1046    ///     std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1047    ///     // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
1048    ///     9999,
1049    /// ));
1050    ///
1051    /// // Finally, spawn the server.
1052    /// serve.await.expect("console subscriber server failed");
1053    /// #       });
1054    /// #   })
1055    /// #   .expect("console subscriber could not spawn thread");
1056    /// # tracing_subscriber::registry().with(console_layer).init();
1057    /// ```
1058    ///
1059    /// For a comprehensive understanding and complete code example,
1060    /// please refer to the `grpc-web` example in the examples directory.
1061    ///
1062    /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1063    #[cfg(feature = "grpc-web")]
1064    pub async fn serve_with_grpc_web(
1065        self,
1066        builder: tonic::transport::Server,
1067    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
1068        let addr = self.addr.clone();
1069        let ServerParts {
1070            instrument_server,
1071            aggregator,
1072        } = self.into_parts();
1073        let router = builder
1074            .accept_http1(true)
1075            .add_service(tonic_web::enable(instrument_server));
1076        let aggregate = spawn_named(aggregator.run(), "console::aggregate");
1077        let res = match addr {
1078            ServerAddr::Tcp(addr) => {
1079                let serve = router.serve(addr);
1080                spawn_named(serve, "console::serve").await
1081            }
1082            #[cfg(unix)]
1083            ServerAddr::Unix(path) => {
1084                let incoming = UnixListener::bind(path)?;
1085                let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
1086                spawn_named(serve, "console::serve").await
1087            }
1088        };
1089        aggregate.abort();
1090        res?.map_err(Into::into)
1091    }
1092
1093    /// Returns the parts needed to spawn a gRPC server and the aggregator that
1094    /// supplies it.
1095    ///
1096    /// Note that a server spawned in this way will disregard any value set by
1097    /// [`Builder::server_addr`], as the user becomes responsible for defining
1098    /// the address when calling [`Router::serve`].
1099    ///
1100    /// Additionally, the user of this API must ensure that the [`Aggregator`]
1101    /// is running for as long as the gRPC server is. If the server stops
1102    /// running, the aggregator task can be aborted.
1103    ///
1104    /// # Examples
1105    ///
1106    /// The parts can be used to serve the instrument server together with
1107    /// other endpoints from the same gRPC server.
1108    ///
1109    /// ```
1110    /// use hds_console_subscriber::{ConsoleLayer, ServerParts};
1111    ///
1112    /// # let runtime = tokio::runtime::Builder::new_current_thread()
1113    /// #     .enable_all()
1114    /// #     .build()
1115    /// #     .unwrap();
1116    /// # runtime.block_on(async {
1117    /// let (console_layer, server) = ConsoleLayer::builder().build();
1118    /// let ServerParts {
1119    ///     instrument_server,
1120    ///     aggregator,
1121    ///     ..
1122    /// } = server.into_parts();
1123    ///
1124    /// let aggregator_handle = tokio::spawn(aggregator.run());
1125    /// let router = tonic::transport::Server::builder()
1126    ///     //.add_service(some_other_service)
1127    ///     .add_service(instrument_server);
1128    /// let serve = router.serve(std::net::SocketAddr::new(
1129    ///     std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1130    ///     6669,
1131    /// ));
1132    ///
1133    /// // Finally, spawn the server.
1134    /// tokio::spawn(serve);
1135    /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1136    /// # drop(console_layer);
1137    /// # let mut aggregator_handle = aggregator_handle;
1138    /// # aggregator_handle.abort();
1139    /// # });
1140    /// ```
1141    ///
1142    /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1143    pub fn into_parts(mut self) -> ServerParts {
1144        let aggregator = self
1145            .aggregator
1146            .take()
1147            .expect("cannot start server multiple times");
1148
1149        let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1150
1151        ServerParts {
1152            instrument_server,
1153            aggregator,
1154        }
1155    }
1156}
1157
1158/// Server Parts
1159///
1160/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1161/// further parts in the future, an as such is marked as `non_exhaustive`.
1162///
1163/// The `InstrumentServer<Server>` can be used to construct a router which
1164/// can be added to a [`tonic`] gRPC server.
1165///
1166/// The `aggregator` is a future which should be running as long as the server is.
1167/// Generally, this future should be spawned onto an appropriate runtime and then
1168/// aborted if the server gets shut down.
1169///
1170/// See the [`Server::into_parts`] documentation for usage.
1171#[non_exhaustive]
1172pub struct ServerParts {
1173    /// The instrument server.
1174    ///
1175    /// See the documentation for [`InstrumentServer`] for details.
1176    pub instrument_server: InstrumentServer<Server>,
1177
1178    /// The aggregator.
1179    ///
1180    /// Responsible for collecting and preparing traces for the instrument server
1181    /// to send its clients.
1182    ///
1183    /// The aggregator should be [`run`] when the instrument server is started.
1184    /// If the server stops running for any reason, the aggregator task can be
1185    /// aborted.
1186    ///
1187    /// [`run`]: fn@crate::Aggregator::run
1188    pub aggregator: Aggregator,
1189}
1190
1191/// Aggregator handle.
1192///
1193/// This object is returned from [`Server::into_parts`]. It can be
1194/// used to abort the aggregator task.
1195///
1196/// The aggregator collects the traces that implement the async runtime
1197/// being observed and prepares them to be served by the gRPC server.
1198///
1199/// Normally, if the server, started with [`Server::serve`] or
1200/// [`Server::serve_with`] stops for any reason, the aggregator is aborted,
1201/// hoewver, if the server was started with the [`InstrumentServer`] returned
1202/// from [`Server::into_parts`], then it is the responsibility of the user
1203/// of the API to stop the aggregator task by calling [`abort`] on this
1204/// object.
1205///
1206/// [`abort`]: fn@crate::AggregatorHandle::abort
1207pub struct AggregatorHandle {
1208    join_handle: JoinHandle<()>,
1209}
1210
1211impl AggregatorHandle {
1212    /// Aborts the task running this aggregator.
1213    ///
1214    /// To avoid having a disconnected aggregator running forever, this
1215    /// method should be called when the [`tonic::transport::Server`] started
1216    /// with the [`InstrumentServer`] also returned from [`Server::into_parts`]
1217    /// stops running.
1218    pub fn abort(&mut self) {
1219        self.join_handle.abort();
1220    }
1221}
1222
1223#[tonic::async_trait]
1224impl proto::instrument::instrument_server::Instrument for Server {
1225    type WatchUpdatesStream =
1226        tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
1227    type WatchTaskDetailsStream =
1228        tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
1229    async fn watch_updates(
1230        &self,
1231        req: tonic::Request<proto::instrument::InstrumentRequest>,
1232    ) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
1233        match req.remote_addr() {
1234            Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
1235            None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
1236        }
1237        let permit = self.subscribe.reserve().await.map_err(|_| {
1238            tonic::Status::internal("cannot start new watch, aggregation task is not running")
1239        })?;
1240        let (tx, rx) = mpsc::channel(self.client_buffer);
1241        permit.send(Command::Instrument(Watch(tx)));
1242        tracing::debug!("watch started");
1243        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1244        Ok(tonic::Response::new(stream))
1245    }
1246
1247    async fn watch_task_details(
1248        &self,
1249        req: tonic::Request<proto::instrument::TaskDetailsRequest>,
1250    ) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
1251        let task_id = req
1252            .into_inner()
1253            .id
1254            .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
1255            .id;
1256
1257        // `tracing` reserves span ID 0 for niche optimization for `Option<Id>`.
1258        let id = std::num::NonZeroU64::new(task_id)
1259            .map(Id::from_non_zero_u64)
1260            .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
1261
1262        let permit = self.subscribe.reserve().await.map_err(|_| {
1263            tonic::Status::internal("cannot start new watch, aggregation task is not running")
1264        })?;
1265
1266        // Check with the aggregator task to request a stream if the task exists.
1267        let (stream_sender, stream_recv) = oneshot::channel();
1268        permit.send(Command::WatchTaskDetail(WatchRequest {
1269            id,
1270            stream_sender,
1271            buffer: self.client_buffer,
1272        }));
1273        // If the aggregator drops the sender, the task doesn't exist.
1274        let rx = stream_recv.await.map_err(|_| {
1275            tracing::warn!(id = ?task_id, "requested task not found");
1276            tonic::Status::not_found("task not found")
1277        })?;
1278
1279        tracing::debug!(id = ?task_id, "task details watch started");
1280        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1281        Ok(tonic::Response::new(stream))
1282    }
1283
1284    async fn pause(
1285        &self,
1286        _req: tonic::Request<proto::instrument::PauseRequest>,
1287    ) -> Result<tonic::Response<proto::instrument::PauseResponse>, tonic::Status> {
1288        self.subscribe.send(Command::Pause).await.map_err(|_| {
1289            tonic::Status::internal("cannot pause, aggregation task is not running")
1290        })?;
1291        Ok(tonic::Response::new(proto::instrument::PauseResponse {}))
1292    }
1293
1294    async fn resume(
1295        &self,
1296        _req: tonic::Request<proto::instrument::ResumeRequest>,
1297    ) -> Result<tonic::Response<proto::instrument::ResumeResponse>, tonic::Status> {
1298        self.subscribe.send(Command::Resume).await.map_err(|_| {
1299            tonic::Status::internal("cannot resume, aggregation task is not running")
1300        })?;
1301        Ok(tonic::Response::new(proto::instrument::ResumeResponse {}))
1302    }
1303}
1304
1305impl WakeOp {
1306    /// Returns `true` if `self` is a `Wake` or `WakeByRef` event.
1307    fn is_wake(self) -> bool {
1308        matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
1309    }
1310
1311    fn self_wake(self, self_wake: bool) -> Self {
1312        match self {
1313            Self::Wake { .. } => Self::Wake { self_wake },
1314            Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
1315            x => x,
1316        }
1317    }
1318}
1319
1320#[track_caller]
1321pub(crate) fn spawn_named<T>(
1322    task: impl std::future::Future<Output = T> + Send + 'static,
1323    _name: &str,
1324) -> tokio::task::JoinHandle<T>
1325where
1326    T: Send + 'static,
1327{
1328    #[cfg(tokio_unstable)]
1329    return tokio::task::Builder::new().name(_name).spawn(task).unwrap();
1330
1331    #[cfg(not(tokio_unstable))]
1332    tokio::spawn(task)
1333}