1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use core::{fmt, num};
4use std::io::Write;
5use std::panic::PanicHookInfo;
6use std::sync::atomic::{self, AtomicBool, Ordering};
7use std::{env, sync::Arc, time::Duration};
8
9use colored::Colorize as _;
10use thread_rt::{RTParams, Scheduling};
11
12pub use atomic_timer::AtomicTimer;
13#[cfg(feature = "logicline")]
14pub use logicline;
15
16pub use log::LevelFilter;
17pub use rtsc::{DataChannel, DataPolicy};
18
19#[cfg(feature = "locking-default")]
20pub use parking_lot as locking;
21
22#[cfg(feature = "locking-rt")]
23pub use parking_lot_rt as locking;
24
25#[cfg(all(feature = "locking-rt-safe", not(target_os = "linux")))]
26pub use parking_lot_rt as locking;
27#[cfg(all(feature = "locking-rt-safe", target_os = "linux"))]
28pub use rtsc::pi as locking;
29
30#[cfg(feature = "metrics")]
31pub use metrics;
32
33pub use rtsc::policy_channel_async;
34pub use rtsc::time;
35
36pub mod buf {
38 pub type DataBuffer = rtsc::buf::DataBuffer<crate::locking::RawMutex>;
40}
41
42pub mod channel {
44
45 pub type Sender<T> =
47 rtsc::channel::Sender<T, crate::locking::RawMutex, crate::locking::Condvar>;
48 pub type Receiver<T> =
50 rtsc::channel::Receiver<T, crate::locking::RawMutex, crate::locking::Condvar>;
51
52 #[inline]
54 pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
55 rtsc::channel::bounded(capacity)
56 }
57}
58
59pub mod policy_channel {
61 use crate::DataDeliveryPolicy;
62
63 pub type Sender<T> =
65 rtsc::policy_channel::Sender<T, crate::locking::RawMutex, crate::locking::Condvar>;
66 pub type Receiver<T> =
68 rtsc::policy_channel::Receiver<T, crate::locking::RawMutex, crate::locking::Condvar>;
69
70 #[inline]
72 pub fn bounded<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
73 rtsc::policy_channel::bounded(capacity)
74 }
75
76 #[inline]
78 pub fn ordered<T: DataDeliveryPolicy>(capacity: usize) -> (Sender<T>, Receiver<T>) {
79 rtsc::policy_channel::ordered(capacity)
80 }
81}
82
83pub mod semaphore {
85 pub type Semaphore =
87 rtsc::semaphore::Semaphore<crate::locking::RawMutex, crate::locking::Condvar>;
88 #[allow(clippy::module_name_repetitions)]
90 pub type SemaphoreGuard =
91 rtsc::semaphore::SemaphoreGuard<crate::locking::RawMutex, crate::locking::Condvar>;
92}
93
94pub use rtsc::data_policy::{DataDeliveryPolicy, DeliveryPolicy};
95
96pub mod comm;
98pub mod controller;
100#[cfg(feature = "hmi")]
102pub mod hmi;
103pub mod hub;
105#[cfg(feature = "async")]
107pub mod hub_async;
108pub mod io;
110pub mod supervisor;
112pub mod system;
114pub mod thread_rt;
116
117#[cfg(any(feature = "json", feature = "msgpack"))]
119pub mod state;
120
121pub type Result<T> = std::result::Result<T, Error>;
123
124static REALTIME_MODE: AtomicBool = AtomicBool::new(true);
125
126pub fn set_simulated() {
129 REALTIME_MODE.store(false, Ordering::Relaxed);
130}
131
132fn is_realtime() -> bool {
133 REALTIME_MODE.load(Ordering::Relaxed)
134}
135
136#[derive(thiserror::Error, Debug)]
138pub enum Error {
139 #[error("channel full")]
141 ChannelFull,
142 #[error("channel message skipped")]
145 ChannelSkipped,
146 #[error("channel closed")]
148 ChannelClosed,
149 #[error("channel empty")]
151 ChannelEmpty,
152 #[error("hub send error {0}")]
154 HubSend(Box<Error>),
155 #[error("hub client already registered: {0}")]
157 HubAlreadyRegistered(Arc<str>),
158 #[error("timed out")]
160 Timeout,
161 #[error("I/O error: {0}")]
163 IO(#[from] std::io::Error),
164 #[error("Communication error: {0}")]
166 Comm(String),
167 #[error("API error {0}: {1}")]
169 API(String, i64),
170 #[error("RT SYS_gettid {0}")]
172 RTGetTId(libc::c_int),
173 #[error("RT sched_setaffinity {0}")]
175 RTSchedSetAffinity(String),
176 #[error("RT sched_setscheduler {0}")]
178 RTSchedSetSchduler(String),
179 #[error("Task name must be specified when spawning by a supervisor")]
181 SupervisorNameNotSpecified,
182 #[error("Task already registered: `{0}`")]
184 SupervisorDuplicateTask(String),
185 #[error("Task not found")]
187 SupervisorTaskNotFound,
188 #[error("Invalid data")]
190 InvalidData(String),
191 #[error("binrw {0}")]
193 BinRw(String),
194 #[error("not implemented")]
196 Unimplemented,
197 #[error("never happens")]
199 Infallible(#[from] std::convert::Infallible),
200 #[error("access denied")]
202 AccessDenied,
203 #[error("operation failed: {0}")]
205 Failed(String),
206}
207
208impl From<rtsc::Error> for Error {
209 fn from(err: rtsc::Error) -> Self {
210 match err {
211 rtsc::Error::ChannelFull => Error::ChannelFull,
212 rtsc::Error::ChannelSkipped => Error::ChannelSkipped,
213 rtsc::Error::ChannelClosed => Error::ChannelClosed,
214 rtsc::Error::ChannelEmpty => Error::ChannelEmpty,
215 rtsc::Error::Unimplemented => Error::Unimplemented,
216 rtsc::Error::Timeout => Error::Timeout,
217 rtsc::Error::InvalidData(msg) => Error::InvalidData(msg),
218 rtsc::Error::Failed(msg) => Error::Failed(msg),
219 rtsc::Error::AccessDenied => Error::AccessDenied,
220 rtsc::Error::RTSchedSetAffinity(msg) => Error::RTSchedSetAffinity(msg),
221 rtsc::Error::RTSchedSetScheduler(msg) => Error::RTSchedSetSchduler(msg),
222 rtsc::Error::IO(err) => Error::IO(err),
223 }
224 }
225}
226
227impl From<Error> for rtsc::Error {
228 fn from(err: Error) -> Self {
229 match err {
230 Error::ChannelFull => rtsc::Error::ChannelFull,
231 Error::ChannelSkipped => rtsc::Error::ChannelSkipped,
232 Error::ChannelClosed => rtsc::Error::ChannelClosed,
233 Error::ChannelEmpty => rtsc::Error::ChannelEmpty,
234 Error::Unimplemented => rtsc::Error::Unimplemented,
235 Error::Timeout => rtsc::Error::Timeout,
236 Error::InvalidData(msg) => rtsc::Error::InvalidData(msg),
237 Error::AccessDenied => rtsc::Error::AccessDenied,
238 Error::RTSchedSetAffinity(msg) => rtsc::Error::RTSchedSetAffinity(msg),
239 Error::RTSchedSetSchduler(msg) => rtsc::Error::RTSchedSetScheduler(msg),
240 Error::IO(err) => rtsc::Error::IO(err),
241 _ => rtsc::Error::Failed(err.to_string()),
242 }
243 }
244}
245
246macro_rules! impl_error {
247 ($t: ty, $key: ident) => {
248 impl From<$t> for Error {
249 fn from(err: $t) -> Self {
250 Error::$key(err.to_string())
251 }
252 }
253 };
254}
255
256#[cfg(feature = "modbus")]
257impl_error!(rmodbus::ErrorKind, Comm);
258impl_error!(oneshot::RecvError, Comm);
259impl_error!(num::ParseIntError, InvalidData);
260impl_error!(num::ParseFloatError, InvalidData);
261impl_error!(binrw::Error, BinRw);
262
263impl Error {
264 pub fn is_data_skipped(&self) -> bool {
266 matches!(self, Error::ChannelSkipped)
267 }
268 pub fn invalid_data<S: fmt::Display>(msg: S) -> Self {
270 Error::InvalidData(msg.to_string())
271 }
272 pub fn io<S: fmt::Display>(msg: S) -> Self {
274 Error::Comm(msg.to_string())
275 }
276 pub fn failed<S: fmt::Display>(msg: S) -> Self {
278 Error::Failed(msg.to_string())
279 }
280}
281
282pub fn critical(msg: &str) -> ! {
284 eprintln!("{}", msg.red().bold());
285 thread_rt::suicide_myself(Duration::from_secs(0), false);
286 std::process::exit(1);
287}
288
289pub fn suicide(delay: Duration, warn: bool) {
295 let mut builder = thread_rt::Builder::new().name("suicide").rt_params(
296 RTParams::new()
297 .set_priority(99)
298 .set_scheduling(Scheduling::FIFO)
299 .set_cpu_ids(&[0]),
300 );
301 builder.park_on_errors = true;
302 let res = builder.spawn(move || {
303 thread_rt::suicide_myself(delay, warn);
304 });
305 if res.is_err() {
306 std::thread::spawn(move || {
307 thread_rt::suicide_myself(delay, warn);
308 });
309 }
310}
311
312#[cfg(feature = "rvideo")]
313pub use rvideo;
314
315#[cfg(feature = "rflow")]
316pub use rflow;
317
318#[cfg(feature = "rvideo")]
319pub fn serve_rvideo() -> std::result::Result<(), rvideo::Error> {
321 rvideo::serve("0.0.0.0:3001")
322}
323
324#[cfg(feature = "rflow")]
325pub fn serve_rflow() -> std::result::Result<(), rflow::Error> {
327 rflow::serve("0.0.0.0:4001")
328}
329
330#[cfg(feature = "metrics")]
341pub fn metrics_exporter() -> metrics_exporter_prometheus::PrometheusBuilder {
342 metrics_exporter_prometheus::PrometheusBuilder::new()
343}
344
345#[cfg(feature = "metrics")]
352pub fn metrics_exporter_install(
353 builder: metrics_exporter_prometheus::PrometheusBuilder,
354) -> Result<()> {
355 let runtime = tokio::runtime::Builder::new_current_thread()
356 .enable_all()
357 .build()?;
358 let (prometheus_exporter, prometheus_exporter_fut) = {
359 let _g = runtime.enter();
360 builder.build().map_err(Error::failed)?
361 };
362 metrics_exporter_scope::ScopeBuilder::new()
363 .with_fallback(Box::new(prometheus_exporter))
364 .install()
365 .map_err(Error::failed)?;
366 std::thread::Builder::new()
367 .name("metrics_exporter".to_owned())
368 .spawn(move || {
369 runtime.block_on(prometheus_exporter_fut).unwrap();
370 })?;
371 Ok(())
372}
373
374static PANIC_PREVENT: atomic::AtomicI32 = atomic::AtomicI32::new(0);
375static PANIC_DELAY_NS: atomic::AtomicU64 = atomic::AtomicU64::new(0);
376
377pub fn setup_panic() {
380 std::panic::set_hook(Box::new(move |info| {
381 panic(info);
382 }));
383}
384
385pub fn set_panic_delay(delay: Duration) {
392 PANIC_DELAY_NS.store(
393 delay.as_nanos().try_into().unwrap(),
394 atomic::Ordering::Relaxed,
395 );
396}
397
398pub fn prevent_panic_suicide() {
400 #[cfg(target_os = "linux")]
401 {
402 let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
403 PANIC_PREVENT.store(tid, atomic::Ordering::SeqCst);
404 }
405}
406
407pub fn allow_panic_suicide() {
409 PANIC_PREVENT.store(0, atomic::Ordering::SeqCst);
410}
411
412fn panic(info: &PanicHookInfo) -> ! {
413 eprintln!("{}", info.to_string().red().bold());
414 #[cfg(target_os = "linux")]
415 {
416 let mut can_suicide = true;
417 let pp = PANIC_PREVENT.load(atomic::Ordering::SeqCst);
418 if pp != 0 {
419 let tid = unsafe { i32::try_from(libc::syscall(libc::SYS_gettid)).unwrap_or(-200) };
420 can_suicide = tid == pp;
421 }
422 if can_suicide {
423 let panic_delay = Duration::from_nanos(PANIC_DELAY_NS.load(atomic::Ordering::Relaxed));
424 thread_rt::suicide_myself(panic_delay, false);
425 }
426 }
427 loop {
428 std::thread::park();
429 }
430}
431
432pub fn is_production() -> bool {
434 env::var("INVOCATION_ID").is_ok_and(|v| !v.is_empty())
435}
436
437pub fn configure_logger(filter: LevelFilter) {
440 let mut builder = env_logger::Builder::new();
441 builder.target(env_logger::Target::Stderr);
442 builder.filter_level(filter);
443 if is_production()
444 && !env::var("ROBOPLC_LOG_STDOUT").is_ok_and(|v| v == "1")
445 && !env::var("ROBOPLC_MODE").is_ok_and(|m| m == "exec")
446 {
447 builder.format(|buf, record| writeln!(buf, "{} {}", record.level(), record.args()));
448 }
449 builder.init();
450}
451
452#[cfg(target_os = "linux")]
454pub fn reload_executable() -> Result<()> {
455 let mut current_exe = std::env::current_exe()?;
456 let fname = current_exe
458 .file_name()
459 .ok_or_else(|| Error::Failed("No file name".to_owned()))?
460 .to_string_lossy()
461 .trim_end_matches(" (deleted)")
462 .to_owned();
463 current_exe = current_exe.with_file_name(fname);
464 let _ = std::os::unix::process::CommandExt::exec(&mut std::process::Command::new(current_exe));
465 Ok(())
466}
467
468#[cfg(not(target_os = "linux"))]
470pub fn reload_executable() -> Result<()> {
471 Err(Error::Unimplemented)
472}
473
474pub mod prelude {
476 pub use super::suicide;
477 pub use crate::controller::*;
478 pub use crate::hub::prelude::*;
479 pub use crate::io::prelude::*;
480 pub use crate::supervisor::prelude::*;
481 pub use crate::time::DurationRT;
482 pub use bma_ts::{Monotonic, Timestamp};
483 pub use rtsc::DataPolicy;
484 pub use std::time::Duration;
485}