rsiot_component_core/
cmp_in_out.rs

1use std::{cmp::max, fmt::Debug};
2
3use tracing::{info, trace};
4use uuid::Uuid;
5
6use rsiot_messages_core::{system_messages::*, *};
7
8use crate::{
9    types::{CmpInput, CmpOutput, FnAuth},
10    Cache, ComponentError,
11};
12
13#[derive(Debug)]
14pub struct CmpInOut<TMsg> {
15    input: CmpInput<TMsg>,
16    output: CmpOutput<TMsg>,
17    pub cache: Cache<TMsg>,
18    name: String,
19    id: Uuid,
20    auth_perm: AuthPermissions,
21    fn_auth: FnAuth<TMsg>,
22}
23
24impl<TMsg> CmpInOut<TMsg>
25where
26    TMsg: MsgDataBound,
27{
28    pub fn new(
29        input: CmpInput<TMsg>,
30        output: CmpOutput<TMsg>,
31        cache: Cache<TMsg>,
32        name: &str,
33        id: Uuid,
34        auth_perm: AuthPermissions,
35        fn_auth: FnAuth<TMsg>,
36    ) -> Self {
37        info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
38        Self {
39            input,
40            output,
41            cache,
42            id,
43            name: name.into(),
44            auth_perm,
45            fn_auth,
46        }
47    }
48
49    pub fn clone_with_new_id(&self, name: &str, auth_perm: AuthPermissions) -> Self {
50        let name = format!("{}::{}", self.name, name);
51        let id = MsgTrace::generate_uuid();
52        info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
53        Self {
54            input: self.input.resubscribe(),
55            output: self.output.clone(),
56            cache: self.cache.clone(),
57            name,
58            id,
59            auth_perm,
60            fn_auth: self.fn_auth,
61        }
62    }
63
64    /// Получение сообщений со входа
65    pub async fn recv_input(&mut self) -> Result<Message<TMsg>, ComponentError> {
66        loop {
67            let msg = self
68                .input
69                .recv()
70                .await
71                .map_err(|e| ComponentError::CmpInput(e.to_string()))?;
72
73            // Обновляем уровень авторизации при получении системного сообщения. Пропускаем
74            // сообщение, если запрос на авторизацию не проходил через данный компонент
75            if let MsgData::System(System::AuthResponseOk(value)) = &msg.data {
76                if !value.trace_ids.contains(&self.id) {
77                    continue;
78                }
79                self.auth_perm = max(self.auth_perm, value.perm);
80            }
81            if let MsgData::System(System::AuthResponseErr(value)) = &msg.data {
82                if !value.trace_ids.contains(&self.id) {
83                    continue;
84                }
85            }
86
87            // Если данное сообщение было сгенерировано данным сервисом, пропускаем
88            if msg.contains_trace_item(&self.id) {
89                continue;
90            }
91
92            // Если нет авторизации, пропускаем
93            let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
94                continue;
95            };
96
97            msg.add_trace_item(&self.id, &self.name);
98            return Ok(msg);
99        }
100    }
101
102    /// Возвращает копию сообщений из кеша
103    pub async fn recv_cache_all(&self) -> Vec<Message<TMsg>> {
104        let lock = self.cache.read().await;
105        lock.values()
106            .cloned()
107            .filter_map(|m| (self.fn_auth)(m, &self.auth_perm))
108            .collect()
109    }
110
111    /// Возвращает сообщение из кеша по ключу
112    pub async fn recv_cache_msg(&self, key: &str) -> Option<Message<TMsg>> {
113        let cache = self.cache.read().await;
114        cache.get(key).map(|m| m.to_owned())
115    }
116
117    /// Отправка сообщений на выход
118    pub async fn send_output(&self, msg: Message<TMsg>) -> Result<(), ComponentError> {
119        trace!("Start send to output: {msg:?}");
120        // Если нет авторизации, пропускаем
121        let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
122            trace!("No authorization. Auth: {:?}", self.auth_perm);
123            return Ok(());
124        };
125
126        msg.add_trace_item(&self.id, &self.name);
127        self.output
128            .send(msg)
129            .await
130            .map_err(|e| ComponentError::CmpOutput(e.to_string()))
131    }
132}
133
134impl<TMsg> Clone for CmpInOut<TMsg>
135where
136    TMsg: Clone,
137{
138    fn clone(&self) -> Self {
139        Self {
140            input: self.input.resubscribe(),
141            output: self.output.clone(),
142            cache: self.cache.clone(),
143            id: self.id,
144            name: self.name.clone(),
145            auth_perm: self.auth_perm,
146            fn_auth: self.fn_auth,
147        }
148    }
149}