mutant_client/
response.rs

1use log::{debug, error, trace, warn};
2use mutant_protocol::{
3    ErrorResponse, ExportResponse, ImportResponse, ListKeysResponse, Response, RmSuccessResponse,
4    Task, TaskCreatedResponse, TaskListResponse, TaskProgress, TaskResult, TaskResultResponse,
5    TaskStatus, TaskStoppedResponse, TaskType, TaskUpdateResponse,
6};
7
8use crate::{
9    error::ClientError, ClientTaskMap, PendingRequestKey, PendingRequestMap, PendingSender,
10    TaskChannelsMap,
11};
12
13use super::MutantClient;
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16
17impl MutantClient {
18    /// Processes a deserialized response from the server
19    pub fn process_response(
20        response: Response,
21        tasks: &ClientTaskMap,
22        task_channels: &TaskChannelsMap,
23        pending_requests: &PendingRequestMap,
24    ) {
25        match response {
26            Response::TaskCreated(TaskCreatedResponse { task_id }) => {
27                let pending_sender = pending_requests
28                    .lock()
29                    .unwrap()
30                    .remove(&PendingRequestKey::TaskCreation);
31
32                if let Some(PendingSender::TaskCreation(sender, channels, task_type)) =
33                    pending_sender
34                {
35                    tasks.lock().unwrap().insert(
36                        task_id,
37                        Task {
38                            id: task_id,
39                            task_type,
40                            status: TaskStatus::Pending,
41                            progress: None,
42                            result: TaskResult::Pending,
43                            key: None,
44                        },
45                    );
46
47                    task_channels.lock().unwrap().insert(task_id, channels);
48
49                    if sender.send(Ok(task_id)).is_err() {
50                        warn!("Failed to send TaskCreated response to waiting future (receiver dropped?)");
51                        tasks.lock().unwrap().remove(&task_id);
52                        task_channels.lock().unwrap().remove(&task_id);
53                    }
54                } else {
55                    warn!(
56                        "Received TaskCreated ({}) but no matching request was pending.",
57                        task_id
58                    );
59                }
60            }
61            Response::TaskUpdate(TaskUpdateResponse {
62                task_id,
63                status,
64                progress,
65            }) => {
66                let mut tasks_guard = tasks.lock().unwrap();
67                let task_exists = tasks_guard.contains_key(&task_id);
68
69                if let Some(task) = tasks_guard.get_mut(&task_id) {
70                    task.status = status;
71                    task.progress = progress.clone();
72
73                    if let Some(progress_update) = progress {
74                        if let Some((_, progress_tx)) = task_channels.lock().unwrap().get(&task_id)
75                        {
76                            if progress_tx.send(Ok(progress_update)).is_err() {
77                                warn!("Failed to send progress update for task {}", task_id);
78                            }
79                        }
80                    }
81                } else {
82                    warn!(
83                        "Received TaskUpdate for unknown task {}, creating entry.",
84                        task_id
85                    );
86                    let task_type = match &progress {
87                        Some(TaskProgress::Put(_)) => TaskType::Put,
88                        _ => TaskType::Get,
89                    };
90                    tasks_guard.insert(
91                        task_id,
92                        Task {
93                            id: task_id,
94                            task_type,
95                            status,
96                            progress,
97                            result: TaskResult::Pending,
98                            key: None,
99                        },
100                    );
101                }
102
103                let pending_sender = pending_requests
104                    .lock()
105                    .unwrap()
106                    .remove(&PendingRequestKey::QueryTask);
107                if let Some(PendingSender::QueryTask(sender)) = pending_sender {
108                    if let Some(task) = tasks_guard.get(&task_id) {
109                        if sender.send(Ok(task.clone())).is_err() {
110                            warn!("Failed to send TaskUpdate response to QueryTask request (receiver dropped)");
111                        }
112                    } else {
113                        warn!("Task {} not found when trying to respond to QueryTask request after TaskUpdate.", task_id);
114                        let _ = sender.send(Err(ClientError::TaskNotFound(task_id)));
115                    }
116                } else if task_exists {
117                    // No pending query, just updated the task
118                } else {
119                    // No pending query and task was created by this update - already logged warning
120                }
121            }
122            Response::TaskResult(TaskResultResponse {
123                task_id,
124                status,
125                result,
126            }) => {
127                let mut tasks_guard = tasks.lock().unwrap();
128                let _task_existed = tasks_guard.contains_key(&task_id);
129
130                if let Some(task) = tasks_guard.get_mut(&task_id) {
131                    task.status = status;
132                    task.result = result.clone();
133
134                    if let Some((completion_tx, _)) = task_channels.lock().unwrap().remove(&task_id)
135                    {
136                        if completion_tx.send(Ok(result.clone())).is_err() {
137                            warn!(
138                                "Failed to send final task result for task {} (receiver dropped)",
139                                task_id
140                            );
141                        }
142                    }
143                } else {
144                    warn!(
145                        "Received TaskResult for unknown task {}, creating entry.",
146                        task_id
147                    );
148                    // FIXME: Nonsense
149                    let task_type = if let TaskResult::Error(_) = result {
150                        TaskType::Get
151                    } else {
152                        TaskType::Put
153                    };
154                    tasks_guard.insert(
155                        task_id,
156                        Task {
157                            id: task_id,
158                            task_type,
159                            status,
160                            result,
161                            progress: None,
162                            key: None,
163                        },
164                    );
165                }
166
167                let pending_sender = pending_requests
168                    .lock()
169                    .unwrap()
170                    .remove(&PendingRequestKey::QueryTask);
171                if let Some(PendingSender::QueryTask(sender)) = pending_sender {
172                    if let Some(task) = tasks_guard.get(&task_id) {
173                        if sender.send(Ok(task.clone())).is_err() {
174                            warn!("Failed to send TaskResult response to QueryTask request (receiver dropped)");
175                        }
176                    } else {
177                        warn!("Task {} not found when trying to respond to QueryTask request after TaskResult.", task_id);
178                        let _ = sender.send(Err(ClientError::TaskNotFound(task_id)));
179                    }
180                }
181            }
182            Response::TaskList(TaskListResponse { tasks: task_list }) => {
183                let pending_sender = pending_requests
184                    .lock()
185                    .unwrap()
186                    .remove(&PendingRequestKey::ListTasks);
187                if let Some(PendingSender::ListTasks(sender)) = pending_sender {
188                    if sender.send(Ok(task_list)).is_err() {
189                        warn!("Failed to send TaskList response (receiver dropped)");
190                    }
191                } else {
192                    warn!("Received TaskList but no ListTasks request was pending");
193                }
194            }
195            Response::Error(ErrorResponse {
196                error,
197                original_request: _,
198            }) => {
199                error!(
200                    "Server error received: {}. Check server logs for details.",
201                    error
202                );
203
204                let mut requests = pending_requests.lock().unwrap();
205
206                if let Some(PendingSender::TaskCreation(sender, _, _)) =
207                    requests.remove(&PendingRequestKey::TaskCreation)
208                {
209                    error!("Error occurred during task creation: {}", error);
210                    let _ = sender.send(Err(ClientError::ServerError(error.clone())));
211                } else if let Some(PendingSender::ListTasks(sender)) =
212                    requests.remove(&PendingRequestKey::ListTasks)
213                {
214                    error!("Error occurred during task list request: {}", error);
215                    let _ = sender.send(Err(ClientError::ServerError(error.clone())));
216                } else if let Some(PendingSender::QueryTask(sender)) =
217                    requests.remove(&PendingRequestKey::QueryTask)
218                {
219                    error!("Error occurred during task query request: {}", error);
220                    let _ = sender.send(Err(ClientError::ServerError(error.clone())));
221                } else if let Some(PendingSender::Rm(sender)) =
222                    requests.remove(&PendingRequestKey::Rm)
223                {
224                    error!("Error occurred during rm request: {}", error);
225                    let _ = sender.send(Err(ClientError::ServerError(error.clone())));
226                } else if let Some(PendingSender::ListKeys(sender)) =
227                    requests.remove(&PendingRequestKey::ListKeys)
228                {
229                    error!("Error occurred during list keys request: {}", error);
230                    let _ = sender.send(Err(ClientError::ServerError(error.clone())));
231                } else if let Some(PendingSender::Stats(sender)) =
232                    requests.remove(&PendingRequestKey::Stats)
233                {
234                    error!("Error occurred during stats request: {}", error);
235                    let _ = sender.send(Err(ClientError::ServerError(error)));
236                } else if let Some(PendingSender::Import(sender)) =
237                    requests.remove(&PendingRequestKey::Import)
238                {
239                    error!("Error occurred during import request: {}", error);
240                    let _ = sender.send(Err(ClientError::ServerError(error)));
241                } else if let Some(PendingSender::Export(sender)) =
242                    requests.remove(&PendingRequestKey::Export)
243                {
244                    error!("Error occurred during export request: {}", error);
245                    let _ = sender.send(Err(ClientError::ServerError(error)));
246                } else {
247                    warn!("Received server error, but no matching pending request found.");
248                }
249            }
250            Response::RmSuccess(RmSuccessResponse { user_key: _ }) => {
251                let pending_sender = pending_requests
252                    .lock()
253                    .unwrap()
254                    .remove(&PendingRequestKey::Rm);
255                if let Some(PendingSender::Rm(sender)) = pending_sender {
256                    if sender.send(Ok(())).is_err() {
257                        warn!("Failed to send RM success response (receiver dropped)");
258                    }
259                } else {
260                    warn!("Received RM success response but no Rm request was pending");
261                }
262            }
263            Response::ListKeys(ListKeysResponse { keys }) => {
264                let pending_sender = pending_requests
265                    .lock()
266                    .unwrap()
267                    .remove(&PendingRequestKey::ListKeys);
268                if let Some(PendingSender::ListKeys(sender)) = pending_sender {
269                    if sender.send(Ok(keys)).is_err() {
270                        warn!("Failed to send ListKeys response (receiver dropped)");
271                    }
272                } else {
273                    warn!("Received ListKeys response but no ListKeys request was pending");
274                }
275            }
276            Response::Stats(stats_response) => {
277                let pending_sender = pending_requests
278                    .lock()
279                    .unwrap()
280                    .remove(&PendingRequestKey::Stats);
281                if let Some(PendingSender::Stats(sender)) = pending_sender {
282                    if sender.send(Ok(stats_response)).is_err() {
283                        warn!("Failed to send Stats response (receiver dropped)");
284                    }
285                } else {
286                    warn!("Received Stats response but no Stats request was pending");
287                }
288            }
289            Response::Import(ImportResponse { result }) => {
290                let pending_sender = pending_requests
291                    .lock()
292                    .unwrap()
293                    .remove(&PendingRequestKey::Import);
294                if let Some(PendingSender::Import(sender)) = pending_sender {
295                    if sender.send(Ok(result)).is_err() {
296                        warn!("Failed to send Import response (receiver dropped)");
297                    }
298                } else {
299                    warn!("Received Import response but no Import request was pending");
300                }
301            }
302            Response::Export(ExportResponse { result }) => {
303                let pending_sender = pending_requests
304                    .lock()
305                    .unwrap()
306                    .remove(&PendingRequestKey::Export);
307                if let Some(PendingSender::Export(sender)) = pending_sender {
308                    if sender.send(Ok(result)).is_err() {
309                        warn!("Failed to send Export response (receiver dropped)");
310                    }
311                } else {
312                    warn!("Received Export response but no Export request was pending");
313                }
314            }
315            Response::TaskStopped(res) => handle_task_stopped(res, pending_requests.clone()),
316        }
317    }
318
319    pub async fn next_response(&mut self) -> Option<Result<Response, ClientError>> {
320        if let Some(receiver) = &mut self.receiver {
321            loop {
322                match receiver.try_recv() {
323                    Some(event) => match event {
324                        ewebsock::WsEvent::Message(msg) => {
325                            if let ewebsock::WsMessage::Text(text) = msg {
326                                match serde_json::from_str::<Response>(&text) {
327                                    Ok(response) => {
328                                        Self::process_response(
329                                            response.clone(),
330                                            &self.tasks,
331                                            &self.task_channels,
332                                            &self.pending_requests,
333                                        );
334                                        return Some(Ok(response));
335                                    }
336                                    Err(e) => {
337                                        error!("Failed to deserialize response: {}", e);
338                                        return Some(Err(ClientError::DeserializationError(e)));
339                                    }
340                                }
341                            } else {
342                                debug!("Received non-text WebSocket message");
343                            }
344                        }
345                        ewebsock::WsEvent::Error(e) => {
346                            error!("WebSocket error: {}", e);
347                            return Some(Err(ClientError::WebSocketError(e.to_string())));
348                        }
349                        ewebsock::WsEvent::Closed => {
350                            debug!("WebSocket connection closed");
351                            return None;
352                        }
353                        ewebsock::WsEvent::Opened => {
354                            debug!("WebSocket connection opened");
355                            continue;
356                        }
357                    },
358                    None => {
359                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
360                        continue;
361                    }
362                }
363            }
364        } else {
365            None
366        }
367    }
368}
369
370// Correctly define the function to accept Arc<Mutex<...>>
371fn handle_task_stopped(
372    res: TaskStoppedResponse,
373    pending_requests_mutex: Arc<Mutex<HashMap<PendingRequestKey, PendingSender>>>,
374) {
375    trace!("Received TaskStopped response for task {}", res.task_id);
376    // Lock the mutex to get mutable access to the map
377    let mut pending_requests = match pending_requests_mutex.lock() {
378        Ok(guard) => guard,
379        Err(poisoned) => {
380            error!(
381                "Mutex poisoned when handling TaskStopped for task {}: {}. Recovering.",
382                res.task_id, poisoned
383            );
384            poisoned.into_inner() // Recover the data even if poisoned
385        }
386    };
387
388    // Find the corresponding sender using the correct key type
389    if let Some(PendingSender::StopTask(sender)) =
390        pending_requests.remove(&PendingRequestKey::StopTask)
391    {
392        // Sender expects Result<TaskStoppedResponse, ClientError>
393        if sender.send(Ok(res.clone())).is_err() {
394            warn!(
395                "Failed to send TaskStopped result for task {}: receiver dropped",
396                res.task_id
397            );
398        }
399    } else {
400        warn!(
401            "Received TaskStopped response for unknown/mismatched request key (task_id: {})",
402            res.task_id
403        );
404    }
405    // Lock guard is dropped here, releasing the mutex
406}