cp_microservice/logic/
executor.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_channel::Sender;
7use tokio::time::timeout;
8
9use crate::core::error::{Error, ErrorKind};
10
11pub type Executor<LogicRequestType, StorageRequestType> = Arc<
12    dyn Fn(
13            LogicRequestType,
14            Sender<StorageRequestType>,
15        ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync>>
16        + Send
17        + Sync,
18>;
19
20pub async fn timeout_send_storage_request<StorageRequestType, OkResultType>(
21    timeout_after_milliseconds: u64,
22    storage_request: StorageRequestType,
23    sender: &Sender<StorageRequestType>,
24    api_replier: tokio::sync::oneshot::Sender<Result<OkResultType, Error>>,
25) -> Result<tokio::sync::oneshot::Sender<Result<OkResultType, Error>>, Error> {
26    match timeout(
27        Duration::from_millis(timeout_after_milliseconds),
28        sender.send(storage_request),
29    )
30    .await
31    {
32        Ok(result) => match result {
33            Ok(_) => (),
34            Err(error) => {
35                let error = Error::new(
36                    ErrorKind::LogicError,
37                    format!("failed to send storage request: {}", &error),
38                );
39
40                if let Err(_) = api_replier.send(Err(error.clone())) {
41                    log::warn!("failed to reply to api with an error");
42                }
43
44                return Err(error);
45            }
46        },
47        Err(error) => {
48            let error = Error::new(
49                ErrorKind::LogicError,
50                format!("timed out sending storage request: {}", &error),
51            );
52
53            if let Err(_) = api_replier.send(Err(error.clone())) {
54                log::warn!("failed to reply to api with an error");
55            }
56
57            return Err(error);
58        }
59    }
60
61    Ok(api_replier)
62}
63
64pub async fn timeout_receive_storage_response<StorageOkResultType, LogicOkResultType>(
65    timeout_after_milliseconds: u64,
66    storage_receiver: tokio::sync::oneshot::Receiver<Result<StorageOkResultType, Error>>,
67    api_replier: tokio::sync::oneshot::Sender<Result<LogicOkResultType, Error>>,
68) -> Result<
69    (
70        tokio::sync::oneshot::Sender<Result<LogicOkResultType, Error>>,
71        StorageOkResultType,
72    ),
73    Error,
74> {
75    let ok_result = match timeout(
76        Duration::from_millis(timeout_after_milliseconds),
77        storage_receiver,
78    )
79    .await
80    {
81        Ok(result) => match result {
82            Ok(result) => match result {
83                Ok(ok_result) => ok_result,
84                Err(error) => {
85                    let error = Error::new(
86                        ErrorKind::LogicError,
87                        format!("storage failed to handle request: {}", &error),
88                    );
89
90                    if let Err(_) = api_replier.send(Err(error.clone())) {
91                        log::warn!("failed to reply to api with an error");
92                    }
93
94                    return Err(error);
95                }
96            },
97            Err(error) => {
98                let error = Error::new(
99                    ErrorKind::LogicError,
100                    format!("failed to receive response from storage: {}", &error),
101                );
102
103                if let Err(_) = api_replier.send(Err(error.clone())) {
104                    log::warn!("failed to reply to api with an error")
105                }
106
107                return Err(error);
108            }
109        },
110        Err(error) => {
111            let error = Error::new(
112                ErrorKind::LogicError,
113                format!("timed out receiving response from storage: {}", &error),
114            );
115
116            if let Err(_) = api_replier.send(Err(error.clone())) {
117                log::warn!("failed to reply to api with an error");
118            }
119
120            return Err(error);
121        }
122    };
123
124    Ok((api_replier, ok_result))
125}