rsiot_extra_components/cmp_external_fn_process.rs
1//! Тестирование документации:
2//!
3//! ```bash
4//! cargo test -p rsiot-extra-components --doc cmp_external_fn_process; cargo test -p rsiot-extra-components --doc cmp_external_fn_process --features single-thread
5//! ```
6
7use async_trait::async_trait;
8
9#[cfg(feature = "single-thread")]
10use futures::future::LocalBoxFuture;
11
12#[cfg(not(feature = "single-thread"))]
13use futures::future::BoxFuture;
14
15use rsiot_component_core::{
16 CmpInOut, Component, ComponentError, ComponentResult, IComponentProcess,
17};
18use rsiot_messages_core::*;
19
20#[cfg(feature = "single-thread")]
21type FnProcess<TMsg> = Box<dyn Fn(CmpInOut<TMsg>) -> LocalBoxFuture<'static, ComponentResult>>;
22
23#[cfg(not(feature = "single-thread"))]
24type FnProcess<TMsg> =
25 Box<dyn Fn(CmpInOut<TMsg>) -> BoxFuture<'static, ComponentResult> + Send + Sync>;
26
27pub struct Config<TMsg> {
28 /// Внешняя функция для выполнения
29 ///
30 /// Выполняемую асинхронную функцию `fn_external` необходимо обернуть в функцию.
31 ///
32 /// ```rust
33 /// # use rsiot_extra_components::cmp_external_fn_process;
34 /// # // insert-start test single_thread
35 /// use std::time::Duration;
36 ///
37 /// use futures::future::LocalBoxFuture;
38 /// use tokio::time::sleep;
39 /// use tracing::info;
40 ///
41 /// use rsiot_component_core::{Cache, CmpInput, CmpOutput, ComponentResult};
42 /// use rsiot_messages_core::{example_message::*, *};
43 ///
44 /// fn fn_process_wrapper<TMsg>(
45 /// input: CmpInput<TMsg>,
46 /// output: CmpOutput<TMsg>,
47 /// cache: Cache<TMsg>,
48 /// ) -> LocalBoxFuture<'static, ComponentResult>
49 /// where
50 /// TMsg: MsgDataBound + 'static,
51 /// {
52 /// Box::pin(async { fn_process(input, output, cache).await })
53 /// }
54 /// async fn fn_process<TMsg>(
55 /// _input: CmpInput<TMsg>,
56 /// _output: CmpOutput<TMsg>,
57 /// _cache: Cache<TMsg>,
58 /// ) -> ComponentResult {
59 /// loop {
60 /// info!("External fn process");
61 /// sleep(Duration::from_secs(2)).await;
62 /// }
63 /// }
64 ///
65 /// let _config = cmp_external_fn_process::Config {
66 /// fn_process: Box::new(fn_process_wrapper::<Custom>),
67 /// };
68 /// # // insert-end
69 /// ```
70 #[cfg(feature = "single-thread")]
71 pub fn_process: FnProcess<TMsg>,
72
73 /// Внешняя функция для выполнения
74 ///
75 /// Выполняемую асинхронную функцию `fn_external` необходимо обернуть в функцию.
76 ///
77 /// ```rust
78 /// # use rsiot_extra_components::cmp_external_fn_process;
79 /// # // insert-start test multi_thread
80 /// use std::time::Duration;
81 ///
82 /// use futures::future::BoxFuture;
83 /// use tokio::time::sleep;
84 /// use tracing::info;
85 ///
86 /// use rsiot_component_core::{Cache, CmpInput, CmpOutput, ComponentResult};
87 /// use rsiot_messages_core::{example_message::*, *};
88 ///
89 /// fn fn_process_wrapper<TMsg>(
90 /// input: CmpInput<TMsg>,
91 /// output: CmpOutput<TMsg>,
92 /// cache: Cache<TMsg>,
93 /// ) -> BoxFuture<'static, ComponentResult>
94 /// where
95 /// TMsg: MsgDataBound + 'static,
96 /// {
97 /// Box::pin(async { fn_process(input, output, cache).await })
98 /// }
99 ///
100 /// async fn fn_process<TMsg>(
101 /// _input: CmpInput<TMsg>,
102 /// _output: CmpOutput<TMsg>,
103 /// _cache: Cache<TMsg>,
104 /// ) -> ComponentResult {
105 /// loop {
106 /// info!("External fn process");
107 /// sleep(Duration::from_secs(2)).await;
108 /// }
109 /// }
110 ///
111 /// let _config = cmp_external_fn_process::Config {
112 /// fn_process: Box::new(fn_process_wrapper::<Custom>),
113 /// };
114 /// # // insert-end
115 /// ```
116 #[cfg(not(feature = "single-thread"))]
117 pub fn_process: FnProcess<TMsg>,
118}
119
120#[cfg_attr(not(feature = "single-thread"), async_trait)]
121#[cfg_attr(feature = "single-thread", async_trait(?Send))]
122#[async_trait(?Send)]
123impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
124where
125 TMsg: MsgDataBound,
126{
127 async fn process(
128 &self,
129 config: Config<TMsg>,
130 in_out: CmpInOut<TMsg>,
131 ) -> Result<(), ComponentError> {
132 (config.fn_process)(
133 in_out.clone_with_new_id("cmp_extrenal_fn_process", AuthPermissions::FullAccess),
134 )
135 .await
136 }
137}
138
139pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;
140
141#[cfg(test)]
142mod tests {
143
144 use super::super::cmp_external_fn_process;
145
146 #[cfg(feature = "single-thread")]
147 #[test]
148 fn single_thread() {
149 use std::time::Duration;
150
151 use futures::future::LocalBoxFuture;
152 use tokio::time::sleep;
153 use tracing::info;
154
155 use rsiot_component_core::{CmpInOut, ComponentResult};
156 use rsiot_messages_core::{example_message::*, *};
157
158 fn fn_process_wrapper<TMsg>(
159 in_out: CmpInOut<TMsg>,
160 ) -> LocalBoxFuture<'static, ComponentResult>
161 where
162 TMsg: MsgDataBound + 'static,
163 {
164 Box::pin(async { fn_process(in_out).await })
165 }
166 async fn fn_process<TMsg>(_in_out: CmpInOut<TMsg>) -> ComponentResult {
167 loop {
168 info!("External fn process");
169 sleep(Duration::from_secs(2)).await;
170 }
171 }
172
173 let _config = cmp_external_fn_process::Config {
174 fn_process: Box::new(fn_process_wrapper::<Custom>),
175 };
176 }
177
178 #[cfg(not(feature = "single-thread"))]
179 #[test]
180 fn multi_thread() {
181 use std::time::Duration;
182
183 use futures::future::BoxFuture;
184 use tokio::time::sleep;
185 use tracing::info;
186
187 use rsiot_component_core::{CmpInOut, ComponentResult};
188 use rsiot_messages_core::{example_message::*, *};
189
190 fn fn_process_wrapper<TMsg>(in_out: CmpInOut<TMsg>) -> BoxFuture<'static, ComponentResult>
191 where
192 TMsg: MsgDataBound + 'static,
193 {
194 Box::pin(async { fn_process(in_out).await })
195 }
196
197 async fn fn_process<TMsg>(_in_out: CmpInOut<TMsg>) -> ComponentResult {
198 loop {
199 info!("External fn process");
200 sleep(Duration::from_secs(2)).await;
201 }
202 }
203
204 let _config = cmp_external_fn_process::Config {
205 fn_process: Box::new(fn_process_wrapper::<Custom>),
206 };
207 }
208}