mutant_client/
macros.rs

1#[macro_export]
2macro_rules! direct_request {
3    ($self:expr, $key:ident, $($req:tt)*) => {{
4        let key = PendingRequestKey::$key;
5        let req = Request::$key(mutant_protocol::$($req)*);
6
7        if $self.pending_requests.lock().unwrap().contains_key(&key) {
8            return Err(ClientError::InternalError(
9                "Another list_keys request is already pending".to_string(),
10            ));
11        } else {
12            let (sender, receiver) = oneshot::channel();
13            let pending_sender = PendingSender::$key(sender);
14
15            $self
16                .pending_requests
17                .lock()
18                .unwrap()
19                .insert(key.clone(), pending_sender);
20
21            match $self.send_request(req).await {
22                Ok(_) => {
23                    debug!("{} request sent, waiting for response...", stringify!($key));
24                    match receiver.await {
25                        Ok(result) => result,
26                        Err(_) => {
27                            $self.pending_requests.lock().unwrap().remove(&key);
28                            error!("{} response channel canceled", stringify!($key));
29                            Err(ClientError::InternalError(
30                                "{} response channel canceled".to_string(),
31                            ))
32                        }
33                    }
34                }
35                Err(e) => {
36                    $self.pending_requests.lock().unwrap().remove(&key);
37                    error!("Failed to send {} request: {:?}", stringify!($key), e);
38                    Err(e)
39                }
40            }
41        }
42    }};
43}
44
45#[macro_export]
46macro_rules! long_request {
47    ($self:expr, $key:ident, $($req:tt)*) => {{
48        let key = PendingRequestKey::TaskCreation;
49        let req = Request::$key(mutant_protocol::$($req)*);
50        if $self.pending_requests.lock().unwrap().contains_key(&key) {
51            return Err(ClientError::InternalError(
52                "Another put/get request is already pending".to_string(),
53            ));
54        }
55
56        let (completion_tx, completion_rx) = oneshot::channel();
57        let (progress_tx, progress_rx) = mpsc::unbounded_channel();
58
59        let (task_creation_tx, task_creation_rx) = oneshot::channel();
60        $self.pending_requests.lock().unwrap().insert(
61            key.clone(),
62            PendingSender::TaskCreation(
63                task_creation_tx,
64                (completion_tx, progress_tx),
65                TaskType::$key,
66            ),
67        );
68
69        let start_task = async move {
70            match $self.send_request(req).await {
71                Ok(_) => {
72                    debug!("{} request sent, waiting for TaskCreated response...", stringify!($key));
73                    let task_id = task_creation_rx.await.map_err(|_| {
74                        ClientError::InternalError("TaskCreated channel canceled".to_string())
75                    })??;
76
77                    info!("Task created with ID: {}", task_id);
78
79                    completion_rx.await.map_err(|_| {
80                        error!("Completion channel canceled");
81                        ClientError::InternalError("Completion channel canceled".to_string())
82                    })?
83                }
84                Err(e) => {
85                    error!("Failed to send {} request: {:?}", stringify!($key), e);
86                    $self.pending_requests.lock().unwrap().remove(&key);
87                    Err(e)
88                }
89            }
90        };
91
92        Ok((start_task, progress_rx))
93
94    }}
95}