rsiot_extra_components/
cmp_external_fn_process.rs

1//! Тестирование документации:
2//!
3//! ```bash
4//! cargo test -p rsiot-extra-components --doc cmp_external_fn_process; cargo test -p rsiot-extra-components --doc cmp_external_fn_process --features single-thread
5//! ```
6
7use async_trait::async_trait;
8
9#[cfg(feature = "single-thread")]
10use futures::future::LocalBoxFuture;
11
12#[cfg(not(feature = "single-thread"))]
13use futures::future::BoxFuture;
14
15use rsiot_component_core::{
16    CmpInOut, Component, ComponentError, ComponentResult, IComponentProcess,
17};
18use rsiot_messages_core::*;
19
20#[cfg(feature = "single-thread")]
21type FnProcess<TMsg> = Box<dyn Fn(CmpInOut<TMsg>) -> LocalBoxFuture<'static, ComponentResult>>;
22
23#[cfg(not(feature = "single-thread"))]
24type FnProcess<TMsg> =
25    Box<dyn Fn(CmpInOut<TMsg>) -> BoxFuture<'static, ComponentResult> + Send + Sync>;
26
27pub struct Config<TMsg> {
28    /// Внешняя функция для выполнения
29    ///
30    /// Выполняемую асинхронную функцию `fn_external` необходимо обернуть в функцию.
31    ///
32    /// ```rust
33    /// # use rsiot_extra_components::cmp_external_fn_process;
34    /// # // insert-start test single_thread
35    /// use std::time::Duration;
36    ///
37    /// use futures::future::LocalBoxFuture;
38    /// use tokio::time::sleep;
39    /// use tracing::info;
40    ///
41    /// use rsiot_component_core::{Cache, CmpInput, CmpOutput, ComponentResult};
42    /// use rsiot_messages_core::{example_message::*, *};
43    ///
44    /// fn fn_process_wrapper<TMsg>(
45    ///     input: CmpInput<TMsg>,
46    ///     output: CmpOutput<TMsg>,
47    ///     cache: Cache<TMsg>,
48    /// ) -> LocalBoxFuture<'static, ComponentResult>
49    /// where
50    ///     TMsg: MsgDataBound + 'static,
51    /// {
52    ///     Box::pin(async { fn_process(input, output, cache).await })
53    /// }
54    /// async fn fn_process<TMsg>(
55    ///     _input: CmpInput<TMsg>,
56    ///     _output: CmpOutput<TMsg>,
57    ///     _cache: Cache<TMsg>,
58    /// ) -> ComponentResult {
59    ///     loop {
60    ///         info!("External fn process");
61    ///         sleep(Duration::from_secs(2)).await;
62    ///     }
63    /// }
64    ///
65    /// let _config = cmp_external_fn_process::Config {
66    ///     fn_process: Box::new(fn_process_wrapper::<Custom>),
67    /// };
68    /// # // insert-end
69    /// ```
70    #[cfg(feature = "single-thread")]
71    pub fn_process: FnProcess<TMsg>,
72
73    /// Внешняя функция для выполнения
74    ///
75    /// Выполняемую асинхронную функцию `fn_external` необходимо обернуть в функцию.
76    ///
77    /// ```rust
78    /// # use rsiot_extra_components::cmp_external_fn_process;
79    /// # // insert-start test multi_thread
80    /// use std::time::Duration;
81    ///
82    /// use futures::future::BoxFuture;
83    /// use tokio::time::sleep;
84    /// use tracing::info;
85    ///
86    /// use rsiot_component_core::{Cache, CmpInput, CmpOutput, ComponentResult};
87    /// use rsiot_messages_core::{example_message::*, *};
88    ///
89    /// fn fn_process_wrapper<TMsg>(
90    ///     input: CmpInput<TMsg>,
91    ///     output: CmpOutput<TMsg>,
92    ///     cache: Cache<TMsg>,
93    /// ) -> BoxFuture<'static, ComponentResult>
94    /// where
95    ///     TMsg: MsgDataBound + 'static,
96    /// {
97    ///     Box::pin(async { fn_process(input, output, cache).await })
98    /// }
99    ///
100    /// async fn fn_process<TMsg>(
101    ///     _input: CmpInput<TMsg>,
102    ///     _output: CmpOutput<TMsg>,
103    ///     _cache: Cache<TMsg>,
104    /// ) -> ComponentResult {
105    ///     loop {
106    ///         info!("External fn process");
107    ///         sleep(Duration::from_secs(2)).await;
108    ///     }
109    /// }
110    ///
111    /// let _config = cmp_external_fn_process::Config {
112    ///     fn_process: Box::new(fn_process_wrapper::<Custom>),
113    /// };
114    /// # // insert-end
115    /// ```
116    #[cfg(not(feature = "single-thread"))]
117    pub fn_process: FnProcess<TMsg>,
118}
119
120#[cfg_attr(not(feature = "single-thread"), async_trait)]
121#[cfg_attr(feature = "single-thread", async_trait(?Send))]
122#[async_trait(?Send)]
123impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
124where
125    TMsg: MsgDataBound,
126{
127    async fn process(
128        &self,
129        config: Config<TMsg>,
130        in_out: CmpInOut<TMsg>,
131    ) -> Result<(), ComponentError> {
132        (config.fn_process)(
133            in_out.clone_with_new_id("cmp_extrenal_fn_process", AuthPermissions::FullAccess),
134        )
135        .await
136    }
137}
138
139pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;
140
141#[cfg(test)]
142mod tests {
143
144    use super::super::cmp_external_fn_process;
145
146    #[cfg(feature = "single-thread")]
147    #[test]
148    fn single_thread() {
149        use std::time::Duration;
150
151        use futures::future::LocalBoxFuture;
152        use tokio::time::sleep;
153        use tracing::info;
154
155        use rsiot_component_core::{CmpInOut, ComponentResult};
156        use rsiot_messages_core::{example_message::*, *};
157
158        fn fn_process_wrapper<TMsg>(
159            in_out: CmpInOut<TMsg>,
160        ) -> LocalBoxFuture<'static, ComponentResult>
161        where
162            TMsg: MsgDataBound + 'static,
163        {
164            Box::pin(async { fn_process(in_out).await })
165        }
166        async fn fn_process<TMsg>(_in_out: CmpInOut<TMsg>) -> ComponentResult {
167            loop {
168                info!("External fn process");
169                sleep(Duration::from_secs(2)).await;
170            }
171        }
172
173        let _config = cmp_external_fn_process::Config {
174            fn_process: Box::new(fn_process_wrapper::<Custom>),
175        };
176    }
177
178    #[cfg(not(feature = "single-thread"))]
179    #[test]
180    fn multi_thread() {
181        use std::time::Duration;
182
183        use futures::future::BoxFuture;
184        use tokio::time::sleep;
185        use tracing::info;
186
187        use rsiot_component_core::{CmpInOut, ComponentResult};
188        use rsiot_messages_core::{example_message::*, *};
189
190        fn fn_process_wrapper<TMsg>(in_out: CmpInOut<TMsg>) -> BoxFuture<'static, ComponentResult>
191        where
192            TMsg: MsgDataBound + 'static,
193        {
194            Box::pin(async { fn_process(in_out).await })
195        }
196
197        async fn fn_process<TMsg>(_in_out: CmpInOut<TMsg>) -> ComponentResult {
198            loop {
199                info!("External fn process");
200                sleep(Duration::from_secs(2)).await;
201            }
202        }
203
204        let _config = cmp_external_fn_process::Config {
205            fn_process: Box::new(fn_process_wrapper::<Custom>),
206        };
207    }
208}