use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::{Builder, Handle, Runtime};
#[derive(Debug, Clone)]
pub struct RuntimeConfig {
pub control_threads: usize,
pub app_threads: usize,
pub control_priority: ControlPriority,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
control_threads: 2,
app_threads: 0,
control_priority: ControlPriority::default(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControlPriority {
Fifo,
Nice,
Default,
}
impl Default for ControlPriority {
fn default() -> Self {
if cfg!(target_os = "linux") {
ControlPriority::Fifo
} else {
ControlPriority::Default
}
}
}
impl ControlPriority {
pub fn from_str(s: &str) -> Self {
match s.to_ascii_lowercase().as_str() {
"fifo" => ControlPriority::Fifo,
"nice" => ControlPriority::Nice,
"default" | "" => ControlPriority::Default,
other => {
tracing::warn!(
value = other,
"unknown control_priority, falling back to default"
);
ControlPriority::Default
}
}
}
}
pub struct SplitRuntime {
control: Runtime,
app: Runtime,
}
impl SplitRuntime {
pub fn new(cfg: RuntimeConfig) -> std::io::Result<Self> {
let total = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let control_threads = cfg.control_threads.max(1);
let app_threads = if cfg.app_threads == 0 {
total.saturating_sub(control_threads).max(1)
} else {
cfg.app_threads
};
tracing::info!(
control_threads,
app_threads,
total_cores = total,
priority = ?cfg.control_priority,
"building split runtime"
);
let ctrl_idx = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let app_idx = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let priority = cfg.control_priority;
let ctrl_idx_clone = Arc::clone(&ctrl_idx);
let control = Builder::new_multi_thread()
.worker_threads(control_threads)
.enable_all()
.thread_name_fn(move || {
let n = ctrl_idx_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
format!("ydb-ctrl-{n}")
})
.on_thread_start(move || {
if let Err(e) = apply_priority(priority) {
static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if WARNED.set(()).is_ok() {
tracing::warn!(
error = %e,
requested = ?priority,
"could not apply requested control-runtime priority; falling back to OS default"
);
}
}
})
.build()?;
let app_idx_clone = Arc::clone(&app_idx);
let app = Builder::new_multi_thread()
.worker_threads(app_threads)
.enable_all()
.thread_name_fn(move || {
let n = app_idx_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
format!("ydb-app-{n}")
})
.build()?;
Ok(Self { control, app })
}
pub fn control_handle(&self) -> Handle {
self.control.handle().clone()
}
pub fn app_handle(&self) -> Handle {
self.app.handle().clone()
}
pub fn shutdown_timeout(self, dur: Duration) {
self.app.shutdown_timeout(dur);
self.control.shutdown_timeout(dur);
}
}
#[cfg(target_os = "linux")]
fn apply_priority(priority: ControlPriority) -> std::io::Result<()> {
match priority {
ControlPriority::Default => Ok(()),
ControlPriority::Fifo => {
unsafe {
let param = libc::sched_param { sched_priority: 1 };
let rc =
libc::pthread_setschedparam(libc::pthread_self(), libc::SCHED_FIFO, ¶m);
if rc != 0 {
return apply_priority(ControlPriority::Nice);
}
}
Ok(())
}
ControlPriority::Nice => {
unsafe {
let rc = libc::setpriority(libc::PRIO_PROCESS, 0, -10);
if rc != 0 {
return Err(std::io::Error::last_os_error());
}
}
Ok(())
}
}
}
#[cfg(not(target_os = "linux"))]
fn apply_priority(priority: ControlPriority) -> std::io::Result<()> {
match priority {
ControlPriority::Default => Ok(()),
ControlPriority::Fifo | ControlPriority::Nice => {
tracing::debug!(
"control priority is Linux-only; runtime split + caps still active on this platform"
);
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_default_chooses_fifo_on_linux() {
let cfg = RuntimeConfig::default();
let expected = if cfg!(target_os = "linux") {
ControlPriority::Fifo
} else {
ControlPriority::Default
};
assert_eq!(cfg.control_priority, expected);
}
#[test]
fn priority_from_str_handles_known_values() {
assert_eq!(ControlPriority::from_str("fifo"), ControlPriority::Fifo);
assert_eq!(ControlPriority::from_str("FIFO"), ControlPriority::Fifo);
assert_eq!(ControlPriority::from_str("nice"), ControlPriority::Nice);
assert_eq!(
ControlPriority::from_str("default"),
ControlPriority::Default
);
assert_eq!(ControlPriority::from_str(""), ControlPriority::Default);
assert_eq!(
ControlPriority::from_str("garbage"),
ControlPriority::Default
);
}
#[test]
fn split_runtime_builds_with_minimal_threads() {
let cfg = RuntimeConfig {
control_threads: 1,
app_threads: 1,
control_priority: ControlPriority::Default,
};
let rt = SplitRuntime::new(cfg).expect("split runtime");
let ctrl = rt.control_handle();
let app = rt.app_handle();
let ctrl_done = ctrl.spawn(async { 1u32 });
let app_done = app.spawn(async { 2u32 });
let (c, a) = ctrl.block_on(async { (ctrl_done.await, app_done.await) });
assert_eq!(c.unwrap(), 1);
assert_eq!(a.unwrap(), 2);
rt.shutdown_timeout(Duration::from_secs(2));
}
}