use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Semaphore, mpsc};
use tokio::task::JoinSet;
use tracing::Level;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::batching::{self, BatchFailurePolicy};
use crate::caching::{self, CacheClass};
use crate::cli::RunnerConfig;
use crate::duration_db::{DurationDb, DurationEstimate};
use crate::execution::{TestingRequest, build_listing_request, build_testing_request};
use crate::executor_server::SpecEnvelope;
use crate::environment::{SchedulingProfile, profile_from_labels};
use crate::listing::{IgnoredPolicy, TestCase};
use crate::orchestrator::Orchestrator;
use crate::policy::{self, Owner, QuarantineStatus, RetryPolicy};
use crate::result::{
self, Execute2Outcome, FailureClass, ProcessOutcome, RunIdentity, TestIdentity, TestVerdict,
build_test_result, decode_response,
};
use crate::spec::TargetSpec;
use crate::translator::{ListingStrategy, PerTestObservation, Translator, TranslatorRegistry};
use crate::variant::RepeatKind;
const NONZERO_EXIT: i32 = 32;
const INFRA_MAX_ATTEMPTS: u32 = 3;
const UNSEEN_WEIGHT_MS: u64 = 50;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RunVerdict {
Pass,
Fail,
}
impl RunVerdict {
fn exit_code(self) -> i32 {
match self {
RunVerdict::Pass => 0,
RunVerdict::Fail => NONZERO_EXIT,
}
}
}
#[derive(Clone)]
struct TestOutcome {
status: TestVerdict,
details: String,
duration: Duration,
max_memory: Option<u64>,
}
#[derive(Clone)]
struct DbObservation {
duration: Duration,
failed: bool,
failure_class: Option<FailureClass>,
env: crate::duration_db::Environment,
}
enum ReporterMessage {
Finished(Vec<FinishedTest>),
Discovered(Vec<TestIdentity>),
}
struct FinishedTest {
result: crate::proto::test::TestResult,
test_id: TestIdentity,
status: TestVerdict,
quarantined: bool,
db_observations: Vec<DbObservation>,
}
pub async fn run(
orch: Orchestrator,
mut intake: mpsc::UnboundedReceiver<SpecEnvelope>,
config: RunnerConfig,
context: crate::cli::SessionContext,
) -> i32 {
let config = Arc::new(config);
let global_sem = Arc::new(Semaphore::new(config.limits.max_inflight_test_actions));
let listing_sem = Arc::new(Semaphore::new(config.limits.max_inflight_listings));
let estimates = Arc::new(load_db(config.duration_db.as_deref()));
let (report_tx, report_rx) = mpsc::channel::<ReporterMessage>(config.limits.max_report_queue);
let reporter_db = load_db(config.duration_db.as_deref());
let reporter = tokio::spawn(reporter_task(orch.clone(), report_rx, reporter_db, context));
let mut target_tasks = JoinSet::new();
while let Some(envelope) = intake.recv().await {
match envelope {
SpecEnvelope::Spec(spec) => {
let ctx = TargetCtx {
orch: orch.clone(),
config: config.clone(),
global_sem: global_sem.clone(),
listing_sem: listing_sem.clone(),
estimates: estimates.clone(),
report_tx: report_tx.clone(),
};
target_tasks.spawn(async move { run_target(ctx, *spec).await });
}
SpecEnvelope::EndOfRequests => break,
}
}
let any_target_panicked = drain_joinset(&mut target_tasks).await;
drop(report_tx);
let mut verdict = reporter.await.unwrap_or(RunVerdict::Fail);
if any_target_panicked {
verdict = RunVerdict::Fail;
}
verdict.exit_code()
}
async fn drain_joinset(tasks: &mut JoinSet<()>) -> bool {
let mut failed = false;
while let Some(joined) = tasks.join_next().await {
if let Err(e) = joined {
eprintln!("quokka: a spawned task did not complete: {e}");
failed = true;
}
}
failed
}
fn load_db(dir: Option<&std::path::Path>) -> DurationDb {
match dir {
Some(dir) => DurationDb::load(dir.to_path_buf()),
None => DurationDb::ephemeral(),
}
}
#[derive(Default)]
struct Tally {
total: u64,
passed: u64,
failed: u64,
quarantined_failed: u64,
skipped: u64,
}
const MAX_CONSOLE_FAILURES: u64 = 100;
async fn reporter_task(
orch: Orchestrator,
mut rx: mpsc::Receiver<ReporterMessage>,
mut db: DurationDb,
context: crate::cli::SessionContext,
) -> RunVerdict {
let mut verdict = RunVerdict::Pass;
let mut tally = Tally::default();
while let Some(msg) = rx.recv().await {
match msg {
ReporterMessage::Discovered(tids) => {
for tid in tids {
db.record_discovered_name(&tid);
}
}
ReporterMessage::Finished(batch) => {
for finished in batch {
tally.total += 1;
let failure = finished.status.is_failure();
match finished.status {
TestVerdict::Pass => tally.passed += 1,
TestVerdict::Skip | TestVerdict::Omitted => tally.skipped += 1,
_ => {}
}
let fail_name = if failure && !finished.quarantined {
Some(finished.result.name.clone())
} else {
None
};
if let Err(e) = orch.report_test_result(finished.result).await {
eprintln!("quokka: failed to report a test result: {e:#}");
}
if failure {
if finished.quarantined {
tally.quarantined_failed += 1;
} else {
tally.failed += 1;
verdict = RunVerdict::Fail;
}
}
if let Some(name) = fail_name
&& tally.failed <= MAX_CONSOLE_FAILURES
{
let _ = orch.console(Level::WARN, format!("FAIL {name}")).await;
}
for obs in &finished.db_observations {
db.record(
obs.env,
&finished.test_id,
obs.duration,
obs.failed,
obs.failure_class,
);
}
}
}
}
}
let summary = session_summary(&tally, &context);
let level = if tally.failed > 0 {
Level::WARN
} else {
Level::INFO
};
let _ = orch.console(level, summary.clone()).await;
let _ = orch
.report_test_session(summary, context.trace_id.clone())
.await;
if let Err(e) = db.flush() {
eprintln!("quokka: failed to flush duration DB: {e:#}");
}
verdict
}
fn session_summary(tally: &Tally, context: &crate::cli::SessionContext) -> String {
let mut s = format!(
"test run complete: {} tests, {} passed, {} failed, {} skipped",
tally.total, tally.passed, tally.failed, tally.skipped
);
if tally.quarantined_failed > 0 {
s.push_str(&format!(
" ({} quarantined failure(s), not counted)",
tally.quarantined_failed
));
}
if let Some(platform) = &context.host_platform {
s.push_str(&format!(" [platform={platform}]"));
}
if let Some(trace) = &context.trace_id {
s.push_str(&format!(" [trace={trace}]"));
}
s
}
#[derive(Clone)]
struct TargetCtx {
orch: Orchestrator,
config: Arc<RunnerConfig>,
global_sem: Arc<Semaphore>,
listing_sem: Arc<Semaphore>,
estimates: Arc<DurationDb>,
report_tx: mpsc::Sender<ReporterMessage>,
}
struct TargetPlan {
spec: Arc<TargetSpec>,
translator: Box<dyn Translator>,
ignored: IgnoredPolicy,
cache_class: CacheClass,
retry: RetryPolicy,
quarantine: QuarantineStatus,
timeout: Duration,
profile: SchedulingProfile,
owner: Owner,
repeat: RepeatKind,
}
impl TargetPlan {
fn derive(spec: Arc<TargetSpec>, config: &RunnerConfig, registry: &TranslatorRegistry) -> Self {
let labels: &[String] = &spec.labels;
let translator = registry
.resolve(&spec.test_type, config)
.unwrap_or_else(|| {
panic!("No translator registered for test_type: {}", spec.test_type)
});
let repeat = if config.stress.is_stress() {
config.stress
} else if labels
.iter()
.any(|l| l.split_once(':').unwrap_or(("", l)).1 == "stress")
{
RepeatKind::Stress(config.stress_label_reps)
} else {
RepeatKind::Once
};
TargetPlan {
translator,
ignored: config.ignored,
cache_class: caching::cache_class(labels),
retry: policy::retry_policy(labels, config.flaky_attempts),
quarantine: policy::quarantine_status(labels),
timeout: policy::test_timeout(labels).resolve(config.per_test_timeout),
profile: profile_from_labels(labels).unwrap_or_else(|e| {
eprintln!(
"quokka: conflict resolving labels for {}: {}",
spec.display, e
);
SchedulingProfile::default()
}),
owner: policy::owner(labels),
repeat,
spec,
}
}
fn listing_profile(&self, config: &RunnerConfig) -> SchedulingProfile {
let mut p = SchedulingProfile::default();
if !self.translator.declares_executor_overrides() {
} else if config.local_debug {
p.hardware.local_debug = true;
} else {
p.hardware.listing_only = true;
}
p
}
fn testing_profile(&self, config: &RunnerConfig) -> SchedulingProfile {
let mut p = self.profile.clone();
if !self.translator.declares_executor_overrides() {
p.hardware = Default::default();
p.local_resources.clear();
} else if config.local_debug {
p.hardware = Default::default();
p.hardware.local_debug = true;
}
p
}
fn quarantined(&self) -> bool {
self.quarantine == QuarantineStatus::Quarantined
}
}
async fn run_target(ctx: TargetCtx, spec_proto: crate::proto::test::ExternalRunnerSpec) {
let spec = match TargetSpec::from_proto(spec_proto) {
Ok(spec) => spec,
Err(e) => {
eprintln!("quokka: dropping malformed spec: {e}");
return;
}
};
let registry = TranslatorRegistry::new();
let plan = Arc::new(TargetPlan::derive(spec, &ctx.config, ®istry));
run_per_test_target(&ctx, plan).await;
}
async fn run_per_test_target(ctx: &TargetCtx, plan: Arc<TargetPlan>) {
let config = &ctx.config;
let listing_outcome = match plan.translator.listing_strategy() {
ListingStrategy::PerTestListing { request_args, .. } => {
let listing_args = (request_args)(plan.ignored, &config.extra_test_args);
let mut infra_attempt = 0u32;
let outcome = loop {
let request = build_listing_request(
&plan.spec,
&listing_args,
&plan.listing_profile(config),
config.listing_timeout,
&config.extra_env,
);
let response = {
let _permit = ctx.listing_sem.acquire().await.expect("listing semaphore");
ctx.orch.execute2(request).await
};
match response {
Ok(response) => match decode_response(response) {
Execute2Outcome::CancelledQueueTimeout
if infra_attempt + 1 < INFRA_MAX_ATTEMPTS =>
{
infra_attempt += 1;
continue;
}
outcome => break Ok(outcome),
},
Err(e) => break Err(e),
}
};
Some(outcome)
}
_ => None,
};
let tests = match plan.translator.listing_strategy() {
ListingStrategy::WholeTarget { name } | ListingStrategy::WholeBinary { name } => {
vec![TestCase {
name: (*name).to_string(),
ignored: false,
}]
}
ListingStrategy::PerTestListing { parse, .. } => match listing_outcome {
Some(Ok(Execute2Outcome::Completed(action))) => match action.status {
ProcessOutcome::Finished { exit_code: 0 } => {
match (parse)(&action.stdout, plan.ignored) {
Ok(tests) => tests,
Err(e) => {
report_target_failure(
ctx,
&plan,
TestVerdict::Fatal,
format!("listing parse failed: {e}"),
)
.await;
return;
}
}
}
ProcessOutcome::Finished { exit_code } => {
report_target_failure(
ctx,
&plan,
TestVerdict::Fatal,
format!(
"listing exited {exit_code}\n{}",
String::from_utf8_lossy(&action.stderr)
),
)
.await;
return;
}
ProcessOutcome::TimedOut { .. } => {
report_target_failure(
ctx,
&plan,
TestVerdict::Timeout,
"listing timed out".into(),
)
.await;
return;
}
},
Some(Ok(Execute2Outcome::CancelledQueueTimeout)) => {
report_target_failure(
ctx,
&plan,
TestVerdict::InfraFailure,
"listing RE queue timeout (retries exhausted)".into(),
)
.await;
return;
}
Some(Ok(Execute2Outcome::CancelledUnspecified)) => {
report_target_failure(ctx, &plan, TestVerdict::Omitted, "listing cancelled".into())
.await;
return;
}
Some(Err(e)) => {
report_target_failure(
ctx,
&plan,
TestVerdict::Fatal,
format!("listing RPC failed: {e:#}"),
)
.await;
return;
}
None => unreachable!(),
},
};
let kept = shard_filter(&plan.spec.display, &tests, config);
if kept.is_empty() {
let _ = ctx
.orch
.report_tests_discovered(plan.spec.handle_proto(), plan.spec.suite.clone(), vec![])
.await;
return;
}
let discovered: Vec<String> = kept.iter().map(|t| t.name.clone()).collect();
let discovered_tids: Vec<TestIdentity> = tests
.iter()
.map(|t| TestIdentity {
target: plan.spec.display.clone(),
name: t.name.clone(),
variant: config.variant.clone(),
})
.collect();
let _ = ctx
.report_tx
.send(ReporterMessage::Discovered(discovered_tids))
.await;
let _ = ctx
.orch
.report_tests_discovered(
plan.spec.handle_proto(),
plan.spec.suite.clone(),
discovered,
)
.await;
if config.libtest_list_only {
return;
}
let execution_batches = build_batches(
&plan.spec.display,
&kept,
config,
&ctx.estimates,
plan.translator.parser_capability(),
);
let per_target_sem = Arc::new(Semaphore::new(config.limits.max_inflight_per_target));
let mut actions = JoinSet::new();
let repeats = plan.repeat.count();
for batch in execution_batches {
let expected_members = match &batch {
batching::TestSelection::All => kept.iter().map(|t| t.name.clone()).collect(),
batching::TestSelection::Explicit(g) => g.clone(),
};
for repeat_index in 0..repeats {
let ctx = ctx.clone();
let plan = plan.clone();
let selection = batch.clone();
let expected_members = expected_members.clone();
let per_target_sem = per_target_sem.clone();
actions.spawn(async move {
let finished = execute_test_action(
&ctx,
&plan,
selection,
expected_members,
repeat_index,
per_target_sem,
)
.await;
let _ = ctx
.report_tx
.send(ReporterMessage::Finished(finished))
.await;
});
}
}
if drain_joinset(&mut actions).await {
report_target_failure(
ctx,
&plan,
TestVerdict::Fatal,
"a test action task panicked".into(),
)
.await;
}
}
fn shard_filter(
target: &str,
tests: &[crate::listing::TestCase],
config: &RunnerConfig,
) -> Vec<crate::listing::TestCase> {
if !config.shard.is_sharded() {
return tests.to_vec();
}
tests
.iter()
.filter(|t| {
let key = format!("{target}\u{1}{}", t.name);
(fnv1a(key.as_bytes()) % u64::from(config.shard.count)) as u16 == config.shard.index
})
.cloned()
.collect()
}
fn build_batches(
target: &str,
tests: &[crate::listing::TestCase],
config: &RunnerConfig,
db: &DurationDb,
capability: crate::translator::DemuxCapability,
) -> Vec<batching::TestSelection<String>> {
#[derive(Clone)]
struct LocalBatchInput {
name: String,
estimate: DurationEstimate,
}
impl batching::Batchable for LocalBatchInput {
fn weight_ms(&self) -> u64 {
self.estimate.weight_ms(UNSEEN_WEIGHT_MS)
}
fn p50_ms(&self) -> u64 {
self.estimate.p50_ms(0)
}
}
let inputs: Vec<LocalBatchInput> = tests
.iter()
.map(|t| LocalBatchInput {
name: t.name.to_owned(),
estimate: db.estimate(
None,
&TestIdentity {
target: target.to_owned(),
name: t.name.to_owned(),
variant: config.variant.clone(),
},
),
})
.collect();
let batch_mode = match capability {
crate::translator::DemuxCapability::NameAttributable => config.batch_mode,
crate::translator::DemuxCapability::SingletonOnly => batching::BatchMode::PerTest,
};
use crate::batching::Batcher;
let mut batches: Vec<batching::TestSelection<String>> = batch_mode
.partition(&inputs)
.into_iter()
.map(|selection| match selection {
batching::TestSelection::All => batching::TestSelection::All,
batching::TestSelection::Explicit(group) => {
batching::TestSelection::Explicit(group.into_iter().map(|t| t.name).collect())
}
})
.collect();
batches.sort_by_key(|selection| {
std::cmp::Reverse(match selection {
batching::TestSelection::All => tests
.iter()
.map(|t| {
db.estimate(
None,
&TestIdentity {
target: target.to_owned(),
name: t.name.to_owned(),
variant: config.variant.clone(),
},
)
.weight_ms(UNSEEN_WEIGHT_MS)
})
.max()
.unwrap_or(0),
batching::TestSelection::Explicit(group) => group
.iter()
.map(|n| {
db.estimate(
None,
&TestIdentity {
target: target.to_owned(),
name: n.to_owned(),
variant: config.variant.clone(),
},
)
.weight_ms(UNSEEN_WEIGHT_MS)
})
.max()
.unwrap_or(0),
})
});
batches
}
enum GroupOutcome {
Observed {
observations: FxHashMap<String, PerTestObservation>,
raw: ProcessOutcome,
execution_time: Duration,
max_memory: Option<u64>,
fresh: bool,
env: crate::duration_db::Environment,
},
GroupFailed {
status: TestVerdict,
details: String,
},
}
#[derive(Clone)]
struct BestObs {
status: TestVerdict,
details: String,
duration: Duration,
max_memory: Option<u64>,
}
impl BestObs {
fn missing() -> Self {
BestObs {
status: TestVerdict::Fatal,
details: "test produced no result".to_owned(),
duration: Duration::ZERO,
max_memory: None,
}
}
fn into_outcome(self) -> TestOutcome {
TestOutcome {
status: self.status,
details: self.details,
duration: self.duration,
max_memory: self.max_memory,
}
}
}
fn merge_best(best: &mut FxHashMap<String, BestObs>, name: &str, incoming: BestObs) {
match best.get(name) {
Some(existing) if !existing.status.is_failure() => {}
_ => {
best.insert(name.to_owned(), incoming);
}
}
}
async fn run_group(
ctx: &TargetCtx,
plan: &TargetPlan,
selection: &batching::TestSelection<String>,
names: &[String],
repeat_index: u32,
failure_attempt: u32,
per_target_sem: &Arc<Semaphore>,
) -> GroupOutcome {
let config = &ctx.config;
let mut infra_attempt = 0u32;
loop {
let attempt_index = failure_attempt + infra_attempt;
let mut has_unseen = false;
for name in names {
let test_id = TestIdentity {
target: plan.spec.display.to_owned(),
name: name.to_owned(),
variant: config.variant.clone(),
};
if matches!(
ctx.estimates.estimate(None, &test_id),
crate::duration_db::DurationEstimate::Unseen
) {
has_unseen = true;
break;
}
}
let mut caching = crate::caching::TestExecutionCaching::resolve(
plan.cache_class,
config.variant.is_default(),
plan.repeat.is_stress(),
attempt_index,
);
if has_unseen {
caching = crate::caching::TestExecutionCaching::Disabled;
}
let name_refs: Vec<&str> = names.iter().map(String::as_str).collect();
let exec_args =
plan.translator
.execution_args(&name_refs, plan.ignored, &config.extra_test_args);
let repeat_count = if plan.repeat.is_stress() {
Some(u64::from(repeat_index))
} else {
None
};
let testcases = match &selection {
batching::TestSelection::All => vec![],
batching::TestSelection::Explicit(g) => {
if g.len() == 1 {
g.clone()
} else {
let mut modules: Vec<&str> = g
.iter()
.map(|t| {
if let Some(idx) = t.rfind("::") {
&t[..idx]
} else {
"(root)"
}
})
.collect();
modules.sort_unstable();
modules.dedup();
let mods_str = if modules.len() <= 3 {
modules.join(", ")
} else {
format!(
"{}, {}, {} and {} more",
modules[0],
modules[1],
modules[2],
modules.len() - 3
)
};
vec![format!("{} ({} tests)", mods_str, g.len())]
}
}
};
let request = build_testing_request(TestingRequest {
target: crate::proto::test::ConfiguredTargetHandle {
id: plan.spec.handle.0,
},
suite: plan.spec.suite.clone(),
testcases,
cmd: crate::execution::build_cmd(&plan.spec, &exec_args),
env: crate::execution::build_env(&plan.spec, &config.extra_env),
variant: config.variant.identity(),
repeat_count,
profile: plan.testing_profile(config),
caching,
timeout: plan.timeout,
});
let response = {
let _target = per_target_sem
.acquire()
.await
.expect("per-target semaphore");
let _global = ctx.global_sem.acquire().await.expect("global semaphore");
ctx.orch.execute2(request).await
};
match response {
Ok(response) => match decode_response(response) {
Execute2Outcome::Completed(action) => {
return GroupOutcome::Observed {
observations: plan
.translator
.parse_results(&action.stdout, &action.stderr),
raw: action.status,
execution_time: action.execution_time,
max_memory: action.max_memory_used_bytes,
fresh: action.exec_kind.is_fresh_run(),
env: match action.exec_kind {
crate::result::ExecKind::RemoteExecuted
| crate::result::ExecKind::RemoteCacheHit => {
crate::duration_db::Environment::Remote
}
_ => crate::duration_db::Environment::Local,
},
};
}
Execute2Outcome::CancelledQueueTimeout => {
if infra_attempt + 1 < INFRA_MAX_ATTEMPTS {
infra_attempt += 1;
continue;
}
return GroupOutcome::GroupFailed {
status: TestVerdict::InfraFailure,
details: "RE queue timeout (retries exhausted)".to_owned(),
};
}
Execute2Outcome::CancelledUnspecified => {
return GroupOutcome::GroupFailed {
status: TestVerdict::Omitted,
details: "run cancelled".to_owned(),
};
}
},
Err(e) => {
return GroupOutcome::GroupFailed {
status: TestVerdict::Fatal,
details: format!("Execute2 RPC failed: {e:#}"),
};
}
}
}
}
async fn execute_test_action(
ctx: &TargetCtx,
plan: &TargetPlan,
selection: batching::TestSelection<String>,
expected_members: Vec<String>,
repeat_index: u32,
per_target_sem: Arc<Semaphore>,
) -> Vec<FinishedTest> {
let mut best: FxHashMap<String, BestObs> = FxHashMap::default();
let mut attempts: FxHashMap<String, Vec<DbObservation>> = FxHashMap::default();
let mut pending: Vec<String> = expected_members.clone();
let mut failure_attempt = 0u32;
loop {
let current_selection = if failure_attempt == 0 {
selection.clone()
} else {
batching::TestSelection::Explicit(pending.clone())
};
match run_group(
ctx,
plan,
¤t_selection,
&pending,
repeat_index,
failure_attempt,
&per_target_sem,
)
.await
{
GroupOutcome::Observed {
observations,
raw,
execution_time,
max_memory,
fresh,
env,
} => {
let is_whole_target = matches!(
plan.translator.listing_strategy(),
ListingStrategy::WholeTarget { .. } | ListingStrategy::WholeBinary { .. }
);
let test_names: Vec<String> = if is_whole_target
&& !observations.is_empty()
&& pending.len() == 1
&& (pending[0] == crate::translator::DOCTEST_RESULT_NAME
|| pending[0] == crate::translator::BINARY_RESULT_NAME)
{
observations.keys().cloned().collect()
} else {
pending.clone()
};
let is_batched = expected_members.len() > 1;
let singleton = !is_batched || test_names.len() == 1;
for name in &test_names {
let (status, details, from_harness) = match observations.get(name) {
Some(obs) => (obs.status, obs.details.clone(), true),
None => {
let (status, details) = match raw {
ProcessOutcome::Finished { exit_code: 0 } => {
(TestVerdict::Pass, String::new())
}
ProcessOutcome::Finished { .. } if singleton => {
(TestVerdict::Fail, String::new())
}
ProcessOutcome::Finished { .. } => (
TestVerdict::Fatal,
"test produced no result in batch output".to_owned(),
),
ProcessOutcome::TimedOut { .. } => {
(TestVerdict::Timeout, "execution timed out".to_owned())
}
};
(status, details, false)
}
};
if fresh && from_harness {
attempts
.entry(name.clone())
.or_default()
.push(DbObservation {
duration: execution_time,
failed: status.is_failure(),
failure_class: result::failure_class(status),
env,
});
}
merge_best(
&mut best,
name,
BestObs {
status,
details,
duration: execution_time,
max_memory,
},
);
}
}
GroupOutcome::GroupFailed { status, details } => {
for name in &pending {
merge_best(
&mut best,
name,
BestObs {
status,
details: details.clone(),
duration: Duration::ZERO,
max_memory: None,
},
);
}
}
}
let still_failing: Vec<String> = expected_members
.iter()
.filter(|n| best.get(*n).map(|b| b.status.is_failure()).unwrap_or(true))
.cloned()
.collect();
if still_failing.is_empty() {
break;
}
let mut retry_pending = Vec::new();
for name in &still_failing {
let base_attempts = plan.retry.max_attempts();
let is_flake = {
let test_id = TestIdentity {
target: plan.spec.display.to_owned(),
name: name.to_owned(),
variant: ctx.config.variant.clone(),
};
ctx.estimates
.flake(None, &test_id)
.map(|f| f.failures > 0)
.unwrap_or(false)
};
let toml_attempts = if is_flake {
ctx.config
.quokka_config
.flaky_retry
.as_ref()
.map(|c| c.attempts)
.unwrap_or(0)
} else {
0
};
let allowed_attempts = base_attempts.max(toml_attempts);
if failure_attempt + 1 < allowed_attempts {
retry_pending.push(name.clone());
}
}
if !retry_pending.is_empty() {
failure_attempt += 1;
pending = retry_pending;
continue;
}
let is_batched = expected_members.len() > 1;
if is_batched
&& ctx.config.batch_failure_policy == BatchFailurePolicy::RerunPerTestToIsolate
{
return isolate_failures(
ctx,
plan,
&expected_members,
&still_failing,
BatchAccum {
best: &best,
attempts: &mut attempts,
},
repeat_index,
&per_target_sem,
)
.await;
}
break;
}
let mut finished = Vec::with_capacity(expected_members.len());
for name in &expected_members {
let outcome = best
.get(name)
.cloned()
.unwrap_or_else(BestObs::missing)
.into_outcome();
let observations = attempts.remove(name).unwrap_or_default();
finished.push(make_finished(ctx, plan, name, repeat_index, outcome, observations).await);
}
finished
}
struct BatchAccum<'a> {
best: &'a FxHashMap<String, BestObs>,
attempts: &'a mut FxHashMap<String, Vec<DbObservation>>,
}
async fn isolate_failures(
ctx: &TargetCtx,
plan: &TargetPlan,
all_names: &[String],
still_failing: &[String],
accum: BatchAccum<'_>,
repeat_index: u32,
per_target_sem: &Arc<Semaphore>,
) -> Vec<FinishedTest> {
let failing: FxHashSet<&str> = still_failing.iter().map(String::as_str).collect();
let mut finished = Vec::with_capacity(all_names.len());
for name in all_names {
if failing.contains(name.as_str()) {
continue;
}
let outcome = accum
.best
.get(name)
.cloned()
.unwrap_or_else(BestObs::missing)
.into_outcome();
let observations = accum.attempts.remove(name).unwrap_or_default();
finished.push(make_finished(ctx, plan, name, repeat_index, outcome, observations).await);
}
for name in still_failing {
let single = Box::pin(execute_test_action(
ctx,
plan,
batching::TestSelection::Explicit(vec![name.clone()]),
vec![name.clone()],
repeat_index,
per_target_sem.clone(),
))
.await;
finished.extend(single);
}
finished
}
async fn make_finished(
ctx: &TargetCtx,
plan: &TargetPlan,
base_name: &str,
repeat_index: u32,
outcome: TestOutcome,
db_observations: Vec<DbObservation>,
) -> FinishedTest {
let test_id = TestIdentity {
target: plan.spec.display.to_owned(),
name: base_name.to_owned(),
variant: ctx.config.variant.clone(),
};
let run_id = RunIdentity {
test: test_id.clone(),
repeat: plan.repeat,
repeat_index,
};
let mut details = finalize_details(ctx, outcome.details).await;
if outcome.status.is_failure() {
let annotation = failure_annotation(plan, ctx, &test_id);
if !annotation.is_empty() {
details.push('\n');
details.push_str(&annotation);
}
}
let result = build_test_result(
&run_id,
plan.spec.handle_proto(),
outcome.status,
Some(outcome.duration),
details,
outcome.max_memory,
);
FinishedTest {
result,
test_id,
status: outcome.status,
quarantined: plan.quarantined(),
db_observations,
}
}
fn failure_annotation(plan: &TargetPlan, ctx: &TargetCtx, test_id: &TestIdentity) -> String {
let mut parts: Vec<String> = vec![format!("target={}", plan.spec.display)];
if let Owner::Team(team) = &plan.owner {
parts.push(format!("owner={team}"));
}
if !plan.spec.contacts.is_empty() {
parts.push(format!("contacts=[{}]", plan.spec.contacts.join(",")));
}
if let Some(oncall) = &plan.spec.oncall {
parts.push(format!("oncall={oncall}"));
}
if let Some(flake) = ctx.estimates.flake(None, &test_id)
&& flake.runs > 0
{
let mut s = format!(
"flaky_history: failed {}/{} recent runs",
flake.failures, flake.runs
);
if let Some(class) = flake.last_failure_class {
s.push_str(&format!(", last={class:?}"));
}
parts.push(s);
}
format!("[brtr: {}]", parts.join(" | "))
}
async fn finalize_details(ctx: &TargetCtx, details: String) -> String {
match result::route_log(details.len(), ctx.config.cas_inline_limit) {
result::LogRouting::Inline => details,
result::LogRouting::UploadToCas => upload_details_to_cas(ctx, details).await,
}
}
async fn upload_details_to_cas(ctx: &TargetCtx, details: String) -> String {
let size = details.len();
let path = std::env::temp_dir().join(format!(
"brtr-log-{}-{}.txt",
std::process::id(),
fnv1a(details.as_bytes())
));
if let Err(e) = std::fs::write(&path, &details) {
return format!(
"[log {size} bytes; CAS upload skipped: {e}]\n{}",
truncate(&details)
);
}
match ctx
.orch
.upload_file_to_cas(
path.to_string_lossy().into_owned(),
7 * 24 * 3600,
"rust-test".to_owned(),
)
.await
{
Ok(digest) => {
let _ = std::fs::remove_file(&path);
let msg = format!(
"[log {size} bytes uploaded to CAS: {}/{}]",
digest.hash, digest.size_bytes
);
let _ = ctx.orch.attach_info_message(msg.clone()).await;
format!("{msg}\n{}", truncate(&details))
}
Err(e) => {
let _ = std::fs::remove_file(&path);
format!(
"[log {size} bytes; CAS upload failed: {e:#}]\n{}",
truncate(&details)
)
}
}
}
fn truncate(details: &str) -> String {
const HEAD: usize = 4096;
if details.len() <= HEAD {
return details.to_owned();
}
let mut end = HEAD;
while !details.is_char_boundary(end) {
end -= 1;
}
let head = details.get(..end).unwrap_or(details);
format!("{head}\n…[truncated, full log in CAS]")
}
async fn report_target_failure(
ctx: &TargetCtx,
plan: &TargetPlan,
status: TestVerdict,
details: String,
) {
let name = format!("{} (listing)", plan.spec.suite);
let _ = ctx
.orch
.report_tests_discovered(
plan.spec.handle_proto(),
plan.spec.suite.clone(),
vec![name.clone()],
)
.await;
let details = finalize_details(ctx, details).await;
let test_id = TestIdentity {
target: plan.spec.display.clone(),
name: plan.spec.display.clone(), variant: ctx.config.variant.clone(),
};
let run_id = RunIdentity {
test: test_id.clone(),
repeat: plan.repeat,
repeat_index: 0,
};
let result = build_test_result(
&run_id,
plan.spec.handle_proto(),
status,
None,
details,
None,
);
let _ = ctx
.report_tx
.send(ReporterMessage::Finished(vec![FinishedTest {
result,
test_id,
status,
quarantined: plan.quarantined(),
db_observations: Vec::new(),
}]))
.await;
}
fn fnv1a(bytes: &[u8]) -> u64 {
let mut hash = 0xcbf29ce484222325u64;
for &b in bytes {
hash ^= u64::from(b);
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}