cp_microservice/logic/
executor.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_channel::Sender;
7use tokio::time::timeout;
8
9use crate::core::error::{Error, ErrorKind};
10
11pub type Executor<LogicRequestType, StorageRequestType> = Arc<
12 dyn Fn(
13 LogicRequestType,
14 Sender<StorageRequestType>,
15 ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync>>
16 + Send
17 + Sync,
18>;
19
20pub async fn timeout_send_storage_request<StorageRequestType, OkResultType>(
21 timeout_after_milliseconds: u64,
22 storage_request: StorageRequestType,
23 sender: &Sender<StorageRequestType>,
24 api_replier: tokio::sync::oneshot::Sender<Result<OkResultType, Error>>,
25) -> Result<tokio::sync::oneshot::Sender<Result<OkResultType, Error>>, Error> {
26 match timeout(
27 Duration::from_millis(timeout_after_milliseconds),
28 sender.send(storage_request),
29 )
30 .await
31 {
32 Ok(result) => match result {
33 Ok(_) => (),
34 Err(error) => {
35 let error = Error::new(
36 ErrorKind::LogicError,
37 format!("failed to send storage request: {}", &error),
38 );
39
40 if let Err(_) = api_replier.send(Err(error.clone())) {
41 log::warn!("failed to reply to api with an error");
42 }
43
44 return Err(error);
45 }
46 },
47 Err(error) => {
48 let error = Error::new(
49 ErrorKind::LogicError,
50 format!("timed out sending storage request: {}", &error),
51 );
52
53 if let Err(_) = api_replier.send(Err(error.clone())) {
54 log::warn!("failed to reply to api with an error");
55 }
56
57 return Err(error);
58 }
59 }
60
61 Ok(api_replier)
62}
63
64pub async fn timeout_receive_storage_response<StorageOkResultType, LogicOkResultType>(
65 timeout_after_milliseconds: u64,
66 storage_receiver: tokio::sync::oneshot::Receiver<Result<StorageOkResultType, Error>>,
67 api_replier: tokio::sync::oneshot::Sender<Result<LogicOkResultType, Error>>,
68) -> Result<
69 (
70 tokio::sync::oneshot::Sender<Result<LogicOkResultType, Error>>,
71 StorageOkResultType,
72 ),
73 Error,
74> {
75 let ok_result = match timeout(
76 Duration::from_millis(timeout_after_milliseconds),
77 storage_receiver,
78 )
79 .await
80 {
81 Ok(result) => match result {
82 Ok(result) => match result {
83 Ok(ok_result) => ok_result,
84 Err(error) => {
85 let error = Error::new(
86 ErrorKind::LogicError,
87 format!("storage failed to handle request: {}", &error),
88 );
89
90 if let Err(_) = api_replier.send(Err(error.clone())) {
91 log::warn!("failed to reply to api with an error");
92 }
93
94 return Err(error);
95 }
96 },
97 Err(error) => {
98 let error = Error::new(
99 ErrorKind::LogicError,
100 format!("failed to receive response from storage: {}", &error),
101 );
102
103 if let Err(_) = api_replier.send(Err(error.clone())) {
104 log::warn!("failed to reply to api with an error")
105 }
106
107 return Err(error);
108 }
109 },
110 Err(error) => {
111 let error = Error::new(
112 ErrorKind::LogicError,
113 format!("timed out receiving response from storage: {}", &error),
114 );
115
116 if let Err(_) = api_replier.send(Err(error.clone())) {
117 log::warn!("failed to reply to api with an error");
118 }
119
120 return Err(error);
121 }
122 };
123
124 Ok((api_replier, ok_result))
125}