cucumber/
tracing.rs

1//! [`tracing`] integration layer.
2
3use std::{collections::HashMap, fmt, io, iter};
4
5use derive_more::with_trait::Debug;
6use futures::channel::{mpsc, oneshot};
7use itertools::Either;
8use tracing::{
9    Dispatch, Event, Span, Subscriber,
10    field::{Field, Visit},
11    span,
12};
13use tracing_subscriber::{
14    field::RecordFields,
15    filter::LevelFilter,
16    fmt::{
17        FmtContext, FormatEvent, FormatFields, MakeWriter,
18        format::{self, Format},
19    },
20    layer::{self, Layer, Layered, SubscriberExt as _},
21    registry::LookupSpan,
22    util::SubscriberInitExt as _,
23};
24
25use crate::{
26    Cucumber, Parser, Runner, ScenarioType, World, Writer,
27    event::{self, HookType, Source},
28    runner::{
29        self,
30        basic::{RetryOptions, ScenarioId},
31    },
32};
33
34impl<W, P, I, Wr, Cli, WhichSc, Before, After>
35    Cucumber<W, P, I, runner::Basic<W, WhichSc, Before, After>, Wr, Cli>
36where
37    W: World,
38    P: Parser<I>,
39    runner::Basic<W, WhichSc, Before, After>: Runner<W>,
40    Wr: Writer<W>,
41    Cli: clap::Args,
42{
43    /// Initializes a global [`tracing::Subscriber`] with a default
44    /// [`fmt::Layer`] and [`LevelFilter::INFO`].
45    ///
46    /// [`fmt::Layer`]: tracing_subscriber::fmt::Layer
47    #[must_use]
48    pub fn init_tracing(self) -> Self {
49        self.configure_and_init_tracing(
50            format::DefaultFields::new(),
51            Format::default(),
52            |layer| {
53                tracing_subscriber::registry()
54                    .with(LevelFilter::INFO.and_then(layer))
55            },
56        )
57    }
58
59    /// Configures a [`fmt::Layer`], additionally wraps it (for example, into a
60    /// [`LevelFilter`]), and initializes as a global [`tracing::Subscriber`].
61    ///
62    /// # Example
63    ///
64    /// ```rust
65    /// # use cucumber::{Cucumber, World as _};
66    /// # use tracing_subscriber::{
67    /// #     filter::LevelFilter,
68    /// #     fmt::format::{self, Format},
69    /// #     layer::SubscriberExt,
70    /// #     Layer,
71    /// # };
72    /// #
73    /// # #[derive(Debug, Default, cucumber::World)]
74    /// # struct World;
75    /// #
76    /// # let _ = async {
77    /// World::cucumber()
78    ///     .configure_and_init_tracing(
79    ///         format::DefaultFields::new(),
80    ///         Format::default(),
81    ///         |fmt_layer| {
82    ///             tracing_subscriber::registry()
83    ///                 .with(LevelFilter::INFO.and_then(fmt_layer))
84    ///         },
85    ///     )
86    ///     .run_and_exit("./tests/features/doctests.feature")
87    ///     .await
88    /// # };
89    /// ```
90    ///
91    /// [`fmt::Layer`]: tracing_subscriber::fmt::Layer
92    #[must_use]
93    pub fn configure_and_init_tracing<Event, Fields, Sub, Conf, Out>(
94        self,
95        fmt_fields: Fields,
96        event_format: Event,
97        configure: Conf,
98    ) -> Self
99    where
100        Fields: for<'a> FormatFields<'a> + 'static,
101        Event: FormatEvent<Sub, SkipScenarioIdSpan<Fields>> + 'static,
102        Sub: Subscriber + for<'a> LookupSpan<'a>,
103        Out: Subscriber + Send + Sync,
104        // TODO: Replace the inner type with TAIT, once stabilized:
105        //       https://github.com/rust-lang/rust/issues/63063
106        Conf: FnOnce(
107            Layered<
108                tracing_subscriber::fmt::Layer<
109                    Sub,
110                    SkipScenarioIdSpan<Fields>,
111                    AppendScenarioMsg<Event>,
112                    CollectorWriter,
113                >,
114                RecordScenarioId,
115                Sub,
116            >,
117        ) -> Out,
118    {
119        let (logs_sender, logs_receiver) = mpsc::unbounded();
120        let (span_close_sender, span_close_receiver) = mpsc::unbounded();
121
122        let layer = RecordScenarioId::new(span_close_sender).and_then(
123            tracing_subscriber::fmt::layer()
124                .fmt_fields(SkipScenarioIdSpan(fmt_fields))
125                .event_format(AppendScenarioMsg(event_format))
126                .with_writer(CollectorWriter::new(logs_sender)),
127        );
128        Dispatch::new(configure(layer)).init();
129
130        drop(self.runner.logs_collector.swap(Box::new(Some(Collector::new(
131            logs_receiver,
132            span_close_receiver,
133        )))));
134
135        self
136    }
137}
138
139/// [`HashMap`] from a [`ScenarioId`] to its [`Scenario`] and full path.
140///
141/// [`Scenario`]: gherkin::Scenario
142type Scenarios = HashMap<
143    ScenarioId,
144    (
145        Source<gherkin::Feature>,
146        Option<Source<gherkin::Rule>>,
147        Source<gherkin::Scenario>,
148        Option<RetryOptions>,
149    ),
150>;
151
152/// All [`Callback`]s for [`Span`]s closing events with their completion status.
153type SpanEventsCallbacks =
154    HashMap<span::Id, (Option<Vec<Callback>>, IsReceived)>;
155
156/// Indication whether a [`Span`] closing event was received.
157type IsReceived = bool;
158
159/// Callback for notifying a [`Runner`] about a [`Span`] being closed.
160type Callback = oneshot::Sender<()>;
161
162/// Collector of [`tracing::Event`]s.
163#[derive(Debug)]
164pub(crate) struct Collector {
165    /// [`Scenarios`] with their IDs.
166    scenarios: Scenarios,
167
168    /// Receiver of [`tracing::Event`]s messages with optional corresponding
169    /// [`ScenarioId`].
170    logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
171
172    /// All [`Callback`]s for [`Span`]s closing events with their completion
173    /// status.
174    span_events: SpanEventsCallbacks,
175
176    /// Receiver of a [`Span`] closing event.
177    span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
178
179    /// Sender for subscribing to a [`Span`] closing event.
180    wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
181
182    /// Receiver for subscribing to a [`Span`] closing event.
183    wait_span_event_receiver: mpsc::UnboundedReceiver<(span::Id, Callback)>,
184}
185
186impl Collector {
187    /// Creates a new [`tracing::Event`]s [`Collector`].
188    pub(crate) fn new(
189        logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
190        span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
191    ) -> Self {
192        let (sender, receiver) = mpsc::unbounded();
193        Self {
194            scenarios: HashMap::new(),
195            logs_receiver,
196            span_events: HashMap::new(),
197            span_close_receiver,
198            wait_span_event_sender: sender,
199            wait_span_event_receiver: receiver,
200        }
201    }
202
203    /// Creates a new [`SpanCloseWaiter`].
204    pub(crate) fn scenario_span_event_waiter(&self) -> SpanCloseWaiter {
205        SpanCloseWaiter {
206            wait_span_event_sender: self.wait_span_event_sender.clone(),
207        }
208    }
209
210    /// Starts [`Scenario`]s from the provided `runnable`.
211    ///
212    /// [`Scenario`]: gherkin::Scenario
213    pub(crate) fn start_scenarios(
214        &mut self,
215        runnable: impl AsRef<
216            [(
217                ScenarioId,
218                Source<gherkin::Feature>,
219                Option<Source<gherkin::Rule>>,
220                Source<gherkin::Scenario>,
221                ScenarioType,
222                Option<RetryOptions>,
223            )],
224        >,
225    ) {
226        for (id, f, r, s, _, ret) in runnable.as_ref() {
227            drop(
228                self.scenarios
229                    .insert(*id, (f.clone(), r.clone(), s.clone(), *ret)),
230            );
231        }
232    }
233
234    /// Marks a [`Scenario`] as finished, by its ID.
235    ///
236    /// [`Scenario`]: gherkin::Scenario
237    pub(crate) fn finish_scenario(&mut self, id: ScenarioId) {
238        drop(self.scenarios.remove(&id));
239    }
240
241    /// Returns all the emitted [`event::Scenario::Log`]s since this method was
242    /// last called.
243    ///
244    /// In case a received [`tracing::Event`] doesn't contain a [`Scenario`]'s
245    /// [`Span`], such [`tracing::Event`] will be forwarded to all active
246    /// [`Scenario`]s.
247    ///
248    /// [`Scenario`]: gherkin::Scenario
249    pub(crate) fn emitted_logs<W>(
250        &mut self,
251    ) -> Option<Vec<event::Cucumber<W>>> {
252        self.notify_about_closing_spans();
253
254        self.logs_receiver.try_next().ok().flatten().map(|(id, msg)| {
255            id.and_then(|k| self.scenarios.get(&k))
256                .map_or_else(
257                    || Either::Left(self.scenarios.values()),
258                    |p| Either::Right(iter::once(p)),
259                )
260                .map(|(f, r, s, opt)| {
261                    event::Cucumber::scenario(
262                        f.clone(),
263                        r.clone(),
264                        s.clone(),
265                        event::RetryableScenario {
266                            event: event::Scenario::Log(msg.clone()),
267                            retries: opt.map(|o| o.retries),
268                        },
269                    )
270                })
271                .collect()
272        })
273    }
274
275    /// Notifies all its subscribers about closing [`Span`]s via [`Callback`]s.
276    fn notify_about_closing_spans(&mut self) {
277        if let Some(id) = self.span_close_receiver.try_next().ok().flatten() {
278            self.span_events.entry(id).or_default().1 = true;
279        }
280        while let Some((id, callback)) =
281            self.wait_span_event_receiver.try_next().ok().flatten()
282        {
283            self.span_events
284                .entry(id)
285                .or_default()
286                .0
287                .get_or_insert(Vec::new())
288                .push(callback);
289        }
290        self.span_events.retain(|_, (callbacks, is_received)| {
291            if callbacks.is_some() && *is_received {
292                for callback in callbacks
293                    .take()
294                    .unwrap_or_else(|| unreachable!("`callbacks.is_some()`"))
295                {
296                    _ = callback.send(()).ok();
297                }
298                false
299            } else {
300                true
301            }
302        });
303    }
304}
305
306#[expect( // related to `tracing` capabilities only
307    clippy::multiple_inherent_impl,
308    reason = "related to `tracing` capabilities only"
309)]
310impl ScenarioId {
311    /// Name of the [`ScenarioId`] [`Span`] field.
312    const SPAN_FIELD_NAME: &'static str = "__cucumber_scenario_id";
313
314    /// Creates a new [`Span`] for running a [`Scenario`] with this
315    /// [`ScenarioId`].
316    ///
317    /// [`Scenario`]: gherkin::Scenario
318    pub(crate) fn scenario_span(self) -> Span {
319        // `Level::ERROR` is used to minimize the chance of the user-provided
320        // filter to skip it.
321        tracing::error_span!("scenario", __cucumber_scenario_id = self.0)
322    }
323
324    /// Creates a new [`Span`] for a running [`Step`].
325    ///
326    /// [`Step`]: gherkin::Step
327    #[expect(clippy::unused_self, reason = "API uniformity")]
328    pub(crate) fn step_span(self, is_background: bool) -> Span {
329        // `Level::ERROR` is used to minimize the chance of the user-provided
330        // filter to skip it.
331        if is_background {
332            tracing::error_span!("background step")
333        } else {
334            tracing::error_span!("step")
335        }
336    }
337
338    /// Creates a new [`Span`] for running a [`Hook`].
339    ///
340    /// [`Hook`]: event::Hook
341    #[expect(clippy::unused_self, reason = "API uniformity")]
342    pub(crate) fn hook_span(self, hook_ty: HookType) -> Span {
343        // `Level::ERROR` is used to minimize the chance of the user-provided
344        // filter to skip it.
345        match hook_ty {
346            HookType::Before => tracing::error_span!("before hook"),
347            HookType::After => tracing::error_span!("after hook"),
348        }
349    }
350}
351
352/// Waiter for a particular [`Span`] to be closed, wich is required because a
353/// [`CollectorWriter`] can notify about an [`event::Scenario::Log`] after a
354/// [`Scenario`]/[`Step`] is considered [`Finished`] already, due to
355/// implementation details of a [`Subscriber`].
356///
357/// [`Finished`]: event::Scenario::Finished
358/// [`Scenario`]: gherkin::Scenario
359/// [`Step`]: gherkin::Step
360#[derive(Clone, Debug)]
361pub(crate) struct SpanCloseWaiter {
362    /// Sender for subscribing to the [`Span`] closing.
363    wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
364}
365
366impl SpanCloseWaiter {
367    /// Waits for the [`Span`] being closed.
368    pub(crate) async fn wait_for_span_close(&self, id: span::Id) {
369        let (sender, receiver) = oneshot::channel();
370        _ = self.wait_span_event_sender.unbounded_send((id, sender)).ok();
371        _ = receiver.await.ok();
372    }
373}
374
375/// [`Layer`] recording a [`ScenarioId`] into [`Span`]'s [`Extensions`].
376///
377/// [`Extensions`]: tracing_subscriber::registry::Extensions
378#[derive(Debug)]
379pub struct RecordScenarioId {
380    /// Sender for [`Span`] closing events.
381    span_close_sender: mpsc::UnboundedSender<span::Id>,
382}
383
384impl RecordScenarioId {
385    /// Creates a new [`RecordScenarioId`] [`Layer`].
386    const fn new(span_close_sender: mpsc::UnboundedSender<span::Id>) -> Self {
387        Self { span_close_sender }
388    }
389}
390
391impl<S> Layer<S> for RecordScenarioId
392where
393    S: for<'a> LookupSpan<'a> + Subscriber,
394{
395    fn on_new_span(
396        &self,
397        attrs: &span::Attributes<'_>,
398        id: &span::Id,
399        ctx: layer::Context<'_, S>,
400    ) {
401        if let Some(span) = ctx.span(id) {
402            let mut visitor = GetScenarioId(None);
403            attrs.values().record(&mut visitor);
404
405            if let Some(scenario_id) = visitor.0 {
406                let mut ext = span.extensions_mut();
407                _ = ext.replace(scenario_id);
408            }
409        }
410    }
411
412    fn on_record(
413        &self,
414        id: &span::Id,
415        values: &span::Record<'_>,
416        ctx: layer::Context<'_, S>,
417    ) {
418        if let Some(span) = ctx.span(id) {
419            let mut visitor = GetScenarioId(None);
420            values.record(&mut visitor);
421
422            if let Some(scenario_id) = visitor.0 {
423                let mut ext = span.extensions_mut();
424                _ = ext.replace(scenario_id);
425            }
426        }
427    }
428
429    fn on_close(&self, id: span::Id, _ctx: layer::Context<'_, S>) {
430        _ = self.span_close_sender.unbounded_send(id).ok();
431    }
432}
433
434/// [`Visit`]or extracting a [`ScenarioId`] from a
435/// [`ScenarioId::SPAN_FIELD_NAME`]d [`Field`], in case it's present.
436#[derive(Debug)]
437struct GetScenarioId(Option<ScenarioId>);
438
439impl Visit for GetScenarioId {
440    fn record_u64(&mut self, field: &Field, value: u64) {
441        if field.name() == ScenarioId::SPAN_FIELD_NAME {
442            self.0 = Some(ScenarioId(value));
443        }
444    }
445
446    fn record_debug(&mut self, _: &Field, _: &dyn Debug) {}
447}
448
449/// [`FormatFields`] wrapper skipping [`Span`]s with a [`ScenarioId`].
450#[derive(Debug)]
451pub struct SkipScenarioIdSpan<F>(pub F);
452
453impl<'w, F: FormatFields<'w>> FormatFields<'w> for SkipScenarioIdSpan<F> {
454    fn format_fields<R: RecordFields>(
455        &self,
456        writer: format::Writer<'w>,
457        fields: R,
458    ) -> fmt::Result {
459        let mut is_scenario_span = IsScenarioIdSpan(false);
460        fields.record(&mut is_scenario_span);
461        if !is_scenario_span.0 {
462            self.0.format_fields(writer, fields)?;
463        }
464        Ok(())
465    }
466}
467
468/// [`Visit`]or checking whether a [`Span`] has a [`Field`] with the
469/// [`ScenarioId::SPAN_FIELD_NAME`].
470#[derive(Debug)]
471struct IsScenarioIdSpan(bool);
472
473impl Visit for IsScenarioIdSpan {
474    fn record_debug(&mut self, field: &Field, _: &dyn Debug) {
475        if field.name() == ScenarioId::SPAN_FIELD_NAME {
476            self.0 = true;
477        }
478    }
479}
480
481/// [`FormatEvent`] wrapper, appending [`tracing::Event`]s with some markers,
482/// to parse them later and retrieve optional [`ScenarioId`].
483///
484/// [`Scenario`]: gherkin::Scenario
485#[derive(Debug)]
486pub struct AppendScenarioMsg<F>(pub F);
487
488impl<S, N, F> FormatEvent<S, N> for AppendScenarioMsg<F>
489where
490    S: Subscriber + for<'a> LookupSpan<'a>,
491    N: for<'a> FormatFields<'a> + 'static,
492    F: FormatEvent<S, N>,
493{
494    fn format_event(
495        &self,
496        ctx: &FmtContext<'_, S, N>,
497        mut writer: format::Writer<'_>,
498        event: &Event<'_>,
499    ) -> fmt::Result {
500        self.0.format_event(ctx, writer.by_ref(), event)?;
501
502        if let Some(scenario_id) = ctx.event_scope().and_then(|scope| {
503            scope
504                .from_root()
505                .find_map(|span| span.extensions().get::<ScenarioId>().copied())
506        }) {
507            writer.write_fmt(format_args!(
508                "{}{scenario_id}",
509                suffix::BEFORE_SCENARIO_ID,
510            ))?;
511        } else {
512            writer.write_fmt(format_args!("{}", suffix::NO_SCENARIO_ID))?;
513        }
514        writer.write_fmt(format_args!("{}", suffix::END))
515    }
516}
517
518mod suffix {
519    //! [`str`]ings appending [`tracing::Event`]s to separate them later.
520    //!
521    //! Every [`tracing::Event`] ends with:
522    //!
523    //! ([`BEFORE_SCENARIO_ID`][`ScenarioId`][`END`]|[`NO_SCENARIO_ID`][`END`])
524    //!
525    //! [`ScenarioId`]: super::ScenarioId
526
527    /// End of a [`tracing::Event`] message.
528    pub(crate) const END: &str = "__cucumber__scenario";
529
530    /// Separator before a [`ScenarioId`].
531    ///
532    /// [`ScenarioId`]: super::ScenarioId
533    pub(crate) const BEFORE_SCENARIO_ID: &str = "__";
534
535    /// Separator in case there is no [`ScenarioId`].
536    ///
537    /// [`ScenarioId`]: super::ScenarioId
538    pub(crate) const NO_SCENARIO_ID: &str = "__unknown";
539}
540
541/// [`io::Write`]r sending [`tracing::Event`]s to a `Collector`.
542#[derive(Clone, Debug)]
543pub struct CollectorWriter {
544    /// Sender for notifying the [`Collector`] about [`tracing::Event`]s via.
545    sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
546}
547
548impl CollectorWriter {
549    /// Creates a new [`CollectorWriter`].
550    const fn new(
551        sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
552    ) -> Self {
553        Self { sender }
554    }
555}
556
557impl<'a> MakeWriter<'a> for CollectorWriter {
558    type Writer = Self;
559
560    fn make_writer(&'a self) -> Self::Writer {
561        self.clone()
562    }
563}
564
565impl io::Write for CollectorWriter {
566    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
567        // Although this is not documented explicitly anywhere, `io::Write`rs
568        // inside `tracing::fmt::Layer` always receives fully formatted messages
569        // at once, not by parts.
570        // Inside docs of `fmt::Layer::with_writer()`, a non-locked `io::stderr`
571        // is passed as an `io::Writer`. So, if this guarantee fails, parts of
572        // log messages will be able to interleave each other, making the result
573        // unreadable.
574        let msgs = String::from_utf8_lossy(buf);
575        for msg in msgs.split_terminator(suffix::END) {
576            if let Some((before, after)) =
577                msg.rsplit_once(suffix::NO_SCENARIO_ID)
578            {
579                if !after.is_empty() {
580                    return Err(io::Error::new(
581                        io::ErrorKind::InvalidData,
582                        "wrong separator",
583                    ));
584                }
585                _ = self.sender.unbounded_send((None, before.to_owned())).ok();
586            } else if let Some((before, after)) =
587                msg.rsplit_once(suffix::BEFORE_SCENARIO_ID)
588            {
589                let scenario_id = after.parse().map_err(|e| {
590                    io::Error::new(io::ErrorKind::InvalidData, e)
591                })?;
592                _ = self
593                    .sender
594                    .unbounded_send((Some(scenario_id), before.to_owned()))
595                    .ok();
596            } else {
597                return Err(io::Error::new(
598                    io::ErrorKind::InvalidData,
599                    "missing separator",
600                ));
601            }
602        }
603        Ok(buf.len())
604    }
605
606    fn flush(&mut self) -> io::Result<()> {
607        Ok(())
608    }
609}