rsiot_component_core/
component_executor.rs1use tokio::{
2 sync::{broadcast, mpsc},
3 task::JoinSet,
4};
5
6use rsiot_messages_core::{system_messages::*, *};
7use tracing::{debug, error, info, trace, warn};
8use uuid::Uuid;
9
10use crate::{error::ComponentError, types::FnAuth, Cache, CmpInOut, IComponent};
11
12pub struct ComponentExecutor<TMsg> {
42 task_set: JoinSet<Result<(), ComponentError>>,
43 cmp_in_out: CmpInOut<TMsg>,
44}
45
46pub struct ComponentExecutorConfig<TMsg> {
48 pub buffer_size: usize,
50
51 pub executor_name: String,
53
54 pub fn_auth: FnAuth<TMsg>,
70}
71
72impl<TMsg> ComponentExecutor<TMsg>
73where
74 TMsg: MsgDataBound + 'static,
75{
76 pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
78 info!("ComponentExecutor start creation");
79 let id = MsgTrace::generate_uuid();
80 let (component_input_send, component_input) =
81 broadcast::channel::<Message<TMsg>>(config.buffer_size);
82 let (component_output, component_output_recv) =
83 mpsc::channel::<Message<TMsg>>(config.buffer_size);
84 let cache: Cache<TMsg> = Cache::new();
85 let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
86
87 let task_internal_handle = task_internal(
88 component_output_recv,
89 component_input_send.clone(),
90 cache.clone(),
91 config.executor_name.clone(),
92 id,
93 );
94
95 if cfg!(feature = "single-thread") {
96 task_set.spawn_local(task_internal_handle);
97 } else {
98 task_set.spawn(task_internal_handle);
99 }
100
101 let cmp_in_out = CmpInOut::new(
102 component_input,
103 component_output,
104 cache.clone(),
105 &config.executor_name,
106 id,
107 AuthPermissions::default(),
108 config.fn_auth,
109 );
110
111 Self {
112 task_set,
113 cmp_in_out,
114 }
115 }
116
117 #[cfg(not(feature = "single-thread"))]
119 pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
120 component.set_interface(self.cmp_in_out.clone());
121
122 self.task_set.spawn(async move { component.spawn().await });
123
124 self
125 }
126 #[cfg(feature = "single-thread")]
128 pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
129 component.set_interface(self.cmp_in_out.clone());
130
131 self.task_set
132 .spawn_local(async move { component.spawn().await });
133 self
134 }
135
136 pub async fn wait_result(&mut self) -> Result<(), ComponentError> {
141 let msg;
142 if let Some(result) = self.task_set.join_next().await {
143 match result {
144 Ok(result) => match result {
145 Ok(_) => msg = "Component has finished executing".to_string(),
146 Err(err) => {
147 msg = format!("Component has finished executing with error: {:?}", err);
148 }
149 },
150 Err(err) => {
151 msg = format!("Component has finished executing with error: {:?}", err);
152 }
153 };
154 error!(msg);
155 return Err(ComponentError::Execution(msg));
156 }
157 Ok(())
158 }
159}
160
161async fn task_internal<TMsg>(
162 mut input: mpsc::Receiver<Message<TMsg>>,
163 output: broadcast::Sender<Message<TMsg>>,
164 cache: Cache<TMsg>,
165 executor_name: String,
166 executor_id: Uuid,
167) -> Result<(), ComponentError>
168where
169 TMsg: MsgDataBound,
170{
171 debug!("Internal task of ComponentExecutor: starting");
172 while let Some(mut msg) = input.recv().await {
173 trace!("ComponentExecutor: new message: {:?}", msg);
174 msg.add_trace_item(&executor_id, &format!("{executor_name}::internal_bus"));
175 save_msg_in_cache(&msg, &cache).await;
176 output.send(msg).map_err(|err| {
177 let err = format!(
178 "Internal task of ComponentExecutor: send to channel error, {:?}",
179 err
180 );
181 ComponentError::Initialization(err)
182 })?;
183 }
184 warn!("Internal task: stop");
185 Ok(())
186}
187
188async fn save_msg_in_cache<TMsg>(msg: &Message<TMsg>, cache: &Cache<TMsg>)
190where
191 TMsg: MsgDataBound,
192{
193 match &msg.data {
195 MsgData::System(data) => match data {
196 System::AuthRequestByLogin(_) => return,
197 System::AuthRequestByToken(_) => return,
198 System::AuthResponseErr(_) => return,
199 System::AuthResponseOk(_) => return,
200 },
201 _ => (),
202 }
203 let key = msg.key.clone();
204 let value = msg.clone();
205 {
206 let mut lock = cache.write().await;
207 lock.insert(key, value);
215 }
216}