use std::{
sync::{
atomic::{AtomicI8, Ordering},
Arc,
},
thread,
time::Duration,
};
#[cfg(target_os = "linux")]
use crate::suicide;
use crate::{
critical,
hub::Hub,
supervisor::Supervisor,
thread_rt::{Builder, RTParams, Scheduling},
Error, Result,
};
pub use roboplc_derive::WorkerOpts;
use rtsc::data_policy::DataDeliveryPolicy;
#[cfg(target_os = "linux")]
use signal_hook::{
consts::{SIGINT, SIGTERM, SIGUSR2},
iterator::Signals,
};
use tracing::error;
pub mod prelude {
pub use super::{Context, Controller, WResult, Worker, WorkerOptions};
pub use roboplc_derive::WorkerOpts;
}
pub type WResult = std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>;
pub type HandlerResult =
std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
pub const SLEEP_STEP: Duration = Duration::from_millis(100);
#[derive(Clone)]
pub struct State {
state: Arc<AtomicI8>,
}
impl State {
fn new() -> Self {
Self {
state: AtomicI8::new(ControllerStateKind::Starting as i8).into(),
}
}
pub fn set(&self, state: ControllerStateKind) {
self.state.store(state as i8, Ordering::SeqCst);
}
pub fn get(&self) -> ControllerStateKind {
ControllerStateKind::from(self.state.load(Ordering::SeqCst))
}
pub fn is_online(&self) -> bool {
self.get() >= ControllerStateKind::Starting
}
}
impl Default for State {
fn default() -> Self {
Self::new()
}
}
#[derive(Default, Eq, PartialEq, Clone, Copy, Ord, PartialOrd)]
#[repr(i8)]
#[allow(clippy::module_name_repetitions)]
pub enum ControllerStateKind {
#[default]
Starting = 0,
Active = 1,
Running = 2,
Stopping = -1,
Stopped = -100,
Unknown = -128,
}
impl From<i8> for ControllerStateKind {
fn from(v: i8) -> Self {
match v {
0 => ControllerStateKind::Starting,
1 => ControllerStateKind::Active,
2 => ControllerStateKind::Running,
-100 => ControllerStateKind::Stopped,
_ => ControllerStateKind::Unknown,
}
}
}
pub struct Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
{
supervisor: Supervisor<()>,
hub: Hub<D>,
state: State,
variables: Arc<V>,
}
impl<D, V> Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
{
pub fn new() -> Self
where
V: Default,
{
Self {
supervisor: <_>::default(),
hub: <_>::default(),
state: State::new(),
variables: <_>::default(),
}
}
pub fn new_with_variables(variables: V) -> Self {
Self {
supervisor: <_>::default(),
hub: <_>::default(),
state: State::new(),
variables: Arc::new(variables),
}
}
pub fn spawn_worker<W: Worker<D, V> + WorkerOptions + 'static>(
&mut self,
mut worker: W,
) -> Result<()> {
let context = self.context();
let mut rt_params = RTParams::new().set_scheduling(worker.worker_scheduling());
if let Some(priority) = worker.worker_priority() {
rt_params = rt_params.set_priority(priority);
}
if let Some(cpu_ids) = worker.worker_cpu_ids() {
rt_params = rt_params.set_cpu_ids(cpu_ids);
}
let mut builder = Builder::new()
.name(worker.worker_name())
.rt_params(rt_params)
.blocking(worker.worker_is_blocking());
if let Some(stack_size) = worker.worker_stack_size() {
builder = builder.stack_size(stack_size);
}
self.supervisor.spawn(builder, move || {
if let Err(e) = worker.run(&context) {
error!(worker=worker.worker_name(), error=%e, "worker terminated");
critical(&format!(
"Worker {} terminated: {}",
worker.worker_name(),
e
));
}
})?;
Ok(())
}
pub fn spawn_task<F>(&mut self, name: &str, f: F) -> Result<()>
where
F: FnOnce() + Send + 'static,
{
self.supervisor.spawn(Builder::new().name(name), f)?;
Ok(())
}
pub fn register_signals(&mut self, shutdown_timeout: Duration) -> Result<()> {
self.register_signals_with_handlers(|_| {}, |_| Ok(()), shutdown_timeout)
}
pub fn register_signals_with_handlers<SH, RH>(
&mut self,
#[allow(unused_variables)] shutdown_handler_fn: SH,
#[allow(unused_variables)] reload_handler_fn: RH,
#[allow(unused_variables)] shutdown_timeout: Duration,
) -> Result<()>
where
SH: Fn(&Context<D, V>) + Send + Sync + 'static,
RH: Fn(&Context<D, V>) -> HandlerResult + Send + Sync + 'static,
{
let shutdown_handler = Arc::new(shutdown_handler_fn);
let reload_handler = Arc::new(reload_handler_fn);
let mut builder = Builder::new().name("RoboPLCSigRT").rt_params(
RTParams::new()
.set_priority(99)
.set_scheduling(Scheduling::FIFO)
.set_cpu_ids(&[0]),
);
builder.park_on_errors = true;
macro_rules! sig_handler {
($shutdown_handler: expr, $reload_handler: expr) => {{
#[cfg(target_os = "linux")]
{
let context = self.context();
let mut signals = Signals::new([SIGTERM, SIGINT, SIGUSR2])?;
move || {
if let Some(sig) = signals.forever().next() {
match sig {
SIGTERM | SIGINT => {
suicide(shutdown_timeout, true);
$shutdown_handler(&context);
context.terminate();
}
SIGUSR2 => {
tracing::warn!("Performing live reload");
if let Err(e) = $reload_handler(&context) {
error!(error=%e, "reload handler");
} else if let Err(e) = crate::reload_executable() {
panic!("Live reload failed: {}", e);
}
}
_ => unreachable!(),
}
}
}
}
#[cfg(not(target_os = "linux"))]
{
move || {}
}
}};
}
#[allow(unused_variables)]
let sh = shutdown_handler.clone();
#[allow(unused_variables)]
let rh = reload_handler.clone();
if let Err(e) = self.supervisor.spawn(builder.clone(), sig_handler!(sh, rh)) {
if !matches!(e, Error::RTSchedSetSchduler(_)) {
return Err(e);
}
} else {
return Ok(());
}
let builder = builder.name("RoboPLCSig").rt_params(RTParams::new());
self.supervisor
.spawn(builder, sig_handler!(shutdown_handler, reload_handler))?;
Ok(())
}
fn context(&self) -> Context<D, V> {
Context {
hub: self.hub.clone(),
state: self.state.clone(),
variables: self.variables.clone(),
}
}
pub fn block(&mut self) {
self.supervisor.join_all();
self.state.set(ControllerStateKind::Stopped);
}
pub fn block_while_online(&self) {
while self.state.is_online() {
thread::sleep(SLEEP_STEP);
}
self.state.set(ControllerStateKind::Stopped);
}
pub fn is_online(&self) {
self.state.is_online();
}
pub fn terminate(&mut self) {
self.state.set(ControllerStateKind::Stopping);
}
pub fn state(&self) -> &State {
&self.state
}
pub fn hub(&self) -> &Hub<D> {
&self.hub
}
pub fn supervisor(&self) -> &Supervisor<()> {
&self.supervisor
}
pub fn variables(&self) -> &Arc<V> {
&self.variables
}
}
impl<D, V> Default for Controller<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send + Sync + 'static + Default,
{
fn default() -> Self {
Self::new()
}
}
pub struct Context<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send,
{
hub: Hub<D>,
state: State,
variables: Arc<V>,
}
impl<D, V> Clone for Context<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send,
{
fn clone(&self) -> Self {
Self {
hub: self.hub.clone(),
state: self.state.clone(),
variables: self.variables.clone(),
}
}
}
impl<D, V> Context<D, V>
where
D: DataDeliveryPolicy + Clone + Send + Sync + 'static,
V: Send,
{
pub fn hub(&self) -> &Hub<D> {
&self.hub
}
pub fn variables(&self) -> &Arc<V> {
&self.variables
}
pub fn get_state(&self) -> ControllerStateKind {
self.state.get()
}
pub fn set_state(&self, state: ControllerStateKind) {
self.state.set(state);
}
pub fn is_online(&self) -> bool {
self.state.is_online()
}
pub fn terminate(&self) {
self.state.set(ControllerStateKind::Stopping);
}
}
pub trait Worker<D: DataDeliveryPolicy + Clone + Send + Sync + 'static, V: Send>:
Send + Sync
{
fn run(&mut self, context: &Context<D, V>) -> WResult;
}
pub trait WorkerOptions {
fn worker_name(&self) -> &str;
fn worker_stack_size(&self) -> Option<usize> {
None
}
fn worker_scheduling(&self) -> Scheduling {
Scheduling::default()
}
fn worker_priority(&self) -> Option<i32> {
None
}
fn worker_cpu_ids(&self) -> Option<&[usize]> {
None
}
fn worker_is_blocking(&self) -> bool {
false
}
}