1use std::future::Future;
2use std::sync::Mutex;
3use std::{collections::HashMap, sync::Arc};
4
5use futures::channel::oneshot;
6use log::{debug, error, info, warn};
7use tokio::sync::mpsc;
8use url::Url;
9
10#[cfg(target_arch = "wasm32")]
11use wasm_bindgen_futures::spawn_local;
12
13use mutant_protocol::{
14 ExportResult, HealthCheckResult, ImportResult, KeyDetails, PurgeResult, Request, StatsResponse,
15 StorageMode, SyncResult, Task, TaskId, TaskListEntry, TaskProgress, TaskResult, TaskStatus,
16 TaskStoppedResponse, TaskType,
17};
18
19pub mod error;
20mod macros;
21mod request;
22mod response;
23
24use crate::error::ClientError;
25
26type ClientTaskMap = Arc<Mutex<HashMap<TaskId, Task>>>;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub enum PendingRequestKey {
32 TaskCreation,
33 ListTasks,
34 QueryTask,
35 Rm,
36 ListKeys,
37 Stats,
38 Sync,
39 Purge,
40 Import,
41 Export,
42 HealthCheck,
43 StopTask,
44}
45
46pub enum PendingSender {
48 TaskCreation(
49 oneshot::Sender<Result<TaskId, ClientError>>,
50 TaskChannels,
51 TaskType,
52 ),
53 ListTasks(oneshot::Sender<Result<Vec<TaskListEntry>, ClientError>>),
54 QueryTask(oneshot::Sender<Result<Task, ClientError>>),
55 Rm(oneshot::Sender<Result<(), ClientError>>),
56 ListKeys(oneshot::Sender<Result<Vec<KeyDetails>, ClientError>>),
57 Stats(oneshot::Sender<Result<StatsResponse, ClientError>>),
58 Sync(oneshot::Sender<Result<SyncResult, ClientError>>),
59 Purge(oneshot::Sender<Result<PurgeResult, ClientError>>),
60 Import(oneshot::Sender<Result<ImportResult, ClientError>>),
61 Export(oneshot::Sender<Result<ExportResult, ClientError>>),
62 HealthCheck(oneshot::Sender<Result<HealthCheckResult, ClientError>>),
63 StopTask(oneshot::Sender<Result<TaskStoppedResponse, ClientError>>),
64}
65
66type PendingRequestMap = Arc<Mutex<HashMap<PendingRequestKey, PendingSender>>>;
68
69pub type CompletionReceiver = oneshot::Receiver<Result<TaskResult, ClientError>>;
70pub type ProgressReceiver = mpsc::UnboundedReceiver<Result<TaskProgress, ClientError>>;
71
72type CompletionSender = oneshot::Sender<Result<TaskResult, ClientError>>;
73type ProgressSender = mpsc::UnboundedSender<Result<TaskProgress, ClientError>>;
74
75type TaskChannels = (CompletionSender, ProgressSender);
76type TaskChannelsMap = Arc<Mutex<HashMap<TaskId, TaskChannels>>>;
77
78#[derive(Debug, Clone, PartialEq)]
79enum ConnectionState {
80 Disconnected,
81 Connecting,
82 Connected,
83}
84
85pub struct MutantClient {
87 sender: Option<ewebsock::WsSender>,
88 receiver: Option<ewebsock::WsReceiver>,
89 tasks: ClientTaskMap,
90 task_channels: TaskChannelsMap,
91 pending_requests: PendingRequestMap,
92 state: Arc<Mutex<ConnectionState>>,
93}
94
95impl MutantClient {
96 pub fn new() -> Self {
98 Self {
99 sender: None,
100 receiver: None,
101 tasks: Arc::new(Mutex::new(HashMap::new())),
102 task_channels: Arc::new(Mutex::new(HashMap::new())),
103 pending_requests: Arc::new(Mutex::new(HashMap::new())),
104 state: Arc::new(Mutex::new(ConnectionState::Disconnected)),
105 }
106 }
107
108 pub async fn connect(&mut self, addr: &str) -> Result<(), ClientError> {
110 if self.sender.is_some() {
111 warn!("Already connected or connecting.");
112 return Ok(());
113 }
114
115 let url = Url::parse(addr).map_err(|e| ClientError::UrlParseError(e))?;
116
117 *self.state.lock().unwrap() = ConnectionState::Connecting;
118
119 let options = ewebsock::Options::default();
120 let (sender, receiver) = ewebsock::connect(url.as_str(), options)
121 .map_err(|e| ClientError::WebSocketError(e.to_string()))?;
122
123 self.sender = Some(sender);
124 self.receiver = Some(receiver);
125
126 *self.state.lock().unwrap() = ConnectionState::Connected;
127
128 let mut client_clone = self.partial_take_receiver();
129
130 #[cfg(target_arch = "wasm32")]
131 spawn_local(async move {
132 while let Some(response) = client_clone.next_response().await {
133 if let Err(e) = response {
134 error!("Error processing response: {:?}", e);
135 }
136 }
137 });
138
139 #[cfg(not(target_arch = "wasm32"))]
140 tokio::spawn(async move {
141 while let Some(response) = client_clone.next_response().await {
142 if let Err(e) = response {
143 error!("Error processing response: {:?}", e);
144 }
145 }
146 });
147
148 Ok(())
149 }
150
151 pub async fn put<'a>(
155 &'a mut self,
156 user_key: &str,
157 source_path: &str,
158 mode: StorageMode,
159 public: bool,
160 no_verify: bool,
161 ) -> Result<
162 (
163 impl Future<Output = Result<TaskResult, ClientError>> + 'a,
164 ProgressReceiver,
165 ),
166 ClientError,
167 > {
168 long_request!(
169 self,
170 Put,
171 PutRequest {
172 user_key: user_key.to_string(),
173 source_path: source_path.to_string(),
174 mode,
175 public,
176 no_verify,
177 }
178 )
179 }
180
181 pub async fn get(
182 &mut self,
183 user_key: &str,
184 destination_path: &str,
185 public: bool,
186 ) -> Result<
187 (
188 impl Future<Output = Result<TaskResult, ClientError>> + '_,
189 ProgressReceiver,
190 ),
191 ClientError,
192 > {
193 long_request!(
194 self,
195 Get,
196 GetRequest {
197 user_key: user_key.to_string(),
198 destination_path: destination_path.to_string(),
199 public,
200 }
201 )
202 }
203
204 pub async fn sync(
205 &mut self,
206 push_force: bool,
207 ) -> Result<
208 (
209 impl Future<Output = Result<TaskResult, ClientError>> + '_,
210 ProgressReceiver,
211 ),
212 ClientError,
213 > {
214 long_request!(self, Sync, SyncRequest { push_force })
215 }
216
217 pub async fn purge(
218 &mut self,
219 aggressive: bool,
220 ) -> Result<
221 (
222 impl Future<Output = Result<TaskResult, ClientError>> + '_,
223 ProgressReceiver,
224 ),
225 ClientError,
226 > {
227 long_request!(self, Purge, PurgeRequest { aggressive })
228 }
229
230 pub async fn health_check(
231 &mut self,
232 key_name: &str,
233 recycle: bool,
234 ) -> Result<
235 (
236 impl Future<Output = Result<TaskResult, ClientError>> + '_,
237 ProgressReceiver,
238 ),
239 ClientError,
240 > {
241 long_request!(
242 self,
243 HealthCheck,
244 HealthCheckRequest {
245 key_name: key_name.to_string(),
246 recycle,
247 }
248 )
249 }
250
251 pub async fn list_keys(&mut self) -> Result<Vec<KeyDetails>, ClientError> {
253 direct_request!(self, ListKeys, ListKeysRequest)
254 }
255
256 pub async fn rm(&mut self, user_key: &str) -> Result<(), ClientError> {
257 direct_request!(
258 self,
259 Rm,
260 RmRequest {
261 user_key: user_key.to_string(),
262 }
263 )
264 }
265
266 pub async fn list_tasks(&mut self) -> Result<Vec<TaskListEntry>, ClientError> {
267 direct_request!(self, ListTasks, ListTasksRequest)
268 }
269
270 pub async fn query_task(&mut self, task_id: TaskId) -> Result<Task, ClientError> {
271 direct_request!(self, QueryTask, QueryTaskRequest { task_id })
272 }
273
274 pub async fn get_stats(&mut self) -> Result<StatsResponse, ClientError> {
275 direct_request!(self, Stats, StatsRequest {})
276 }
277
278 pub async fn import(&mut self, file_path: &str) -> Result<ImportResult, ClientError> {
279 direct_request!(
280 self,
281 Import,
282 ImportRequest {
283 file_path: file_path.to_string()
284 }
285 )
286 }
287
288 pub async fn export(&mut self, destination_path: &str) -> Result<ExportResult, ClientError> {
289 direct_request!(
290 self,
291 Export,
292 ExportRequest {
293 destination_path: destination_path.to_string()
294 }
295 )
296 }
297
298 pub async fn stop_task(&mut self, task_id: TaskId) -> Result<TaskStoppedResponse, ClientError> {
300 direct_request!(self, StopTask, StopTaskRequest { task_id })
301 }
302
303 pub fn get_task_status(&self, task_id: TaskId) -> Option<TaskStatus> {
306 self.tasks
307 .lock()
308 .unwrap()
309 .get(&task_id)
310 .map(|t| t.status.clone())
311 }
312
313 pub fn get_task_result(&self, task_id: TaskId) -> Option<TaskResult> {
314 self.tasks
315 .lock()
316 .unwrap()
317 .get(&task_id)
318 .map(|t| t.result.clone())
319 }
320}
321
322impl Clone for MutantClient {
323 fn clone(&self) -> Self {
324 Self {
325 sender: None,
326 receiver: None,
327 tasks: self.tasks.clone(),
328 task_channels: self.task_channels.clone(),
329 pending_requests: self.pending_requests.clone(), state: self.state.clone(),
331 }
332 }
333}
334
335impl MutantClient {
336 pub fn partial_take_receiver(&mut self) -> Self {
337 let mut clone = self.clone();
338
339 clone.receiver = self.receiver.take();
340
341 clone
342 }
343}
344
345pub fn set_panic_hook() {
347 console_error_panic_hook::set_once();
348}