hds_console_subscriber/lib.rs
1#![doc = include_str!("../README.md")]
2use console_api as proto;
3use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
4use serde::Serialize;
5use std::{
6 cell::RefCell,
7 fmt,
8 net::{IpAddr, Ipv4Addr},
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12 },
13 time::{Duration, Instant},
14};
15use thread_local::ThreadLocal;
16#[cfg(unix)]
17use tokio::net::UnixListener;
18use tokio::{
19 sync::{mpsc, oneshot},
20 task::JoinHandle,
21};
22#[cfg(unix)]
23use tokio_stream::wrappers::UnixListenerStream;
24use tracing_core::{
25 span::{self, Id},
26 subscriber::{self, Subscriber},
27 Metadata,
28};
29use tracing_subscriber::{
30 layer::Context,
31 registry::{Extensions, LookupSpan},
32 Layer,
33};
34
35mod aggregator;
36mod attribute;
37mod builder;
38mod callsites;
39mod record;
40mod stack;
41mod stats;
42pub(crate) mod sync;
43mod visitors;
44
45pub use aggregator::Aggregator;
46pub use builder::{Builder, ServerAddr};
47use callsites::Callsites;
48use record::Recorder;
49use stack::SpanStack;
50use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
51
52pub use builder::{init, spawn};
53
54use crate::visitors::{PollOpVisitor, StateUpdateVisitor};
55
56/// A [`ConsoleLayer`] is a [`tracing_subscriber::Layer`] that records [`tracing`]
57/// spans and events emitted by the async runtime.
58///
59/// Runtimes emit [`tracing`] spans and events that represent specific operations
60/// that occur in asynchronous Rust programs, such as spawning tasks and waker
61/// operations. The `ConsoleLayer` collects and aggregates these events, and the
62/// resulting diagnostic data is exported to clients by the corresponding gRPC
63/// [`Server`] instance.
64///
65/// [`tracing`]: https://docs.rs/tracing
66pub struct ConsoleLayer {
67 current_spans: ThreadLocal<RefCell<SpanStack>>,
68 tx: mpsc::Sender<Event>,
69 shared: Arc<Shared>,
70 /// When the channel capacity goes under this number, a flush in the aggregator
71 /// will be triggered.
72 flush_under_capacity: usize,
73
74 /// Set of callsites for spans representing spawned tasks.
75 ///
76 /// For task spans, each runtime these will have like, 1-5 callsites in it, max, so
77 /// 8 should be plenty. If several runtimes are in use, we may have to spill
78 /// over into the backup hashmap, but it's unlikely.
79 spawn_callsites: Callsites<8>,
80
81 /// Set of callsites for events representing waker operations.
82 ///
83 /// 16 is probably a reasonable number of waker ops; it's a bit generous if
84 /// there's only one async runtime library in use, but if there are multiple,
85 /// they might all have their own sets of waker ops.
86 waker_callsites: Callsites<16>,
87
88 /// Set of callsites for spans representing resources
89 ///
90 /// TODO: Take some time to determine more reasonable numbers
91 resource_callsites: Callsites<32>,
92
93 /// Set of callsites for spans representing async operations on resources
94 ///
95 /// TODO: Take some time to determine more reasonable numbers
96 async_op_callsites: Callsites<32>,
97
98 /// Set of callsites for spans representing async op poll operations
99 ///
100 /// TODO: Take some time to determine more reasonable numbers
101 async_op_poll_callsites: Callsites<32>,
102
103 /// Set of callsites for events representing poll operation invocations on resources
104 ///
105 /// TODO: Take some time to determine more reasonable numbers
106 poll_op_callsites: Callsites<32>,
107
108 /// Set of callsites for events representing state attribute state updates on resources
109 ///
110 /// TODO: Take some time to determine more reasonable numbers
111 resource_state_update_callsites: Callsites<32>,
112
113 /// Set of callsites for events representing state attribute state updates on async resource ops
114 ///
115 /// TODO: Take some time to determine more reasonable numbers
116 async_op_state_update_callsites: Callsites<32>,
117
118 /// A sink to record all events to a file.
119 recorder: Option<Recorder>,
120
121 /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
122 /// timestamp that can be sent over the wire or recorded to JSON.
123 base_time: stats::TimeAnchor,
124
125 /// Maximum value for the poll time histogram.
126 ///
127 /// By default, this is one second.
128 max_poll_duration_nanos: u64,
129
130 /// Maximum value for the scheduled time histogram.
131 ///
132 /// By default, this is one second.
133 max_scheduled_duration_nanos: u64,
134}
135
136/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
137///
138/// Client applications, such as the [`tokio-console` CLI][cli] connect to the gRPC
139/// server, and stream data about the runtime's history (such as a list of the
140/// currently active tasks, or statistics summarizing polling times). A [`Server`] also
141/// interprets commands from a client application, such a request to focus in on
142/// a specific task, and translates that into a stream of details specific to
143/// that task.
144///
145/// [wire]: https://docs.rs/console-api
146/// [cli]: https://crates.io/crates/tokio-console
147pub struct Server {
148 subscribe: mpsc::Sender<Command>,
149 addr: ServerAddr,
150 aggregator: Option<Aggregator>,
151 client_buffer: usize,
152}
153
154pub(crate) trait ToProto {
155 type Output;
156 fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output;
157}
158
159/// State shared between the `ConsoleLayer` and the `Aggregator` task.
160#[derive(Debug, Default)]
161struct Shared {
162 /// Used to notify the aggregator task when the event buffer should be
163 /// flushed.
164 flush: aggregator::Flush,
165
166 /// A counter of how many task events were dropped because the event buffer
167 /// was at capacity.
168 dropped_tasks: AtomicUsize,
169
170 /// A counter of how many async op events were dropped because the event buffer
171 /// was at capacity.
172 dropped_async_ops: AtomicUsize,
173
174 /// A counter of how many resource events were dropped because the event buffer
175 /// was at capacity.
176 dropped_resources: AtomicUsize,
177}
178
179struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
180
181enum Command {
182 Instrument(Watch<proto::instrument::Update>),
183 WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
184 Pause,
185 Resume,
186}
187
188struct WatchRequest<T> {
189 id: Id,
190 stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
191 buffer: usize,
192}
193
194#[derive(Debug)]
195enum Event {
196 Metadata(&'static Metadata<'static>),
197 Spawn {
198 id: span::Id,
199 metadata: &'static Metadata<'static>,
200 stats: Arc<stats::TaskStats>,
201 fields: Vec<proto::Field>,
202 location: Option<proto::Location>,
203 },
204 Resource {
205 id: span::Id,
206 parent_id: Option<span::Id>,
207 metadata: &'static Metadata<'static>,
208 concrete_type: String,
209 kind: resource::Kind,
210 location: Option<proto::Location>,
211 is_internal: bool,
212 stats: Arc<stats::ResourceStats>,
213 },
214 PollOp {
215 metadata: &'static Metadata<'static>,
216 resource_id: span::Id,
217 op_name: String,
218 async_op_id: span::Id,
219 task_id: span::Id,
220 is_ready: bool,
221 },
222 AsyncResourceOp {
223 id: span::Id,
224 parent_id: Option<span::Id>,
225 resource_id: span::Id,
226 metadata: &'static Metadata<'static>,
227 source: String,
228
229 stats: Arc<stats::AsyncOpStats>,
230 },
231}
232
233#[derive(Clone, Debug, Copy, Serialize)]
234enum WakeOp {
235 Wake { self_wake: bool },
236 WakeByRef { self_wake: bool },
237 Clone,
238 Drop,
239}
240
241/// Marker type used to indicate that a span is actually tracked by the console.
242#[derive(Debug)]
243struct Tracked {}
244
245impl ConsoleLayer {
246 /// Returns a `ConsoleLayer` built with the default settings.
247 ///
248 /// Note: these defaults do *not* include values provided via the
249 /// environment variables specified in [`Builder::with_default_env`].
250 ///
251 /// See also [`Builder::build`].
252 pub fn new() -> (Self, Server) {
253 Self::builder().build()
254 }
255
256 /// Returns a [`Builder`] for configuring a `ConsoleLayer`.
257 ///
258 /// Note that the returned builder does *not* include values provided via
259 /// the environment variables specified in [`Builder::with_default_env`].
260 /// To extract those, you can call that method on the returned builder.
261 pub fn builder() -> Builder {
262 Builder::default()
263 }
264
265 fn build(config: Builder) -> (Self, Server) {
266 // The `cfg` value *appears* to be a constant to clippy, but it changes
267 // depending on the build-time configuration...
268 #![allow(clippy::assertions_on_constants)]
269 assert!(
270 cfg!(any(tokio_unstable, console_without_tokio_unstable)),
271 "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
272 );
273
274 let base_time = stats::TimeAnchor::new();
275 tracing::debug!(
276 config.event_buffer_capacity,
277 config.client_buffer_capacity,
278 ?config.publish_interval,
279 ?config.retention,
280 ?config.server_addr,
281 ?config.recording_path,
282 ?config.filter_env_var,
283 ?config.poll_duration_max,
284 ?config.scheduled_duration_max,
285 ?base_time,
286 "configured console subscriber"
287 );
288 tracing::debug!("red");
289
290 let (tx, events) = mpsc::channel(config.event_buffer_capacity);
291 let (subscribe, rpcs) = mpsc::channel(256);
292 let shared = Arc::new(Shared::default());
293 let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone());
294 // Conservatively, start to trigger a flush when half the channel is full.
295 // This tries to reduce the chance of losing events to a full channel.
296 let flush_under_capacity = config.event_buffer_capacity / 2;
297 let recorder = config
298 .recording_path
299 .as_ref()
300 .map(|path| Recorder::new(path).expect("creating recorder"));
301 let server = Server {
302 aggregator: Some(aggregator),
303 addr: config.server_addr,
304 subscribe,
305 client_buffer: config.client_buffer_capacity,
306 };
307 let layer = Self {
308 current_spans: ThreadLocal::new(),
309 tx,
310 shared,
311 flush_under_capacity,
312 spawn_callsites: Callsites::default(),
313 waker_callsites: Callsites::default(),
314 resource_callsites: Callsites::default(),
315 async_op_callsites: Callsites::default(),
316 async_op_poll_callsites: Callsites::default(),
317 poll_op_callsites: Callsites::default(),
318 resource_state_update_callsites: Callsites::default(),
319 async_op_state_update_callsites: Callsites::default(),
320 recorder,
321 base_time,
322 max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
323 max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
324 };
325 (layer, server)
326 }
327}
328
329impl ConsoleLayer {
330 /// Default maximum capacity for the channel of events sent from a
331 /// [`ConsoleLayer`] to a [`Server`].
332 ///
333 /// When this capacity is exhausted, additional events will be dropped.
334 /// Decreasing this value will reduce memory usage, but may result in
335 /// events being dropped more frequently.
336 ///
337 /// See also [`Builder::event_buffer_capacity`].
338 pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 100;
339 /// Default maximum capacity for th echannel of events sent from a
340 /// [`Server`] to each subscribed client.
341 ///
342 /// When this capacity is exhausted, the client is assumed to be inactive,
343 /// and may be disconnected.
344 ///
345 /// See also [`Builder::client_buffer_capacity`].
346 pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
347
348 /// Default frequency for publishing events to clients.
349 ///
350 /// Note that methods like [`init`][`crate::init`] and [`spawn`][`crate::spawn`] will take the value
351 /// from the `TOKIO_CONSOLE_PUBLISH_INTERVAL` [environment variable] before falling
352 /// back on this default.
353 ///
354 /// See also [`Builder::publish_interval`].
355 ///
356 /// [environment variable]: `Builder::with_default_env`
357 pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);
358
359 /// By default, completed spans are retained for one hour.
360 ///
361 /// Note that methods like [`init`][`crate::init`] and
362 /// [`spawn`][`crate::spawn`] will take the value from the
363 /// `TOKIO_CONSOLE_RETENTION` [environment variable] before falling back on
364 /// this default.
365 ///
366 /// See also [`Builder::retention`].
367 ///
368 /// [environment variable]: `Builder::with_default_env`
369 pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
370
371 /// The default maximum value for task poll duration histograms.
372 ///
373 /// Any poll duration exceeding this will be clamped to this value. By
374 /// default, the maximum poll duration is one second.
375 ///
376 /// See also [`Builder::poll_duration_histogram_max`].
377 pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
378
379 /// The default maximum value for the task scheduled duration histogram.
380 ///
381 /// Any scheduled duration (the time from a task being woken until it is next
382 /// polled) exceeding this will be clamped to this value. By default, the
383 /// maximum scheduled duration is one second.
384 ///
385 /// See also [`Builder::scheduled_duration_histogram_max`].
386 pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);
387
388 fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
389 self.spawn_callsites.contains(meta)
390 }
391
392 fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
393 self.resource_callsites.contains(meta)
394 }
395
396 fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
397 self.async_op_callsites.contains(meta)
398 }
399
400 fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
401 where
402 S: Subscriber + for<'a> LookupSpan<'a>,
403 {
404 cx.span(id)
405 .map(|span| self.is_spawn(span.metadata()))
406 .unwrap_or(false)
407 }
408
409 fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
410 where
411 S: Subscriber + for<'a> LookupSpan<'a>,
412 {
413 cx.span(id)
414 .map(|span| self.is_resource(span.metadata()))
415 .unwrap_or(false)
416 }
417
418 fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
419 where
420 S: Subscriber + for<'a> LookupSpan<'a>,
421 {
422 cx.span(id)
423 .map(|span| self.is_async_op(span.metadata()))
424 .unwrap_or(false)
425 }
426
427 fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
428 where
429 P: Fn(&span::Id) -> bool,
430 {
431 stack
432 .stack()
433 .iter()
434 .rev()
435 .find(|id| p(id.id()))
436 .map(|id| id.id())
437 .cloned()
438 }
439
440 fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
441 self.send_stats(dropped, move || (event, ())).is_some()
442 }
443
444 fn send_stats<S>(
445 &self,
446 dropped: &AtomicUsize,
447 mk_event: impl FnOnce() -> (Event, S),
448 ) -> Option<S> {
449 use mpsc::error::TrySendError;
450
451 // Return whether or not we actually sent the event.
452 let sent = match self.tx.try_reserve() {
453 Ok(permit) => {
454 let (event, stats) = mk_event();
455 permit.send(event);
456 Some(stats)
457 }
458 Err(TrySendError::Closed(_)) => {
459 // we should warn here eventually, but nop for now because we
460 // can't trigger tracing events...
461 None
462 }
463 Err(TrySendError::Full(_)) => {
464 // this shouldn't happen, since we trigger a flush when
465 // approaching the high water line...but if the executor wait
466 // time is very high, maybe the aggregator task hasn't been
467 // polled yet. so... eek?!
468 dropped.fetch_add(1, Ordering::Release);
469 None
470 }
471 };
472
473 let capacity = self.tx.capacity();
474 if capacity <= self.flush_under_capacity {
475 self.shared.flush.trigger();
476 }
477
478 sent
479 }
480
481 fn record(&self, event: impl FnOnce() -> record::Event) {
482 if let Some(ref recorder) = self.recorder {
483 recorder.record(event());
484 }
485 }
486
487 fn state_update<S>(
488 &self,
489 id: &Id,
490 event: &tracing::Event<'_>,
491 ctx: &Context<'_, S>,
492 get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
493 ) where
494 S: Subscriber + for<'a> LookupSpan<'a>,
495 {
496 let meta_id = event.metadata().into();
497 let mut state_update_visitor = StateUpdateVisitor::new(meta_id);
498 event.record(&mut state_update_visitor);
499
500 let update = match state_update_visitor.result() {
501 Some(update) => update,
502 None => return,
503 };
504
505 let span = match ctx.span(id) {
506 Some(span) => span,
507 // XXX(eliza): no span exists for a resource ID, we should maybe
508 // record an error here...
509 None => return,
510 };
511
512 let exts = span.extensions();
513 let stats = match get_stats(&exts) {
514 Some(stats) => stats,
515 // XXX(eliza): a resource span was not a resource??? this is a bug
516 None => return,
517 };
518
519 stats.update_attribute(id, &update);
520
521 if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
522 let exts = parent.extensions();
523 if let Some(stats) = get_stats(&exts) {
524 if stats.inherit_child_attributes {
525 stats.update_attribute(id, &update);
526 }
527 }
528 }
529 }
530}
531
532impl<S> Layer<S> for ConsoleLayer
533where
534 S: Subscriber + for<'a> LookupSpan<'a>,
535{
536 fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
537 let dropped = match (meta.name(), meta.target()) {
538 ("runtime.spawn", _) | ("task", "tokio::task") => {
539 self.spawn_callsites.insert(meta);
540 &self.shared.dropped_tasks
541 }
542 (_, "runtime::waker") | (_, "tokio::task::waker") => {
543 self.waker_callsites.insert(meta);
544 &self.shared.dropped_tasks
545 }
546 (ResourceVisitor::RES_SPAN_NAME, _) => {
547 self.resource_callsites.insert(meta);
548 &self.shared.dropped_resources
549 }
550 (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
551 self.async_op_callsites.insert(meta);
552 &self.shared.dropped_async_ops
553 }
554 ("runtime.resource.async_op.poll", _) => {
555 self.async_op_poll_callsites.insert(meta);
556 &self.shared.dropped_async_ops
557 }
558 (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
559 self.poll_op_callsites.insert(meta);
560 &self.shared.dropped_async_ops
561 }
562 (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
563 self.resource_state_update_callsites.insert(meta);
564 &self.shared.dropped_resources
565 }
566 (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
567 self.async_op_state_update_callsites.insert(meta);
568 &self.shared.dropped_async_ops
569 }
570 (_, _) => &self.shared.dropped_tasks,
571 };
572
573 self.send_metadata(dropped, Event::Metadata(meta));
574 subscriber::Interest::always()
575 }
576
577 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
578 let metadata = attrs.metadata();
579 if self.is_spawn(metadata) {
580 let at = Instant::now();
581 let mut task_visitor = TaskVisitor::new(metadata.into());
582 attrs.record(&mut task_visitor);
583 let (fields, location) = task_visitor.result();
584 self.record(|| record::Event::Spawn {
585 id: id.into_u64(),
586 at: self.base_time.to_system_time(at),
587 fields: record::SerializeFields(fields.clone()),
588 });
589 if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
590 let stats = Arc::new(stats::TaskStats::new(
591 self.max_poll_duration_nanos,
592 self.max_scheduled_duration_nanos,
593 at,
594 ));
595 let event = Event::Spawn {
596 id: id.clone(),
597 stats: stats.clone(),
598 metadata,
599 fields,
600 location,
601 };
602 (event, stats)
603 }) {
604 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
605 }
606 return;
607 }
608
609 if self.is_resource(metadata) {
610 let at = Instant::now();
611 let mut resource_visitor = ResourceVisitor::default();
612 attrs.record(&mut resource_visitor);
613 if let Some(result) = resource_visitor.result() {
614 let ResourceVisitorResult {
615 concrete_type,
616 kind,
617 location,
618 is_internal,
619 inherit_child_attrs,
620 } = result;
621 let parent_id = self.current_spans.get().and_then(|stack| {
622 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
623 });
624 if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
625 let stats = Arc::new(stats::ResourceStats::new(
626 at,
627 inherit_child_attrs,
628 parent_id.clone(),
629 ));
630 let event = Event::Resource {
631 id: id.clone(),
632 parent_id,
633 metadata,
634 concrete_type,
635 kind,
636 location,
637 is_internal,
638 stats: stats.clone(),
639 };
640 (event, stats)
641 }) {
642 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
643 }
644 }
645 return;
646 }
647
648 if self.is_async_op(metadata) {
649 let at = Instant::now();
650 let mut async_op_visitor = AsyncOpVisitor::default();
651 attrs.record(&mut async_op_visitor);
652 if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
653 let resource_id = self.current_spans.get().and_then(|stack| {
654 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
655 });
656
657 let parent_id = self.current_spans.get().and_then(|stack| {
658 self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
659 });
660
661 if let Some(resource_id) = resource_id {
662 if let Some(stats) =
663 self.send_stats(&self.shared.dropped_async_ops, move || {
664 let stats = Arc::new(stats::AsyncOpStats::new(
665 at,
666 inherit_child_attrs,
667 parent_id.clone(),
668 ));
669 let event = Event::AsyncResourceOp {
670 id: id.clone(),
671 parent_id,
672 resource_id,
673 metadata,
674 source,
675 stats: stats.clone(),
676 };
677 (event, stats)
678 })
679 {
680 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
681 }
682 }
683 }
684 }
685 }
686
687 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
688 let metadata = event.metadata();
689 if self.waker_callsites.contains(metadata) {
690 let at = Instant::now();
691 let mut visitor = WakerVisitor::default();
692 event.record(&mut visitor);
693 // XXX (eliza): ew...
694 if let Some((id, mut op)) = visitor.result() {
695 if let Some(span) = ctx.span(&id) {
696 let exts = span.extensions();
697 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
698 if op.is_wake() {
699 // Are we currently inside the task's span? If so, the task
700 // has woken itself.
701
702 let self_wake = self
703 .current_spans
704 .get()
705 .map(|spans| spans.borrow().iter().any(|span| span == &id))
706 .unwrap_or(false);
707 op = op.self_wake(self_wake);
708 }
709
710 stats.record_wake_op(op, at);
711 self.record(|| record::Event::Waker {
712 id: id.into_u64(),
713 at: self.base_time.to_system_time(at),
714 op,
715 });
716 }
717 }
718 }
719 return;
720 }
721
722 if self.poll_op_callsites.contains(metadata) {
723 let resource_id = self.current_spans.get().and_then(|stack| {
724 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
725 });
726 // poll op event should have a resource span parent
727 if let Some(resource_id) = resource_id {
728 let mut poll_op_visitor = PollOpVisitor::default();
729 event.record(&mut poll_op_visitor);
730 if let Some((op_name, is_ready)) = poll_op_visitor.result() {
731 let task_and_async_op_ids = self.current_spans.get().and_then(|stack| {
732 let stack = stack.borrow();
733 let task_id =
734 self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?;
735 let async_op_id =
736 self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?;
737 Some((task_id, async_op_id))
738 });
739 // poll op event should be emitted in the context of an async op and task spans
740 if let Some((task_id, async_op_id)) = task_and_async_op_ids {
741 if let Some(span) = ctx.span(&async_op_id) {
742 let exts = span.extensions();
743 if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
744 stats.set_task_id(&task_id);
745 }
746 }
747
748 self.send_stats(&self.shared.dropped_async_ops, || {
749 let event = Event::PollOp {
750 metadata,
751 op_name,
752 resource_id,
753 async_op_id,
754 task_id,
755 is_ready,
756 };
757 (event, ())
758 });
759
760 // TODO: JSON recorder doesn't care about poll ops.
761 }
762 }
763 }
764 return;
765 }
766
767 if self.resource_state_update_callsites.contains(metadata) {
768 // state update event should have a resource span parent
769 let resource_id = self.current_spans.get().and_then(|stack| {
770 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
771 });
772 if let Some(id) = resource_id {
773 self.state_update(&id, event, &ctx, |exts| {
774 exts.get::<Arc<stats::ResourceStats>>()
775 .map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
776 });
777 }
778
779 return;
780 }
781
782 if self.async_op_state_update_callsites.contains(metadata) {
783 let async_op_id = self.current_spans.get().and_then(|stack| {
784 self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
785 });
786 if let Some(id) = async_op_id {
787 self.state_update(&id, event, &ctx, |exts| {
788 let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
789 Some(&async_op.stats)
790 });
791 }
792 }
793 }
794
795 fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
796 if let Some(span) = cx.span(id) {
797 let now = Instant::now();
798 let exts = span.extensions();
799 // if the span we are entering is a task or async op, record the
800 // poll stats.
801 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
802 stats.start_poll(now);
803 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
804 stats.start_poll(now);
805 } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
806 // otherwise, is the span a resource? in that case, we also want
807 // to enter it, although we don't care about recording poll
808 // stats.
809 } else {
810 return;
811 };
812
813 self.current_spans
814 .get_or_default()
815 .borrow_mut()
816 .push(id.clone());
817
818 self.record(|| record::Event::Enter {
819 id: id.into_u64(),
820 at: self.base_time.to_system_time(now),
821 });
822 }
823 }
824
825 fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
826 if let Some(span) = cx.span(id) {
827 let exts = span.extensions();
828 let now = Instant::now();
829 // if the span we are entering is a task or async op, record the
830 // poll stats.
831 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
832 stats.end_poll(now);
833 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
834 stats.end_poll(now);
835 } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
836 // otherwise, is the span a resource? in that case, we also want
837 // to enter it, although we don't care about recording poll
838 // stats.
839 } else {
840 return;
841 };
842
843 self.current_spans.get_or_default().borrow_mut().pop(id);
844
845 self.record(|| record::Event::Exit {
846 id: id.into_u64(),
847 at: self.base_time.to_system_time(now),
848 });
849 }
850 }
851
852 fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
853 if let Some(span) = cx.span(&id) {
854 let now = Instant::now();
855 let exts = span.extensions();
856 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
857 stats.drop_task(now);
858 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
859 stats.drop_async_op(now);
860 } else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
861 stats.drop_resource(now);
862 }
863 self.record(|| record::Event::Close {
864 id: id.into_u64(),
865 at: self.base_time.to_system_time(now),
866 });
867 }
868 }
869}
870
871impl fmt::Debug for ConsoleLayer {
872 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873 f.debug_struct("ConsoleLayer")
874 // mpsc::Sender debug impl is not very useful
875 .field("tx", &format_args!("<...>"))
876 .field("tx.capacity", &self.tx.capacity())
877 .field("shared", &self.shared)
878 .field("spawn_callsites", &self.spawn_callsites)
879 .field("waker_callsites", &self.waker_callsites)
880 .finish()
881 }
882}
883
884impl Server {
885 // XXX(eliza): why is `SocketAddr::new` not `const`???
886 /// A [`Server`] by default binds socket address 127.0.0.1 to service remote
887 /// procedure calls.
888 ///
889 /// Note that methods like [`init`][`crate::init`] and
890 /// [`spawn`][`crate::spawn`] will parse the socket address from the
891 /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
892 /// constructing a socket address from this default.
893 ///
894 /// See also [`Builder::server_addr`].
895 ///
896 /// [environment variable]: `Builder::with_default_env`
897 pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
898
899 /// A [`Server`] by default binds port 6669 to service remote procedure
900 /// calls.
901 ///
902 /// Note that methods like [`init`][`crate::init`] and
903 /// [`spawn`][`crate::spawn`] will parse the socket address from the
904 /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
905 /// constructing a socket address from this default.
906 ///
907 /// See also [`Builder::server_addr`].
908 ///
909 /// [environment variable]: `Builder::with_default_env`
910 pub const DEFAULT_PORT: u16 = 6669;
911
912 /// Starts the gRPC service with the default gRPC settings.
913 ///
914 /// To configure gRPC server settings before starting the server, use
915 /// [`serve_with`] instead. This method is equivalent to calling [`serve_with`]
916 /// and providing the default gRPC server settings:
917 ///
918 /// ```rust
919 /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
920 /// # let (_, server) = hds_console_subscriber::ConsoleLayer::new();
921 /// server.serve_with(tonic::transport::Server::default()).await
922 /// # }
923 /// ```
924 /// [`serve_with`]: Server::serve_with
925 pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
926 self.serve_with(tonic::transport::Server::default()).await
927 }
928
929 /// Starts the gRPC service with the given [`tonic`] gRPC transport server
930 /// `builder`.
931 ///
932 /// The `builder` parameter may be used to configure gRPC-specific settings
933 /// prior to starting the server.
934 ///
935 /// This spawns both the server task and the event aggregation worker
936 /// task on the current async runtime.
937 ///
938 /// [`tonic`]: https://docs.rs/tonic/
939 pub async fn serve_with(
940 self,
941 mut builder: tonic::transport::Server,
942 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
943 let addr = self.addr.clone();
944 let ServerParts {
945 instrument_server,
946 aggregator,
947 } = self.into_parts();
948 let aggregate = spawn_named(aggregator.run(), "console::aggregate");
949 let router = builder.add_service(instrument_server);
950 let res = match addr {
951 ServerAddr::Tcp(addr) => {
952 let serve = router.serve(addr);
953 spawn_named(serve, "console::serve").await
954 }
955 #[cfg(unix)]
956 ServerAddr::Unix(path) => {
957 let incoming = UnixListener::bind(path)?;
958 let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
959 spawn_named(serve, "console::serve").await
960 }
961 };
962 aggregate.abort();
963 res?.map_err(Into::into)
964 }
965
966 /// Starts the gRPC service with the default gRPC settings and gRPC-Web
967 /// support.
968 ///
969 /// # Examples
970 ///
971 /// To serve the instrument server with gRPC-Web support with the default
972 /// settings:
973 ///
974 /// ```rust
975 /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
976 /// # let (_, server) = hds_console_subscriber::ConsoleLayer::new();
977 /// server.serve_with_grpc_web(tonic::transport::Server::default()).await
978 /// # }
979 /// ```
980 ///
981 /// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the
982 /// following code:
983 ///
984 /// ```rust
985 /// # use std::{thread, time::Duration};
986 /// #
987 /// use hds_console_subscriber::{ConsoleLayer, ServerParts};
988 /// use tonic_web::GrpcWebLayer;
989 /// use tower_web::cors::{CorsLayer, AllowOrigin};
990 /// use http::header::HeaderName;
991 /// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
992 /// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
993 /// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
994 /// # ["grpc-status", "grpc-message", "grpc-status-details-bin"];
995 /// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
996 /// # "x-grpc-web",
997 /// # "content-type",
998 /// # "x-user-agent",
999 /// # "grpc-timeout",
1000 /// # "user-agent",
1001 /// # ];
1002 ///
1003 /// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
1004 /// # thread::Builder::new()
1005 /// # .name("subscriber".into())
1006 /// # .spawn(move || {
1007 /// // Customize the CORS configuration.
1008 /// let cors = CorsLayer::new()
1009 /// .allow_origin(AllowOrigin::mirror_request())
1010 /// .allow_credentials(true)
1011 /// .max_age(DEFAULT_MAX_AGE)
1012 /// .expose_headers(
1013 /// DEFAULT_EXPOSED_HEADERS
1014 /// .iter()
1015 /// .cloned()
1016 /// .map(HeaderName::from_static)
1017 /// .collect::<Vec<HeaderName>>(),
1018 /// )
1019 /// .allow_headers(
1020 /// DEFAULT_ALLOW_HEADERS
1021 /// .iter()
1022 /// .cloned()
1023 /// .map(HeaderName::from_static)
1024 /// .collect::<Vec<HeaderName>>(),
1025 /// );
1026 /// # let runtime = tokio::runtime::Builder::new_current_thread()
1027 /// # .enable_all()
1028 /// # .build()
1029 /// # .expect("console subscriber runtime initialization failed");
1030 /// # runtime.block_on(async move {
1031 ///
1032 /// let ServerParts {
1033 /// instrument_server,
1034 /// aggregator,
1035 /// ..
1036 /// } = server.into_parts();
1037 /// tokio::spawn(aggregator.run());
1038 ///
1039 /// // Serve the instrument server with gRPC-Web support and the CORS configuration.
1040 /// let router = tonic::transport::Server::builder()
1041 /// .accept_http1(true)
1042 /// .layer(cors)
1043 /// .layer(GrpcWebLayer::new())
1044 /// .add_service(instrument_server);
1045 /// let serve = router.serve(std::net::SocketAddr::new(
1046 /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1047 /// // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
1048 /// 9999,
1049 /// ));
1050 ///
1051 /// // Finally, spawn the server.
1052 /// serve.await.expect("console subscriber server failed");
1053 /// # });
1054 /// # })
1055 /// # .expect("console subscriber could not spawn thread");
1056 /// # tracing_subscriber::registry().with(console_layer).init();
1057 /// ```
1058 ///
1059 /// For a comprehensive understanding and complete code example,
1060 /// please refer to the `grpc-web` example in the examples directory.
1061 ///
1062 /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1063 #[cfg(feature = "grpc-web")]
1064 pub async fn serve_with_grpc_web(
1065 self,
1066 builder: tonic::transport::Server,
1067 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
1068 let addr = self.addr.clone();
1069 let ServerParts {
1070 instrument_server,
1071 aggregator,
1072 } = self.into_parts();
1073 let router = builder
1074 .accept_http1(true)
1075 .add_service(tonic_web::enable(instrument_server));
1076 let aggregate = spawn_named(aggregator.run(), "console::aggregate");
1077 let res = match addr {
1078 ServerAddr::Tcp(addr) => {
1079 let serve = router.serve(addr);
1080 spawn_named(serve, "console::serve").await
1081 }
1082 #[cfg(unix)]
1083 ServerAddr::Unix(path) => {
1084 let incoming = UnixListener::bind(path)?;
1085 let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
1086 spawn_named(serve, "console::serve").await
1087 }
1088 };
1089 aggregate.abort();
1090 res?.map_err(Into::into)
1091 }
1092
1093 /// Returns the parts needed to spawn a gRPC server and the aggregator that
1094 /// supplies it.
1095 ///
1096 /// Note that a server spawned in this way will disregard any value set by
1097 /// [`Builder::server_addr`], as the user becomes responsible for defining
1098 /// the address when calling [`Router::serve`].
1099 ///
1100 /// Additionally, the user of this API must ensure that the [`Aggregator`]
1101 /// is running for as long as the gRPC server is. If the server stops
1102 /// running, the aggregator task can be aborted.
1103 ///
1104 /// # Examples
1105 ///
1106 /// The parts can be used to serve the instrument server together with
1107 /// other endpoints from the same gRPC server.
1108 ///
1109 /// ```
1110 /// use hds_console_subscriber::{ConsoleLayer, ServerParts};
1111 ///
1112 /// # let runtime = tokio::runtime::Builder::new_current_thread()
1113 /// # .enable_all()
1114 /// # .build()
1115 /// # .unwrap();
1116 /// # runtime.block_on(async {
1117 /// let (console_layer, server) = ConsoleLayer::builder().build();
1118 /// let ServerParts {
1119 /// instrument_server,
1120 /// aggregator,
1121 /// ..
1122 /// } = server.into_parts();
1123 ///
1124 /// let aggregator_handle = tokio::spawn(aggregator.run());
1125 /// let router = tonic::transport::Server::builder()
1126 /// //.add_service(some_other_service)
1127 /// .add_service(instrument_server);
1128 /// let serve = router.serve(std::net::SocketAddr::new(
1129 /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1130 /// 6669,
1131 /// ));
1132 ///
1133 /// // Finally, spawn the server.
1134 /// tokio::spawn(serve);
1135 /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1136 /// # drop(console_layer);
1137 /// # let mut aggregator_handle = aggregator_handle;
1138 /// # aggregator_handle.abort();
1139 /// # });
1140 /// ```
1141 ///
1142 /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1143 pub fn into_parts(mut self) -> ServerParts {
1144 let aggregator = self
1145 .aggregator
1146 .take()
1147 .expect("cannot start server multiple times");
1148
1149 let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1150
1151 ServerParts {
1152 instrument_server,
1153 aggregator,
1154 }
1155 }
1156}
1157
1158/// Server Parts
1159///
1160/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1161/// further parts in the future, an as such is marked as `non_exhaustive`.
1162///
1163/// The `InstrumentServer<Server>` can be used to construct a router which
1164/// can be added to a [`tonic`] gRPC server.
1165///
1166/// The `aggregator` is a future which should be running as long as the server is.
1167/// Generally, this future should be spawned onto an appropriate runtime and then
1168/// aborted if the server gets shut down.
1169///
1170/// See the [`Server::into_parts`] documentation for usage.
1171#[non_exhaustive]
1172pub struct ServerParts {
1173 /// The instrument server.
1174 ///
1175 /// See the documentation for [`InstrumentServer`] for details.
1176 pub instrument_server: InstrumentServer<Server>,
1177
1178 /// The aggregator.
1179 ///
1180 /// Responsible for collecting and preparing traces for the instrument server
1181 /// to send its clients.
1182 ///
1183 /// The aggregator should be [`run`] when the instrument server is started.
1184 /// If the server stops running for any reason, the aggregator task can be
1185 /// aborted.
1186 ///
1187 /// [`run`]: fn@crate::Aggregator::run
1188 pub aggregator: Aggregator,
1189}
1190
1191/// Aggregator handle.
1192///
1193/// This object is returned from [`Server::into_parts`]. It can be
1194/// used to abort the aggregator task.
1195///
1196/// The aggregator collects the traces that implement the async runtime
1197/// being observed and prepares them to be served by the gRPC server.
1198///
1199/// Normally, if the server, started with [`Server::serve`] or
1200/// [`Server::serve_with`] stops for any reason, the aggregator is aborted,
1201/// hoewver, if the server was started with the [`InstrumentServer`] returned
1202/// from [`Server::into_parts`], then it is the responsibility of the user
1203/// of the API to stop the aggregator task by calling [`abort`] on this
1204/// object.
1205///
1206/// [`abort`]: fn@crate::AggregatorHandle::abort
1207pub struct AggregatorHandle {
1208 join_handle: JoinHandle<()>,
1209}
1210
1211impl AggregatorHandle {
1212 /// Aborts the task running this aggregator.
1213 ///
1214 /// To avoid having a disconnected aggregator running forever, this
1215 /// method should be called when the [`tonic::transport::Server`] started
1216 /// with the [`InstrumentServer`] also returned from [`Server::into_parts`]
1217 /// stops running.
1218 pub fn abort(&mut self) {
1219 self.join_handle.abort();
1220 }
1221}
1222
1223#[tonic::async_trait]
1224impl proto::instrument::instrument_server::Instrument for Server {
1225 type WatchUpdatesStream =
1226 tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
1227 type WatchTaskDetailsStream =
1228 tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
1229 async fn watch_updates(
1230 &self,
1231 req: tonic::Request<proto::instrument::InstrumentRequest>,
1232 ) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
1233 match req.remote_addr() {
1234 Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
1235 None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
1236 }
1237 let permit = self.subscribe.reserve().await.map_err(|_| {
1238 tonic::Status::internal("cannot start new watch, aggregation task is not running")
1239 })?;
1240 let (tx, rx) = mpsc::channel(self.client_buffer);
1241 permit.send(Command::Instrument(Watch(tx)));
1242 tracing::debug!("watch started");
1243 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1244 Ok(tonic::Response::new(stream))
1245 }
1246
1247 async fn watch_task_details(
1248 &self,
1249 req: tonic::Request<proto::instrument::TaskDetailsRequest>,
1250 ) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
1251 let task_id = req
1252 .into_inner()
1253 .id
1254 .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
1255 .id;
1256
1257 // `tracing` reserves span ID 0 for niche optimization for `Option<Id>`.
1258 let id = std::num::NonZeroU64::new(task_id)
1259 .map(Id::from_non_zero_u64)
1260 .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
1261
1262 let permit = self.subscribe.reserve().await.map_err(|_| {
1263 tonic::Status::internal("cannot start new watch, aggregation task is not running")
1264 })?;
1265
1266 // Check with the aggregator task to request a stream if the task exists.
1267 let (stream_sender, stream_recv) = oneshot::channel();
1268 permit.send(Command::WatchTaskDetail(WatchRequest {
1269 id,
1270 stream_sender,
1271 buffer: self.client_buffer,
1272 }));
1273 // If the aggregator drops the sender, the task doesn't exist.
1274 let rx = stream_recv.await.map_err(|_| {
1275 tracing::warn!(id = ?task_id, "requested task not found");
1276 tonic::Status::not_found("task not found")
1277 })?;
1278
1279 tracing::debug!(id = ?task_id, "task details watch started");
1280 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1281 Ok(tonic::Response::new(stream))
1282 }
1283
1284 async fn pause(
1285 &self,
1286 _req: tonic::Request<proto::instrument::PauseRequest>,
1287 ) -> Result<tonic::Response<proto::instrument::PauseResponse>, tonic::Status> {
1288 self.subscribe.send(Command::Pause).await.map_err(|_| {
1289 tonic::Status::internal("cannot pause, aggregation task is not running")
1290 })?;
1291 Ok(tonic::Response::new(proto::instrument::PauseResponse {}))
1292 }
1293
1294 async fn resume(
1295 &self,
1296 _req: tonic::Request<proto::instrument::ResumeRequest>,
1297 ) -> Result<tonic::Response<proto::instrument::ResumeResponse>, tonic::Status> {
1298 self.subscribe.send(Command::Resume).await.map_err(|_| {
1299 tonic::Status::internal("cannot resume, aggregation task is not running")
1300 })?;
1301 Ok(tonic::Response::new(proto::instrument::ResumeResponse {}))
1302 }
1303}
1304
1305impl WakeOp {
1306 /// Returns `true` if `self` is a `Wake` or `WakeByRef` event.
1307 fn is_wake(self) -> bool {
1308 matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
1309 }
1310
1311 fn self_wake(self, self_wake: bool) -> Self {
1312 match self {
1313 Self::Wake { .. } => Self::Wake { self_wake },
1314 Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
1315 x => x,
1316 }
1317 }
1318}
1319
1320#[track_caller]
1321pub(crate) fn spawn_named<T>(
1322 task: impl std::future::Future<Output = T> + Send + 'static,
1323 _name: &str,
1324) -> tokio::task::JoinHandle<T>
1325where
1326 T: Send + 'static,
1327{
1328 #[cfg(tokio_unstable)]
1329 return tokio::task::Builder::new().name(_name).spawn(task).unwrap();
1330
1331 #[cfg(not(tokio_unstable))]
1332 tokio::spawn(task)
1333}