Skip to main content

roboplc/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use core::{fmt, num};
4use std::io::Write;
5use std::panic::PanicHookInfo;
6use std::sync::atomic::{self, AtomicBool, Ordering};
7use std::{env, sync::Arc, time::Duration};
8
9use colored::Colorize as _;
10use thread_rt::{RTParams, Scheduling};
11
12pub use atomic_timer::AtomicTimer;
13#[cfg(feature = "logicline")]
14pub use logicline;
15
16pub use log::LevelFilter;
17pub use rtsc::{DataChannel, DataPolicy};
18
19#[cfg(feature = "locking-default")]
20pub use parking_lot as locking;
21
22#[cfg(feature = "locking-rt")]
23pub use parking_lot_rt as locking;
24
25#[cfg(all(feature = "locking-rt-safe", not(target_os = "linux")))]
26pub use parking_lot_rt as locking;
27#[cfg(all(feature = "locking-rt-safe", target_os = "linux"))]
28pub use rtsc::pi as locking;
29
30#[cfg(feature = "metrics")]
31pub use metrics;
32
33pub use rtsc::policy_channel_async;
34pub use rtsc::time;
35
36/// Wrapper around [`rtsc::buf`] with the chosen locking policy
37pub mod buf {
38    /// Type alias for [`rtsc::buf::DataBuffer`] with the chosen locking policy
39    pub type DataBuffer = rtsc::buf::DataBuffer<crate::locking::RawMutex>;
40}
41
42/// Wrapper around [`rtsc::channel`] with the chosen locking policy
43pub mod channel {
44
45    /// Type alias for [`rtsc::channel::Sender`] with the chosen locking policy
46    pub type Sender<T> =
47        rtsc::channel::Sender<T, crate::locking::RawMutex, crate::locking::Condvar>;
48    /// Type alias for [`rtsc::channel::Receiver`] with the chosen locking policy
49    pub type Receiver<T> =
50        rtsc::channel::Receiver<T, crate::locking::RawMutex, crate::locking::Condvar>;
51
52    /// Function alias for [`rtsc::channel::bounded`] with the chosen locking policy
53    #[inline]
54    pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
55        rtsc::channel::bounded(capacity)
56    }
57}
58
59/// Wrapper around [`rtsc::policy_channel`] with the chosen locking policy
60pub mod policy_channel {
61    use crate::DataDeliveryPolicy;
62
63    /// Type alias for [`rtsc::policy_channel::Sender`] with the chosen locking policy
64    pub type Sender<T> =
65        rtsc::policy_channel::Sender<T, crate::locking::RawMutex, crate::locking::Condvar>;
66    /// Type alias for [`rtsc::policy_channel::Receiver`] with the chosen locking policy
67    pub type Receiver<T> =
68        rtsc::policy_channel::Receiver<T, crate::locking::RawMutex, crate::locking::Condvar>;
69
70    /// Function alias for [`rtsc::policy_channel::bounded`] with the chosen locking policy
71    #[inline]
72    pub fn bounded<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
73        rtsc::policy_channel::bounded(capacity)
74    }
75
76    /// Function alias for [`rtsc::policy_channel::ordered`] with the chosen locking policy
77    #[inline]
78    pub fn ordered<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
79        rtsc::policy_channel::ordered(capacity)
80    }
81}
82
83/// Wrapper around [`rtsc::semaphore`] with the chosen locking policy
84pub mod semaphore {
85    /// Type alias for [`rtsc::semaphore::Semaphore`] with the chosen locking policy
86    pub type Semaphore =
87        rtsc::semaphore::Semaphore<crate::locking::RawMutex, crate::locking::Condvar>;
88    /// Type alias for [`rtsc::semaphore::SemaphoreGuard`] with the chosen locking policy
89    #[allow(clippy::module_name_repetitions)]
90    pub type SemaphoreGuard =
91        rtsc::semaphore::SemaphoreGuard<crate::locking::RawMutex, crate::locking::Condvar>;
92}
93
94pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy};
95
96/// Reliable TCP/Serial communications
97pub mod comm;
98/// Controller and workers
99pub mod controller;
100/// HMI (Human-Machine Interface) API
101#[cfg(feature = "hmi")]
102pub mod hmi;
103/// In-process data communication pub/sub hub, synchronous edition
104pub mod hub;
105/// In-process data communication pub/sub hub, asynchronous edition
106#[cfg(feature = "async")]
107pub mod hub_async;
108/// I/O
109pub mod io;
110/// Task supervisor to manage real-time threads
111pub mod supervisor;
112/// Linux system tools
113pub mod system;
114/// Real-time thread functions to work with [`supervisor::Supervisor`] and standalone, Linux only
115pub mod thread_rt;
116
117/// State helper functions
118#[cfg(any(feature = "json", feature = "msgpack"))]
119pub mod state;
120
121/// The crate result type
122pub type Result<T> = std::result::Result<T, Error>;
123
124static REALTIME_MODE: AtomicBool = AtomicBool::new(true);
125
126/// The function can be used in test environments to disable real-time functions but keep all
127/// methods running with no errors
128pub fn set_simulated() {
129    REALTIME_MODE.store(false, Ordering::Relaxed);
130}
131
132fn is_realtime() -> bool {
133    REALTIME_MODE.load(Ordering::Relaxed)
134}
135
136/// The crate error type
137#[derive(thiserror::Error, Debug)]
138pub enum Error {
139    /// the channel is full and the value can not be sent
140    #[error("channel full")]
141    ChannelFull,
142    /// the channel is full, an optional value is skipped. the error can be ignored but should be
143    /// logged
144    #[error("channel message skipped")]
145    ChannelSkipped,
146    /// The channel is closed (all transmitters/receivers gone)
147    #[error("channel closed")]
148    ChannelClosed,
149    /// Receive attempt failed because the channel is empty
150    #[error("channel empty")]
151    ChannelEmpty,
152    /// Hub send errors
153    #[error("hub send error {0}")]
154    HubSend(Box<Error>),
155    /// Hub client with the given name is already registered
156    #[error("hub client already registered: {0}")]
157    HubAlreadyRegistered(Arc<str>),
158    /// Timeouts
159    #[error("timed out")]
160    Timeout,
161    /// Standard I/O errors
162    #[error("I/O error: {0}")]
163    IO(#[from] std::io::Error),
164    /// Non-standard I/O errors
165    #[error("Communication error: {0}")]
166    Comm(String),
167    /// 3rd party API errors
168    #[error("API error {0}: {1}")]
169    API(String, i64),
170    /// Real-time engine error: unable to get the system thread id
171    #[error("RT SYS_gettid {0}")]
172    RTGetTId(libc::c_int),
173    /// Real-time engine error: unable to set the thread scheduler affinity
174    #[error("RT sched_setaffinity {0}")]
175    RTSchedSetAffinity(String),
176    /// Real-time engine error: unable to set the thread scheduler policy
177    #[error("RT sched_setscheduler {0}")]
178    RTSchedSetSchduler(String),
179    /// Supervisor error: task name is not specified in the thread builder
180    #[error("Task name must be specified when spawning by a supervisor")]
181    SupervisorNameNotSpecified,
182    /// Supervisor error: task with the given name is already registered
183    #[error("Task already registered: `{0}`")]
184    SupervisorDuplicateTask(String),
185    /// Supervisor error: task with the given name is not found
186    #[error("Task not found")]
187    SupervisorTaskNotFound,
188    /// Invalid data receied / parameters provided
189    #[error("Invalid data")]
190    InvalidData(String),
191    /// [binrw](https://crates.io/crates/binrw) crate errors
192    #[error("binrw {0}")]
193    BinRw(String),
194    /// The requested operation is not implemented
195    #[error("not implemented")]
196    Unimplemented,
197    /// This error never happens and is used as a compiler hint only
198    #[error("never happens")]
199    Infallible(#[from] std::convert::Infallible),
200    /// Syscall / internal API access denied
201    #[error("access denied")]
202    AccessDenied,
203    /// All other errors
204    #[error("operation failed: {0}")]
205    Failed(String),
206}
207
208impl From<rtsc::Error> for Error {
209    fn from(err: rtsc::Error) -> Self {
210        match err {
211            rtsc::Error::ChannelFull => Error::ChannelFull,
212            rtsc::Error::ChannelSkipped => Error::ChannelSkipped,
213            rtsc::Error::ChannelClosed => Error::ChannelClosed,
214            rtsc::Error::ChannelEmpty => Error::ChannelEmpty,
215            rtsc::Error::Unimplemented => Error::Unimplemented,
216            rtsc::Error::Timeout => Error::Timeout,
217            rtsc::Error::InvalidData(msg) => Error::InvalidData(msg),
218            rtsc::Error::Failed(msg) => Error::Failed(msg),
219            rtsc::Error::AccessDenied => Error::AccessDenied,
220            rtsc::Error::RTSchedSetAffinity(msg) => Error::RTSchedSetAffinity(msg),
221            rtsc::Error::RTSchedSetScheduler(msg) => Error::RTSchedSetSchduler(msg),
222            rtsc::Error::IO(err) => Error::IO(err),
223        }
224    }
225}
226
227impl From<Error> for rtsc::Error {
228    fn from(err: Error) -> Self {
229        match err {
230            Error::ChannelFull => rtsc::Error::ChannelFull,
231            Error::ChannelSkipped => rtsc::Error::ChannelSkipped,
232            Error::ChannelClosed => rtsc::Error::ChannelClosed,
233            Error::ChannelEmpty => rtsc::Error::ChannelEmpty,
234            Error::Unimplemented => rtsc::Error::Unimplemented,
235            Error::Timeout => rtsc::Error::Timeout,
236            Error::InvalidData(msg) => rtsc::Error::InvalidData(msg),
237            Error::AccessDenied => rtsc::Error::AccessDenied,
238            Error::RTSchedSetAffinity(msg) => rtsc::Error::RTSchedSetAffinity(msg),
239            Error::RTSchedSetSchduler(msg) => rtsc::Error::RTSchedSetScheduler(msg),
240            Error::IO(err) => rtsc::Error::IO(err),
241            _ => rtsc::Error::Failed(err.to_string()),
242        }
243    }
244}
245
246macro_rules! impl_error {
247    ($t: ty, $key: ident) => {
248        impl From<$t> for Error {
249            fn from(err: $t) -> Self {
250                Error::$key(err.to_string())
251            }
252        }
253    };
254}
255
256#[cfg(feature = "modbus")]
257impl_error!(rmodbus::ErrorKind, Comm);
258impl_error!(oneshot::RecvError, Comm);
259impl_error!(num::ParseIntError, InvalidData);
260impl_error!(num::ParseFloatError, InvalidData);
261impl_error!(binrw::Error, BinRw);
262
263impl Error {
264    /// Returns true if the data is skipped
265    pub fn is_data_skipped(&self) -> bool {
266        matches!(self, Error::ChannelSkipped)
267    }
268    /// Creates new invalid data error
269    pub fn invalid_data<S: fmt::Display>(msg: S) -> Self {
270        Error::InvalidData(msg.to_string())
271    }
272    /// Creates new I/O error (for non-standard I/O)
273    pub fn io<S: fmt::Display>(msg: S) -> Self {
274        Error::Comm(msg.to_string())
275    }
276    /// Creates new function failed error
277    pub fn failed<S: fmt::Display>(msg: S) -> Self {
278        Error::Failed(msg.to_string())
279    }
280}
281
282/// Immediately kills the current process and all its subprocesses with a message to stderr
283pub fn critical(msg: &str) -> ! {
284    eprintln!("{}", msg.red().bold());
285    thread_rt::suicide_myself(Duration::from_secs(0), false);
286    std::process::exit(1);
287}
288
289/// Terminates the current process and all its subprocesses in the specified period of time with
290/// SIGKILL command. Useful if a process is unable to shut it down gracefully within a specified
291/// period of time.
292///
293/// Prints warnings to STDOUT if warn is true
294pub fn suicide(delay: Duration, warn: bool) {
295    let mut builder = thread_rt::Builder::new().name("suicide").rt_params(
296        RTParams::new()
297            .set_priority(99)
298            .set_scheduling(Scheduling::FIFO)
299            .set_cpu_ids(&[0]),
300    );
301    builder.park_on_errors = true;
302    let res = builder.spawn(move || {
303        thread_rt::suicide_myself(delay, warn);
304    });
305    if res.is_err() {
306        std::thread::spawn(move || {
307            thread_rt::suicide_myself(delay, warn);
308        });
309    }
310}
311
312#[cfg(feature = "rvideo")]
313pub use rvideo;
314
315#[cfg(feature = "rflow")]
316pub use rflow;
317
318#[cfg(feature = "rvideo")]
319/// Serves the default [`rvideo`] server at TCP port `0.0.0.0:3001`
320pub fn serve_rvideo() -> std::result::Result<(), rvideo::Error> {
321    rvideo::serve("0.0.0.0:3001")
322}
323
324#[cfg(feature = "rflow")]
325/// Serves the default [`rflow`] server at TCP port `0.0.0.0:4001`
326pub fn serve_rflow() -> std::result::Result<(), rflow::Error> {
327    rflow::serve("0.0.0.0:4001")
328}
329
330/// Returns [Prometheus metrics exporter
331/// builder](https://docs.rs/metrics-exporter-prometheus/)
332///
333/// # Example
334///
335/// ```rust,no_run
336/// roboplc::metrics_exporter()
337///   .set_bucket_duration(std::time::Duration::from_secs(300)).unwrap()
338///   .install().unwrap();
339/// ```
340#[cfg(feature = "metrics")]
341pub fn metrics_exporter() -> metrics_exporter_prometheus::PrometheusBuilder {
342    metrics_exporter_prometheus::PrometheusBuilder::new()
343}
344
345/// Installs Prometheus metrics exporter together with [Scope
346/// exporter](https://docs.rs/metrics-exporter-scope)
347///
348/// # Panics
349///
350/// Panics if the exporter fails to init
351#[cfg(feature = "metrics")]
352pub fn metrics_exporter_install(
353    builder: metrics_exporter_prometheus::PrometheusBuilder,
354) -> Result<()> {
355    let runtime = tokio::runtime::Builder::new_current_thread()
356        .enable_all()
357        .build()?;
358    let (prometheus_exporter, prometheus_exporter_fut) = {
359        let _g = runtime.enter();
360        builder.build().map_err(Error::failed)?
361    };
362    metrics_exporter_scope::ScopeBuilder::new()
363        .with_fallback(Box::new(prometheus_exporter))
364        .install()
365        .map_err(Error::failed)?;
366    std::thread::Builder::new()
367        .name("metrics_exporter".to_owned())
368        .spawn(move || {
369            runtime.block_on(prometheus_exporter_fut).unwrap();
370        })?;
371    Ok(())
372}
373
374static PANIC_PREVENT: atomic::AtomicI32 = atomic::AtomicI32::new(0);
375static PANIC_DELAY_NS: atomic::AtomicU64 = atomic::AtomicU64::new(0);
376
377/// Sets panic handler to immediately kill the process and its childs with SIGKILL. The process is
378/// killed when panic happens in ANY thread
379pub fn setup_panic() {
380    std::panic::set_hook(Box::new(move |info| {
381        panic(info);
382    }));
383}
384
385/// Sets the delay before killing the process on panic. The default is 0, which means the process
386/// is killed immediately.
387///
388/// # Panics
389///
390/// Panics if the delay is longer than `u64::MAX` nanoseconds (about 584 years).
391pub fn set_panic_delay(delay: Duration) {
392    PANIC_DELAY_NS.store(
393        delay.as_nanos().try_into().unwrap(),
394        atomic::Ordering::Relaxed,
395    );
396}
397
398/// Prevent other threads to kill the process on panic (the setter still has the ability)
399pub fn prevent_panic_suicide() {
400    #[cfg(target_os = "linux")]
401    {
402        let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
403        PANIC_PREVENT.store(tid, atomic::Ordering::SeqCst);
404    }
405}
406
407/// Allow any thread to kill the process on panic (on by default)
408pub fn allow_panic_suicide() {
409    PANIC_PREVENT.store(0, atomic::Ordering::SeqCst);
410}
411
412fn panic(info: &PanicHookInfo) -> ! {
413    eprintln!("{}", info.to_string().red().bold());
414    #[cfg(target_os = "linux")]
415    {
416        let mut can_suicide = true;
417        let pp = PANIC_PREVENT.load(atomic::Ordering::SeqCst);
418        if pp != 0 {
419            let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
420            can_suicide = tid == pp;
421        }
422        if can_suicide {
423            let panic_delay = Duration::from_nanos(PANIC_DELAY_NS.load(atomic::Ordering::Relaxed));
424            thread_rt::suicide_myself(panic_delay, false);
425        }
426    }
427    loop {
428        std::thread::park();
429    }
430}
431
432/// Returns true if started in production mode (as a systemd unit)
433pub fn is_production() -> bool {
434    env::var("INVOCATION_ID").is_ok_and(|v| !v.is_empty())
435}
436
437/// Configures stdout logger with the given filter. If started in production mode, does not logs
438/// timestamps
439pub fn configure_logger(filter: LevelFilter) {
440    let mut builder = env_logger::Builder::new();
441    builder.target(env_logger::Target::Stderr);
442    builder.filter_level(filter);
443    if is_production()
444        && !env::var("ROBOPLC_LOG_STDOUT").is_ok_and(|v| v == "1")
445        && !env::var("ROBOPLC_MODE").is_ok_and(|m| m == "exec")
446    {
447        builder.format(|buf, record| writeln!(buf, "{} {}", record.level(), record.args()));
448    }
449    builder.init();
450}
451
452/// Reload the current executable (performs execvp syscall, Linux only)
453#[cfg(target_os = "linux")]
454pub fn reload_executable() -> Result<()> {
455    let mut current_exe = std::env::current_exe()?;
456    // handle a case if the executable is deleted
457    let fname = current_exe
458        .file_name()
459        .ok_or_else(|| Error::Failed("No file name".to_owned()))?
460        .to_string_lossy()
461        .trim_end_matches(" (deleted)")
462        .to_owned();
463    current_exe = current_exe.with_file_name(fname);
464    let _ = std::os::unix::process::CommandExt::exec(&mut std::process::Command::new(current_exe));
465    Ok(())
466}
467
468/// Reload the current executable (performs execvp syscall, Linux only)
469#[cfg(not(target_os = "linux"))]
470pub fn reload_executable() -> Result<()> {
471    Err(Error::Unimplemented)
472}
473
474/// Prelude module
475pub mod prelude {
476    pub use super::suicide;
477    pub use crate::controller::*;
478    pub use crate::hub::prelude::*;
479    pub use crate::io::prelude::*;
480    pub use crate::supervisor::prelude::*;
481    pub use crate::time::DurationRT;
482    pub use bma_ts::{Monotonic, Timestamp};
483    pub use rtsc::DataPolicy;
484    pub use std::time::Duration;
485}