rsiot-plc 0.0.71

Компонент для исполнения логики наподобие PLC
Documentation
use std::time::Duration;

use instant::Instant;
use serde::Serialize;
use tracing::trace;

#[cfg(not(feature = "single-thread"))]
use tokio::task::spawn;
#[cfg(feature = "single-thread")]
use tokio::task::spawn_local;

#[cfg(target_arch = "wasm32")]
use gloo::timers::future::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;

use rsiot_component_core::{CmpInOut, ComponentError};
use rsiot_messages_core::MsgDataBound;

use crate::{
    config::Config,
    plc::function_block_base::{FunctionBlockBase, IFunctionBlock},
    Error,
};

type Result = std::result::Result<(), Error>;

pub async fn fn_process<TMessage, I, Q, S>(
    output: CmpInOut<TMessage>,
    config: Config<TMessage, I, Q, S>,
) -> std::result::Result<(), ComponentError>
where
    TMessage: MsgDataBound + 'static,
    I: Clone + Default + Send + Serialize + 'static + Sync,
    Q: Clone + Default + Send + Serialize + 'static + Sync,
    S: Clone + Default + Send + Serialize + 'static + Sync,
    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
    #[cfg(feature = "single-thread")]
    let handle = spawn_local(task_main_loop::<TMessage, I, Q, S>(output, config));

    #[cfg(not(feature = "single-thread"))]
    let handle = spawn(task_main_loop::<TMessage, I, Q, S>(output, config));

    handle
        .await
        .map_err(|err| ComponentError::Execution(err.to_string()))??;
    Ok(())
}

async fn task_main_loop<TMessage, I, Q, S>(
    output: CmpInOut<TMessage>,
    config: Config<TMessage, I, Q, S>,
) -> Result
where
    TMessage: MsgDataBound + 'static,
    I: Clone + Default + Send + Serialize + Sync,
    Q: Clone + Default + Send + Serialize + Sync,
    S: Clone + Default + Send + Serialize + Sync,
    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
    let mut fb_main = config.fb_main.clone();
    loop {
        trace!("Start PLC cycle");
        let begin = Instant::now();
        task_main::<TMessage, I, Q, S>(&output, &config, &mut fb_main).await?;
        let elapsed = begin.elapsed();
        trace!("End PLC cycle, elapsed: {:?}", elapsed);
        let sleep_time = if config.period <= elapsed {
            Duration::from_millis(10)
        } else {
            config.period - elapsed
        };
        sleep(sleep_time).await;
    }
}

async fn task_main<TMessage, I, Q, S>(
    output: &CmpInOut<TMessage>,
    config: &Config<TMessage, I, Q, S>,
    fb_main: &mut FunctionBlockBase<I, Q, S>,
) -> Result
where
    TMessage: MsgDataBound + 'static,
    I: Clone + Default + Send + Serialize,
    Q: Clone + Default + Send + Serialize,
    S: Clone + Default + Send + Serialize,
    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
    let mut input = I::default();
    {
        let cache = output.recv_cache_all().await;
        for msg in cache {
            (config.fn_input)(&mut input, &msg);
        }
    }
    fb_main.call(input);
    let msgs = (config.fn_output)(&fb_main.output);
    for msg in msgs {
        output.send_output(msg).await.map_err(Error::CmpOutput)?;
    }
    Ok(())
}