1use log::{debug, error, trace, warn};
2use mutant_protocol::{
3 ErrorResponse, ExportResponse, ImportResponse, ListKeysResponse, Response, RmSuccessResponse,
4 Task, TaskCreatedResponse, TaskListResponse, TaskProgress, TaskResult, TaskResultResponse,
5 TaskStatus, TaskStoppedResponse, TaskType, TaskUpdateResponse,
6};
7
8use crate::{
9 error::ClientError, ClientTaskMap, PendingRequestKey, PendingRequestMap, PendingSender,
10 TaskChannelsMap,
11};
12
13use super::MutantClient;
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16
17impl MutantClient {
18 pub fn process_response(
20 response: Response,
21 tasks: &ClientTaskMap,
22 task_channels: &TaskChannelsMap,
23 pending_requests: &PendingRequestMap,
24 ) {
25 match response {
26 Response::TaskCreated(TaskCreatedResponse { task_id }) => {
27 let pending_sender = pending_requests
28 .lock()
29 .unwrap()
30 .remove(&PendingRequestKey::TaskCreation);
31
32 if let Some(PendingSender::TaskCreation(sender, channels, task_type)) =
33 pending_sender
34 {
35 tasks.lock().unwrap().insert(
36 task_id,
37 Task {
38 id: task_id,
39 task_type,
40 status: TaskStatus::Pending,
41 progress: None,
42 result: TaskResult::Pending,
43 key: None,
44 },
45 );
46
47 task_channels.lock().unwrap().insert(task_id, channels);
48
49 if sender.send(Ok(task_id)).is_err() {
50 warn!("Failed to send TaskCreated response to waiting future (receiver dropped?)");
51 tasks.lock().unwrap().remove(&task_id);
52 task_channels.lock().unwrap().remove(&task_id);
53 }
54 } else {
55 warn!(
56 "Received TaskCreated ({}) but no matching request was pending.",
57 task_id
58 );
59 }
60 }
61 Response::TaskUpdate(TaskUpdateResponse {
62 task_id,
63 status,
64 progress,
65 }) => {
66 let mut tasks_guard = tasks.lock().unwrap();
67 let task_exists = tasks_guard.contains_key(&task_id);
68
69 if let Some(task) = tasks_guard.get_mut(&task_id) {
70 task.status = status;
71 task.progress = progress.clone();
72
73 if let Some(progress_update) = progress {
74 if let Some((_, progress_tx)) = task_channels.lock().unwrap().get(&task_id)
75 {
76 if progress_tx.send(Ok(progress_update)).is_err() {
77 warn!("Failed to send progress update for task {}", task_id);
78 }
79 }
80 }
81 } else {
82 warn!(
83 "Received TaskUpdate for unknown task {}, creating entry.",
84 task_id
85 );
86 let task_type = match &progress {
87 Some(TaskProgress::Put(_)) => TaskType::Put,
88 _ => TaskType::Get,
89 };
90 tasks_guard.insert(
91 task_id,
92 Task {
93 id: task_id,
94 task_type,
95 status,
96 progress,
97 result: TaskResult::Pending,
98 key: None,
99 },
100 );
101 }
102
103 let pending_sender = pending_requests
104 .lock()
105 .unwrap()
106 .remove(&PendingRequestKey::QueryTask);
107 if let Some(PendingSender::QueryTask(sender)) = pending_sender {
108 if let Some(task) = tasks_guard.get(&task_id) {
109 if sender.send(Ok(task.clone())).is_err() {
110 warn!("Failed to send TaskUpdate response to QueryTask request (receiver dropped)");
111 }
112 } else {
113 warn!("Task {} not found when trying to respond to QueryTask request after TaskUpdate.", task_id);
114 let _ = sender.send(Err(ClientError::TaskNotFound(task_id)));
115 }
116 } else if task_exists {
117 } else {
119 }
121 }
122 Response::TaskResult(TaskResultResponse {
123 task_id,
124 status,
125 result,
126 }) => {
127 let mut tasks_guard = tasks.lock().unwrap();
128 let _task_existed = tasks_guard.contains_key(&task_id);
129
130 if let Some(task) = tasks_guard.get_mut(&task_id) {
131 task.status = status;
132 task.result = result.clone();
133
134 if let Some((completion_tx, _)) = task_channels.lock().unwrap().remove(&task_id)
135 {
136 if completion_tx.send(Ok(result.clone())).is_err() {
137 warn!(
138 "Failed to send final task result for task {} (receiver dropped)",
139 task_id
140 );
141 }
142 }
143 } else {
144 warn!(
145 "Received TaskResult for unknown task {}, creating entry.",
146 task_id
147 );
148 let task_type = if let TaskResult::Error(_) = result {
150 TaskType::Get
151 } else {
152 TaskType::Put
153 };
154 tasks_guard.insert(
155 task_id,
156 Task {
157 id: task_id,
158 task_type,
159 status,
160 result,
161 progress: None,
162 key: None,
163 },
164 );
165 }
166
167 let pending_sender = pending_requests
168 .lock()
169 .unwrap()
170 .remove(&PendingRequestKey::QueryTask);
171 if let Some(PendingSender::QueryTask(sender)) = pending_sender {
172 if let Some(task) = tasks_guard.get(&task_id) {
173 if sender.send(Ok(task.clone())).is_err() {
174 warn!("Failed to send TaskResult response to QueryTask request (receiver dropped)");
175 }
176 } else {
177 warn!("Task {} not found when trying to respond to QueryTask request after TaskResult.", task_id);
178 let _ = sender.send(Err(ClientError::TaskNotFound(task_id)));
179 }
180 }
181 }
182 Response::TaskList(TaskListResponse { tasks: task_list }) => {
183 let pending_sender = pending_requests
184 .lock()
185 .unwrap()
186 .remove(&PendingRequestKey::ListTasks);
187 if let Some(PendingSender::ListTasks(sender)) = pending_sender {
188 if sender.send(Ok(task_list)).is_err() {
189 warn!("Failed to send TaskList response (receiver dropped)");
190 }
191 } else {
192 warn!("Received TaskList but no ListTasks request was pending");
193 }
194 }
195 Response::Error(ErrorResponse {
196 error,
197 original_request: _,
198 }) => {
199 error!(
200 "Server error received: {}. Check server logs for details.",
201 error
202 );
203
204 let mut requests = pending_requests.lock().unwrap();
205
206 if let Some(PendingSender::TaskCreation(sender, _, _)) =
207 requests.remove(&PendingRequestKey::TaskCreation)
208 {
209 error!("Error occurred during task creation: {}", error);
210 let _ = sender.send(Err(ClientError::ServerError(error.clone())));
211 } else if let Some(PendingSender::ListTasks(sender)) =
212 requests.remove(&PendingRequestKey::ListTasks)
213 {
214 error!("Error occurred during task list request: {}", error);
215 let _ = sender.send(Err(ClientError::ServerError(error.clone())));
216 } else if let Some(PendingSender::QueryTask(sender)) =
217 requests.remove(&PendingRequestKey::QueryTask)
218 {
219 error!("Error occurred during task query request: {}", error);
220 let _ = sender.send(Err(ClientError::ServerError(error.clone())));
221 } else if let Some(PendingSender::Rm(sender)) =
222 requests.remove(&PendingRequestKey::Rm)
223 {
224 error!("Error occurred during rm request: {}", error);
225 let _ = sender.send(Err(ClientError::ServerError(error.clone())));
226 } else if let Some(PendingSender::ListKeys(sender)) =
227 requests.remove(&PendingRequestKey::ListKeys)
228 {
229 error!("Error occurred during list keys request: {}", error);
230 let _ = sender.send(Err(ClientError::ServerError(error.clone())));
231 } else if let Some(PendingSender::Stats(sender)) =
232 requests.remove(&PendingRequestKey::Stats)
233 {
234 error!("Error occurred during stats request: {}", error);
235 let _ = sender.send(Err(ClientError::ServerError(error)));
236 } else if let Some(PendingSender::Import(sender)) =
237 requests.remove(&PendingRequestKey::Import)
238 {
239 error!("Error occurred during import request: {}", error);
240 let _ = sender.send(Err(ClientError::ServerError(error)));
241 } else if let Some(PendingSender::Export(sender)) =
242 requests.remove(&PendingRequestKey::Export)
243 {
244 error!("Error occurred during export request: {}", error);
245 let _ = sender.send(Err(ClientError::ServerError(error)));
246 } else {
247 warn!("Received server error, but no matching pending request found.");
248 }
249 }
250 Response::RmSuccess(RmSuccessResponse { user_key: _ }) => {
251 let pending_sender = pending_requests
252 .lock()
253 .unwrap()
254 .remove(&PendingRequestKey::Rm);
255 if let Some(PendingSender::Rm(sender)) = pending_sender {
256 if sender.send(Ok(())).is_err() {
257 warn!("Failed to send RM success response (receiver dropped)");
258 }
259 } else {
260 warn!("Received RM success response but no Rm request was pending");
261 }
262 }
263 Response::ListKeys(ListKeysResponse { keys }) => {
264 let pending_sender = pending_requests
265 .lock()
266 .unwrap()
267 .remove(&PendingRequestKey::ListKeys);
268 if let Some(PendingSender::ListKeys(sender)) = pending_sender {
269 if sender.send(Ok(keys)).is_err() {
270 warn!("Failed to send ListKeys response (receiver dropped)");
271 }
272 } else {
273 warn!("Received ListKeys response but no ListKeys request was pending");
274 }
275 }
276 Response::Stats(stats_response) => {
277 let pending_sender = pending_requests
278 .lock()
279 .unwrap()
280 .remove(&PendingRequestKey::Stats);
281 if let Some(PendingSender::Stats(sender)) = pending_sender {
282 if sender.send(Ok(stats_response)).is_err() {
283 warn!("Failed to send Stats response (receiver dropped)");
284 }
285 } else {
286 warn!("Received Stats response but no Stats request was pending");
287 }
288 }
289 Response::Import(ImportResponse { result }) => {
290 let pending_sender = pending_requests
291 .lock()
292 .unwrap()
293 .remove(&PendingRequestKey::Import);
294 if let Some(PendingSender::Import(sender)) = pending_sender {
295 if sender.send(Ok(result)).is_err() {
296 warn!("Failed to send Import response (receiver dropped)");
297 }
298 } else {
299 warn!("Received Import response but no Import request was pending");
300 }
301 }
302 Response::Export(ExportResponse { result }) => {
303 let pending_sender = pending_requests
304 .lock()
305 .unwrap()
306 .remove(&PendingRequestKey::Export);
307 if let Some(PendingSender::Export(sender)) = pending_sender {
308 if sender.send(Ok(result)).is_err() {
309 warn!("Failed to send Export response (receiver dropped)");
310 }
311 } else {
312 warn!("Received Export response but no Export request was pending");
313 }
314 }
315 Response::TaskStopped(res) => handle_task_stopped(res, pending_requests.clone()),
316 }
317 }
318
319 pub async fn next_response(&mut self) -> Option<Result<Response, ClientError>> {
320 if let Some(receiver) = &mut self.receiver {
321 loop {
322 match receiver.try_recv() {
323 Some(event) => match event {
324 ewebsock::WsEvent::Message(msg) => {
325 if let ewebsock::WsMessage::Text(text) = msg {
326 match serde_json::from_str::<Response>(&text) {
327 Ok(response) => {
328 Self::process_response(
329 response.clone(),
330 &self.tasks,
331 &self.task_channels,
332 &self.pending_requests,
333 );
334 return Some(Ok(response));
335 }
336 Err(e) => {
337 error!("Failed to deserialize response: {}", e);
338 return Some(Err(ClientError::DeserializationError(e)));
339 }
340 }
341 } else {
342 debug!("Received non-text WebSocket message");
343 }
344 }
345 ewebsock::WsEvent::Error(e) => {
346 error!("WebSocket error: {}", e);
347 return Some(Err(ClientError::WebSocketError(e.to_string())));
348 }
349 ewebsock::WsEvent::Closed => {
350 debug!("WebSocket connection closed");
351 return None;
352 }
353 ewebsock::WsEvent::Opened => {
354 debug!("WebSocket connection opened");
355 continue;
356 }
357 },
358 None => {
359 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
360 continue;
361 }
362 }
363 }
364 } else {
365 None
366 }
367 }
368}
369
370fn handle_task_stopped(
372 res: TaskStoppedResponse,
373 pending_requests_mutex: Arc<Mutex<HashMap<PendingRequestKey, PendingSender>>>,
374) {
375 trace!("Received TaskStopped response for task {}", res.task_id);
376 let mut pending_requests = match pending_requests_mutex.lock() {
378 Ok(guard) => guard,
379 Err(poisoned) => {
380 error!(
381 "Mutex poisoned when handling TaskStopped for task {}: {}. Recovering.",
382 res.task_id, poisoned
383 );
384 poisoned.into_inner() }
386 };
387
388 if let Some(PendingSender::StopTask(sender)) =
390 pending_requests.remove(&PendingRequestKey::StopTask)
391 {
392 if sender.send(Ok(res.clone())).is_err() {
394 warn!(
395 "Failed to send TaskStopped result for task {}: receiver dropped",
396 res.task_id
397 );
398 }
399 } else {
400 warn!(
401 "Received TaskStopped response for unknown/mismatched request key (task_id: {})",
402 res.task_id
403 );
404 }
405 }