use anyhow::Result;
use ktstr::assert::AssertResult;
use ktstr::ktstr_test;
use ktstr::prelude::{KernelOpReplyPayload, KernelOpValue, VmResult};
use ktstr::scenario::Ctx;
use ktstr::scenario::ops::{
HoldSpec, KernelTarget, KernelValue, KernelValueWidth, Op, Step, execute_steps,
};
use ktstr::test_support::{Scheduler, SchedulerSpec};
use ktstr::workload::{
AffinityIntent, SchedPolicy, WorkPhase, WorkType, WorkloadConfig, WorkloadHandle,
};
use std::time::Duration;
const KTSTR_SCHED: Scheduler =
Scheduler::named("ktstr_sched").binary(SchedulerSpec::Discover("scx-ktstr"));
const SEED_VTIME_NS: u64 = 30 * 86_400 * 1_000_000_000;
const TAG_TASK_FIELD: &str = "verify_dsq_vtime";
const TAG_PER_CPU_FIELD: &str = "verify_rq_nr_running";
const TAG_SYMBOL: &str = "verify_jiffies_64";
fn read_start_time_ns(pid: libc::pid_t) -> Result<u64> {
let stat = std::fs::read_to_string(format!("/proc/{pid}/stat"))?;
let after_comm = stat
.rsplit_once(')')
.ok_or_else(|| anyhow::anyhow!("/proc/{pid}/stat missing closing ')' for comm"))?
.1;
let token = after_comm
.split_whitespace()
.nth(19)
.ok_or_else(|| anyhow::anyhow!("/proc/{pid}/stat truncated before starttime"))?;
let jiffies: u64 = token
.parse()
.map_err(|e| anyhow::anyhow!("starttime '{token}' not a u64: {e}"))?;
let clk_tck = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
anyhow::ensure!(clk_tck > 0, "sysconf(_SC_CLK_TCK) returned {clk_tck}");
Ok(jiffies
.saturating_mul(1_000_000_000)
.saturating_div(clk_tck as u64))
}
#[ktstr_test(
scheduler = KTSTR_SCHED,
duration_s = 5,
watchdog_timeout_s = 60,
auto_repro = false,
post_vm = assert_cold_path_roundtrips,
)]
fn cold_path_op_handler_roundtrips(ctx: &Ctx) -> Result<AssertResult> {
let block_dur = ctx.duration + Duration::from_secs(10);
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::Sequence {
first: WorkPhase::Sleep(block_dur),
rest: vec![],
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut handle = WorkloadHandle::spawn(&config)?;
handle.start();
std::thread::sleep(Duration::from_millis(500));
let pids = handle.worker_pids();
let pid_i32 = *pids
.first()
.ok_or_else(|| anyhow::anyhow!("worker_pids returned no workers"))?;
anyhow::ensure!(pid_i32 > 0, "worker pid {pid_i32} unset");
let pid: u32 = u32::try_from(pid_i32)
.map_err(|e| anyhow::anyhow!("worker pid {pid_i32} does not fit u32: {e}"))?;
let start_time_ns = read_start_time_ns(pid_i32)?;
let task_target = || KernelTarget::task_field(pid, start_time_ns, "scx.dsq_vtime");
let steps = vec![Step::new(
vec![
Op::write_kernel_cold(task_target(), KernelValue::u64(SEED_VTIME_NS)),
Op::read_kernel_cold(TAG_TASK_FIELD, task_target(), KernelValueWidth::u64()),
Op::read_kernel_cold(
TAG_PER_CPU_FIELD,
KernelTarget::per_cpu_field("runqueues", "nr_running", 0),
KernelValueWidth::u32(),
),
Op::read_kernel_cold(
TAG_SYMBOL,
KernelTarget::symbol("jiffies_64"),
KernelValueWidth::u64(),
),
],
HoldSpec::FULL,
)];
let result = execute_steps(ctx, steps);
let _ = handle.stop_and_collect();
result
}
fn assert_cold_path_roundtrips(result: &VmResult) -> Result<()> {
let replies = result.snapshot_bridge.drain_kernel_ops();
anyhow::ensure!(!result.timed_out, "guest timed out under the watchdog");
anyhow::ensure!(
result.crash_message.is_none(),
"guest panicked: crash_message = {:?}",
result.crash_message,
);
anyhow::ensure!(
result.exit_code == 0,
"guest exit_code = {} (expected 0)",
result.exit_code,
);
assert_task_field(&replies)?;
assert_per_cpu_field(&replies)?;
assert_symbol(&replies)?;
Ok(())
}
fn find_reply<'a>(
replies: &'a [(String, KernelOpReplyPayload)],
tag: &str,
) -> Result<&'a KernelOpReplyPayload> {
replies
.iter()
.find(|(t, _)| t == tag)
.map(|(_, reply)| reply)
.ok_or_else(|| {
let tags: Vec<&str> = replies.iter().map(|(t, _)| t.as_str()).collect();
anyhow::anyhow!("no reply for tag `{tag}`; captured={tags:?}")
})
}
fn assert_task_field(replies: &[(String, KernelOpReplyPayload)]) -> Result<()> {
let reply = find_reply(replies, TAG_TASK_FIELD)?;
anyhow::ensure!(reply.success, "TaskField read rejected: {}", reply.reason);
let value = reply
.read_values
.first()
.ok_or_else(|| anyhow::anyhow!("TaskField reply read_values empty"))?;
let observed = match value {
KernelOpValue::U64(v) => *v,
other => anyhow::bail!("TaskField expected U64, got {other:?}"),
};
anyhow::ensure!(
observed >= SEED_VTIME_NS,
"TaskField scx.dsq_vtime regressed: seed={SEED_VTIME_NS}, observed={observed} — \
cold-path write did not stick, or the scheduler reset the value"
);
Ok(())
}
fn assert_per_cpu_field(replies: &[(String, KernelOpReplyPayload)]) -> Result<()> {
let reply = find_reply(replies, TAG_PER_CPU_FIELD)?;
anyhow::ensure!(reply.success, "PerCpuField read rejected: {}", reply.reason);
let value = reply
.read_values
.first()
.ok_or_else(|| anyhow::anyhow!("PerCpuField reply read_values empty"))?;
match value {
KernelOpValue::U32(_) => Ok(()),
other => anyhow::bail!("PerCpuField expected U32, got {other:?}"),
}
}
fn assert_symbol(replies: &[(String, KernelOpReplyPayload)]) -> Result<()> {
let reply = find_reply(replies, TAG_SYMBOL)?;
anyhow::ensure!(reply.success, "Symbol read rejected: {}", reply.reason);
let value = reply
.read_values
.first()
.ok_or_else(|| anyhow::anyhow!("Symbol reply read_values empty"))?;
match value {
KernelOpValue::U64(v) => {
anyhow::ensure!(*v > 0, "jiffies_64 read as 0");
Ok(())
}
other => anyhow::bail!("Symbol expected U64, got {other:?}"),
}
}