epics-base-rs 0.18.3

Pure Rust EPICS IOC core — record system, database, iocsh, calc engine
Documentation
use std::future::Future;
use std::time::Duration;
use tokio::task::JoinHandle;

pub use tokio::runtime::Handle as RuntimeHandle;
pub use tokio::time::interval;

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    tokio::spawn(future)
}

pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    tokio::task::spawn_blocking(f)
}

pub async fn sleep(duration: Duration) {
    tokio::time::sleep(duration).await;
}

pub async fn sleep_until(deadline: std::time::Instant) {
    tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
}

pub fn runtime_handle() -> tokio::runtime::Handle {
    tokio::runtime::Handle::current()
}

// ---------------------------------------------------------------------------
// EPICS thread priority abstraction
//
// C parity: `modules/libcom/src/osi/epicsThread.h:73-92` defines an
// integer priority space `0..=99` (`epicsThreadPriorityMin/Max`) with a
// set of named levels, plus three stack-size classes.
// `osi/os/posix/osdThread.c` maps an EPICS priority `p` onto the OS
// SCHED_FIFO range with `oss = p * (max-min)/100 + min` and falls back
// to a non-RT (default-policy) thread when the process lacks permission
// to use SCHED_FIFO.
//
// The Rust port runs work as tokio tasks on a shared pool, so there is
// no per-task OS thread to re-prioritise for `spawn`. What is portably
// achievable is: (a) the priority enum + named levels as a first-class
// type, (b) a stack-size class with the C size table, and (c) a
// best-effort OS-scheduler priority applied to the *current* OS thread
// (used by dedicated `spawn_blocking` threads and the runtime's worker
// threads). `apply_to_current_thread` reports whether the OS actually
// honoured the request.
// ---------------------------------------------------------------------------

/// Minimum EPICS thread priority (`epicsThreadPriorityMin`).
pub const PRIORITY_MIN: u8 = 0;
/// Maximum EPICS thread priority (`epicsThreadPriorityMax`).
pub const PRIORITY_MAX: u8 = 99;

/// EPICS thread priority — an integer `0..=99` with the named levels
/// from `epicsThreadPriority*` (`epicsThread.h:73-83`). Lower values
/// are lower priority; the CA server bands sit below the scan bands so
/// scan threads preempt CA-server threads on a loaded IOC.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ThreadPriority {
    /// `epicsThreadPriorityLow` = 10.
    Low,
    /// `epicsThreadPriorityCAServerLow` = 20.
    CaServerLow,
    /// `epicsThreadPriorityCAServerHigh` = 40.
    CaServerHigh,
    /// `epicsThreadPriorityMedium` = 50.
    Medium,
    /// `epicsThreadPriorityScanLow` = 60.
    ScanLow,
    /// `epicsThreadPriorityScanHigh` = 70.
    ScanHigh,
    /// `epicsThreadPriorityHigh` = 90.
    High,
    /// `epicsThreadPriorityIocsh` = 91.
    Iocsh,
    /// An explicit priority value, clamped to `0..=99` on use.
    Custom(u8),
}

impl ThreadPriority {
    /// The raw EPICS priority value `0..=99`, matching the
    /// `epicsThreadPriority*` constants in `epicsThread.h`.
    pub fn value(self) -> u8 {
        let v = match self {
            ThreadPriority::Low => 10,
            ThreadPriority::CaServerLow => 20,
            ThreadPriority::CaServerHigh => 40,
            ThreadPriority::Medium => 50,
            ThreadPriority::ScanLow => 60,
            ThreadPriority::ScanHigh => 70,
            ThreadPriority::High => 90,
            ThreadPriority::Iocsh => 91,
            ThreadPriority::Custom(v) => v,
        };
        v.min(PRIORITY_MAX)
    }
}

/// Stack-size class — `epicsThreadStackSizeClass` (`epicsThread.h:91`).
///
/// The byte size is implementation-dependent in C; the values here
/// mirror the POSIX table `STACK_SIZE(f) = f * 0x10000 * sizeof(void*)`
/// (`osdThread.c:506-509`) for a 64-bit target (`sizeof(void*) == 8`):
/// Small = 1, Medium = 2, Big = 4 units of `0x10000 * 8`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StackSizeClass {
    Small,
    Medium,
    Big,
}

impl StackSizeClass {
    /// Stack size in bytes for this class, matching the POSIX
    /// `stackSizeTable` in `osdThread.c` on a 64-bit target.
    pub fn bytes(self) -> usize {
        // STACK_SIZE(f) = f * 0x10000 * sizeof(void*)
        let unit = 0x10000usize * std::mem::size_of::<usize>();
        match self {
            StackSizeClass::Small => unit,
            StackSizeClass::Medium => 2 * unit,
            StackSizeClass::Big => 4 * unit,
        }
    }
}

/// Outcome of a best-effort OS-scheduler priority change.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PriorityApplied {
    /// The OS scheduler honoured the requested priority (real-time
    /// SCHED_FIFO band applied).
    Realtime,
    /// The platform does not expose a portable scheduler priority API
    /// (e.g. Windows here, or a non-Unix target) — no change applied.
    Unsupported,
    /// The platform exposes the API but rejected the request (typically
    /// the process lacks `CAP_SYS_NICE`/root for SCHED_FIFO). C's
    /// `osdThread.c` makes the same best-effort fall back to a non-RT
    /// thread in this case (`osdThread.c:647` "Try again without
    /// SCHED_FIFO").
    BestEffortFailed,
}

impl PriorityApplied {
    /// `true` only when the OS actually applied a real-time priority.
    pub fn is_realtime(self) -> bool {
        matches!(self, PriorityApplied::Realtime)
    }
}

/// Apply an EPICS [`ThreadPriority`] to the **current OS thread**, best
/// effort.
///
/// C parity: mirrors `osdThread.c`'s SCHED_FIFO mapping
/// `oss = p * (max-min)/100 + min` over the kernel's
/// `sched_get_priority_min/max(SCHED_FIFO)` range, and the
/// EPERM-fallback to a non-RT thread.
///
/// Returns [`PriorityApplied`] describing what the platform allowed —
/// callers running in environments without RT permission still get a
/// running thread, just at the default policy, exactly as a C IOC does.
///
/// Note: tokio tasks spawned via [`spawn`] share worker threads, so
/// this is meaningful for [`spawn_blocking`] closures and for tuning
/// the runtime's worker threads at startup — not for individual async
/// tasks.
///
/// Platform support: the OS-scheduler change is wired on Linux (where
/// the crate links `libc`). On other targets the priority enum + API
/// surface still exist but `apply` reports [`PriorityApplied::Unsupported`]
/// — the platform allows no portable change here.
pub fn apply_to_current_thread(priority: ThreadPriority) -> PriorityApplied {
    apply_priority_impl(priority.value())
}

#[cfg(target_os = "linux")]
fn apply_priority_impl(epics_priority: u8) -> PriorityApplied {
    // SAFETY: sched_get_priority_min/max take only an int policy and
    // have no preconditions; pthread_setschedparam operates on the
    // calling thread with a stack-local sched_param.
    unsafe {
        let policy = libc::SCHED_FIFO;
        let min = libc::sched_get_priority_min(policy);
        let max = libc::sched_get_priority_max(policy);
        if min < 0 || max < 0 || max < min {
            return PriorityApplied::Unsupported;
        }
        // C `osdThread.c:138-139`: slope over a 0..100 EPICS range.
        let slope = (max - min) as f64 / 100.0;
        let mut oss = (epics_priority as f64 * slope) as i32 + min;
        if oss < min {
            oss = min;
        }
        if oss > max {
            oss = max;
        }
        let param = libc::sched_param {
            sched_priority: oss,
        };
        let rc = libc::pthread_setschedparam(libc::pthread_self(), policy, &param);
        if rc == 0 {
            PriorityApplied::Realtime
        } else {
            // EPERM (no RT permission) or EINVAL — C falls back to a
            // non-RT thread here. Leave the thread at the default
            // policy and report best-effort failure.
            tracing::debug!(
                target: "epics_base_rs::runtime",
                epics_priority,
                oss,
                errno = rc,
                "SCHED_FIFO priority not applied; thread stays at default policy"
            );
            PriorityApplied::BestEffortFailed
        }
    }
}

#[cfg(not(target_os = "linux"))]
fn apply_priority_impl(_epics_priority: u8) -> PriorityApplied {
    // The crate links `libc` only on Linux; no portable OS-scheduler
    // priority API is wired on other targets.
    PriorityApplied::Unsupported
}

/// Spawn a blocking closure on a dedicated thread and apply the given
/// EPICS [`ThreadPriority`] to that thread before running `f`.
///
/// The priority application is best effort (see
/// [`apply_to_current_thread`]); `f` runs regardless of whether the OS
/// honoured the request. This is the priority-aware counterpart of
/// [`spawn_blocking`] for IOC threads (CA server, scan) that a C IOC
/// would run in a distinct SCHED band.
pub fn spawn_blocking_with_priority<F, R>(priority: ThreadPriority, f: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    tokio::task::spawn_blocking(move || {
        let _ = apply_to_current_thread(priority);
        f()
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_spawn() {
        let handle = spawn(async { 42 });
        assert_eq!(handle.await.unwrap(), 42);
    }

    #[tokio::test]
    async fn test_spawn_blocking() {
        let handle = spawn_blocking(|| 123);
        assert_eq!(handle.await.unwrap(), 123);
    }

    #[tokio::test]
    async fn test_sleep() {
        let start = std::time::Instant::now();
        sleep(Duration::from_millis(10)).await;
        assert!(start.elapsed() >= Duration::from_millis(10));
    }

    #[test]
    fn priority_named_levels_match_epics_thread_h() {
        // epicsThread.h:73-83 named-level constants.
        assert_eq!(ThreadPriority::Low.value(), 10);
        assert_eq!(ThreadPriority::CaServerLow.value(), 20);
        assert_eq!(ThreadPriority::CaServerHigh.value(), 40);
        assert_eq!(ThreadPriority::Medium.value(), 50);
        assert_eq!(ThreadPriority::ScanLow.value(), 60);
        assert_eq!(ThreadPriority::ScanHigh.value(), 70);
        assert_eq!(ThreadPriority::High.value(), 90);
        assert_eq!(ThreadPriority::Iocsh.value(), 91);
    }

    #[test]
    fn priority_ordering_ca_server_below_scan() {
        // Real-time invariant: scan threads must outrank CA-server
        // threads so scans preempt the CA server on a loaded IOC.
        assert!(ThreadPriority::CaServerHigh.value() < ThreadPriority::ScanLow.value());
        assert!(ThreadPriority::CaServerLow.value() < ThreadPriority::ScanLow.value());
    }

    #[test]
    fn priority_custom_clamps_to_max() {
        assert_eq!(ThreadPriority::Custom(200).value(), PRIORITY_MAX);
        assert_eq!(ThreadPriority::Custom(99).value(), 99);
        assert_eq!(ThreadPriority::Custom(0).value(), PRIORITY_MIN);
    }

    #[test]
    fn stack_size_classes_ordered() {
        // STACK_SIZE table is strictly increasing Small < Medium < Big.
        assert!(StackSizeClass::Small.bytes() < StackSizeClass::Medium.bytes());
        assert!(StackSizeClass::Medium.bytes() < StackSizeClass::Big.bytes());
        // Small = 0x10000 * sizeof(usize).
        assert_eq!(
            StackSizeClass::Small.bytes(),
            0x10000 * std::mem::size_of::<usize>()
        );
    }

    #[test]
    fn apply_priority_returns_a_defined_outcome() {
        // The result depends on the platform + permissions of the test
        // host; we only assert it is one of the defined outcomes and
        // does not panic. On a CI box without CAP_SYS_NICE this is
        // typically BestEffortFailed — which is C-parity behaviour.
        let outcome = apply_to_current_thread(ThreadPriority::ScanHigh);
        assert!(matches!(
            outcome,
            PriorityApplied::Realtime
                | PriorityApplied::Unsupported
                | PriorityApplied::BestEffortFailed
        ));
    }

    #[tokio::test]
    async fn spawn_blocking_with_priority_runs_closure() {
        let handle = spawn_blocking_with_priority(ThreadPriority::CaServerHigh, || 7);
        assert_eq!(handle.await.unwrap(), 7);
    }
}