use std::{collections::HashMap, fmt, io, iter};
use derive_more::with_trait::Debug;
use futures::channel::{mpsc, oneshot};
use itertools::Either;
use tracing::{
Dispatch, Event, Span, Subscriber,
field::{Field, Visit},
span,
};
use tracing_subscriber::{
field::RecordFields,
filter::LevelFilter,
fmt::{
FmtContext, FormatEvent, FormatFields, MakeWriter,
format::{self, Format},
},
layer::{self, Layer, Layered, SubscriberExt as _},
registry::LookupSpan,
util::SubscriberInitExt as _,
};
use crate::{
Cucumber, Parser, Runner, ScenarioType, World, Writer,
event::{self, HookType, Source},
runner::{
self,
basic::{RetryOptions, ScenarioId},
},
};
impl<W, P, I, Wr, Cli, WhichSc, Before, After>
Cucumber<W, P, I, runner::Basic<W, WhichSc, Before, After>, Wr, Cli>
where
W: World,
P: Parser<I>,
runner::Basic<W, WhichSc, Before, After>: Runner<W>,
Wr: Writer<W>,
Cli: clap::Args,
{
#[must_use]
pub fn init_tracing(self) -> Self {
self.configure_and_init_tracing(
format::DefaultFields::new(),
Format::default(),
|layer| {
tracing_subscriber::registry()
.with(LevelFilter::INFO.and_then(layer))
},
)
}
#[must_use]
pub fn configure_and_init_tracing<Event, Fields, Sub, Conf, Out>(
self,
fmt_fields: Fields,
event_format: Event,
configure: Conf,
) -> Self
where
Fields: for<'a> FormatFields<'a> + 'static,
Event: FormatEvent<Sub, SkipScenarioIdSpan<Fields>> + 'static,
Sub: Subscriber + for<'a> LookupSpan<'a>,
Out: Subscriber + Send + Sync,
Conf: FnOnce(
Layered<
tracing_subscriber::fmt::Layer<
Sub,
SkipScenarioIdSpan<Fields>,
AppendScenarioMsg<Event>,
CollectorWriter,
>,
RecordScenarioId,
Sub,
>,
) -> Out,
{
let (logs_sender, logs_receiver) = mpsc::unbounded();
let (span_close_sender, span_close_receiver) = mpsc::unbounded();
let layer = RecordScenarioId::new(span_close_sender).and_then(
tracing_subscriber::fmt::layer()
.fmt_fields(SkipScenarioIdSpan(fmt_fields))
.event_format(AppendScenarioMsg(event_format))
.with_writer(CollectorWriter::new(logs_sender)),
);
Dispatch::new(configure(layer)).init();
drop(self.runner.logs_collector.swap(Box::new(Some(Collector::new(
logs_receiver,
span_close_receiver,
)))));
self
}
}
type Scenarios = HashMap<
ScenarioId,
(
Source<gherkin::Feature>,
Option<Source<gherkin::Rule>>,
Source<gherkin::Scenario>,
Option<RetryOptions>,
),
>;
type SpanEventsCallbacks =
HashMap<span::Id, (Option<Vec<Callback>>, IsReceived)>;
type IsReceived = bool;
type Callback = oneshot::Sender<()>;
#[derive(Debug)]
pub(crate) struct Collector {
scenarios: Scenarios,
logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
span_events: SpanEventsCallbacks,
span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
wait_span_event_receiver: mpsc::UnboundedReceiver<(span::Id, Callback)>,
}
impl Collector {
pub(crate) fn new(
logs_receiver: mpsc::UnboundedReceiver<(Option<ScenarioId>, String)>,
span_close_receiver: mpsc::UnboundedReceiver<span::Id>,
) -> Self {
let (sender, receiver) = mpsc::unbounded();
Self {
scenarios: HashMap::new(),
logs_receiver,
span_events: HashMap::new(),
span_close_receiver,
wait_span_event_sender: sender,
wait_span_event_receiver: receiver,
}
}
pub(crate) fn scenario_span_event_waiter(&self) -> SpanCloseWaiter {
SpanCloseWaiter {
wait_span_event_sender: self.wait_span_event_sender.clone(),
}
}
pub(crate) fn start_scenarios(
&mut self,
runnable: impl AsRef<
[(
ScenarioId,
Source<gherkin::Feature>,
Option<Source<gherkin::Rule>>,
Source<gherkin::Scenario>,
ScenarioType,
Option<RetryOptions>,
)],
>,
) {
for (id, f, r, s, _, ret) in runnable.as_ref() {
drop(
self.scenarios
.insert(*id, (f.clone(), r.clone(), s.clone(), *ret)),
);
}
}
pub(crate) fn finish_scenario(&mut self, id: ScenarioId) {
drop(self.scenarios.remove(&id));
}
pub(crate) fn emitted_logs<W>(
&mut self,
) -> Option<Vec<event::Cucumber<W>>> {
self.notify_about_closing_spans();
self.logs_receiver.try_next().ok().flatten().map(|(id, msg)| {
id.and_then(|k| self.scenarios.get(&k))
.map_or_else(
|| Either::Left(self.scenarios.values()),
|p| Either::Right(iter::once(p)),
)
.map(|(f, r, s, opt)| {
event::Cucumber::scenario(
f.clone(),
r.clone(),
s.clone(),
event::RetryableScenario {
event: event::Scenario::Log(msg.clone()),
retries: opt.map(|o| o.retries),
},
)
})
.collect()
})
}
fn notify_about_closing_spans(&mut self) {
if let Some(id) = self.span_close_receiver.try_next().ok().flatten() {
self.span_events.entry(id).or_default().1 = true;
}
while let Some((id, callback)) =
self.wait_span_event_receiver.try_next().ok().flatten()
{
self.span_events
.entry(id)
.or_default()
.0
.get_or_insert(Vec::new())
.push(callback);
}
self.span_events.retain(|_, (callbacks, is_received)| {
if callbacks.is_some() && *is_received {
for callback in callbacks
.take()
.unwrap_or_else(|| unreachable!("`callbacks.is_some()`"))
{
_ = callback.send(()).ok();
}
false
} else {
true
}
});
}
}
#[expect( // related to `tracing` capabilities only
clippy::multiple_inherent_impl,
reason = "related to `tracing` capabilities only"
)]
impl ScenarioId {
const SPAN_FIELD_NAME: &'static str = "__cucumber_scenario_id";
pub(crate) fn scenario_span(self) -> Span {
tracing::error_span!("scenario", __cucumber_scenario_id = self.0)
}
#[expect(clippy::unused_self, reason = "API uniformity")]
pub(crate) fn step_span(self, is_background: bool) -> Span {
if is_background {
tracing::error_span!("background step")
} else {
tracing::error_span!("step")
}
}
#[expect(clippy::unused_self, reason = "API uniformity")]
pub(crate) fn hook_span(self, hook_ty: HookType) -> Span {
match hook_ty {
HookType::Before => tracing::error_span!("before hook"),
HookType::After => tracing::error_span!("after hook"),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct SpanCloseWaiter {
wait_span_event_sender: mpsc::UnboundedSender<(span::Id, Callback)>,
}
impl SpanCloseWaiter {
pub(crate) async fn wait_for_span_close(&self, id: span::Id) {
let (sender, receiver) = oneshot::channel();
_ = self.wait_span_event_sender.unbounded_send((id, sender)).ok();
_ = receiver.await.ok();
}
}
#[derive(Debug)]
pub struct RecordScenarioId {
span_close_sender: mpsc::UnboundedSender<span::Id>,
}
impl RecordScenarioId {
const fn new(span_close_sender: mpsc::UnboundedSender<span::Id>) -> Self {
Self { span_close_sender }
}
}
impl<S> Layer<S> for RecordScenarioId
where
S: for<'a> LookupSpan<'a> + Subscriber,
{
fn on_new_span(
&self,
attrs: &span::Attributes<'_>,
id: &span::Id,
ctx: layer::Context<'_, S>,
) {
if let Some(span) = ctx.span(id) {
let mut visitor = GetScenarioId(None);
attrs.values().record(&mut visitor);
if let Some(scenario_id) = visitor.0 {
let mut ext = span.extensions_mut();
_ = ext.replace(scenario_id);
}
}
}
fn on_record(
&self,
id: &span::Id,
values: &span::Record<'_>,
ctx: layer::Context<'_, S>,
) {
if let Some(span) = ctx.span(id) {
let mut visitor = GetScenarioId(None);
values.record(&mut visitor);
if let Some(scenario_id) = visitor.0 {
let mut ext = span.extensions_mut();
_ = ext.replace(scenario_id);
}
}
}
fn on_close(&self, id: span::Id, _ctx: layer::Context<'_, S>) {
_ = self.span_close_sender.unbounded_send(id).ok();
}
}
#[derive(Debug)]
struct GetScenarioId(Option<ScenarioId>);
impl Visit for GetScenarioId {
fn record_u64(&mut self, field: &Field, value: u64) {
if field.name() == ScenarioId::SPAN_FIELD_NAME {
self.0 = Some(ScenarioId(value));
}
}
fn record_debug(&mut self, _: &Field, _: &dyn Debug) {}
}
#[derive(Debug)]
pub struct SkipScenarioIdSpan<F>(pub F);
impl<'w, F: FormatFields<'w>> FormatFields<'w> for SkipScenarioIdSpan<F> {
fn format_fields<R: RecordFields>(
&self,
writer: format::Writer<'w>,
fields: R,
) -> fmt::Result {
let mut is_scenario_span = IsScenarioIdSpan(false);
fields.record(&mut is_scenario_span);
if !is_scenario_span.0 {
self.0.format_fields(writer, fields)?;
}
Ok(())
}
}
#[derive(Debug)]
struct IsScenarioIdSpan(bool);
impl Visit for IsScenarioIdSpan {
fn record_debug(&mut self, field: &Field, _: &dyn Debug) {
if field.name() == ScenarioId::SPAN_FIELD_NAME {
self.0 = true;
}
}
}
#[derive(Debug)]
pub struct AppendScenarioMsg<F>(pub F);
impl<S, N, F> FormatEvent<S, N> for AppendScenarioMsg<F>
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
F: FormatEvent<S, N>,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: format::Writer<'_>,
event: &Event<'_>,
) -> fmt::Result {
self.0.format_event(ctx, writer.by_ref(), event)?;
if let Some(scenario_id) = ctx.event_scope().and_then(|scope| {
scope
.from_root()
.find_map(|span| span.extensions().get::<ScenarioId>().copied())
}) {
writer.write_fmt(format_args!(
"{}{scenario_id}",
suffix::BEFORE_SCENARIO_ID,
))?;
} else {
writer.write_fmt(format_args!("{}", suffix::NO_SCENARIO_ID))?;
}
writer.write_fmt(format_args!("{}", suffix::END))
}
}
mod suffix {
pub(crate) const END: &str = "__cucumber__scenario";
pub(crate) const BEFORE_SCENARIO_ID: &str = "__";
pub(crate) const NO_SCENARIO_ID: &str = "__unknown";
}
#[derive(Clone, Debug)]
pub struct CollectorWriter {
sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
}
impl CollectorWriter {
const fn new(
sender: mpsc::UnboundedSender<(Option<ScenarioId>, String)>,
) -> Self {
Self { sender }
}
}
impl<'a> MakeWriter<'a> for CollectorWriter {
type Writer = Self;
fn make_writer(&'a self) -> Self::Writer {
self.clone()
}
}
impl io::Write for CollectorWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let msgs = String::from_utf8_lossy(buf);
for msg in msgs.split_terminator(suffix::END) {
if let Some((before, after)) =
msg.rsplit_once(suffix::NO_SCENARIO_ID)
{
if !after.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"wrong separator",
));
}
_ = self.sender.unbounded_send((None, before.to_owned())).ok();
} else if let Some((before, after)) =
msg.rsplit_once(suffix::BEFORE_SCENARIO_ID)
{
let scenario_id = after.parse().map_err(|e| {
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
_ = self
.sender
.unbounded_send((Some(scenario_id), before.to_owned()))
.ok();
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"missing separator",
));
}
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}