use std::path::PathBuf;
use std::time::Duration;
use std::num::NonZeroU32;
use std::num::NonZeroUsize;
use clap::Parser;
use crate::batching::{BatchFailurePolicy, BatchMode};
use crate::listing::IgnoredPolicy;
use crate::translator::{
ListFormat, RunFormat, libtest_user_ignored_policies, libtest_user_requests_listing_only,
};
use crate::variant::{RepeatKind, Variant};
#[derive(Debug, Parser)]
#[command(disable_help_flag = true)]
struct OuterCli {
#[arg(long)]
buck_trace_id: Option<String>,
#[arg(long = "config-entry")]
config_entry: Vec<String>,
#[arg(long)]
executor_fd: Option<i32>,
#[arg(long)]
orchestrator_fd: Option<i32>,
#[arg(long)]
executor_addr: Option<String>,
#[arg(long)]
orchestrator_addr: Option<String>,
}
#[derive(Debug, Parser)]
#[command(disable_help_flag = true)]
struct TpxConfig {
#[arg(long, hide = true)]
buck_test_info: Option<String>,
#[arg(long)]
env: Vec<String>,
#[arg(long, num_args = 1.., allow_hyphen_values = true)]
test_arg: Vec<String>,
#[arg(long, default_value_t = 600)]
timeout: u64,
#[arg(long, default_value_t = 600)]
listing_timeout: u64,
#[arg(long)]
include_ignored: bool,
#[arg(long = "ignored")]
ignored: bool,
#[arg(long)]
ignored_only: bool,
#[arg(long, default_value = "json")]
list_format: String,
#[arg(long, default_value = "json")]
run_format: String,
#[arg(long, default_value = "fixed-chunk")]
batch_mode: String,
#[arg(long, default_value_t = 32)]
chunk_size: usize,
#[arg(long, default_value_t = 20)]
batch_threshold_ms: u64,
#[arg(long, default_value = "isolate")]
batch_failure_policy: String,
#[arg(long, default_value = "default")]
variant: String,
#[arg(long, default_value_t = 0)]
stress: u32,
#[arg(long, default_value_t = 50)]
stress_label_reps: u32,
#[arg(long, default_value_t = 0)]
shard_index: u16,
#[arg(long, default_value_t = 1)]
shard_count: u16,
#[arg(long)]
local_debug: bool,
#[arg(long, default_value_t = 3)]
flaky_attempts: u32,
#[arg(long)]
duration_db: Option<PathBuf>,
#[arg(long, default_value_t = 65_536)]
cas_inline_limit: usize,
#[arg(long = "force-run-in-process")]
libtest_force_run_in_process: bool,
#[arg(long = "exclude-should-panic")]
libtest_exclude_should_panic: bool,
#[arg(long = "test")]
libtest_test: bool,
#[arg(long = "bench")]
libtest_bench: bool,
#[arg(long = "list")]
libtest_list: bool,
#[arg(long = "fail-fast")]
libtest_fail_fast: bool,
#[arg(short = 'h', long = "help")]
libtest_help: bool,
#[arg(long = "logfile")]
libtest_logfile: Option<String>,
#[arg(long = "no-capture", alias = "nocapture")]
libtest_no_capture: bool,
#[arg(long = "test-threads")]
libtest_test_threads: Option<String>,
#[arg(long = "skip")]
libtest_skip: Vec<String>,
#[arg(short = 'q', long = "quiet")]
libtest_quiet: bool,
#[arg(long = "exact")]
libtest_exact: bool,
#[arg(long = "color")]
libtest_color: Option<String>,
#[arg(long = "format")]
libtest_format: Option<String>,
#[arg(long = "show-output")]
libtest_show_output: bool,
#[arg(short = 'Z', value_name = "FLAG")]
libtest_unstable_options: Vec<String>,
#[arg(long = "report-time")]
libtest_report_time: bool,
#[arg(long = "ensure-time")]
libtest_ensure_time: bool,
#[arg(long = "shuffle")]
libtest_shuffle: bool,
#[arg(long = "shuffle-seed")]
libtest_shuffle_seed: Option<String>,
#[arg(value_name = "FILTER")]
libtest_filters: Vec<String>,
}
impl TpxConfig {
fn direct_libtest_args(&self) -> Vec<String> {
let mut args = Vec::new();
args.extend(self.libtest_filters.iter().cloned());
if self.libtest_exact {
args.push("--exact".to_owned());
}
for skip in &self.libtest_skip {
args.push("--skip".to_owned());
args.push(skip.clone());
}
if self.libtest_force_run_in_process {
args.push("--force-run-in-process".to_owned());
}
if self.libtest_exclude_should_panic {
args.push("--exclude-should-panic".to_owned());
}
if self.libtest_test {
args.push("--test".to_owned());
}
if self.libtest_bench {
args.push("--bench".to_owned());
}
if self.libtest_list {
args.push("--list".to_owned());
}
if self.libtest_fail_fast {
args.push("--fail-fast".to_owned());
}
if self.libtest_help {
args.push("--help".to_owned());
}
if let Some(path) = &self.libtest_logfile {
args.push("--logfile".to_owned());
args.push(path.clone());
}
if self.libtest_no_capture {
args.push("--no-capture".to_owned());
}
if let Some(threads) = &self.libtest_test_threads {
args.push("--test-threads".to_owned());
args.push(threads.clone());
}
if self.libtest_quiet {
args.push("--quiet".to_owned());
}
if let Some(color) = &self.libtest_color {
args.push("--color".to_owned());
args.push(color.clone());
}
if let Some(format) = &self.libtest_format {
args.push("--format".to_owned());
args.push(format.clone());
}
if self.libtest_show_output {
args.push("--show-output".to_owned());
}
for flag in &self.libtest_unstable_options {
args.push("-Z".to_owned());
args.push(flag.clone());
}
if self.libtest_report_time {
args.push("--report-time".to_owned());
}
if self.libtest_ensure_time {
args.push("--ensure-time".to_owned());
}
if self.libtest_shuffle {
args.push("--shuffle".to_owned());
}
if let Some(seed) = &self.libtest_shuffle_seed {
args.push("--shuffle-seed".to_owned());
args.push(seed.clone());
}
args
}
}
#[derive(Debug, Clone)]
pub enum Transport {
UnixFds {
executor_fd: i32,
orchestrator_fd: i32,
},
Tcp {
executor_addr: String,
orchestrator_addr: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ShardSpec {
pub index: u16,
pub count: u16,
}
impl ShardSpec {
pub fn is_sharded(&self) -> bool {
self.count > 1
}
}
#[derive(Debug, Clone, Copy)]
pub struct SchedulerLimits {
pub max_inflight_listings: usize,
pub max_inflight_test_actions: usize,
pub max_inflight_per_target: usize,
pub max_report_queue: usize,
}
impl Default for SchedulerLimits {
fn default() -> Self {
Self {
max_inflight_listings: 256,
max_inflight_test_actions: 2_000,
max_inflight_per_target: 128,
max_report_queue: 1_024,
}
}
}
#[derive(Debug, Clone)]
pub struct RunnerConfig {
pub per_test_timeout: Duration,
pub listing_timeout: Duration,
pub ignored: IgnoredPolicy,
pub list_format: ListFormat,
pub run_format: RunFormat,
pub batch_mode: BatchMode,
pub batch_failure_policy: BatchFailurePolicy,
pub variant: Variant,
pub stress: RepeatKind,
pub stress_label_reps: NonZeroU32,
pub shard: ShardSpec,
pub local_debug: bool,
pub flaky_attempts: u32,
pub limits: SchedulerLimits,
pub cas_inline_limit: usize,
pub duration_db: Option<PathBuf>,
pub libtest_list_only: bool,
pub extra_test_args: Vec<String>,
pub extra_env: Vec<(String, String)>,
pub quokka_config: crate::config::QuokkaConfig,
}
#[derive(Debug, Clone)]
pub struct SessionContext {
pub host_platform: Option<String>,
pub trace_id: Option<String>,
}
#[derive(Debug)]
pub struct Invocation {
pub transport: Transport,
pub config: RunnerConfig,
pub context: SessionContext,
}
#[derive(Debug, thiserror::Error)]
pub enum CliError {
#[error(
"missing transport: provide --executor-fd/--orchestrator-fd or --executor-addr/--orchestrator-addr"
)]
MissingTransport,
#[error("invalid {field}: `{value}`")]
InvalidValue { field: &'static str, value: String },
#[error(
"--ignored-only requires --list-format json: stable text `--list` cannot \
report a test's ignored status, so an ignored-only run would list and run zero tests"
)]
IgnoredOnlyNeedsJsonListing,
#[error("invalid --env entry `{0}` (expected NAME=VALUE)")]
InvalidEnv(String),
#[error("--shard-index {index} is out of range for --shard-count {count}")]
ShardOutOfRange { index: u16, count: u16 },
#[error(transparent)]
Parse(#[from] clap::Error),
}
pub fn parse(argv: Vec<String>) -> Result<Invocation, CliError> {
let split = argv.iter().position(|a| a == "--");
let (left, right) = match split {
Some(i) => (argv[..i].to_vec(), argv[i + 1..].to_vec()),
None => (argv, Vec::new()),
};
let outer = OuterCli::try_parse_from(left)?;
let tpx = if right.is_empty() {
TpxConfig::try_parse_from(["tpx"])?
} else {
TpxConfig::try_parse_from(right)?
};
let transport = resolve_transport(&outer)?;
let (config, context) = resolve_config(&outer, tpx)?;
Ok(Invocation {
transport,
config,
context,
})
}
fn resolve_transport(outer: &OuterCli) -> Result<Transport, CliError> {
match (
outer.executor_fd,
outer.orchestrator_fd,
&outer.executor_addr,
&outer.orchestrator_addr,
) {
(Some(executor_fd), Some(orchestrator_fd), _, _) => Ok(Transport::UnixFds {
executor_fd,
orchestrator_fd,
}),
(_, _, Some(executor_addr), Some(orchestrator_addr)) => Ok(Transport::Tcp {
executor_addr: executor_addr.clone(),
orchestrator_addr: orchestrator_addr.clone(),
}),
_ => Err(CliError::MissingTransport),
}
}
fn resolve_config(
outer: &OuterCli,
tpx: TpxConfig,
) -> Result<(RunnerConfig, SessionContext), CliError> {
let mut extra_test_args = tpx.test_arg.clone();
extra_test_args.extend(tpx.direct_libtest_args());
let mut ignored = IgnoredPolicy::ExcludeIgnored;
let mut ignored_policies = Vec::new();
if tpx.include_ignored {
ignored_policies.push(IgnoredPolicy::IncludeIgnored);
}
if tpx.ignored || tpx.ignored_only {
ignored_policies.push(IgnoredPolicy::IgnoredOnly);
}
ignored_policies.extend(libtest_user_ignored_policies(&extra_test_args));
for policy in ignored_policies {
if ignored == IgnoredPolicy::ExcludeIgnored {
ignored = policy;
} else if ignored != policy {
return Err(CliError::InvalidValue {
field: "ignored",
value: "--include-ignored with --ignored".to_owned(),
});
}
}
let list_format = match tpx.list_format.as_str() {
"text" => ListFormat::Text,
"json" => ListFormat::Json,
other => {
return Err(CliError::InvalidValue {
field: "list-format",
value: other.to_owned(),
});
}
};
if ignored == IgnoredPolicy::IgnoredOnly && list_format == ListFormat::Text {
return Err(CliError::IgnoredOnlyNeedsJsonListing);
}
let run_format = match tpx.run_format.as_str() {
"text" => RunFormat::Text,
"json" => RunFormat::Json,
other => {
return Err(CliError::InvalidValue {
field: "run-format",
value: other.to_owned(),
});
}
};
let batch_mode = match tpx.batch_mode.as_str() {
"per-test" => BatchMode::PerTest,
"duration-bucketed" => BatchMode::DurationBucketed {
p50_lt_ms: tpx.batch_threshold_ms,
},
"fixed-chunk" => BatchMode::FixedChunk {
size: NonZeroUsize::new(tpx.chunk_size.max(1)).expect("clamped to >=1"),
},
"target" => BatchMode::Target,
other => {
return Err(CliError::InvalidValue {
field: "batch-mode",
value: other.to_owned(),
});
}
};
let batch_failure_policy = match tpx.batch_failure_policy.as_str() {
"isolate" => BatchFailurePolicy::RerunPerTestToIsolate,
"fail-all" => BatchFailurePolicy::FailAll,
other => {
return Err(CliError::InvalidValue {
field: "batch-failure-policy",
value: other.to_owned(),
});
}
};
let variant = Variant::parse(&tpx.variant);
let stress = match NonZeroU32::new(tpx.stress) {
Some(n) => RepeatKind::Stress(n),
None => RepeatKind::Once,
};
let stress_label_reps = NonZeroU32::new(tpx.stress_label_reps).unwrap_or(NonZeroU32::MIN);
let shard = ShardSpec {
index: tpx.shard_index,
count: tpx.shard_count.max(1),
};
if shard.index >= shard.count {
return Err(CliError::ShardOutOfRange {
index: shard.index,
count: shard.count,
});
}
let mut extra_env = Vec::with_capacity(tpx.env.len());
for entry in &tpx.env {
match entry.split_once('=') {
Some((k, v)) => extra_env.push((k.to_owned(), v.to_owned())),
None => return Err(CliError::InvalidEnv(entry.clone())),
}
}
let host_platform = outer
.config_entry
.iter()
.find_map(|e| e.strip_prefix("host=").map(str::to_owned));
let config = RunnerConfig {
per_test_timeout: Duration::from_secs(tpx.timeout),
listing_timeout: Duration::from_secs(tpx.listing_timeout),
ignored,
list_format,
run_format,
batch_mode,
batch_failure_policy,
variant,
stress,
stress_label_reps,
shard,
local_debug: tpx.local_debug,
flaky_attempts: tpx.flaky_attempts.max(1),
limits: SchedulerLimits::default(),
cas_inline_limit: tpx.cas_inline_limit,
duration_db: tpx.duration_db,
libtest_list_only: libtest_user_requests_listing_only(&extra_test_args),
extra_test_args,
extra_env,
quokka_config: crate::config::load_config(),
};
let context = SessionContext {
host_platform,
trace_id: outer.buck_trace_id.clone(),
};
Ok((config, context))
}
#[cfg(test)]
mod tests {
use super::*;
fn argv(items: &[&str]) -> Vec<String> {
items.iter().map(|s| s.to_string()).collect()
}
#[test]
fn parses_buck2_exact_arg_shape() {
let inv = parse(argv(&[
"quokka",
"--buck-trace-id",
"abc-123",
"--config-entry",
"host=linux",
"--config-entry",
"config=foo;bar",
"--executor-fd",
"7",
"--orchestrator-fd",
"9",
"--",
"ignored",
"--buck-test-info",
"ignored",
]))
.expect("must parse buck2's arg shape");
match inv.transport {
Transport::UnixFds {
executor_fd,
orchestrator_fd,
} => {
assert_eq!(executor_fd, 7);
assert_eq!(orchestrator_fd, 9);
}
_ => panic!("expected unix fds"),
}
assert_eq!(inv.context.trace_id.as_deref(), Some("abc-123"));
assert_eq!(inv.context.host_platform.as_deref(), Some("linux"));
assert_eq!(inv.config.per_test_timeout, Duration::from_secs(600));
assert_eq!(inv.config.ignored, IgnoredPolicy::ExcludeIgnored);
assert_eq!(inv.config.run_format, RunFormat::Json);
assert_eq!(
inv.config.batch_mode,
BatchMode::FixedChunk {
size: NonZeroUsize::new(32).unwrap()
}
);
}
#[test]
fn fixed_chunk_size_is_configurable() {
let inv = parse(argv(&[
"runner",
"--executor-fd",
"7",
"--orchestrator-fd",
"9",
"--",
"ignored",
"--chunk-size",
"32",
]))
.unwrap();
assert_eq!(
inv.config.batch_mode,
BatchMode::FixedChunk {
size: NonZeroUsize::new(32).unwrap()
}
);
}
#[test]
fn tcp_transport_and_feature_flags() {
let inv = parse(argv(&[
"runner",
"--executor-addr",
"127.0.0.1:5001",
"--orchestrator-addr",
"127.0.0.1:5002",
"--",
"ignored",
"--include-ignored",
"--batch-mode",
"duration-bucketed",
"--batch-threshold-ms",
"30",
"--stress",
"5",
"--shard-index",
"1",
"--shard-count",
"4",
"--local-debug",
]))
.unwrap();
assert!(matches!(inv.transport, Transport::Tcp { .. }));
assert_eq!(inv.config.ignored, IgnoredPolicy::IncludeIgnored);
assert_eq!(
inv.config.batch_mode,
BatchMode::DurationBucketed { p50_lt_ms: 30 }
);
assert!(inv.config.stress.is_stress());
assert_eq!(inv.config.shard.index, 1);
assert_eq!(inv.config.shard.count, 4);
assert!(inv.config.local_debug);
}
#[test]
fn ignored_only_with_text_listing_is_rejected() {
let err = parse(argv(&[
"runner",
"--executor-fd",
"1",
"--orchestrator-fd",
"2",
"--",
"ignored",
"--ignored-only",
"--list-format",
"text",
]))
.unwrap_err();
assert!(matches!(err, CliError::IgnoredOnlyNeedsJsonListing));
}
#[test]
fn standard_libtest_args_are_accepted_directly() {
let inv = parse(argv(&[
"runner",
"--executor-fd",
"1",
"--orchestrator-fd",
"2",
"--",
"ignored",
"alpha",
"--exact",
"--skip",
"beta",
"--no-capture",
"--ignored",
]))
.unwrap();
assert_eq!(inv.config.ignored, IgnoredPolicy::IgnoredOnly);
assert_eq!(
inv.config.extra_test_args,
vec![
"alpha".to_owned(),
"--exact".to_owned(),
"--skip".to_owned(),
"beta".to_owned(),
"--no-capture".to_owned()
]
);
}
#[test]
fn extra_env_is_parsed() {
let inv = parse(argv(&[
"runner",
"--executor-fd",
"1",
"--orchestrator-fd",
"2",
"--",
"ignored",
"--env",
"RUST_LOG=debug",
]))
.unwrap();
assert_eq!(
inv.config.extra_env,
vec![("RUST_LOG".to_owned(), "debug".to_owned())]
);
}
#[test]
fn missing_transport_errors() {
let err = parse(argv(&["runner", "--", "ignored"])).unwrap_err();
assert!(matches!(err, CliError::MissingTransport));
}
#[test]
fn shard_out_of_range_errors() {
let err = parse(argv(&[
"runner",
"--executor-fd",
"1",
"--orchestrator-fd",
"2",
"--",
"ignored",
"--shard-index",
"4",
"--shard-count",
"4",
]))
.unwrap_err();
assert!(matches!(err, CliError::ShardOutOfRange { .. }));
}
}