1use std::{collections::HashMap, fmt, io, iter, sync::Arc};
4
5use futures::channel::{mpsc, oneshot};
6use itertools::Either;
7use tracing::{
8 field::{Field, Visit},
9 span, Dispatch, Event, Span, Subscriber,
10};
11use tracing_subscriber::{
12 field::RecordFields,
13 filter::LevelFilter,
14 fmt::{
15 format::{self, Format},
16 FmtContext, FormatEvent, FormatFields, MakeWriter,
17 },
18 layer::{self, Layer, Layered, SubscriberExt as _},
19 registry::LookupSpan,
20 util::SubscriberInitExt as _,
21};
22
23use crate::{
24 event::{self, HookType},
25 runner::{
26 self,
27 basic::{RetryOptions, ScenarioId},
28 },
29 Cucumber, Parser, Runner, ScenarioType, World, Writer,
30};
31
32impl<W, P, I, Wr, Cli, WhichSc, Before, After>
33 Cucumber<W, P, I, runner::Basic<W, WhichSc, Before, After>, Wr, Cli>
34where
35 W: World,
36 P: Parser<I>,
37 runner::Basic<W, WhichSc, Before, After>: Runner<W>,
38 Wr: Writer<W>,
39 Cli: clap::Args,
40{
41 #[must_use]
46 pub fn init_tracing(self) -> Self {
47 self.configure_and_init_tracing(
48 format::DefaultFields::new(),
49 Format::default(),
50 |layer| {
51 tracing_subscriber::registry()
52 .with(LevelFilter::INFO.and_then(layer))
53 },
54 )
55 }
56
57 #[must_use]
91 pub fn configure_and_init_tracing<Event, Fields, Sub, Conf, Out>(
92 self,
93 fmt_fields: Fields,
94 event_format: Event,
95 configure: Conf,
96 ) -> Self
97 where
98 Fields: for<'a> FormatFields<'a> + 'static,
99 Event: FormatEvent<Sub, SkipScenarioIdSpan<Fields>> + 'static,
100 Sub: Subscriber + for<'a> LookupSpan<'a>,
101 Out: Subscriber + Send + Sync,
102 Conf: FnOnce(
105 Layered<
106 tracing_subscriber::fmt::Layer<
107 Sub,
108 SkipScenarioIdSpan<Fields>,
109 AppendScenarioMsg<Event>,
110 CollectorWriter,
111 >,
112 RecordScenarioId,
113 Sub,
114 >,
115 ) -> Out,
116 {
117 let (logs_sender, logs_receiver) = mpsc::unbounded();
118 let (span_close_sender, span_close_receiver) = mpsc::unbounded();
119
120 let layer = RecordScenarioId::new(span_close_sender).and_then(
121 tracing_subscriber::fmt::layer()
122 .fmt_fields(SkipScenarioIdSpan(fmt_fields))
123 .event_format(AppendScenarioMsg(event_format))
124 .with_writer(CollectorWriter::new(logs_sender)),
125 );
126 Dispatch::new(configure(layer)).init();
127
128 drop(
129 self.runner
130 .logs_collector
131 .swap(Box::new(Some(Collector::new(
132 logs_receiver,
133 span_close_receiver,
134 )))),
135 );
136
137 self
138 }
139}
140
141type Scenarios = HashMap<
145 ScenarioId,
146 (
147 Arc<gherkin::Feature>,
148 Option<Arc<gherkin::Rule>>,
149 Arc<gherkin::Scenario>,
150 Option<RetryOptions>,
151 ),
152>;
153
154type SpanEventsCallbacks =
156 HashMap<span::Id, (Option<Vec<Callback>>, IsReceived)>;
157
158type IsReceived = bool;
160
161type Callback = oneshot::Sender<()>;
163
164#[derive(Debug)]
166pub(crate) struct Collector {
167 scenarios: Scenarios,
169
170 logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
173
174 span_events: SpanEventsCallbacks,
177
178 span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
180
181 wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
183
184 wait_span_event_receiver: mpsc::UnboundedReceiver<(span::Id, Callback)>,
186}
187
188impl Collector {
189 pub(crate) fn new(
191 logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
192 span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
193 ) -> Self {
194 let (sender, receiver) = mpsc::unbounded();
195 Self {
196 scenarios: HashMap::new(),
197 logs_receiver,
198 span_events: HashMap::new(),
199 span_close_receiver,
200 wait_span_event_sender: sender,
201 wait_span_event_receiver: receiver,
202 }
203 }
204
205 pub(crate) fn scenario_span_event_waiter(&self) -> SpanCloseWaiter {
207 SpanCloseWaiter {
208 wait_span_event_sender: self.wait_span_event_sender.clone(),
209 }
210 }
211
212 pub(crate) fn start_scenarios(
216 &mut self,
217 runnable: impl AsRef<
218 [(
219 ScenarioId,
220 Arc<gherkin::Feature>,
221 Option<Arc<gherkin::Rule>>,
222 Arc<gherkin::Scenario>,
223 ScenarioType,
224 Option<RetryOptions>,
225 )],
226 >,
227 ) {
228 for (id, f, r, s, _, ret) in runnable.as_ref() {
229 drop(
230 self.scenarios.insert(
231 *id,
232 (Arc::clone(f), r.clone(), Arc::clone(s), *ret),
233 ),
234 );
235 }
236 }
237
238 pub(crate) fn finish_scenario(&mut self, id: ScenarioId) {
242 drop(self.scenarios.remove(&id));
243 }
244
245 pub(crate) fn emitted_logs<W>(
254 &mut self,
255 ) -> Option<Vec<event::Cucumber<W>>> {
256 self.notify_about_closing_spans();
257
258 self.logs_receiver
259 .try_next()
260 .ok()
261 .flatten()
262 .map(|(id, msg)| {
263 id.and_then(|k| self.scenarios.get(&k))
264 .map_or_else(
265 || Either::Left(self.scenarios.values()),
266 |p| Either::Right(iter::once(p)),
267 )
268 .map(|(f, r, s, opt)| {
269 event::Cucumber::scenario(
270 Arc::clone(f),
271 r.clone(),
272 Arc::clone(s),
273 event::RetryableScenario {
274 event: event::Scenario::Log(msg.clone()),
275 retries: opt.map(|o| o.retries),
276 },
277 )
278 })
279 .collect()
280 })
281 }
282
283 fn notify_about_closing_spans(&mut self) {
285 if let Some(id) = self.span_close_receiver.try_next().ok().flatten() {
286 self.span_events.entry(id).or_default().1 = true;
287 }
288 while let Some((id, callback)) =
289 self.wait_span_event_receiver.try_next().ok().flatten()
290 {
291 self.span_events
292 .entry(id)
293 .or_default()
294 .0
295 .get_or_insert(Vec::new())
296 .push(callback);
297 }
298 self.span_events.retain(|_, (callbacks, is_received)| {
299 if callbacks.is_some() && *is_received {
300 for callback in callbacks
301 .take()
302 .unwrap_or_else(|| unreachable!("`callbacks.is_some()`"))
303 {
304 _ = callback.send(()).ok();
305 }
306 false
307 } else {
308 true
309 }
310 });
311 }
312}
313
314#[allow(clippy::multiple_inherent_impl)] impl ScenarioId {
317 const SPAN_FIELD_NAME: &'static str = "__cucumber_scenario_id";
319
320 pub(crate) fn scenario_span(self) -> Span {
325 tracing::error_span!("scenario", __cucumber_scenario_id = self.0)
328 }
329
330 #[allow(clippy::unused_self)]
334 pub(crate) fn step_span(self, is_background: bool) -> Span {
335 if is_background {
338 tracing::error_span!("background step")
339 } else {
340 tracing::error_span!("step")
341 }
342 }
343
344 #[allow(clippy::unused_self)]
348 pub(crate) fn hook_span(self, hook_ty: HookType) -> Span {
349 match hook_ty {
352 HookType::Before => tracing::error_span!("before hook"),
353 HookType::After => tracing::error_span!("after hook"),
354 }
355 }
356}
357
358#[derive(Clone, Debug)]
367pub(crate) struct SpanCloseWaiter {
368 wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
370}
371
372impl SpanCloseWaiter {
373 pub(crate) async fn wait_for_span_close(&self, id: span::Id) {
375 let (sender, receiver) = oneshot::channel();
376 _ = self
377 .wait_span_event_sender
378 .unbounded_send((id, sender))
379 .ok();
380 _ = receiver.await.ok();
381 }
382}
383
384#[derive(Debug)]
388pub struct RecordScenarioId {
389 span_close_sender: mpsc::UnboundedSender<span::Id>,
391}
392
393impl RecordScenarioId {
394 const fn new(span_close_sender: mpsc::UnboundedSender<span::Id>) -> Self {
396 Self { span_close_sender }
397 }
398}
399
400impl<S> Layer<S> for RecordScenarioId
401where
402 S: for<'a> LookupSpan<'a> + Subscriber,
403{
404 fn on_new_span(
405 &self,
406 attr: &span::Attributes<'_>,
407 id: &span::Id,
408 ctx: layer::Context<'_, S>,
409 ) {
410 if let Some(span) = ctx.span(id) {
411 let mut visitor = GetScenarioId(None);
412 attr.values().record(&mut visitor);
413
414 if let Some(scenario_id) = visitor.0 {
415 let mut ext = span.extensions_mut();
416 _ = ext.replace(scenario_id);
417 }
418 }
419 }
420
421 fn on_record(
422 &self,
423 id: &span::Id,
424 values: &span::Record<'_>,
425 ctx: layer::Context<'_, S>,
426 ) {
427 if let Some(span) = ctx.span(id) {
428 let mut visitor = GetScenarioId(None);
429 values.record(&mut visitor);
430
431 if let Some(scenario_id) = visitor.0 {
432 let mut ext = span.extensions_mut();
433 _ = ext.replace(scenario_id);
434 }
435 }
436 }
437
438 fn on_close(&self, id: span::Id, _ctx: layer::Context<'_, S>) {
439 _ = self.span_close_sender.unbounded_send(id).ok();
440 }
441}
442
443#[derive(Debug)]
446struct GetScenarioId(Option<ScenarioId>);
447
448impl Visit for GetScenarioId {
449 fn record_u64(&mut self, field: &Field, value: u64) {
450 if field.name() == ScenarioId::SPAN_FIELD_NAME {
451 self.0 = Some(ScenarioId(value));
452 }
453 }
454
455 fn record_debug(&mut self, _: &Field, _: &dyn fmt::Debug) {}
456}
457
458#[derive(Debug)]
460pub struct SkipScenarioIdSpan<F>(pub F);
461
462impl<'w, F: FormatFields<'w>> FormatFields<'w> for SkipScenarioIdSpan<F> {
463 fn format_fields<R: RecordFields>(
464 &self,
465 writer: format::Writer<'w>,
466 fields: R,
467 ) -> fmt::Result {
468 let mut is_scenario_span = IsScenarioIdSpan(false);
469 fields.record(&mut is_scenario_span);
470 if !is_scenario_span.0 {
471 self.0.format_fields(writer, fields)?;
472 }
473 Ok(())
474 }
475}
476
477#[derive(Debug)]
480struct IsScenarioIdSpan(bool);
481
482impl Visit for IsScenarioIdSpan {
483 fn record_debug(&mut self, field: &Field, _: &dyn fmt::Debug) {
484 if field.name() == ScenarioId::SPAN_FIELD_NAME {
485 self.0 = true;
486 }
487 }
488}
489
490#[derive(Debug)]
495pub struct AppendScenarioMsg<F>(pub F);
496
497impl<S, N, F> FormatEvent<S, N> for AppendScenarioMsg<F>
498where
499 S: Subscriber + for<'a> LookupSpan<'a>,
500 N: for<'a> FormatFields<'a> + 'static,
501 F: FormatEvent<S, N>,
502{
503 fn format_event(
504 &self,
505 ctx: &FmtContext<'_, S, N>,
506 mut writer: format::Writer<'_>,
507 event: &Event<'_>,
508 ) -> fmt::Result {
509 self.0.format_event(ctx, writer.by_ref(), event)?;
510
511 if let Some(scenario_id) = ctx.event_scope().and_then(|scope| {
512 scope
513 .from_root()
514 .find_map(|span| span.extensions().get::<ScenarioId>().copied())
515 }) {
516 writer.write_fmt(format_args!(
517 "{}{scenario_id}",
518 suffix::BEFORE_SCENARIO_ID,
519 ))?;
520 } else {
521 writer.write_fmt(format_args!("{}", suffix::NO_SCENARIO_ID))?;
522 }
523 writer.write_fmt(format_args!("{}", suffix::END))
524 }
525}
526
527mod suffix {
528 pub(crate) const END: &str = "__cucumber__scenario";
538
539 pub(crate) const BEFORE_SCENARIO_ID: &str = "__";
543
544 pub(crate) const NO_SCENARIO_ID: &str = "__unknown";
548}
549
550#[derive(Clone, Debug)]
552pub struct CollectorWriter {
553 sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
555}
556
557impl CollectorWriter {
558 const fn new(
560 sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
561 ) -> Self {
562 Self { sender }
563 }
564}
565
566impl<'a> MakeWriter<'a> for CollectorWriter {
567 type Writer = Self;
568
569 fn make_writer(&'a self) -> Self::Writer {
570 self.clone()
571 }
572}
573
574impl io::Write for CollectorWriter {
575 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
576 let msgs = String::from_utf8_lossy(buf);
584 for msg in msgs.split_terminator(suffix::END) {
585 if let Some((before, after)) =
586 msg.rsplit_once(suffix::NO_SCENARIO_ID)
587 {
588 if !after.is_empty() {
589 return Err(io::Error::new(
590 io::ErrorKind::InvalidData,
591 "wrong separator",
592 ));
593 }
594 _ = self.sender.unbounded_send((None, before.to_owned())).ok();
595 } else if let Some((before, after)) =
596 msg.rsplit_once(suffix::BEFORE_SCENARIO_ID)
597 {
598 let scenario_id = after.parse().map_err(|e| {
599 io::Error::new(io::ErrorKind::InvalidData, e)
600 })?;
601 _ = self
602 .sender
603 .unbounded_send((Some(scenario_id), before.to_owned()))
604 .ok();
605 } else {
606 return Err(io::Error::new(
607 io::ErrorKind::InvalidData,
608 "missing separator",
609 ));
610 }
611 }
612 Ok(buf.len())
613 }
614
615 fn flush(&mut self) -> io::Result<()> {
616 Ok(())
617 }
618}