use std::future::Future;
use std::time::Duration;
use tokio::task::JoinHandle;
pub use tokio::runtime::Handle as RuntimeHandle;
pub use tokio::time::interval;
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(future)
}
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(f)
}
pub async fn sleep(duration: Duration) {
tokio::time::sleep(duration).await;
}
pub async fn sleep_until(deadline: std::time::Instant) {
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}
pub fn runtime_handle() -> tokio::runtime::Handle {
tokio::runtime::Handle::current()
}
pub const PRIORITY_MIN: u8 = 0;
pub const PRIORITY_MAX: u8 = 99;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ThreadPriority {
Low,
CaServerLow,
CaServerHigh,
Medium,
ScanLow,
ScanHigh,
High,
Iocsh,
Custom(u8),
}
impl ThreadPriority {
pub fn value(self) -> u8 {
let v = match self {
ThreadPriority::Low => 10,
ThreadPriority::CaServerLow => 20,
ThreadPriority::CaServerHigh => 40,
ThreadPriority::Medium => 50,
ThreadPriority::ScanLow => 60,
ThreadPriority::ScanHigh => 70,
ThreadPriority::High => 90,
ThreadPriority::Iocsh => 91,
ThreadPriority::Custom(v) => v,
};
v.min(PRIORITY_MAX)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StackSizeClass {
Small,
Medium,
Big,
}
impl StackSizeClass {
pub fn bytes(self) -> usize {
let unit = 0x10000usize * std::mem::size_of::<usize>();
match self {
StackSizeClass::Small => unit,
StackSizeClass::Medium => 2 * unit,
StackSizeClass::Big => 4 * unit,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PriorityApplied {
Realtime,
Unsupported,
BestEffortFailed,
}
impl PriorityApplied {
pub fn is_realtime(self) -> bool {
matches!(self, PriorityApplied::Realtime)
}
}
pub fn apply_to_current_thread(priority: ThreadPriority) -> PriorityApplied {
apply_priority_impl(priority.value())
}
#[cfg(target_os = "linux")]
fn apply_priority_impl(epics_priority: u8) -> PriorityApplied {
unsafe {
let policy = libc::SCHED_FIFO;
let min = libc::sched_get_priority_min(policy);
let max = libc::sched_get_priority_max(policy);
if min < 0 || max < 0 || max < min {
return PriorityApplied::Unsupported;
}
let slope = (max - min) as f64 / 100.0;
let mut oss = (epics_priority as f64 * slope) as i32 + min;
if oss < min {
oss = min;
}
if oss > max {
oss = max;
}
let param = libc::sched_param {
sched_priority: oss,
};
let rc = libc::pthread_setschedparam(libc::pthread_self(), policy, ¶m);
if rc == 0 {
PriorityApplied::Realtime
} else {
tracing::debug!(
target: "epics_base_rs::runtime",
epics_priority,
oss,
errno = rc,
"SCHED_FIFO priority not applied; thread stays at default policy"
);
PriorityApplied::BestEffortFailed
}
}
}
#[cfg(not(target_os = "linux"))]
fn apply_priority_impl(_epics_priority: u8) -> PriorityApplied {
PriorityApplied::Unsupported
}
pub fn spawn_blocking_with_priority<F, R>(priority: ThreadPriority, f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(move || {
let _ = apply_to_current_thread(priority);
f()
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_spawn() {
let handle = spawn(async { 42 });
assert_eq!(handle.await.unwrap(), 42);
}
#[tokio::test]
async fn test_spawn_blocking() {
let handle = spawn_blocking(|| 123);
assert_eq!(handle.await.unwrap(), 123);
}
#[tokio::test]
async fn test_sleep() {
let start = std::time::Instant::now();
sleep(Duration::from_millis(10)).await;
assert!(start.elapsed() >= Duration::from_millis(10));
}
#[test]
fn priority_named_levels_match_epics_thread_h() {
assert_eq!(ThreadPriority::Low.value(), 10);
assert_eq!(ThreadPriority::CaServerLow.value(), 20);
assert_eq!(ThreadPriority::CaServerHigh.value(), 40);
assert_eq!(ThreadPriority::Medium.value(), 50);
assert_eq!(ThreadPriority::ScanLow.value(), 60);
assert_eq!(ThreadPriority::ScanHigh.value(), 70);
assert_eq!(ThreadPriority::High.value(), 90);
assert_eq!(ThreadPriority::Iocsh.value(), 91);
}
#[test]
fn priority_ordering_ca_server_below_scan() {
assert!(ThreadPriority::CaServerHigh.value() < ThreadPriority::ScanLow.value());
assert!(ThreadPriority::CaServerLow.value() < ThreadPriority::ScanLow.value());
}
#[test]
fn priority_custom_clamps_to_max() {
assert_eq!(ThreadPriority::Custom(200).value(), PRIORITY_MAX);
assert_eq!(ThreadPriority::Custom(99).value(), 99);
assert_eq!(ThreadPriority::Custom(0).value(), PRIORITY_MIN);
}
#[test]
fn stack_size_classes_ordered() {
assert!(StackSizeClass::Small.bytes() < StackSizeClass::Medium.bytes());
assert!(StackSizeClass::Medium.bytes() < StackSizeClass::Big.bytes());
assert_eq!(
StackSizeClass::Small.bytes(),
0x10000 * std::mem::size_of::<usize>()
);
}
#[test]
fn apply_priority_returns_a_defined_outcome() {
let outcome = apply_to_current_thread(ThreadPriority::ScanHigh);
assert!(matches!(
outcome,
PriorityApplied::Realtime
| PriorityApplied::Unsupported
| PriorityApplied::BestEffortFailed
));
}
#[tokio::test]
async fn spawn_blocking_with_priority_runs_closure() {
let handle = spawn_blocking_with_priority(ThreadPriority::CaServerHigh, || 7);
assert_eq!(handle.await.unwrap(), 7);
}
}