cp_microservice/logic/
dispatch.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::mem;
4use std::mem::Discriminant;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_channel::{Receiver, Sender};
9use log::info;
10use tokio::time::timeout;
11use tokio_util::sync::CancellationToken;
12
13use crate::core::error::Error;
14use crate::logic::executor::Executor;
15
16pub struct Dispatch<LogicRequestType: Debug, StorageRequestType> {
17    logic_request_receiver: Receiver<LogicRequestType>,
18    executors:
19        HashMap<Discriminant<LogicRequestType>, Executor<LogicRequestType, StorageRequestType>>,
20    storage_request_sender: Sender<StorageRequestType>,
21    cancellation_token: CancellationToken,
22}
23
24impl<LogicRequestType: Debug, StorageRequestType> Dispatch<LogicRequestType, StorageRequestType> {
25    pub fn new(
26        logic_request_receiver: Receiver<LogicRequestType>,
27        executors: HashMap<
28            Discriminant<LogicRequestType>,
29            Executor<LogicRequestType, StorageRequestType>,
30        >,
31        storage_request_sender: Sender<StorageRequestType>,
32        cancellation_token: CancellationToken,
33    ) -> Dispatch<LogicRequestType, StorageRequestType> {
34        Dispatch {
35            logic_request_receiver,
36            executors,
37            storage_request_sender,
38            cancellation_token,
39        }
40    }
41
42    pub async fn run(self) {
43        loop {
44            if self.cancellation_token.is_cancelled() && self.logic_request_receiver.is_empty() {
45                info!(
46                    "cancellation token is cancelled and logic request receiver is empty, logic dispatch is stopping"
47                );
48
49                break;
50            }
51
52            let logic_request = match self.logic_request_receiver.recv().await {
53                Ok(logic_request) => logic_request,
54                Err(_) => {
55                    info!("failed to receive logic request");
56                    continue;
57                }
58            };
59
60            let executor = match self.executors.get(&mem::discriminant(&logic_request)) {
61                Some(executor) => executor,
62                None => {
63                    info!(
64                        "failed to find discriminant for logic request: {:?}",
65                        logic_request
66                    );
67                    continue;
68                }
69            };
70
71            if let Err(error) = executor(logic_request, self.storage_request_sender.clone()).await {
72                info!("logic executor returned error: {}", &error);
73            }
74        }
75    }
76}
77
78#[cfg(test)]
79async fn dummy_executor(
80    _value: LogicRequest,
81    storage_sender: Sender<StorageRequest>,
82) -> Result<(), Error> {
83    storage_sender
84        .send(StorageRequest::DummyElement(
85            TEST_STORAGE_REQUEST_VALUE.to_string(),
86        ))
87        .await
88        .expect("failed to send storage request");
89
90    Ok(())
91}
92
93const TEST_STORAGE_REQUEST_VALUE: &str = "ok";
94
95#[derive(Debug)]
96pub enum LogicRequest {
97    DummyElement(String),
98}
99
100pub enum StorageRequest {
101    DummyElement(String),
102}
103
104#[tokio::test]
105pub async fn run_expected_executors() {
106    let exec: Executor<LogicRequest, StorageRequest> =
107        Arc::new(|logic_request, storage_request_sender| {
108            Box::pin(dummy_executor(logic_request, storage_request_sender))
109        });
110
111    let executors: HashMap<Discriminant<LogicRequest>, Executor<LogicRequest, StorageRequest>> =
112        HashMap::from([(
113            mem::discriminant(&LogicRequest::DummyElement("".to_string())),
114            exec,
115        )]);
116
117    let (sender, receiver) = async_channel::unbounded::<LogicRequest>();
118    let (storage_request_sender, storage_request_receiver) =
119        async_channel::unbounded::<StorageRequest>();
120
121    let dispatch: Dispatch<LogicRequest, StorageRequest> = Dispatch::new(
122        receiver,
123        executors,
124        storage_request_sender,
125        CancellationToken::new(),
126    );
127
128    tokio::spawn(dispatch.run());
129
130    sender
131        .send(LogicRequest::DummyElement("random".to_string()))
132        .await
133        .expect("failed to send logic request");
134
135    let request: StorageRequest = timeout(
136        Duration::from_millis(200u64),
137        storage_request_receiver.recv(),
138    )
139    .await
140    .expect("timeout waiting for storage request")
141    .expect("failed to receive storage request");
142
143    match request {
144        StorageRequest::DummyElement(value) => assert_eq!(TEST_STORAGE_REQUEST_VALUE, value),
145        _ => panic!("unexpected storage request value received"),
146    }
147}