1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
use std::fmt::Debug;
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
};
use rsiot_messages_core::*;
use tracing::{debug, error, info, trace, warn};
use crate::{error::ComponentError, types::FnAuth, Cache, CmpInOut, IComponent};
/// Запуск коллекции компонентов в работу
///
/// # Примеры
///
/// ## Многопоточное окружение
///
/// TODO
///
/// ## Однопоточное окружение
///
/// TODO
///
/// ## Однопоточное окружение - WASM (Leptos)
///
/// ```rust
/// use leptos::*;
///
/// let context = LocalSet::new();
/// context.spawn_local(async move {
/// ComponentExecutor::<Message>::new(100, "example")
/// .add_cmp(cmp_websocket_client_wasm::Cmp::new(ws_client_config))
/// .add_cmp(cmp_leptos::Cmp::new(leptos_config))
/// .wait_result()
/// .await?;
/// Ok(()) as anyhow::Result<()>
/// });
/// spawn_local(context);
/// Ok(())
/// ```
pub struct ComponentExecutor<TMsg> {
task_set: JoinSet<Result<(), ComponentError>>,
cmp_in_out: CmpInOut<TMsg>,
}
/// Настройка исполнителя
pub struct ComponentExecutorConfig<TMsg> {
/// Размер буфера канала сообщения
pub buffer_size: usize,
/// Название исполнителя
pub executor_name: String,
/// Функция фильтрации сообщений в зависимости от текущей авторизации
///
/// # Примеры
///
/// ## Все сообщения блокируются
///
/// ```rust
/// |_, _| None
/// ```
///
/// ## Все сообщения разрешены
///
/// ```rust
/// |msg, _| Some(msg)
/// ```
pub fn_auth: FnAuth<TMsg>,
}
impl<TMsg> ComponentExecutor<TMsg>
where
TMsg: MsgDataBound + 'static,
{
/// Создание коллекции компонентов
pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
info!("ComponentExecutor start creation");
let (component_input_send, component_input) =
broadcast::channel::<Message<TMsg>>(config.buffer_size);
let (component_output, component_output_recv) =
mpsc::channel::<Message<TMsg>>(config.buffer_size);
let cache: Cache<TMsg> = Cache::new();
let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
let task_internal_handle = task_internal(
component_output_recv,
component_input_send.clone(),
cache.clone(),
);
if cfg!(feature = "single-thread") {
task_set.spawn_local(task_internal_handle);
} else {
task_set.spawn(task_internal_handle);
}
let cmp_in_out = CmpInOut::new(
component_input,
component_output,
cache.clone(),
&config.executor_name,
AuthPermissions::default(),
config.fn_auth.clone(),
);
Self {
task_set,
cmp_in_out,
}
}
/// Добавить компонент
#[cfg(not(feature = "single-thread"))]
pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
component.set_interface(self.cmp_in_out.clone());
self.task_set.spawn(async move { component.spawn().await });
self
}
/// Добавить компонент (?Send)
#[cfg(feature = "single-thread")]
pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
component.set_interface(self.cmp_in_out.clone());
self.task_set
.spawn_local(async move { component.spawn().await });
self
}
/// Запустить на выполнение все компоненты.
///
/// Компоненты не должны заканчивать выполнение. Если хоть один остановился (неважно по какой
/// причине - по ошибке или нет), это ошибка выполнения.
pub async fn wait_result(&mut self) -> Result<(), ComponentError> {
let msg;
if let Some(result) = self.task_set.join_next().await {
match result {
Ok(result) => match result {
Ok(_) => msg = "Component has finished executing".to_string(),
Err(err) => {
msg = format!("Component has finished executing with error: {:?}", err);
}
},
Err(err) => {
msg = format!("Component has finished executing with error: {:?}", err);
}
};
error!(msg);
return Err(ComponentError::Execution(msg));
}
Ok(())
}
}
async fn task_internal<TMsg>(
mut input: mpsc::Receiver<Message<TMsg>>,
output: broadcast::Sender<Message<TMsg>>,
cache: Cache<TMsg>,
) -> Result<(), ComponentError>
where
TMsg: Clone + Debug,
{
debug!("Internal task of ComponentExecutor: starting");
while let Some(msg) = input.recv().await {
trace!("Internal task of ComponentExecutor: new message: {:?}", msg);
let key = msg.key.clone();
let value = msg.clone();
{
let mut lock = cache.write().await;
// let value_from_cache = lock.get(&key);
// if let Some(value_from_cache) = value_from_cache {
// // если в кеше более новое сообщение, отбрасываем
// if value.ts <= value_from_cache.ts {
// continue;
// }
// }
lock.insert(key, value);
}
output.send(msg).map_err(|err| {
let err = format!(
"Internal task of ComponentExecutor: send to channel error, {:?}",
err
);
ComponentError::Initialization(err)
})?;
}
warn!("Internal task: stop");
Ok(())
}