cp_microservice/logic/
dispatch.rs1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::mem;
4use std::mem::Discriminant;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_channel::{Receiver, Sender};
9use log::info;
10use tokio::time::timeout;
11use tokio_util::sync::CancellationToken;
12
13use crate::core::error::Error;
14use crate::logic::executor::Executor;
15
16pub struct Dispatch<LogicRequestType: Debug, StorageRequestType> {
17 logic_request_receiver: Receiver<LogicRequestType>,
18 executors:
19 HashMap<Discriminant<LogicRequestType>, Executor<LogicRequestType, StorageRequestType>>,
20 storage_request_sender: Sender<StorageRequestType>,
21 cancellation_token: CancellationToken,
22}
23
24impl<LogicRequestType: Debug, StorageRequestType> Dispatch<LogicRequestType, StorageRequestType> {
25 pub fn new(
26 logic_request_receiver: Receiver<LogicRequestType>,
27 executors: HashMap<
28 Discriminant<LogicRequestType>,
29 Executor<LogicRequestType, StorageRequestType>,
30 >,
31 storage_request_sender: Sender<StorageRequestType>,
32 cancellation_token: CancellationToken,
33 ) -> Dispatch<LogicRequestType, StorageRequestType> {
34 Dispatch {
35 logic_request_receiver,
36 executors,
37 storage_request_sender,
38 cancellation_token,
39 }
40 }
41
42 pub async fn run(self) {
43 loop {
44 if self.cancellation_token.is_cancelled() && self.logic_request_receiver.is_empty() {
45 info!(
46 "cancellation token is cancelled and logic request receiver is empty, logic dispatch is stopping"
47 );
48
49 break;
50 }
51
52 let logic_request = match self.logic_request_receiver.recv().await {
53 Ok(logic_request) => logic_request,
54 Err(_) => {
55 info!("failed to receive logic request");
56 continue;
57 }
58 };
59
60 let executor = match self.executors.get(&mem::discriminant(&logic_request)) {
61 Some(executor) => executor,
62 None => {
63 info!(
64 "failed to find discriminant for logic request: {:?}",
65 logic_request
66 );
67 continue;
68 }
69 };
70
71 if let Err(error) = executor(logic_request, self.storage_request_sender.clone()).await {
72 info!("logic executor returned error: {}", &error);
73 }
74 }
75 }
76}
77
78#[cfg(test)]
79async fn dummy_executor(
80 _value: LogicRequest,
81 storage_sender: Sender<StorageRequest>,
82) -> Result<(), Error> {
83 storage_sender
84 .send(StorageRequest::DummyElement(
85 TEST_STORAGE_REQUEST_VALUE.to_string(),
86 ))
87 .await
88 .expect("failed to send storage request");
89
90 Ok(())
91}
92
93const TEST_STORAGE_REQUEST_VALUE: &str = "ok";
94
95#[derive(Debug)]
96pub enum LogicRequest {
97 DummyElement(String),
98}
99
100pub enum StorageRequest {
101 DummyElement(String),
102}
103
104#[tokio::test]
105pub async fn run_expected_executors() {
106 let exec: Executor<LogicRequest, StorageRequest> =
107 Arc::new(|logic_request, storage_request_sender| {
108 Box::pin(dummy_executor(logic_request, storage_request_sender))
109 });
110
111 let executors: HashMap<Discriminant<LogicRequest>, Executor<LogicRequest, StorageRequest>> =
112 HashMap::from([(
113 mem::discriminant(&LogicRequest::DummyElement("".to_string())),
114 exec,
115 )]);
116
117 let (sender, receiver) = async_channel::unbounded::<LogicRequest>();
118 let (storage_request_sender, storage_request_receiver) =
119 async_channel::unbounded::<StorageRequest>();
120
121 let dispatch: Dispatch<LogicRequest, StorageRequest> = Dispatch::new(
122 receiver,
123 executors,
124 storage_request_sender,
125 CancellationToken::new(),
126 );
127
128 tokio::spawn(dispatch.run());
129
130 sender
131 .send(LogicRequest::DummyElement("random".to_string()))
132 .await
133 .expect("failed to send logic request");
134
135 let request: StorageRequest = timeout(
136 Duration::from_millis(200u64),
137 storage_request_receiver.recv(),
138 )
139 .await
140 .expect("timeout waiting for storage request")
141 .expect("failed to receive storage request");
142
143 match request {
144 StorageRequest::DummyElement(value) => assert_eq!(TEST_STORAGE_REQUEST_VALUE, value),
145 _ => panic!("unexpected storage request value received"),
146 }
147}