use std::fmt::Debug;
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
};
use rsiot_messages_core::*;
use tracing::{debug, error, info, trace, warn};
use crate::{error::ComponentError, Cache, CmpInOut, IComponent};
pub struct ComponentExecutor<TMsg> {
task_set: JoinSet<Result<(), ComponentError>>,
cache: Cache<TMsg>,
cmp_in_out: CmpInOut<TMsg>,
}
impl<TMsg> ComponentExecutor<TMsg>
where
TMsg: MsgDataBound + 'static,
{
pub fn new(buffer_size: usize, executor_name: &str) -> Self {
info!("ComponentExecutor start creation");
let (component_input_send, component_input) =
broadcast::channel::<Message<TMsg>>(buffer_size);
let (component_output, component_output_recv) = mpsc::channel::<Message<TMsg>>(buffer_size);
let cache: Cache<TMsg> = Cache::new();
let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
let task_internal_handle = task_internal(
component_output_recv,
component_input_send.clone(),
cache.clone(),
);
if cfg!(feature = "single-thread") {
task_set.spawn_local(task_internal_handle);
} else {
task_set.spawn(task_internal_handle);
}
let cmp_in_out = CmpInOut::new(
component_input,
component_output,
executor_name,
AuthPermissions::Admin,
);
Self {
task_set,
cache,
cmp_in_out,
}
}
#[cfg(not(feature = "single-thread"))]
pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
component.set_interface(self.cmp_in_out.clone(), self.cache.clone());
self.task_set.spawn(async move { component.spawn().await });
self
}
#[cfg(feature = "single-thread")]
pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
component.set_interface(self.cmp_in_out.clone(), self.cache.clone());
self.task_set
.spawn_local(async move { component.spawn().await });
self
}
pub async fn wait_result(&mut self) -> Result<(), ComponentError> {
let msg;
if let Some(result) = self.task_set.join_next().await {
match result {
Ok(result) => match result {
Ok(_) => msg = "Component has finished executing".to_string(),
Err(err) => {
msg = format!("Component has finished executing with error: {:?}", err);
}
},
Err(err) => {
msg = format!("Component has finished executing with error: {:?}", err);
}
};
error!(msg);
return Err(ComponentError::Execution(msg));
}
Ok(())
}
}
async fn task_internal<TMsg>(
mut input: mpsc::Receiver<Message<TMsg>>,
output: broadcast::Sender<Message<TMsg>>,
cache: Cache<TMsg>,
) -> Result<(), ComponentError>
where
TMsg: Clone + Debug,
{
debug!("Internal task of ComponentExecutor: starting");
while let Some(msg) = input.recv().await {
trace!("Internal task of ComponentExecutor: new message: {:?}", msg);
let key = msg.key.clone();
let value = msg.clone();
{
let mut lock = cache.write().await;
lock.insert(key, value);
}
output.send(msg).map_err(|err| {
let err = format!(
"Internal task of ComponentExecutor: send to channel error, {:?}",
err
);
ComponentError::Initialization(err)
})?;
}
warn!("Internal task: stop");
Ok(())
}