1#[macro_export]
2macro_rules! direct_request {
3 ($self:expr, $key:ident, $($req:tt)*) => {{
4 let key = PendingRequestKey::$key;
5 let req = Request::$key(mutant_protocol::$($req)*);
6
7 if $self.pending_requests.lock().unwrap().contains_key(&key) {
8 return Err(ClientError::InternalError(
9 "Another list_keys request is already pending".to_string(),
10 ));
11 } else {
12 let (sender, receiver) = oneshot::channel();
13 let pending_sender = PendingSender::$key(sender);
14
15 $self
16 .pending_requests
17 .lock()
18 .unwrap()
19 .insert(key.clone(), pending_sender);
20
21 match $self.send_request(req).await {
22 Ok(_) => {
23 debug!("{} request sent, waiting for response...", stringify!($key));
24 match receiver.await {
25 Ok(result) => result,
26 Err(_) => {
27 $self.pending_requests.lock().unwrap().remove(&key);
28 error!("{} response channel canceled", stringify!($key));
29 Err(ClientError::InternalError(
30 "{} response channel canceled".to_string(),
31 ))
32 }
33 }
34 }
35 Err(e) => {
36 $self.pending_requests.lock().unwrap().remove(&key);
37 error!("Failed to send {} request: {:?}", stringify!($key), e);
38 Err(e)
39 }
40 }
41 }
42 }};
43}
44
45#[macro_export]
46macro_rules! long_request {
47 ($self:expr, $key:ident, $($req:tt)*) => {{
48 let key = PendingRequestKey::TaskCreation;
49 let req = Request::$key(mutant_protocol::$($req)*);
50 if $self.pending_requests.lock().unwrap().contains_key(&key) {
51 return Err(ClientError::InternalError(
52 "Another put/get request is already pending".to_string(),
53 ));
54 }
55
56 let (completion_tx, completion_rx) = oneshot::channel();
57 let (progress_tx, progress_rx) = mpsc::unbounded_channel();
58
59 let (task_creation_tx, task_creation_rx) = oneshot::channel();
60 $self.pending_requests.lock().unwrap().insert(
61 key.clone(),
62 PendingSender::TaskCreation(
63 task_creation_tx,
64 (completion_tx, progress_tx),
65 TaskType::$key,
66 ),
67 );
68
69 let start_task = async move {
70 match $self.send_request(req).await {
71 Ok(_) => {
72 debug!("{} request sent, waiting for TaskCreated response...", stringify!($key));
73 let task_id = task_creation_rx.await.map_err(|_| {
74 ClientError::InternalError("TaskCreated channel canceled".to_string())
75 })??;
76
77 info!("Task created with ID: {}", task_id);
78
79 completion_rx.await.map_err(|_| {
80 error!("Completion channel canceled");
81 ClientError::InternalError("Completion channel canceled".to_string())
82 })?
83 }
84 Err(e) => {
85 error!("Failed to send {} request: {:?}", stringify!($key), e);
86 $self.pending_requests.lock().unwrap().remove(&key);
87 Err(e)
88 }
89 }
90 };
91
92 Ok((start_task, progress_rx))
93
94 }}
95}