mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
#[rustfmt::skip]
mod bpf;
use bpf::*;
mod stats;
use std::collections::BTreeSet;
use std::io::{self};
use std::mem::MaybeUninit;
use std::time::Duration;
use std::time::SystemTime;
use anyhow::Result;
use clap::Parser;
use libbpf_rs::OpenObject;
use log::info;
use log::warn;
use procfs::process::Process;
use scx_stats::prelude::*;
use scx_utils::build_id;
use scx_utils::libbpf_clap_opts::LibbpfOpts;
use scx_utils::UserExitInfo;
use stats::Metrics;
const SCHEDULER_NAME: &str = "RustLand";
#[derive(Debug, Parser)]
struct Opts {
#[clap(short = 's', long, default_value = "20000")]
slice_us: u64,
#[clap(short = 'S', long, default_value = "1000")]
slice_us_min: u64,
#[clap(short = 'l', long, action = clap::ArgAction::SetTrue)]
percpu_local: bool,
#[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
numa_local: bool,
#[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
partial: bool,
#[clap(long, default_value = "0")]
exit_dump_len: u32,
#[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
verbose: bool,
#[clap(long)]
stats: Option<f64>,
#[clap(long)]
monitor: Option<f64>,
#[clap(long)]
help_stats: bool,
#[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
version: bool,
#[clap(flatten, next_help_heading = "Libbpf Options")]
pub libbpf: LibbpfOpts,
}
const NSEC_PER_USEC: u64 = 1_000;
#[derive(Debug, PartialEq, Eq, Clone)]
struct Task {
qtask: QueuedTask, deadline: u64, timestamp: u64, }
impl Ord for Task {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.deadline
.cmp(&other.deadline)
.then_with(|| self.timestamp.cmp(&other.timestamp))
.then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
}
}
impl PartialOrd for Task {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
struct Scheduler<'a> {
bpf: BpfScheduler<'a>, opts: &'a Opts, stats_server: StatsServer<(), Metrics>, tasks: BTreeSet<Task>, vruntime_now: u64, init_page_faults: u64, slice_ns: u64, slice_ns_min: u64, }
impl<'a> Scheduler<'a> {
fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
let stats_server = StatsServer::new(stats::server_data()).launch()?;
let slice_ns = opts.slice_us * NSEC_PER_USEC;
let slice_ns_min = opts.slice_us_min * NSEC_PER_USEC;
let bpf = BpfScheduler::init(
open_object,
opts.libbpf.clone().into_bpf_open_opts(),
opts.exit_dump_len,
opts.partial,
opts.verbose,
true, opts.numa_local,
slice_ns_min,
"rustland",
)?;
info!(
"{} version {} - scx_rustland_core {}",
SCHEDULER_NAME,
build_id::full_version(env!("CARGO_PKG_VERSION")),
scx_rustland_core::VERSION
);
Ok(Self {
bpf,
opts,
stats_server,
tasks: BTreeSet::new(),
vruntime_now: 0,
init_page_faults: 0,
slice_ns,
slice_ns_min,
})
}
fn get_metrics(&mut self) -> Metrics {
let page_faults = Self::get_page_faults().unwrap_or_default();
if self.init_page_faults == 0 {
self.init_page_faults = page_faults;
}
let nr_page_faults = page_faults - self.init_page_faults;
Metrics {
nr_running: *self.bpf.nr_running_mut(),
nr_cpus: *self.bpf.nr_online_cpus_mut(),
nr_queued: *self.bpf.nr_queued_mut(),
nr_scheduled: *self.bpf.nr_scheduled_mut(),
nr_page_faults,
nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
}
}
fn now() -> u64 {
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
ts.as_nanos() as u64
}
fn scale_by_task_weight(task: &QueuedTask, value: u64) -> u64 {
value * task.weight / 100
}
fn scale_by_task_weight_inverse(task: &QueuedTask, value: u64) -> u64 {
value * 100 / task.weight
}
fn update_enqueued(&mut self, task: &mut QueuedTask) -> u64 {
task.vtime = if task.vtime == 0 {
self.vruntime_now
} else {
let vruntime_min = self.vruntime_now.saturating_sub(self.slice_ns);
task.vtime.max(vruntime_min)
};
let slice_ns = task.stop_ts.saturating_sub(task.start_ts);
let vslice = Self::scale_by_task_weight_inverse(task, slice_ns);
task.vtime += vslice;
self.vruntime_now += vslice;
task.vtime + task.exec_runtime.min(self.slice_ns.saturating_mul(100))
}
fn dispatch_task(&mut self) -> bool {
let Some(task) = self.tasks.pop_first() else {
return true;
};
let mut dispatched_task = DispatchedTask::new(&task.qtask);
dispatched_task.slice_ns = Self::scale_by_task_weight(&task.qtask, self.slice_ns_min);
dispatched_task.vtime = task.deadline;
dispatched_task.cpu = if self.opts.percpu_local {
task.qtask.cpu
} else {
match self
.bpf
.select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags)
{
cpu if cpu >= 0 => cpu,
_ => RL_CPU_ANY,
}
};
if self.bpf.dispatch_task(&dispatched_task).is_err() {
self.tasks.insert(task);
return false;
}
true
}
fn drain_queued_tasks(&mut self) {
loop {
match self.bpf.dequeue_task() {
Ok(Some(mut task)) => {
let deadline = self.update_enqueued(&mut task);
let timestamp = Self::now();
self.tasks.insert(Task {
qtask: task,
deadline,
timestamp,
});
}
Ok(None) => {
break;
}
Err(err) => {
warn!("Error: {err}");
break;
}
}
}
}
fn schedule(&mut self) {
self.drain_queued_tasks();
self.dispatch_task();
self.bpf.notify_complete(self.tasks.len() as u64);
}
fn get_page_faults() -> Result<u64, io::Error> {
let myself = Process::myself().map_err(io::Error::other)?;
let stat = myself.stat().map_err(io::Error::other)?;
Ok(stat.minflt + stat.majflt)
}
fn run(&mut self) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
while !self.bpf.exited() {
self.schedule();
if req_ch.try_recv().is_ok() {
res_ch.send(self.get_metrics())?;
}
}
self.bpf.shutdown_and_report()
}
}
impl Drop for Scheduler<'_> {
fn drop(&mut self) {
info!("Unregister {SCHEDULER_NAME} scheduler");
}
}
fn main() -> Result<()> {
let opts = Opts::parse();
if opts.version {
println!(
"{} version {} - scx_rustland_core {}",
SCHEDULER_NAME,
build_id::full_version(env!("CARGO_PKG_VERSION")),
scx_rustland_core::VERSION
);
return Ok(());
}
if opts.help_stats {
stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
return Ok(());
}
let loglevel = simplelog::LevelFilter::Info;
let mut lcfg = simplelog::ConfigBuilder::new();
lcfg.set_time_offset_to_local()
.expect("Failed to set local time offset")
.set_time_level(simplelog::LevelFilter::Error)
.set_location_level(simplelog::LevelFilter::Off)
.set_target_level(simplelog::LevelFilter::Off)
.set_thread_level(simplelog::LevelFilter::Off);
simplelog::TermLogger::init(
loglevel,
lcfg.build(),
simplelog::TerminalMode::Stderr,
simplelog::ColorChoice::Auto,
)?;
if let Some(intv) = opts.monitor.or(opts.stats) {
let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
if opts.monitor.is_some() {
let _ = jh.join();
return Ok(());
}
}
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&opts, &mut open_object)?;
if !sched.run()?.should_restart() {
break;
}
}
Ok(())
}