cucumber/
tracing.rs

1//! [`tracing`] integration layer.
2
3use std::{collections::HashMap, fmt, io, iter, sync::Arc};
4
5use futures::channel::{mpsc, oneshot};
6use itertools::Either;
7use tracing::{
8    field::{Field, Visit},
9    span, Dispatch, Event, Span, Subscriber,
10};
11use tracing_subscriber::{
12    field::RecordFields,
13    filter::LevelFilter,
14    fmt::{
15        format::{self, Format},
16        FmtContext, FormatEvent, FormatFields, MakeWriter,
17    },
18    layer::{self, Layer, Layered, SubscriberExt as _},
19    registry::LookupSpan,
20    util::SubscriberInitExt as _,
21};
22
23use crate::{
24    event::{self, HookType},
25    runner::{
26        self,
27        basic::{RetryOptions, ScenarioId},
28    },
29    Cucumber, Parser, Runner, ScenarioType, World, Writer,
30};
31
32impl<W, P, I, Wr, Cli, WhichSc, Before, After>
33    Cucumber<W, P, I, runner::Basic<W, WhichSc, Before, After>, Wr, Cli>
34where
35    W: World,
36    P: Parser<I>,
37    runner::Basic<W, WhichSc, Before, After>: Runner<W>,
38    Wr: Writer<W>,
39    Cli: clap::Args,
40{
41    /// Initializes a global [`tracing::Subscriber`] with a default
42    /// [`fmt::Layer`] and [`LevelFilter::INFO`].
43    ///
44    /// [`fmt::Layer`]: tracing_subscriber::fmt::Layer
45    #[must_use]
46    pub fn init_tracing(self) -> Self {
47        self.configure_and_init_tracing(
48            format::DefaultFields::new(),
49            Format::default(),
50            |layer| {
51                tracing_subscriber::registry()
52                    .with(LevelFilter::INFO.and_then(layer))
53            },
54        )
55    }
56
57    /// Configures a [`fmt::Layer`], additionally wraps it (for example, into a
58    /// [`LevelFilter`]), and initializes as a global [`tracing::Subscriber`].
59    ///
60    /// # Example
61    ///
62    /// ```rust
63    /// # use cucumber::{Cucumber, World as _};
64    /// # use tracing_subscriber::{
65    /// #     filter::LevelFilter,
66    /// #     fmt::format::{self, Format},
67    /// #     layer::SubscriberExt,
68    /// #     Layer,
69    /// # };
70    /// #
71    /// # #[derive(Debug, Default, cucumber::World)]
72    /// # struct World;
73    /// #
74    /// # let _ = async {
75    /// World::cucumber()
76    ///     .configure_and_init_tracing(
77    ///         format::DefaultFields::new(),
78    ///         Format::default(),
79    ///         |fmt_layer| {
80    ///             tracing_subscriber::registry()
81    ///                 .with(LevelFilter::INFO.and_then(fmt_layer))
82    ///         },
83    ///     )
84    ///     .run_and_exit("./tests/features/doctests.feature")
85    ///     .await
86    /// # };
87    /// ```
88    ///
89    /// [`fmt::Layer`]: tracing_subscriber::fmt::Layer
90    #[must_use]
91    pub fn configure_and_init_tracing<Event, Fields, Sub, Conf, Out>(
92        self,
93        fmt_fields: Fields,
94        event_format: Event,
95        configure: Conf,
96    ) -> Self
97    where
98        Fields: for<'a> FormatFields<'a> + 'static,
99        Event: FormatEvent<Sub, SkipScenarioIdSpan<Fields>> + 'static,
100        Sub: Subscriber + for<'a> LookupSpan<'a>,
101        Out: Subscriber + Send + Sync,
102        // TODO: Replace the inner type with TAIT, once stabilized:
103        //       https://github.com/rust-lang/rust/issues/63063
104        Conf: FnOnce(
105            Layered<
106                tracing_subscriber::fmt::Layer<
107                    Sub,
108                    SkipScenarioIdSpan<Fields>,
109                    AppendScenarioMsg<Event>,
110                    CollectorWriter,
111                >,
112                RecordScenarioId,
113                Sub,
114            >,
115        ) -> Out,
116    {
117        let (logs_sender, logs_receiver) = mpsc::unbounded();
118        let (span_close_sender, span_close_receiver) = mpsc::unbounded();
119
120        let layer = RecordScenarioId::new(span_close_sender).and_then(
121            tracing_subscriber::fmt::layer()
122                .fmt_fields(SkipScenarioIdSpan(fmt_fields))
123                .event_format(AppendScenarioMsg(event_format))
124                .with_writer(CollectorWriter::new(logs_sender)),
125        );
126        Dispatch::new(configure(layer)).init();
127
128        drop(
129            self.runner
130                .logs_collector
131                .swap(Box::new(Some(Collector::new(
132                    logs_receiver,
133                    span_close_receiver,
134                )))),
135        );
136
137        self
138    }
139}
140
141/// [`HashMap`] from a [`ScenarioId`] to its [`Scenario`] and full path.
142///
143/// [`Scenario`]: gherkin::Scenario
144type Scenarios = HashMap<
145    ScenarioId,
146    (
147        Arc<gherkin::Feature>,
148        Option<Arc<gherkin::Rule>>,
149        Arc<gherkin::Scenario>,
150        Option<RetryOptions>,
151    ),
152>;
153
154/// All [`Callback`]s for [`Span`]s closing events with their completion status.
155type SpanEventsCallbacks =
156    HashMap<span::Id, (Option<Vec<Callback>>, IsReceived)>;
157
158/// Indication whether a [`Span`] closing event was received.
159type IsReceived = bool;
160
161/// Callback for notifying a [`Runner`] about a [`Span`] being closed.
162type Callback = oneshot::Sender<()>;
163
164/// Collector of [`tracing::Event`]s.
165#[derive(Debug)]
166pub(crate) struct Collector {
167    /// [`Scenarios`] with their IDs.
168    scenarios: Scenarios,
169
170    /// Receiver of [`tracing::Event`]s messages with optional corresponding
171    /// [`ScenarioId`].
172    logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
173
174    /// All [`Callback`]s for [`Span`]s closing events with their completion
175    /// status.
176    span_events: SpanEventsCallbacks,
177
178    /// Receiver of a [`Span`] closing event.
179    span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
180
181    /// Sender for subscribing to a [`Span`] closing event.
182    wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
183
184    /// Receiver for subscribing to a [`Span`] closing event.
185    wait_span_event_receiver: mpsc::UnboundedReceiver<(span::Id, Callback)>,
186}
187
188impl Collector {
189    /// Creates a new [`tracing::Event`]s [`Collector`].
190    pub(crate) fn new(
191        logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
192        span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
193    ) -> Self {
194        let (sender, receiver) = mpsc::unbounded();
195        Self {
196            scenarios: HashMap::new(),
197            logs_receiver,
198            span_events: HashMap::new(),
199            span_close_receiver,
200            wait_span_event_sender: sender,
201            wait_span_event_receiver: receiver,
202        }
203    }
204
205    /// Creates a new [`SpanCloseWaiter`].
206    pub(crate) fn scenario_span_event_waiter(&self) -> SpanCloseWaiter {
207        SpanCloseWaiter {
208            wait_span_event_sender: self.wait_span_event_sender.clone(),
209        }
210    }
211
212    /// Starts [`Scenario`]s from the provided `runnable`.
213    ///
214    /// [`Scenario`]: gherkin::Scenario
215    pub(crate) fn start_scenarios(
216        &mut self,
217        runnable: impl AsRef<
218            [(
219                ScenarioId,
220                Arc<gherkin::Feature>,
221                Option<Arc<gherkin::Rule>>,
222                Arc<gherkin::Scenario>,
223                ScenarioType,
224                Option<RetryOptions>,
225            )],
226        >,
227    ) {
228        for (id, f, r, s, _, ret) in runnable.as_ref() {
229            drop(
230                self.scenarios.insert(
231                    *id,
232                    (Arc::clone(f), r.clone(), Arc::clone(s), *ret),
233                ),
234            );
235        }
236    }
237
238    /// Marks a [`Scenario`] as finished, by its ID.
239    ///
240    /// [`Scenario`]: gherkin::Scenario
241    pub(crate) fn finish_scenario(&mut self, id: ScenarioId) {
242        drop(self.scenarios.remove(&id));
243    }
244
245    /// Returns all the emitted [`event::Scenario::Log`]s since this method was
246    /// last called.
247    ///
248    /// In case a received [`tracing::Event`] doesn't contain a [`Scenario`]'s
249    /// [`Span`], such [`tracing::Event`] will be forwarded to all active
250    /// [`Scenario`]s.
251    ///
252    /// [`Scenario`]: gherkin::Scenario
253    pub(crate) fn emitted_logs<W>(
254        &mut self,
255    ) -> Option<Vec<event::Cucumber<W>>> {
256        self.notify_about_closing_spans();
257
258        self.logs_receiver
259            .try_next()
260            .ok()
261            .flatten()
262            .map(|(id, msg)| {
263                id.and_then(|k| self.scenarios.get(&k))
264                    .map_or_else(
265                        || Either::Left(self.scenarios.values()),
266                        |p| Either::Right(iter::once(p)),
267                    )
268                    .map(|(f, r, s, opt)| {
269                        event::Cucumber::scenario(
270                            Arc::clone(f),
271                            r.clone(),
272                            Arc::clone(s),
273                            event::RetryableScenario {
274                                event: event::Scenario::Log(msg.clone()),
275                                retries: opt.map(|o| o.retries),
276                            },
277                        )
278                    })
279                    .collect()
280            })
281    }
282
283    /// Notifies all its subscribers about closing [`Span`]s via [`Callback`]s.
284    fn notify_about_closing_spans(&mut self) {
285        if let Some(id) = self.span_close_receiver.try_next().ok().flatten() {
286            self.span_events.entry(id).or_default().1 = true;
287        }
288        while let Some((id, callback)) =
289            self.wait_span_event_receiver.try_next().ok().flatten()
290        {
291            self.span_events
292                .entry(id)
293                .or_default()
294                .0
295                .get_or_insert(Vec::new())
296                .push(callback);
297        }
298        self.span_events.retain(|_, (callbacks, is_received)| {
299            if callbacks.is_some() && *is_received {
300                for callback in callbacks
301                    .take()
302                    .unwrap_or_else(|| unreachable!("`callbacks.is_some()`"))
303                {
304                    _ = callback.send(()).ok();
305                }
306                false
307            } else {
308                true
309            }
310        });
311    }
312}
313
314// We better keep this here, as it's related to `tracing` capabilities only.
315#[allow(clippy::multiple_inherent_impl)] // intentional
316impl ScenarioId {
317    /// Name of the [`ScenarioId`] [`Span`] field.
318    const SPAN_FIELD_NAME: &'static str = "__cucumber_scenario_id";
319
320    /// Creates a new [`Span`] for running a [`Scenario`] with this
321    /// [`ScenarioId`].
322    ///
323    /// [`Scenario`]: gherkin::Scenario
324    pub(crate) fn scenario_span(self) -> Span {
325        // `Level::ERROR` is used to minimize the chance of the user-provided
326        // filter to skip it.
327        tracing::error_span!("scenario", __cucumber_scenario_id = self.0)
328    }
329
330    /// Creates a new [`Span`] for a running [`Step`].
331    ///
332    /// [`Step`]: gherkin::Step
333    #[allow(clippy::unused_self)]
334    pub(crate) fn step_span(self, is_background: bool) -> Span {
335        // `Level::ERROR` is used to minimize the chance of the user-provided
336        // filter to skip it.
337        if is_background {
338            tracing::error_span!("background step")
339        } else {
340            tracing::error_span!("step")
341        }
342    }
343
344    /// Creates a new [`Span`] for running a [`Hook`].
345    ///
346    /// [`Hook`]: event::Hook
347    #[allow(clippy::unused_self)]
348    pub(crate) fn hook_span(self, hook_ty: HookType) -> Span {
349        // `Level::ERROR` is used to minimize the chance of the user-provided
350        // filter to skip it.
351        match hook_ty {
352            HookType::Before => tracing::error_span!("before hook"),
353            HookType::After => tracing::error_span!("after hook"),
354        }
355    }
356}
357
358/// Waiter for a particular [`Span`] to be closed, wich is required because a
359/// [`CollectorWriter`] can notify about an [`event::Scenario::Log`] after a
360/// [`Scenario`]/[`Step`] is considered [`Finished`] already, due to
361/// implementation details of a [`Subscriber`].
362///
363/// [`Finished`]: event::Scenario::Finished
364/// [`Scenario`]: gherkin::Scenario
365/// [`Step`]: gherkin::Step
366#[derive(Clone, Debug)]
367pub(crate) struct SpanCloseWaiter {
368    /// Sender for subscribing to the [`Span`] closing.
369    wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
370}
371
372impl SpanCloseWaiter {
373    /// Waits for the [`Span`] being closed.
374    pub(crate) async fn wait_for_span_close(&self, id: span::Id) {
375        let (sender, receiver) = oneshot::channel();
376        _ = self
377            .wait_span_event_sender
378            .unbounded_send((id, sender))
379            .ok();
380        _ = receiver.await.ok();
381    }
382}
383
384/// [`Layer`] recording a [`ScenarioId`] into [`Span`]'s [`Extensions`].
385///
386/// [`Extensions`]: tracing_subscriber::registry::Extensions
387#[derive(Debug)]
388pub struct RecordScenarioId {
389    /// Sender for [`Span`] closing events.
390    span_close_sender: mpsc::UnboundedSender<span::Id>,
391}
392
393impl RecordScenarioId {
394    /// Creates a new [`RecordScenarioId`] [`Layer`].
395    const fn new(span_close_sender: mpsc::UnboundedSender<span::Id>) -> Self {
396        Self { span_close_sender }
397    }
398}
399
400impl<S> Layer<S> for RecordScenarioId
401where
402    S: for<'a> LookupSpan<'a> + Subscriber,
403{
404    fn on_new_span(
405        &self,
406        attr: &span::Attributes<'_>,
407        id: &span::Id,
408        ctx: layer::Context<'_, S>,
409    ) {
410        if let Some(span) = ctx.span(id) {
411            let mut visitor = GetScenarioId(None);
412            attr.values().record(&mut visitor);
413
414            if let Some(scenario_id) = visitor.0 {
415                let mut ext = span.extensions_mut();
416                _ = ext.replace(scenario_id);
417            }
418        }
419    }
420
421    fn on_record(
422        &self,
423        id: &span::Id,
424        values: &span::Record<'_>,
425        ctx: layer::Context<'_, S>,
426    ) {
427        if let Some(span) = ctx.span(id) {
428            let mut visitor = GetScenarioId(None);
429            values.record(&mut visitor);
430
431            if let Some(scenario_id) = visitor.0 {
432                let mut ext = span.extensions_mut();
433                _ = ext.replace(scenario_id);
434            }
435        }
436    }
437
438    fn on_close(&self, id: span::Id, _ctx: layer::Context<'_, S>) {
439        _ = self.span_close_sender.unbounded_send(id).ok();
440    }
441}
442
443/// [`Visit`]or extracting a [`ScenarioId`] from a
444/// [`ScenarioId::SPAN_FIELD_NAME`]d [`Field`], in case it's present.
445#[derive(Debug)]
446struct GetScenarioId(Option<ScenarioId>);
447
448impl Visit for GetScenarioId {
449    fn record_u64(&mut self, field: &Field, value: u64) {
450        if field.name() == ScenarioId::SPAN_FIELD_NAME {
451            self.0 = Some(ScenarioId(value));
452        }
453    }
454
455    fn record_debug(&mut self, _: &Field, _: &dyn fmt::Debug) {}
456}
457
458/// [`FormatFields`] wrapper skipping [`Span`]s with a [`ScenarioId`].
459#[derive(Debug)]
460pub struct SkipScenarioIdSpan<F>(pub F);
461
462impl<'w, F: FormatFields<'w>> FormatFields<'w> for SkipScenarioIdSpan<F> {
463    fn format_fields<R: RecordFields>(
464        &self,
465        writer: format::Writer<'w>,
466        fields: R,
467    ) -> fmt::Result {
468        let mut is_scenario_span = IsScenarioIdSpan(false);
469        fields.record(&mut is_scenario_span);
470        if !is_scenario_span.0 {
471            self.0.format_fields(writer, fields)?;
472        }
473        Ok(())
474    }
475}
476
477/// [`Visit`]or checking whether a [`Span`] has a [`Field`] with the
478/// [`ScenarioId::SPAN_FIELD_NAME`].
479#[derive(Debug)]
480struct IsScenarioIdSpan(bool);
481
482impl Visit for IsScenarioIdSpan {
483    fn record_debug(&mut self, field: &Field, _: &dyn fmt::Debug) {
484        if field.name() == ScenarioId::SPAN_FIELD_NAME {
485            self.0 = true;
486        }
487    }
488}
489
490/// [`FormatEvent`] wrapper, appending [`tracing::Event`]s with some markers,
491/// to parse them later and retrieve optional [`ScenarioId`].
492///
493/// [`Scenario`]: gherkin::Scenario
494#[derive(Debug)]
495pub struct AppendScenarioMsg<F>(pub F);
496
497impl<S, N, F> FormatEvent<S, N> for AppendScenarioMsg<F>
498where
499    S: Subscriber + for<'a> LookupSpan<'a>,
500    N: for<'a> FormatFields<'a> + 'static,
501    F: FormatEvent<S, N>,
502{
503    fn format_event(
504        &self,
505        ctx: &FmtContext<'_, S, N>,
506        mut writer: format::Writer<'_>,
507        event: &Event<'_>,
508    ) -> fmt::Result {
509        self.0.format_event(ctx, writer.by_ref(), event)?;
510
511        if let Some(scenario_id) = ctx.event_scope().and_then(|scope| {
512            scope
513                .from_root()
514                .find_map(|span| span.extensions().get::<ScenarioId>().copied())
515        }) {
516            writer.write_fmt(format_args!(
517                "{}{scenario_id}",
518                suffix::BEFORE_SCENARIO_ID,
519            ))?;
520        } else {
521            writer.write_fmt(format_args!("{}", suffix::NO_SCENARIO_ID))?;
522        }
523        writer.write_fmt(format_args!("{}", suffix::END))
524    }
525}
526
527mod suffix {
528    //! [`str`]ings appending [`tracing::Event`]s to separate them later.
529    //!
530    //! Every [`tracing::Event`] ends with:
531    //!
532    //! ([`BEFORE_SCENARIO_ID`][`ScenarioId`][`END`]|[`NO_SCENARIO_ID`][`END`])
533    //!
534    //! [`ScenarioId`]: super::ScenarioId
535
536    /// End of a [`tracing::Event`] message.
537    pub(crate) const END: &str = "__cucumber__scenario";
538
539    /// Separator before a [`ScenarioId`].
540    ///
541    /// [`ScenarioId`]: super::ScenarioId
542    pub(crate) const BEFORE_SCENARIO_ID: &str = "__";
543
544    /// Separator in case there is no [`ScenarioId`].
545    ///
546    /// [`ScenarioId`]: super::ScenarioId
547    pub(crate) const NO_SCENARIO_ID: &str = "__unknown";
548}
549
550/// [`io::Write`]r sending [`tracing::Event`]s to a `Collector`.
551#[derive(Clone, Debug)]
552pub struct CollectorWriter {
553    /// Sender for notifying the [`Collector`] about [`tracing::Event`]s via.
554    sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
555}
556
557impl CollectorWriter {
558    /// Creates a new [`CollectorWriter`].
559    const fn new(
560        sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
561    ) -> Self {
562        Self { sender }
563    }
564}
565
566impl<'a> MakeWriter<'a> for CollectorWriter {
567    type Writer = Self;
568
569    fn make_writer(&'a self) -> Self::Writer {
570        self.clone()
571    }
572}
573
574impl io::Write for CollectorWriter {
575    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
576        // Although this is not documented explicitly anywhere, `io::Write`rs
577        // inside `tracing::fmt::Layer` always receives fully formatted messages
578        // at once, not by parts.
579        // Inside docs of `fmt::Layer::with_writer()`, a non-locked `io::stderr`
580        // is passed as an `io::Writer`. So, if this guarantee fails, parts of
581        // log messages will be able to interleave each other, making the result
582        // unreadable.
583        let msgs = String::from_utf8_lossy(buf);
584        for msg in msgs.split_terminator(suffix::END) {
585            if let Some((before, after)) =
586                msg.rsplit_once(suffix::NO_SCENARIO_ID)
587            {
588                if !after.is_empty() {
589                    return Err(io::Error::new(
590                        io::ErrorKind::InvalidData,
591                        "wrong separator",
592                    ));
593                }
594                _ = self.sender.unbounded_send((None, before.to_owned())).ok();
595            } else if let Some((before, after)) =
596                msg.rsplit_once(suffix::BEFORE_SCENARIO_ID)
597            {
598                let scenario_id = after.parse().map_err(|e| {
599                    io::Error::new(io::ErrorKind::InvalidData, e)
600                })?;
601                _ = self
602                    .sender
603                    .unbounded_send((Some(scenario_id), before.to_owned()))
604                    .ok();
605            } else {
606                return Err(io::Error::new(
607                    io::ErrorKind::InvalidData,
608                    "missing separator",
609                ));
610            }
611        }
612        Ok(buf.len())
613    }
614
615    fn flush(&mut self) -> io::Result<()> {
616        Ok(())
617    }
618}