#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
#![deny(missing_docs)]
use core::{fmt, num};
use std::io::Write;
use std::panic::PanicHookInfo;
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::{env, sync::Arc, time::Duration};
use colored::Colorize as _;
use thread_rt::{RTParams, Scheduling};
pub use atomic_timer::AtomicTimer;
#[cfg(feature = "logicline")]
pub use logicline;
pub use log::LevelFilter;
pub use rtsc::{DataChannel, DataPolicy};
#[cfg(feature = "locking-default")]
pub use parking_lot as locking;
#[cfg(feature = "locking-rt")]
pub use parking_lot_rt as locking;
#[cfg(all(feature = "locking-rt-safe", not(target_os = "linux")))]
pub use parking_lot_rt as locking;
#[cfg(all(feature = "locking-rt-safe", target_os = "linux"))]
pub use rtsc::pi as locking;
#[cfg(feature = "metrics")]
pub use metrics;
pub use rtsc::policy_channel_async;
pub use rtsc::time;
pub mod buf {
pub type DataBuffer = rtsc::buf::DataBuffer<crate::locking::RawMutex>;
}
pub mod channel {
pub type Sender<T> =
rtsc::channel::Sender<T, crate::locking::RawMutex, crate::locking::Condvar>;
pub type Receiver<T> =
rtsc::channel::Receiver<T, crate::locking::RawMutex, crate::locking::Condvar>;
#[inline]
pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
rtsc::channel::bounded(capacity)
}
}
pub mod policy_channel {
use crate::DataDeliveryPolicy;
pub type Sender<T> =
rtsc::policy_channel::Sender<T, crate::locking::RawMutex, crate::locking::Condvar>;
pub type Receiver<T> =
rtsc::policy_channel::Receiver<T, crate::locking::RawMutex, crate::locking::Condvar>;
#[inline]
pub fn bounded<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
rtsc::policy_channel::bounded(capacity)
}
#[inline]
pub fn ordered<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
rtsc::policy_channel::ordered(capacity)
}
}
pub mod semaphore {
pub type Semaphore =
rtsc::semaphore::Semaphore<crate::locking::RawMutex, crate::locking::Condvar>;
#[allow(clippy::module_name_repetitions)]
pub type SemaphoreGuard =
rtsc::semaphore::SemaphoreGuard<crate::locking::RawMutex, crate::locking::Condvar>;
}
pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy};
pub mod comm;
pub mod controller;
#[cfg(feature = "hmi")]
pub mod hmi;
pub mod hub;
#[cfg(feature = "async")]
pub mod hub_async;
pub mod io;
pub mod supervisor;
pub mod system;
pub mod thread_rt;
#[cfg(any(feature = "json", feature = "msgpack"))]
pub mod state;
pub type Result<T> = std::result::Result<T, Error>;
static REALTIME_MODE: AtomicBool = AtomicBool::new(true);
pub fn set_simulated() {
REALTIME_MODE.store(false, Ordering::Relaxed);
}
fn is_realtime() -> bool {
REALTIME_MODE.load(Ordering::Relaxed)
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("channel full")]
ChannelFull,
#[error("channel message skipped")]
ChannelSkipped,
#[error("channel closed")]
ChannelClosed,
#[error("channel empty")]
ChannelEmpty,
#[error("hub send error {0}")]
HubSend(Box<Error>),
#[error("hub client already registered: {0}")]
HubAlreadyRegistered(Arc<str>),
#[error("timed out")]
Timeout,
#[error("I/O error: {0}")]
IO(#[from] std::io::Error),
#[error("Communication error: {0}")]
Comm(String),
#[error("API error {0}: {1}")]
API(String, i64),
#[error("RT SYS_gettid {0}")]
RTGetTId(libc::c_int),
#[error("RT sched_setaffinity {0}")]
RTSchedSetAffinity(String),
#[error("RT sched_setscheduler {0}")]
RTSchedSetSchduler(String),
#[error("Task name must be specified when spawning by a supervisor")]
SupervisorNameNotSpecified,
#[error("Task already registered: `{0}`")]
SupervisorDuplicateTask(String),
#[error("Task not found")]
SupervisorTaskNotFound,
#[error("Invalid data")]
InvalidData(String),
#[error("binrw {0}")]
BinRw(String),
#[error("not implemented")]
Unimplemented,
#[error("never happens")]
Infallible(#[from] std::convert::Infallible),
#[error("access denied")]
AccessDenied,
#[error("operation failed: {0}")]
Failed(String),
}
impl From<rtsc::Error> for Error {
fn from(err: rtsc::Error) -> Self {
match err {
rtsc::Error::ChannelFull => Error::ChannelFull,
rtsc::Error::ChannelSkipped => Error::ChannelSkipped,
rtsc::Error::ChannelClosed => Error::ChannelClosed,
rtsc::Error::ChannelEmpty => Error::ChannelEmpty,
rtsc::Error::Unimplemented => Error::Unimplemented,
rtsc::Error::Timeout => Error::Timeout,
rtsc::Error::InvalidData(msg) => Error::InvalidData(msg),
rtsc::Error::Failed(msg) => Error::Failed(msg),
rtsc::Error::AccessDenied => Error::AccessDenied,
rtsc::Error::RTSchedSetAffinity(msg) => Error::RTSchedSetAffinity(msg),
rtsc::Error::RTSchedSetScheduler(msg) => Error::RTSchedSetSchduler(msg),
rtsc::Error::IO(err) => Error::IO(err),
}
}
}
impl From<Error> for rtsc::Error {
fn from(err: Error) -> Self {
match err {
Error::ChannelFull => rtsc::Error::ChannelFull,
Error::ChannelSkipped => rtsc::Error::ChannelSkipped,
Error::ChannelClosed => rtsc::Error::ChannelClosed,
Error::ChannelEmpty => rtsc::Error::ChannelEmpty,
Error::Unimplemented => rtsc::Error::Unimplemented,
Error::Timeout => rtsc::Error::Timeout,
Error::InvalidData(msg) => rtsc::Error::InvalidData(msg),
Error::AccessDenied => rtsc::Error::AccessDenied,
Error::RTSchedSetAffinity(msg) => rtsc::Error::RTSchedSetAffinity(msg),
Error::RTSchedSetSchduler(msg) => rtsc::Error::RTSchedSetScheduler(msg),
Error::IO(err) => rtsc::Error::IO(err),
_ => rtsc::Error::Failed(err.to_string()),
}
}
}
macro_rules! impl_error {
($t: ty, $key: ident) => {
impl From<$t> for Error {
fn from(err: $t) -> Self {
Error::$key(err.to_string())
}
}
};
}
#[cfg(feature = "modbus")]
impl_error!(rmodbus::ErrorKind, Comm);
impl_error!(oneshot::RecvError, Comm);
impl_error!(num::ParseIntError, InvalidData);
impl_error!(num::ParseFloatError, InvalidData);
impl_error!(binrw::Error, BinRw);
impl Error {
pub fn is_data_skipped(&self) -> bool {
matches!(self, Error::ChannelSkipped)
}
pub fn invalid_data<S: fmt::Display>(msg: S) -> Self {
Error::InvalidData(msg.to_string())
}
pub fn io<S: fmt::Display>(msg: S) -> Self {
Error::Comm(msg.to_string())
}
pub fn failed<S: fmt::Display>(msg: S) -> Self {
Error::Failed(msg.to_string())
}
}
pub fn critical(msg: &str) -> ! {
eprintln!("{}", msg.red().bold());
thread_rt::suicide_myself(Duration::from_secs(0), false);
std::process::exit(1);
}
pub fn suicide(delay: Duration, warn: bool) {
let mut builder = thread_rt::Builder::new().name("suicide").rt_params(
RTParams::new()
.set_priority(99)
.set_scheduling(Scheduling::FIFO)
.set_cpu_ids(&[0]),
);
builder.park_on_errors = true;
let res = builder.spawn(move || {
thread_rt::suicide_myself(delay, warn);
});
if res.is_err() {
std::thread::spawn(move || {
thread_rt::suicide_myself(delay, warn);
});
}
}
#[cfg(feature = "rvideo")]
pub use rvideo;
#[cfg(feature = "rflow")]
pub use rflow;
#[cfg(feature = "rvideo")]
pub fn serve_rvideo() -> std::result::Result<(), rvideo::Error> {
rvideo::serve("0.0.0.0:3001")
}
#[cfg(feature = "rflow")]
pub fn serve_rflow() -> std::result::Result<(), rflow::Error> {
rflow::serve("0.0.0.0:4001")
}
#[cfg(feature = "metrics")]
pub fn metrics_exporter() -> metrics_exporter_prometheus::PrometheusBuilder {
metrics_exporter_prometheus::PrometheusBuilder::new()
}
#[cfg(feature = "metrics")]
pub fn metrics_exporter_install(
builder: metrics_exporter_prometheus::PrometheusBuilder,
) -> Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let (prometheus_exporter, prometheus_exporter_fut) = {
let _g = runtime.enter();
builder.build().map_err(Error::failed)?
};
metrics_exporter_scope::ScopeBuilder::new()
.with_fallback(Box::new(prometheus_exporter))
.install()
.map_err(Error::failed)?;
std::thread::Builder::new()
.name("metrics_exporter".to_owned())
.spawn(move || {
runtime.block_on(prometheus_exporter_fut).unwrap();
})?;
Ok(())
}
static PANIC_PREVENT: atomic::AtomicI32 = atomic::AtomicI32::new(0);
static PANIC_DELAY_NS: atomic::AtomicU64 = atomic::AtomicU64::new(0);
pub fn setup_panic() {
std::panic::set_hook(Box::new(move |info| {
panic(info);
}));
}
pub fn set_panic_delay(delay: Duration) {
PANIC_DELAY_NS.store(
delay.as_nanos().try_into().unwrap(),
atomic::Ordering::Relaxed,
);
}
pub fn prevent_panic_suicide() {
#[cfg(target_os = "linux")]
{
let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
PANIC_PREVENT.store(tid, atomic::Ordering::SeqCst);
}
}
pub fn allow_panic_suicide() {
PANIC_PREVENT.store(0, atomic::Ordering::SeqCst);
}
fn panic(info: &PanicHookInfo) -> ! {
eprintln!("{}", info.to_string().red().bold());
#[cfg(target_os = "linux")]
{
let mut can_suicide = true;
let pp = PANIC_PREVENT.load(atomic::Ordering::SeqCst);
if pp != 0 {
let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
can_suicide = tid == pp;
}
if can_suicide {
let panic_delay = Duration::from_nanos(PANIC_DELAY_NS.load(atomic::Ordering::Relaxed));
thread_rt::suicide_myself(panic_delay, false);
}
}
loop {
std::thread::park();
}
}
pub fn is_production() -> bool {
env::var("INVOCATION_ID").is_ok_and(|v| !v.is_empty())
}
pub fn configure_logger(filter: LevelFilter) {
let mut builder = env_logger::Builder::new();
builder.target(env_logger::Target::Stderr);
builder.filter_level(filter);
if is_production()
&& !env::var("ROBOPLC_LOG_STDOUT").is_ok_and(|v| v == "1")
&& !env::var("ROBOPLC_MODE").is_ok_and(|m| m == "exec")
{
builder.format(|buf, record| writeln!(buf, "{} {}", record.level(), record.args()));
}
builder.init();
}
#[cfg(target_os = "linux")]
pub fn reload_executable() -> Result<()> {
let mut current_exe = std::env::current_exe()?;
let fname = current_exe
.file_name()
.ok_or_else(|| Error::Failed("No file name".to_owned()))?
.to_string_lossy()
.trim_end_matches(" (deleted)")
.to_owned();
current_exe = current_exe.with_file_name(fname);
let _ = std::os::unix::process::CommandExt::exec(&mut std::process::Command::new(current_exe));
Ok(())
}
#[cfg(not(target_os = "linux"))]
pub fn reload_executable() -> Result<()> {
Err(Error::Unimplemented)
}
pub mod prelude {
pub use super::suicide;
pub use crate::controller::*;
pub use crate::hub::prelude::*;
pub use crate::io::prelude::*;
pub use crate::supervisor::prelude::*;
pub use crate::time::DurationRT;
pub use bma_ts::{Monotonic, Timestamp};
pub use rtsc::DataPolicy;
pub use std::time::Duration;
}