rsiot_component_core/
component_executor.rs

1use tokio::{
2    sync::{broadcast, mpsc},
3    task::JoinSet,
4};
5
6use rsiot_messages_core::{system_messages::*, *};
7use tracing::{debug, error, info, trace, warn};
8use uuid::Uuid;
9
10use crate::{error::ComponentError, types::FnAuth, Cache, CmpInOut, IComponent};
11
12/// Запуск коллекции компонентов в работу
13///
14/// # Примеры
15///
16/// ## Многопоточное окружение
17///
18/// TODO
19///
20/// ## Однопоточное окружение
21///
22/// TODO
23///
24/// ## Однопоточное окружение - WASM (Leptos)
25///
26/// ```rust
27/// use leptos::*;
28///
29/// let context = LocalSet::new();
30/// context.spawn_local(async move {
31///     ComponentExecutor::<Message>::new(100, "example")
32///         .add_cmp(cmp_websocket_client_wasm::Cmp::new(ws_client_config))
33///         .add_cmp(cmp_leptos::Cmp::new(leptos_config))
34///         .wait_result()
35///         .await?;
36///     Ok(()) as anyhow::Result<()>
37/// });
38/// spawn_local(context);
39/// Ok(())
40/// ```
41pub struct ComponentExecutor<TMsg> {
42    task_set: JoinSet<Result<(), ComponentError>>,
43    cmp_in_out: CmpInOut<TMsg>,
44}
45
46/// Настройка исполнителя
47pub struct ComponentExecutorConfig<TMsg> {
48    /// Размер буфера канала сообщения
49    pub buffer_size: usize,
50
51    /// Название исполнителя
52    pub executor_name: String,
53
54    /// Функция фильтрации сообщений в зависимости от текущей авторизации
55    ///
56    /// # Примеры
57    ///
58    /// ## Все сообщения блокируются
59    ///
60    /// ```rust
61    /// |_, _| None
62    /// ```
63    ///
64    /// ## Все сообщения разрешены
65    ///
66    /// ```rust
67    /// |msg, _| Some(msg)
68    /// ```
69    pub fn_auth: FnAuth<TMsg>,
70}
71
72impl<TMsg> ComponentExecutor<TMsg>
73where
74    TMsg: MsgDataBound + 'static,
75{
76    /// Создание коллекции компонентов
77    pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
78        info!("ComponentExecutor start creation");
79        let id = MsgTrace::generate_uuid();
80        let (component_input_send, component_input) =
81            broadcast::channel::<Message<TMsg>>(config.buffer_size);
82        let (component_output, component_output_recv) =
83            mpsc::channel::<Message<TMsg>>(config.buffer_size);
84        let cache: Cache<TMsg> = Cache::new();
85        let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
86
87        let task_internal_handle = task_internal(
88            component_output_recv,
89            component_input_send.clone(),
90            cache.clone(),
91            config.executor_name.clone(),
92            id,
93        );
94
95        if cfg!(feature = "single-thread") {
96            task_set.spawn_local(task_internal_handle);
97        } else {
98            task_set.spawn(task_internal_handle);
99        }
100
101        let cmp_in_out = CmpInOut::new(
102            component_input,
103            component_output,
104            cache.clone(),
105            &config.executor_name,
106            id,
107            AuthPermissions::default(),
108            config.fn_auth,
109        );
110
111        Self {
112            task_set,
113            cmp_in_out,
114        }
115    }
116
117    /// Добавить компонент
118    #[cfg(not(feature = "single-thread"))]
119    pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
120        component.set_interface(self.cmp_in_out.clone());
121
122        self.task_set.spawn(async move { component.spawn().await });
123
124        self
125    }
126    /// Добавить компонент (?Send)
127    #[cfg(feature = "single-thread")]
128    pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
129        component.set_interface(self.cmp_in_out.clone());
130
131        self.task_set
132            .spawn_local(async move { component.spawn().await });
133        self
134    }
135
136    /// Запустить на выполнение все компоненты.
137    ///
138    /// Компоненты не должны заканчивать выполнение. Если хоть один остановился (неважно по какой
139    /// причине - по ошибке или нет), это ошибка выполнения.
140    pub async fn wait_result(&mut self) -> Result<(), ComponentError> {
141        let msg;
142        if let Some(result) = self.task_set.join_next().await {
143            match result {
144                Ok(result) => match result {
145                    Ok(_) => msg = "Component has finished executing".to_string(),
146                    Err(err) => {
147                        msg = format!("Component has finished executing with error: {:?}", err);
148                    }
149                },
150                Err(err) => {
151                    msg = format!("Component has finished executing with error: {:?}", err);
152                }
153            };
154            error!(msg);
155            return Err(ComponentError::Execution(msg));
156        }
157        Ok(())
158    }
159}
160
161async fn task_internal<TMsg>(
162    mut input: mpsc::Receiver<Message<TMsg>>,
163    output: broadcast::Sender<Message<TMsg>>,
164    cache: Cache<TMsg>,
165    executor_name: String,
166    executor_id: Uuid,
167) -> Result<(), ComponentError>
168where
169    TMsg: MsgDataBound,
170{
171    debug!("Internal task of ComponentExecutor: starting");
172    while let Some(mut msg) = input.recv().await {
173        trace!("ComponentExecutor: new message: {:?}", msg);
174        msg.add_trace_item(&executor_id, &format!("{executor_name}::internal_bus"));
175        save_msg_in_cache(&msg, &cache).await;
176        output.send(msg).map_err(|err| {
177            let err = format!(
178                "Internal task of ComponentExecutor: send to channel error, {:?}",
179                err
180            );
181            ComponentError::Initialization(err)
182        })?;
183    }
184    warn!("Internal task: stop");
185    Ok(())
186}
187
188/// Сохраняем сообщение в кеше
189async fn save_msg_in_cache<TMsg>(msg: &Message<TMsg>, cache: &Cache<TMsg>)
190where
191    TMsg: MsgDataBound,
192{
193    // Фильтруем сообщения авторизации
194    match &msg.data {
195        MsgData::System(data) => match data {
196            System::AuthRequestByLogin(_) => return,
197            System::AuthRequestByToken(_) => return,
198            System::AuthResponseErr(_) => return,
199            System::AuthResponseOk(_) => return,
200        },
201        _ => (),
202    }
203    let key = msg.key.clone();
204    let value = msg.clone();
205    {
206        let mut lock = cache.write().await;
207        // let value_from_cache = lock.get(&key);
208        // if let Some(value_from_cache) = value_from_cache {
209        //     // если в кеше более новое сообщение, отбрасываем
210        //     if value.ts <= value_from_cache.ts {
211        //         continue;
212        //     }
213        // }
214        lock.insert(key, value);
215    }
216}