#![allow(clippy::unused_self)]
#![allow(clippy::cast_possible_wrap)]
mod logger;
use scx_loader::dbus::LoaderClientProxy;
use scx_loader::{config, SchedMode, SupportedSched};
use std::process::ExitStatus;
use std::process::Stdio;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
use sysinfo::System;
use tokio::process::Child;
use tokio::process::Command;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::Duration;
use tokio::time::Instant;
use zbus::interface;
use zbus::message::Header;
use zbus::Connection;
use zbus_polkit::policykit1::{AuthorityProxy, CheckAuthorizationFlags, Subject};
#[derive(Debug, PartialEq)]
enum ScxMessage {
Quit,
StopSched,
StartSched((SupportedSched, SchedMode)),
StartSchedArgs((SupportedSched, Vec<String>)),
SwitchSched((SupportedSched, SchedMode)),
SwitchSchedArgs((SupportedSched, Vec<String>)),
RestartSched((SupportedSched, Option<Vec<String>>, SchedMode)),
}
#[derive(Debug, PartialEq)]
enum RunnerMessage {
Switch((SupportedSched, Vec<String>)),
Start((SupportedSched, Vec<String>)),
Stop,
Restart((SupportedSched, Vec<String>)),
}
struct ScxLoader {
current_scx: Option<SupportedSched>,
current_mode: SchedMode,
current_args: Option<Vec<String>>,
channel: UnboundedSender<ScxMessage>,
default_sched: Option<SupportedSched>,
default_mode: SchedMode,
}
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(long, short, action)]
auto: bool,
}
const ROOT_ACTION_ID: &str = "org.scx.loader.manage-schedulers";
#[interface(name = "org.scx.Loader")]
impl ScxLoader {
#[zbus(property)]
fn current_scheduler(&self) -> String {
if let Some(current_scx) = &self.current_scx {
let current_scx: &str = current_scx.clone().into();
log::info!("called {current_scx:?}");
current_scx.into()
} else {
"unknown".into()
}
}
#[zbus(property)]
fn scheduler_mode(&self) -> SchedMode {
self.current_mode
}
#[zbus(property)]
fn current_scheduler_args(&self) -> Vec<String> {
self.current_args.clone().unwrap_or_default()
}
#[zbus(property)]
fn supported_schedulers(&self) -> Vec<&'static str> {
vec![
"scx_beerland",
"scx_bpfland",
"scx_cake",
"scx_cosmos",
"scx_flash",
"scx_flow",
"scx_lavd",
"scx_pandemonium",
"scx_p2dq",
"scx_tickless",
"scx_rustland",
"scx_rusty",
]
}
#[zbus(property)]
fn default_scheduler(&self) -> String {
if let Some(default_scx) = &self.default_sched {
let default_scx: &str = default_scx.clone().into();
default_scx.into()
} else {
"unknown".into()
}
}
#[zbus(property)]
fn default_mode(&self) -> SchedMode {
self.default_mode
}
async fn start_scheduler(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
scx_name: SupportedSched,
sched_mode: SchedMode,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
log::info!("starting {scx_name:?} with mode {sched_mode:?}..");
let _ = self
.channel
.send(ScxMessage::StartSched((scx_name.clone(), sched_mode)));
self.current_scx = Some(scx_name);
self.current_mode = sched_mode;
self.current_args = None;
Ok(())
}
async fn start_scheduler_with_args(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
scx_name: SupportedSched,
scx_args: Vec<String>,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
log::info!("starting {scx_name:?} with args {scx_args:?}..");
let _ = self.channel.send(ScxMessage::StartSchedArgs((
scx_name.clone(),
scx_args.clone(),
)));
self.current_scx = Some(scx_name);
self.current_mode = SchedMode::Auto;
self.current_args = Some(scx_args);
Ok(())
}
async fn switch_scheduler(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
scx_name: SupportedSched,
sched_mode: SchedMode,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
log::info!("switching {scx_name:?} with mode {sched_mode:?}..");
let _ = self
.channel
.send(ScxMessage::SwitchSched((scx_name.clone(), sched_mode)));
self.current_scx = Some(scx_name);
self.current_mode = sched_mode;
self.current_args = None;
Ok(())
}
async fn switch_scheduler_with_args(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
scx_name: SupportedSched,
scx_args: Vec<String>,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
log::info!("switching {scx_name:?} with args {scx_args:?}..");
let _ = self.channel.send(ScxMessage::SwitchSchedArgs((
scx_name.clone(),
scx_args.clone(),
)));
self.current_scx = Some(scx_name);
self.current_mode = SchedMode::Auto;
self.current_args = Some(scx_args);
Ok(())
}
async fn stop_scheduler(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
if let Some(current_scx) = &self.current_scx {
let scx_name: &str = current_scx.clone().into();
log::info!("stopping {scx_name:?}..");
let _ = self.channel.send(ScxMessage::StopSched);
self.current_scx = None;
self.current_args = None;
}
Ok(())
}
async fn restart_scheduler(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
if let Some(current_scx) = &self.current_scx {
let scx_name: &str = current_scx.clone().into();
log::info!("restarting {scx_name:?}..");
let _ = self.channel.send(ScxMessage::RestartSched((
current_scx.clone(),
self.current_args.clone(),
self.current_mode,
)));
Ok(())
} else {
Err(zbus::fdo::Error::Failed(
"No scheduler is currently running to restart".to_string(),
))
}
}
async fn restore_default(
&mut self,
#[zbus(connection)] conn: &Connection,
#[zbus(header)] hdr: Header<'_>,
) -> zbus::fdo::Result<()> {
check_authorization_inter(conn, &hdr, ROOT_ACTION_ID).await?;
if let Some(default_scx) = &self.default_sched {
let scx_name: &str = default_scx.clone().into();
log::info!(
"restoring default scheduler {scx_name:?} with mode {:?}..",
self.default_mode
);
let _ = self.channel.send(ScxMessage::SwitchSched((
default_scx.clone(),
self.default_mode,
)));
self.current_scx = Some(default_scx.clone());
self.current_mode = self.default_mode;
self.current_args = None;
Ok(())
} else {
Err(zbus::fdo::Error::Failed(
"No default scheduler is configured".to_string(),
))
}
}
}
async fn monitor_cpu_util() -> Result<()> {
let mut system = System::new_all();
let mut running_sched: Option<Child> = None;
let mut cpu_above_threshold_since: Option<Instant> = None;
let mut cpu_below_threshold_since: Option<Instant> = None;
let high_utilization_threshold = 90.0;
let low_utilization_threshold_duration = Duration::from_secs(30);
let high_utilization_trigger_duration = Duration::from_secs(5);
loop {
system.refresh_cpu_all();
let any_cpu_above_threshold = system
.cpus()
.iter()
.any(|cpu| cpu.cpu_usage() > high_utilization_threshold);
if any_cpu_above_threshold {
if cpu_above_threshold_since.is_none() {
cpu_above_threshold_since = Some(Instant::now());
}
if cpu_above_threshold_since.unwrap().elapsed() > high_utilization_trigger_duration {
if running_sched.is_none() {
log::info!("CPU Utilization exceeded 90% for 5 seconds, starting scx_lavd");
let scx_name: &str = SupportedSched::Lavd.into();
running_sched = Some(
Command::new(scx_name)
.spawn()
.expect("Failed to start scx_lavd"),
);
}
cpu_below_threshold_since = None;
}
} else {
cpu_above_threshold_since = None;
if cpu_below_threshold_since.is_none() {
cpu_below_threshold_since = Some(Instant::now());
}
if cpu_below_threshold_since.unwrap().elapsed() > low_utilization_threshold_duration {
if let Some(mut running_sched_loc) = running_sched.take() {
log::info!(
"CPU utilization dropped below 90% for more than 30 seconds, exiting latency-aware scheduler"
);
running_sched_loc
.kill()
.await
.expect("Failed to kill scx_lavd");
let lavd_exit_status = running_sched_loc
.wait()
.await
.expect("Failed to wait on scx_lavd");
log::info!("scx_lavd exited with status: {lavd_exit_status}");
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() -> Result<()> {
logger::init_logger().expect("Failed to initialize logger");
let args = Args::parse();
let config = config::init_config().context("Failed to initialize config")?;
if args.auto {
log::info!("Starting scx_loader monitor as standard process without dbus interface");
monitor_cpu_util().await?;
return Ok(());
}
log::info!("Starting as dbus interface");
let (channel, rx) = tokio::sync::mpsc::unbounded_channel::<ScxMessage>();
let channel_clone = channel.clone();
ctrlc::set_handler(move || {
log::info!("shutting down..");
let _ = channel_clone.send(ScxMessage::Quit);
})
.context("Error setting Ctrl-C handler")?;
let connection = Connection::system().await?;
connection
.object_server()
.at(
"/org/scx/Loader",
ScxLoader {
current_scx: None,
current_mode: SchedMode::Auto,
current_args: None,
channel: channel.clone(),
default_sched: config.default_sched.clone(),
default_mode: config.default_mode.unwrap_or(SchedMode::Auto),
},
)
.await?;
connection.request_name("org.scx.Loader").await?;
if let Some(default_sched) = &config.default_sched {
log::info!("Starting default scheduler: {default_sched:?}");
let default_mode = config.default_mode.unwrap_or(SchedMode::Auto);
let loader_client = LoaderClientProxy::new(&connection).await?;
loader_client
.switch_scheduler(default_sched.clone(), default_mode)
.await?;
}
worker_loop(config, rx).await?;
Ok(())
}
async fn worker_loop(
config: config::Config,
mut receiver: UnboundedReceiver<ScxMessage>,
) -> Result<()> {
let (runner_tx, runner_rx) = tokio::sync::mpsc::channel::<RunnerMessage>(1);
let run_sched_future = tokio::spawn(async move { handle_child_process(runner_rx).await });
tokio::pin!(run_sched_future);
loop {
let msg = tokio::select! {
msg = receiver.recv() => {
match msg {
None => return Ok(()),
Some(m) => m,
}
}
res = &mut run_sched_future => {
log::info!("Sched future finished");
let _ = res?;
continue;
}
};
log::debug!("Got msg : {msg:?}");
match msg {
ScxMessage::Quit => return Ok(()),
ScxMessage::StopSched => {
log::info!("Got event to stop scheduler!");
runner_tx.send(RunnerMessage::Stop).await?;
}
ScxMessage::StartSched((scx_sched, sched_mode)) => {
log::info!("Got event to start scheduler!");
let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
runner_tx
.send(RunnerMessage::Start((scx_sched, args)))
.await?;
}
ScxMessage::StartSchedArgs((scx_sched, sched_args)) => {
log::info!("Got event to start scheduler with args!");
runner_tx
.send(RunnerMessage::Start((scx_sched, sched_args)))
.await?;
}
ScxMessage::SwitchSched((scx_sched, sched_mode)) => {
log::info!("Got event to switch scheduler!");
let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
runner_tx
.send(RunnerMessage::Switch((scx_sched, args)))
.await?;
}
ScxMessage::SwitchSchedArgs((scx_sched, sched_args)) => {
log::info!("Got event to switch scheduler with args!");
runner_tx
.send(RunnerMessage::Switch((scx_sched, sched_args)))
.await?;
}
ScxMessage::RestartSched((scx_sched, current_args, current_mode)) => {
log::info!("Got event to restart scheduler!");
let args = if let Some(args) = current_args {
args
} else {
config::get_scx_flags_for_mode(&config, &scx_sched, current_mode)
};
runner_tx
.send(RunnerMessage::Restart((scx_sched, args)))
.await?;
}
}
}
}
async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver<RunnerMessage>) -> Result<()> {
let mut task: Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> = None;
let mut cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
while let Some(message) = rx.recv().await {
match message {
RunnerMessage::Switch((scx_sched, sched_args)) => {
stop_scheduler(&mut task, &mut cancel_token).await;
let handle = start_scheduler(scx_sched, sched_args, cancel_token.clone());
task = Some(handle);
log::debug!("Scheduler started");
}
RunnerMessage::Start((scx_sched, sched_args)) => {
if task.is_some() {
log::error!("Scheduler wasn't finished yet. Stop already running scheduler!");
continue;
}
let handle = start_scheduler(scx_sched, sched_args, cancel_token.clone());
task = Some(handle);
log::debug!("Scheduler started");
}
RunnerMessage::Stop => {
stop_scheduler(&mut task, &mut cancel_token).await;
}
RunnerMessage::Restart((scx_sched, sched_args)) => {
log::info!("Got event to restart scheduler!");
stop_scheduler(&mut task, &mut cancel_token).await;
let handle = start_scheduler(scx_sched, sched_args, cancel_token.clone());
task = Some(handle);
log::debug!("Scheduler restarted");
}
}
}
Ok(())
}
fn start_scheduler(
scx_crate: SupportedSched,
args: Vec<String>,
cancel_token: Arc<tokio_util::sync::CancellationToken>,
) -> tokio::task::JoinHandle<Result<Option<ExitStatus>>> {
tokio::spawn(async move {
let mut retries = 0u32;
let max_retries = 5u32;
let mut last_status: Option<ExitStatus> = None;
while retries < max_retries {
let child = spawn_scheduler(scx_crate.clone(), args.clone());
let mut failed = false;
if let Ok(mut child) = child {
tokio::select! {
status = child.wait() => {
let status = status.expect("child process encountered an error");
last_status = Some(status);
if !status.success() {
failed = true;
}
log::debug!("Child process exited with status: {status:?}");
}
() = cancel_token.cancelled() => {
log::debug!("Received cancellation signal");
if let Some(child_id) = child.id() {
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(child_id as i32),
nix::sys::signal::SIGINT,
).context("Failed to send termination signal to the child")?;
}
let status = child.wait().await.expect("child process encountered an error");
last_status = Some(status);
break;
}
};
} else {
log::debug!("Failed to spawn child process");
failed = true;
}
if !failed {
break;
}
retries += 1;
log::error!("Failed to start scheduler (attempt {retries}/{max_retries})");
}
Ok(last_status)
})
}
fn spawn_scheduler(scx_crate: SupportedSched, args: Vec<String>) -> Result<Child> {
let sched_bin_name: &str = scx_crate.into();
log::info!("starting {sched_bin_name} command");
let mut cmd = Command::new(sched_bin_name);
cmd.args(args);
cmd.stdin(Stdio::null());
Ok(cmd.spawn()?)
}
async fn stop_scheduler(
task: &mut Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>>,
cancel_token: &mut Arc<tokio_util::sync::CancellationToken>,
) {
if let Some(task) = task.take() {
log::debug!("Stopping already running scheduler..");
cancel_token.cancel();
let status = task.await;
log::debug!("Scheduler was stopped with status: {status:?}");
*cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
}
}
async fn check_authorization(
connection: &Connection,
header: &Header<'_>,
action_id: &str,
) -> anyhow::Result<()> {
log::debug!("Checking auth");
let proxy = AuthorityProxy::new(connection).await?;
let subject = Subject::new_for_message_header(header).expect("Failed to create polkit subject");
let auth_result = proxy
.check_authorization(
&subject,
action_id,
&std::collections::HashMap::new(),
CheckAuthorizationFlags::AllowUserInteraction.into(),
"",
)
.await?;
if !auth_result.is_authorized {
anyhow::bail!("Not allowed!");
}
log::debug!("Auth allowed");
Ok(())
}
async fn check_authorization_inter(
connection: &Connection,
header: &Header<'_>,
action_id: &str,
) -> zbus::fdo::Result<()> {
if let Err(auth_err) = check_authorization(connection, header, action_id).await {
return Err(zbus::fdo::Error::Failed(auth_err.to_string()));
}
Ok(())
}