use anyhow::Result;
use ktstr::assert::AssertResult;
use ktstr::ktstr_test;
use ktstr::prelude::*;
use ktstr::scenario::Ctx;
use ktstr::scenario::ops::{
HoldSpec, KernelTarget, KernelValue, KernelValueWidth, Op, Step, execute_steps,
};
declare_scheduler!(KTSTR_SCHED, {
name = "ktstr_sched",
binary = "scx-ktstr",
});
const TAG_PRE: &str = "panic_on_oops_pre";
const TAG_POST: &str = "panic_on_oops_post";
const SENTINEL: u32 = 0xDEAD_BEEF;
#[ktstr_test(
scheduler = KTSTR_SCHED,
duration_s = 3,
auto_repro = false,
post_vm = assert_panic_on_oops_roundtrip,
)]
fn kaslr_write_symbol_roundtrip_panic_on_oops(ctx: &Ctx) -> Result<AssertResult> {
let target = || KernelTarget::symbol("panic_on_oops");
let steps = vec![Step::new(
vec![
Op::read_kernel_cold(TAG_PRE, target(), KernelValueWidth::u32()),
Op::write_kernel_cold(target(), KernelValue::u32(SENTINEL)),
Op::read_kernel_cold(TAG_POST, target(), KernelValueWidth::u32()),
],
HoldSpec::FULL,
)];
execute_steps(ctx, steps)
}
fn assert_panic_on_oops_roundtrip(result: &VmResult) -> Result<()> {
anyhow::ensure!(
result.kern_kaslr_offset != 0,
"kern_kaslr_offset == 0 — test must run under KASLR-on to \
validate slide-aware symbol resolution for writes"
);
let replies = result.snapshot_bridge.drain_kernel_ops();
let find = |tag: &str| {
replies
.iter()
.find(|(t, _)| t == tag)
.map(|(_, r)| r)
.ok_or_else(|| anyhow::anyhow!("no reply for tag '{tag}'"))
};
let pre = find(TAG_PRE)?;
let post = find(TAG_POST)?;
anyhow::ensure!(pre.success, "pre read failed: {}", pre.reason);
anyhow::ensure!(post.success, "post read failed: {}", post.reason);
let pre_val = match pre.read_values.first() {
Some(KernelOpValue::U32(v)) => *v,
other => anyhow::bail!("pre read_values[0] not U32: {other:?}"),
};
let post_val = match post.read_values.first() {
Some(KernelOpValue::U32(v)) => *v,
other => anyhow::bail!("post read_values[0] not U32: {other:?}"),
};
anyhow::ensure!(
pre_val <= 2,
"pre panic_on_oops = {pre_val} (sane values are 0, 1, 2) — \
read PA wrong under KASLR-on"
);
anyhow::ensure!(
post_val == SENTINEL,
"post panic_on_oops = {post_val:#x}, expected SENTINEL = \
{SENTINEL:#x}. If post == pre ({pre_val}), the write \
missed. Otherwise read and write resolved to different \
PAs (asymmetric slide application — the slide thread-through \
did not reach the write side OR direct-symbol resolution \
lost the slide)"
);
Ok(())
}
const TAG_JIFFIES: &str = "jiffies_64_read";
#[ktstr_test(
scheduler = KTSTR_SCHED,
duration_s = 3,
auto_repro = false,
post_vm = assert_jiffies_read,
)]
fn kaslr_read_symbol_jiffies_64(ctx: &Ctx) -> Result<AssertResult> {
let steps = vec![Step::new(
vec![Op::read_kernel_cold(
TAG_JIFFIES,
KernelTarget::symbol("jiffies_64"),
KernelValueWidth::u64(),
)],
HoldSpec::FULL,
)];
execute_steps(ctx, steps)
}
fn assert_jiffies_read(result: &VmResult) -> Result<()> {
anyhow::ensure!(result.kern_kaslr_offset != 0, "KASLR-on required");
let replies = result.snapshot_bridge.drain_kernel_ops();
let reply = replies
.iter()
.find(|(t, _)| t == TAG_JIFFIES)
.map(|(_, r)| r)
.ok_or_else(|| anyhow::anyhow!("no jiffies reply"))?;
anyhow::ensure!(reply.success, "jiffies read failed: {}", reply.reason);
let v = match reply.read_values.first() {
Some(KernelOpValue::U64(v)) => *v,
other => anyhow::bail!("expected U64, got {other:?}"),
};
anyhow::ensure!(v != 0, "jiffies_64 = 0 — wrong PA or pre-boot read");
anyhow::ensure!(
v != u64::MAX,
"jiffies_64 = u64::MAX — uninitialized slab page"
);
eprintln!("jiffies_64 = {v} (raw) — KASLR-on read succeeded");
Ok(())
}
const TAG_PCPU_PRE: &str = "rq_flags_pre";
const TAG_PCPU_POST: &str = "rq_flags_post";
const HIGH_BIT: u32 = 1 << 31;
#[ktstr_test(
scheduler = KTSTR_SCHED,
duration_s = 3,
auto_repro = false,
post_vm = assert_rq_flags_or_roundtrip,
)]
fn kaslr_write_per_cpu_field_or_roundtrip(ctx: &Ctx) -> Result<AssertResult> {
let target = || KernelTarget::per_cpu_field("runqueues", "scx.flags", 0);
let steps = vec![Step::new(
vec![
Op::read_kernel_cold(TAG_PCPU_PRE, target(), KernelValueWidth::u32()),
Op::write_kernel_cold(target(), KernelValue::or_u32(HIGH_BIT)),
Op::read_kernel_cold(TAG_PCPU_POST, target(), KernelValueWidth::u32()),
],
HoldSpec::FULL,
)];
execute_steps(ctx, steps)
}
fn assert_rq_flags_or_roundtrip(result: &VmResult) -> Result<()> {
anyhow::ensure!(
result.kern_kaslr_offset != 0,
"kern_kaslr_offset == 0 — test requires KASLR-on"
);
let replies = result.snapshot_bridge.drain_kernel_ops();
let find = |tag: &str| {
replies
.iter()
.find(|(t, _)| t == tag)
.map(|(_, r)| r)
.ok_or_else(|| anyhow::anyhow!("no reply for tag '{tag}'"))
};
let pre = find(TAG_PCPU_PRE)?;
let post = find(TAG_PCPU_POST)?;
anyhow::ensure!(pre.success, "pre read failed: {}", pre.reason);
anyhow::ensure!(post.success, "post read failed: {}", post.reason);
let pre_v = match pre.read_values.first() {
Some(KernelOpValue::U32(v)) => *v,
other => anyhow::bail!("pre not U32: {other:?}"),
};
let post_v = match post.read_values.first() {
Some(KernelOpValue::U32(v)) => *v,
other => anyhow::bail!("post not U32: {other:?}"),
};
anyhow::ensure!(
pre_v < (1 << 16),
"pre rq.scx.flags = {pre_v:#x} (>16 bits) — likely wrong PA"
);
anyhow::ensure!(
(post_v & HIGH_BIT) == HIGH_BIT,
"post rq.scx.flags = {post_v:#x} missing HIGH_BIT {HIGH_BIT:#x} — \
OrU32 RMW did not land. Either the read leg or the write leg \
resolved to the wrong PA under KASLR-on."
);
anyhow::ensure!(
(post_v ^ pre_v) == HIGH_BIT,
"post ^ pre = {:#x}, expected {HIGH_BIT:#x} — other bits \
changed mid-rendezvous; FULL hold did not block vCPUs.",
post_v ^ pre_v
);
Ok(())
}
const TAG_RQ_CPU0: &str = "rq_cpu_index_0";
const TAG_RQ_CPU1: &str = "rq_cpu_index_1";
#[ktstr_test(
scheduler = KTSTR_SCHED,
duration_s = 3,
auto_repro = false,
post_vm = assert_rq_cpu_field,
)]
fn kaslr_compute_rq_pas_ground_truth(ctx: &Ctx) -> Result<AssertResult> {
let steps = vec![Step::new(
vec![
Op::read_kernel_cold(
TAG_RQ_CPU0,
KernelTarget::per_cpu_field("runqueues", "cpu", 0),
KernelValueWidth::u32(),
),
Op::read_kernel_cold(
TAG_RQ_CPU1,
KernelTarget::per_cpu_field("runqueues", "cpu", 1),
KernelValueWidth::u32(),
),
],
HoldSpec::FULL,
)];
execute_steps(ctx, steps)
}
fn assert_rq_cpu_field(result: &VmResult) -> Result<()> {
anyhow::ensure!(
result.kern_kaslr_offset != 0,
"KASLR-on required to validate compute_rq_pas under slide"
);
let replies = result.snapshot_bridge.drain_kernel_ops();
let read = |tag: &str| -> Result<u32> {
let r = replies
.iter()
.find(|(t, _)| t == tag)
.map(|(_, r)| r)
.ok_or_else(|| anyhow::anyhow!("no reply for {tag}"))?;
anyhow::ensure!(r.success, "{tag} read failed: {}", r.reason);
match r.read_values.first() {
Some(KernelOpValue::U32(v)) => Ok(*v),
other => anyhow::bail!("{tag} not U32: {other:?}"),
}
};
let cpu0 = read(TAG_RQ_CPU0)?;
let cpu1 = read(TAG_RQ_CPU1)?;
anyhow::ensure!(
cpu0 == 0,
"rq.cpu at cpu 0 = {cpu0} (expected 0). compute_rq_pas resolved \
to wrong per-CPU page OR the slide thread-through did not \
reach the per-CPU read path."
);
anyhow::ensure!(
cpu1 == 1,
"rq.cpu at cpu 1 = {cpu1} (expected 1). per-CPU offset array \
indexing wrong (or only 1 CPU in this test fixture)."
);
Ok(())
}