1use std::{collections::HashMap, fmt, io, iter};
4
5use derive_more::with_trait::Debug;
6use futures::channel::{mpsc, oneshot};
7use itertools::Either;
8use tracing::{
9 Dispatch, Event, Span, Subscriber,
10 field::{Field, Visit},
11 span,
12};
13use tracing_subscriber::{
14 field::RecordFields,
15 filter::LevelFilter,
16 fmt::{
17 FmtContext, FormatEvent, FormatFields, MakeWriter,
18 format::{self, Format},
19 },
20 layer::{self, Layer, Layered, SubscriberExt as _},
21 registry::LookupSpan,
22 util::SubscriberInitExt as _,
23};
24
25use crate::{
26 Cucumber, Parser, Runner, ScenarioType, World, Writer,
27 event::{self, HookType, Source},
28 runner::{
29 self,
30 basic::{RetryOptions, ScenarioId},
31 },
32};
33
34impl<W, P, I, Wr, Cli, WhichSc, Before, After>
35 Cucumber<W, P, I, runner::Basic<W, WhichSc, Before, After>, Wr, Cli>
36where
37 W: World,
38 P: Parser<I>,
39 runner::Basic<W, WhichSc, Before, After>: Runner<W>,
40 Wr: Writer<W>,
41 Cli: clap::Args,
42{
43 #[must_use]
48 pub fn init_tracing(self) -> Self {
49 self.configure_and_init_tracing(
50 format::DefaultFields::new(),
51 Format::default(),
52 |layer| {
53 tracing_subscriber::registry()
54 .with(LevelFilter::INFO.and_then(layer))
55 },
56 )
57 }
58
59 #[must_use]
93 pub fn configure_and_init_tracing<Event, Fields, Sub, Conf, Out>(
94 self,
95 fmt_fields: Fields,
96 event_format: Event,
97 configure: Conf,
98 ) -> Self
99 where
100 Fields: for<'a> FormatFields<'a> + 'static,
101 Event: FormatEvent<Sub, SkipScenarioIdSpan<Fields>> + 'static,
102 Sub: Subscriber + for<'a> LookupSpan<'a>,
103 Out: Subscriber + Send + Sync,
104 Conf: FnOnce(
107 Layered<
108 tracing_subscriber::fmt::Layer<
109 Sub,
110 SkipScenarioIdSpan<Fields>,
111 AppendScenarioMsg<Event>,
112 CollectorWriter,
113 >,
114 RecordScenarioId,
115 Sub,
116 >,
117 ) -> Out,
118 {
119 let (logs_sender, logs_receiver) = mpsc::unbounded();
120 let (span_close_sender, span_close_receiver) = mpsc::unbounded();
121
122 let layer = RecordScenarioId::new(span_close_sender).and_then(
123 tracing_subscriber::fmt::layer()
124 .fmt_fields(SkipScenarioIdSpan(fmt_fields))
125 .event_format(AppendScenarioMsg(event_format))
126 .with_writer(CollectorWriter::new(logs_sender)),
127 );
128 Dispatch::new(configure(layer)).init();
129
130 drop(self.runner.logs_collector.swap(Box::new(Some(Collector::new(
131 logs_receiver,
132 span_close_receiver,
133 )))));
134
135 self
136 }
137}
138
139type Scenarios = HashMap<
143 ScenarioId,
144 (
145 Source<gherkin::Feature>,
146 Option<Source<gherkin::Rule>>,
147 Source<gherkin::Scenario>,
148 Option<RetryOptions>,
149 ),
150>;
151
152type SpanEventsCallbacks =
154 HashMap<span::Id, (Option<Vec<Callback>>, IsReceived)>;
155
156type IsReceived = bool;
158
159type Callback = oneshot::Sender<()>;
161
162#[derive(Debug)]
164pub(crate) struct Collector {
165 scenarios: Scenarios,
167
168 logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
171
172 span_events: SpanEventsCallbacks,
175
176 span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
178
179 wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
181
182 wait_span_event_receiver: mpsc::UnboundedReceiver<(span::Id, Callback)>,
184}
185
186impl Collector {
187 pub(crate) fn new(
189 logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
190 span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
191 ) -> Self {
192 let (sender, receiver) = mpsc::unbounded();
193 Self {
194 scenarios: HashMap::new(),
195 logs_receiver,
196 span_events: HashMap::new(),
197 span_close_receiver,
198 wait_span_event_sender: sender,
199 wait_span_event_receiver: receiver,
200 }
201 }
202
203 pub(crate) fn scenario_span_event_waiter(&self) -> SpanCloseWaiter {
205 SpanCloseWaiter {
206 wait_span_event_sender: self.wait_span_event_sender.clone(),
207 }
208 }
209
210 pub(crate) fn start_scenarios(
214 &mut self,
215 runnable: impl AsRef<
216 [(
217 ScenarioId,
218 Source<gherkin::Feature>,
219 Option<Source<gherkin::Rule>>,
220 Source<gherkin::Scenario>,
221 ScenarioType,
222 Option<RetryOptions>,
223 )],
224 >,
225 ) {
226 for (id, f, r, s, _, ret) in runnable.as_ref() {
227 drop(
228 self.scenarios
229 .insert(*id, (f.clone(), r.clone(), s.clone(), *ret)),
230 );
231 }
232 }
233
234 pub(crate) fn finish_scenario(&mut self, id: ScenarioId) {
238 drop(self.scenarios.remove(&id));
239 }
240
241 pub(crate) fn emitted_logs<W>(
250 &mut self,
251 ) -> Option<Vec<event::Cucumber<W>>> {
252 self.notify_about_closing_spans();
253
254 self.logs_receiver.try_next().ok().flatten().map(|(id, msg)| {
255 id.and_then(|k| self.scenarios.get(&k))
256 .map_or_else(
257 || Either::Left(self.scenarios.values()),
258 |p| Either::Right(iter::once(p)),
259 )
260 .map(|(f, r, s, opt)| {
261 event::Cucumber::scenario(
262 f.clone(),
263 r.clone(),
264 s.clone(),
265 event::RetryableScenario {
266 event: event::Scenario::Log(msg.clone()),
267 retries: opt.map(|o| o.retries),
268 },
269 )
270 })
271 .collect()
272 })
273 }
274
275 fn notify_about_closing_spans(&mut self) {
277 if let Some(id) = self.span_close_receiver.try_next().ok().flatten() {
278 self.span_events.entry(id).or_default().1 = true;
279 }
280 while let Some((id, callback)) =
281 self.wait_span_event_receiver.try_next().ok().flatten()
282 {
283 self.span_events
284 .entry(id)
285 .or_default()
286 .0
287 .get_or_insert(Vec::new())
288 .push(callback);
289 }
290 self.span_events.retain(|_, (callbacks, is_received)| {
291 if callbacks.is_some() && *is_received {
292 for callback in callbacks
293 .take()
294 .unwrap_or_else(|| unreachable!("`callbacks.is_some()`"))
295 {
296 _ = callback.send(()).ok();
297 }
298 false
299 } else {
300 true
301 }
302 });
303 }
304}
305
306#[expect( clippy::multiple_inherent_impl,
308 reason = "related to `tracing` capabilities only"
309)]
310impl ScenarioId {
311 const SPAN_FIELD_NAME: &'static str = "__cucumber_scenario_id";
313
314 pub(crate) fn scenario_span(self) -> Span {
319 tracing::error_span!("scenario", __cucumber_scenario_id = self.0)
322 }
323
324 #[expect(clippy::unused_self, reason = "API uniformity")]
328 pub(crate) fn step_span(self, is_background: bool) -> Span {
329 if is_background {
332 tracing::error_span!("background step")
333 } else {
334 tracing::error_span!("step")
335 }
336 }
337
338 #[expect(clippy::unused_self, reason = "API uniformity")]
342 pub(crate) fn hook_span(self, hook_ty: HookType) -> Span {
343 match hook_ty {
346 HookType::Before => tracing::error_span!("before hook"),
347 HookType::After => tracing::error_span!("after hook"),
348 }
349 }
350}
351
352#[derive(Clone, Debug)]
361pub(crate) struct SpanCloseWaiter {
362 wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
364}
365
366impl SpanCloseWaiter {
367 pub(crate) async fn wait_for_span_close(&self, id: span::Id) {
369 let (sender, receiver) = oneshot::channel();
370 _ = self.wait_span_event_sender.unbounded_send((id, sender)).ok();
371 _ = receiver.await.ok();
372 }
373}
374
375#[derive(Debug)]
379pub struct RecordScenarioId {
380 span_close_sender: mpsc::UnboundedSender<span::Id>,
382}
383
384impl RecordScenarioId {
385 const fn new(span_close_sender: mpsc::UnboundedSender<span::Id>) -> Self {
387 Self { span_close_sender }
388 }
389}
390
391impl<S> Layer<S> for RecordScenarioId
392where
393 S: for<'a> LookupSpan<'a> + Subscriber,
394{
395 fn on_new_span(
396 &self,
397 attrs: &span::Attributes<'_>,
398 id: &span::Id,
399 ctx: layer::Context<'_, S>,
400 ) {
401 if let Some(span) = ctx.span(id) {
402 let mut visitor = GetScenarioId(None);
403 attrs.values().record(&mut visitor);
404
405 if let Some(scenario_id) = visitor.0 {
406 let mut ext = span.extensions_mut();
407 _ = ext.replace(scenario_id);
408 }
409 }
410 }
411
412 fn on_record(
413 &self,
414 id: &span::Id,
415 values: &span::Record<'_>,
416 ctx: layer::Context<'_, S>,
417 ) {
418 if let Some(span) = ctx.span(id) {
419 let mut visitor = GetScenarioId(None);
420 values.record(&mut visitor);
421
422 if let Some(scenario_id) = visitor.0 {
423 let mut ext = span.extensions_mut();
424 _ = ext.replace(scenario_id);
425 }
426 }
427 }
428
429 fn on_close(&self, id: span::Id, _ctx: layer::Context<'_, S>) {
430 _ = self.span_close_sender.unbounded_send(id).ok();
431 }
432}
433
434#[derive(Debug)]
437struct GetScenarioId(Option<ScenarioId>);
438
439impl Visit for GetScenarioId {
440 fn record_u64(&mut self, field: &Field, value: u64) {
441 if field.name() == ScenarioId::SPAN_FIELD_NAME {
442 self.0 = Some(ScenarioId(value));
443 }
444 }
445
446 fn record_debug(&mut self, _: &Field, _: &dyn Debug) {}
447}
448
449#[derive(Debug)]
451pub struct SkipScenarioIdSpan<F>(pub F);
452
453impl<'w, F: FormatFields<'w>> FormatFields<'w> for SkipScenarioIdSpan<F> {
454 fn format_fields<R: RecordFields>(
455 &self,
456 writer: format::Writer<'w>,
457 fields: R,
458 ) -> fmt::Result {
459 let mut is_scenario_span = IsScenarioIdSpan(false);
460 fields.record(&mut is_scenario_span);
461 if !is_scenario_span.0 {
462 self.0.format_fields(writer, fields)?;
463 }
464 Ok(())
465 }
466}
467
468#[derive(Debug)]
471struct IsScenarioIdSpan(bool);
472
473impl Visit for IsScenarioIdSpan {
474 fn record_debug(&mut self, field: &Field, _: &dyn Debug) {
475 if field.name() == ScenarioId::SPAN_FIELD_NAME {
476 self.0 = true;
477 }
478 }
479}
480
481#[derive(Debug)]
486pub struct AppendScenarioMsg<F>(pub F);
487
488impl<S, N, F> FormatEvent<S, N> for AppendScenarioMsg<F>
489where
490 S: Subscriber + for<'a> LookupSpan<'a>,
491 N: for<'a> FormatFields<'a> + 'static,
492 F: FormatEvent<S, N>,
493{
494 fn format_event(
495 &self,
496 ctx: &FmtContext<'_, S, N>,
497 mut writer: format::Writer<'_>,
498 event: &Event<'_>,
499 ) -> fmt::Result {
500 self.0.format_event(ctx, writer.by_ref(), event)?;
501
502 if let Some(scenario_id) = ctx.event_scope().and_then(|scope| {
503 scope
504 .from_root()
505 .find_map(|span| span.extensions().get::<ScenarioId>().copied())
506 }) {
507 writer.write_fmt(format_args!(
508 "{}{scenario_id}",
509 suffix::BEFORE_SCENARIO_ID,
510 ))?;
511 } else {
512 writer.write_fmt(format_args!("{}", suffix::NO_SCENARIO_ID))?;
513 }
514 writer.write_fmt(format_args!("{}", suffix::END))
515 }
516}
517
518mod suffix {
519 pub(crate) const END: &str = "__cucumber__scenario";
529
530 pub(crate) const BEFORE_SCENARIO_ID: &str = "__";
534
535 pub(crate) const NO_SCENARIO_ID: &str = "__unknown";
539}
540
541#[derive(Clone, Debug)]
543pub struct CollectorWriter {
544 sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
546}
547
548impl CollectorWriter {
549 const fn new(
551 sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
552 ) -> Self {
553 Self { sender }
554 }
555}
556
557impl<'a> MakeWriter<'a> for CollectorWriter {
558 type Writer = Self;
559
560 fn make_writer(&'a self) -> Self::Writer {
561 self.clone()
562 }
563}
564
565impl io::Write for CollectorWriter {
566 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
567 let msgs = String::from_utf8_lossy(buf);
575 for msg in msgs.split_terminator(suffix::END) {
576 if let Some((before, after)) =
577 msg.rsplit_once(suffix::NO_SCENARIO_ID)
578 {
579 if !after.is_empty() {
580 return Err(io::Error::new(
581 io::ErrorKind::InvalidData,
582 "wrong separator",
583 ));
584 }
585 _ = self.sender.unbounded_send((None, before.to_owned())).ok();
586 } else if let Some((before, after)) =
587 msg.rsplit_once(suffix::BEFORE_SCENARIO_ID)
588 {
589 let scenario_id = after.parse().map_err(|e| {
590 io::Error::new(io::ErrorKind::InvalidData, e)
591 })?;
592 _ = self
593 .sender
594 .unbounded_send((Some(scenario_id), before.to_owned()))
595 .ok();
596 } else {
597 return Err(io::Error::new(
598 io::ErrorKind::InvalidData,
599 "missing separator",
600 ));
601 }
602 }
603 Ok(buf.len())
604 }
605
606 fn flush(&mut self) -> io::Result<()> {
607 Ok(())
608 }
609}