rsiot_component_core/
cmp_in_out.rs1use std::{cmp::max, fmt::Debug};
2
3use tracing::{info, trace};
4use uuid::Uuid;
5
6use rsiot_messages_core::{system_messages::*, *};
7
8use crate::{
9 types::{CmpInput, CmpOutput, FnAuth},
10 Cache, ComponentError,
11};
12
13#[derive(Debug)]
14pub struct CmpInOut<TMsg> {
15 input: CmpInput<TMsg>,
16 output: CmpOutput<TMsg>,
17 pub cache: Cache<TMsg>,
18 name: String,
19 id: Uuid,
20 auth_perm: AuthPermissions,
21 fn_auth: FnAuth<TMsg>,
22}
23
24impl<TMsg> CmpInOut<TMsg>
25where
26 TMsg: MsgDataBound,
27{
28 pub fn new(
29 input: CmpInput<TMsg>,
30 output: CmpOutput<TMsg>,
31 cache: Cache<TMsg>,
32 name: &str,
33 id: Uuid,
34 auth_perm: AuthPermissions,
35 fn_auth: FnAuth<TMsg>,
36 ) -> Self {
37 info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
38 Self {
39 input,
40 output,
41 cache,
42 id,
43 name: name.into(),
44 auth_perm,
45 fn_auth,
46 }
47 }
48
49 pub fn clone_with_new_id(&self, name: &str, auth_perm: AuthPermissions) -> Self {
50 let name = format!("{}::{}", self.name, name);
51 let id = MsgTrace::generate_uuid();
52 info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
53 Self {
54 input: self.input.resubscribe(),
55 output: self.output.clone(),
56 cache: self.cache.clone(),
57 name,
58 id,
59 auth_perm,
60 fn_auth: self.fn_auth,
61 }
62 }
63
64 pub async fn recv_input(&mut self) -> Result<Message<TMsg>, ComponentError> {
66 loop {
67 let msg = self
68 .input
69 .recv()
70 .await
71 .map_err(|e| ComponentError::CmpInput(e.to_string()))?;
72
73 if let MsgData::System(System::AuthResponseOk(value)) = &msg.data {
76 if !value.trace_ids.contains(&self.id) {
77 continue;
78 }
79 self.auth_perm = max(self.auth_perm, value.perm);
80 }
81 if let MsgData::System(System::AuthResponseErr(value)) = &msg.data {
82 if !value.trace_ids.contains(&self.id) {
83 continue;
84 }
85 }
86
87 if msg.contains_trace_item(&self.id) {
89 continue;
90 }
91
92 let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
94 continue;
95 };
96
97 msg.add_trace_item(&self.id, &self.name);
98 return Ok(msg);
99 }
100 }
101
102 pub async fn recv_cache_all(&self) -> Vec<Message<TMsg>> {
104 let lock = self.cache.read().await;
105 lock.values()
106 .cloned()
107 .filter_map(|m| (self.fn_auth)(m, &self.auth_perm))
108 .collect()
109 }
110
111 pub async fn recv_cache_msg(&self, key: &str) -> Option<Message<TMsg>> {
113 let cache = self.cache.read().await;
114 cache.get(key).map(|m| m.to_owned())
115 }
116
117 pub async fn send_output(&self, msg: Message<TMsg>) -> Result<(), ComponentError> {
119 trace!("Start send to output: {msg:?}");
120 let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
122 trace!("No authorization. Auth: {:?}", self.auth_perm);
123 return Ok(());
124 };
125
126 msg.add_trace_item(&self.id, &self.name);
127 self.output
128 .send(msg)
129 .await
130 .map_err(|e| ComponentError::CmpOutput(e.to_string()))
131 }
132}
133
134impl<TMsg> Clone for CmpInOut<TMsg>
135where
136 TMsg: Clone,
137{
138 fn clone(&self) -> Self {
139 Self {
140 input: self.input.resubscribe(),
141 output: self.output.clone(),
142 cache: self.cache.clone(),
143 id: self.id,
144 name: self.name.clone(),
145 auth_perm: self.auth_perm,
146 fn_auth: self.fn_auth,
147 }
148 }
149}