mutant_client/
lib.rs

1use std::future::Future;
2use std::sync::Mutex;
3use std::{collections::HashMap, sync::Arc};
4
5use futures::channel::oneshot;
6use log::{debug, error, info, warn};
7use tokio::sync::mpsc;
8use url::Url;
9
10#[cfg(target_arch = "wasm32")]
11use wasm_bindgen_futures::spawn_local;
12
13use mutant_protocol::{
14    ExportResult, HealthCheckResult, ImportResult, KeyDetails, PurgeResult, Request, StatsResponse,
15    StorageMode, SyncResult, Task, TaskId, TaskListEntry, TaskProgress, TaskResult, TaskStatus,
16    TaskStoppedResponse, TaskType,
17};
18
19pub mod error;
20mod macros;
21mod request;
22mod response;
23
24use crate::error::ClientError;
25
26// Shared state for tasks managed by the client (using Arc<Mutex> for thread safety)
27type ClientTaskMap = Arc<Mutex<HashMap<TaskId, Task>>>;
28
29// Key for the pending requests map
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub enum PendingRequestKey {
32    TaskCreation,
33    ListTasks,
34    QueryTask,
35    Rm,
36    ListKeys,
37    Stats,
38    Sync,
39    Purge,
40    Import,
41    Export,
42    HealthCheck,
43    StopTask,
44}
45
46// Enum to hold the different sender types for the pending requests map
47pub enum PendingSender {
48    TaskCreation(
49        oneshot::Sender<Result<TaskId, ClientError>>,
50        TaskChannels,
51        TaskType,
52    ),
53    ListTasks(oneshot::Sender<Result<Vec<TaskListEntry>, ClientError>>),
54    QueryTask(oneshot::Sender<Result<Task, ClientError>>),
55    Rm(oneshot::Sender<Result<(), ClientError>>),
56    ListKeys(oneshot::Sender<Result<Vec<KeyDetails>, ClientError>>),
57    Stats(oneshot::Sender<Result<StatsResponse, ClientError>>),
58    Sync(oneshot::Sender<Result<SyncResult, ClientError>>),
59    Purge(oneshot::Sender<Result<PurgeResult, ClientError>>),
60    Import(oneshot::Sender<Result<ImportResult, ClientError>>),
61    Export(oneshot::Sender<Result<ExportResult, ClientError>>),
62    HealthCheck(oneshot::Sender<Result<HealthCheckResult, ClientError>>),
63    StopTask(oneshot::Sender<Result<TaskStoppedResponse, ClientError>>),
64}
65
66// The new map type for pending requests
67type PendingRequestMap = Arc<Mutex<HashMap<PendingRequestKey, PendingSender>>>;
68
69pub type CompletionReceiver = oneshot::Receiver<Result<TaskResult, ClientError>>;
70pub type ProgressReceiver = mpsc::UnboundedReceiver<Result<TaskProgress, ClientError>>;
71
72type CompletionSender = oneshot::Sender<Result<TaskResult, ClientError>>;
73type ProgressSender = mpsc::UnboundedSender<Result<TaskProgress, ClientError>>;
74
75type TaskChannels = (CompletionSender, ProgressSender);
76type TaskChannelsMap = Arc<Mutex<HashMap<TaskId, TaskChannels>>>;
77
78#[derive(Debug, Clone, PartialEq)]
79enum ConnectionState {
80    Disconnected,
81    Connecting,
82    Connected,
83}
84
85/// A client for interacting with the Mutant Daemon over WebSocket (Cross-platform implementation).
86pub struct MutantClient {
87    sender: Option<ewebsock::WsSender>,
88    receiver: Option<ewebsock::WsReceiver>,
89    tasks: ClientTaskMap,
90    task_channels: TaskChannelsMap,
91    pending_requests: PendingRequestMap,
92    state: Arc<Mutex<ConnectionState>>,
93}
94
95impl MutantClient {
96    /// Creates a new client instance but does not connect yet.
97    pub fn new() -> Self {
98        Self {
99            sender: None,
100            receiver: None,
101            tasks: Arc::new(Mutex::new(HashMap::new())),
102            task_channels: Arc::new(Mutex::new(HashMap::new())),
103            pending_requests: Arc::new(Mutex::new(HashMap::new())),
104            state: Arc::new(Mutex::new(ConnectionState::Disconnected)),
105        }
106    }
107
108    /// Establishes a WebSocket connection to the Mutant Daemon.
109    pub async fn connect(&mut self, addr: &str) -> Result<(), ClientError> {
110        if self.sender.is_some() {
111            warn!("Already connected or connecting.");
112            return Ok(());
113        }
114
115        let url = Url::parse(addr).map_err(|e| ClientError::UrlParseError(e))?;
116
117        *self.state.lock().unwrap() = ConnectionState::Connecting;
118
119        let options = ewebsock::Options::default();
120        let (sender, receiver) = ewebsock::connect(url.as_str(), options)
121            .map_err(|e| ClientError::WebSocketError(e.to_string()))?;
122
123        self.sender = Some(sender);
124        self.receiver = Some(receiver);
125
126        *self.state.lock().unwrap() = ConnectionState::Connected;
127
128        let mut client_clone = self.partial_take_receiver();
129
130        #[cfg(target_arch = "wasm32")]
131        spawn_local(async move {
132            while let Some(response) = client_clone.next_response().await {
133                if let Err(e) = response {
134                    error!("Error processing response: {:?}", e);
135                }
136            }
137        });
138
139        #[cfg(not(target_arch = "wasm32"))]
140        tokio::spawn(async move {
141            while let Some(response) = client_clone.next_response().await {
142                if let Err(e) = response {
143                    error!("Error processing response: {:?}", e);
144                }
145            }
146        });
147
148        Ok(())
149    }
150
151    // --- Public API Methods ---
152    // A simple request/response map or channels might be needed.
153
154    pub async fn put<'a>(
155        &'a mut self,
156        user_key: &str,
157        source_path: &str,
158        mode: StorageMode,
159        public: bool,
160        no_verify: bool,
161    ) -> Result<
162        (
163            impl Future<Output = Result<TaskResult, ClientError>> + 'a,
164            ProgressReceiver,
165        ),
166        ClientError,
167    > {
168        long_request!(
169            self,
170            Put,
171            PutRequest {
172                user_key: user_key.to_string(),
173                source_path: source_path.to_string(),
174                mode,
175                public,
176                no_verify,
177            }
178        )
179    }
180
181    pub async fn get(
182        &mut self,
183        user_key: &str,
184        destination_path: &str,
185        public: bool,
186    ) -> Result<
187        (
188            impl Future<Output = Result<TaskResult, ClientError>> + '_,
189            ProgressReceiver,
190        ),
191        ClientError,
192    > {
193        long_request!(
194            self,
195            Get,
196            GetRequest {
197                user_key: user_key.to_string(),
198                destination_path: destination_path.to_string(),
199                public,
200            }
201        )
202    }
203
204    pub async fn sync(
205        &mut self,
206        push_force: bool,
207    ) -> Result<
208        (
209            impl Future<Output = Result<TaskResult, ClientError>> + '_,
210            ProgressReceiver,
211        ),
212        ClientError,
213    > {
214        long_request!(self, Sync, SyncRequest { push_force })
215    }
216
217    pub async fn purge(
218        &mut self,
219        aggressive: bool,
220    ) -> Result<
221        (
222            impl Future<Output = Result<TaskResult, ClientError>> + '_,
223            ProgressReceiver,
224        ),
225        ClientError,
226    > {
227        long_request!(self, Purge, PurgeRequest { aggressive })
228    }
229
230    pub async fn health_check(
231        &mut self,
232        key_name: &str,
233        recycle: bool,
234    ) -> Result<
235        (
236            impl Future<Output = Result<TaskResult, ClientError>> + '_,
237            ProgressReceiver,
238        ),
239        ClientError,
240    > {
241        long_request!(
242            self,
243            HealthCheck,
244            HealthCheckRequest {
245                key_name: key_name.to_string(),
246                recycle,
247            }
248        )
249    }
250
251    /// Retrieves a list of all stored keys from the daemon.
252    pub async fn list_keys(&mut self) -> Result<Vec<KeyDetails>, ClientError> {
253        direct_request!(self, ListKeys, ListKeysRequest)
254    }
255
256    pub async fn rm(&mut self, user_key: &str) -> Result<(), ClientError> {
257        direct_request!(
258            self,
259            Rm,
260            RmRequest {
261                user_key: user_key.to_string(),
262            }
263        )
264    }
265
266    pub async fn list_tasks(&mut self) -> Result<Vec<TaskListEntry>, ClientError> {
267        direct_request!(self, ListTasks, ListTasksRequest)
268    }
269
270    pub async fn query_task(&mut self, task_id: TaskId) -> Result<Task, ClientError> {
271        direct_request!(self, QueryTask, QueryTaskRequest { task_id })
272    }
273
274    pub async fn get_stats(&mut self) -> Result<StatsResponse, ClientError> {
275        direct_request!(self, Stats, StatsRequest {})
276    }
277
278    pub async fn import(&mut self, file_path: &str) -> Result<ImportResult, ClientError> {
279        direct_request!(
280            self,
281            Import,
282            ImportRequest {
283                file_path: file_path.to_string()
284            }
285        )
286    }
287
288    pub async fn export(&mut self, destination_path: &str) -> Result<ExportResult, ClientError> {
289        direct_request!(
290            self,
291            Export,
292            ExportRequest {
293                destination_path: destination_path.to_string()
294            }
295        )
296    }
297
298    /// Stops a running task on the daemon.
299    pub async fn stop_task(&mut self, task_id: TaskId) -> Result<TaskStoppedResponse, ClientError> {
300        direct_request!(self, StopTask, StopTaskRequest { task_id })
301    }
302
303    // --- Accessor methods for internal state ---
304
305    pub fn get_task_status(&self, task_id: TaskId) -> Option<TaskStatus> {
306        self.tasks
307            .lock()
308            .unwrap()
309            .get(&task_id)
310            .map(|t| t.status.clone())
311    }
312
313    pub fn get_task_result(&self, task_id: TaskId) -> Option<TaskResult> {
314        self.tasks
315            .lock()
316            .unwrap()
317            .get(&task_id)
318            .map(|t| t.result.clone())
319    }
320}
321
322impl Clone for MutantClient {
323    fn clone(&self) -> Self {
324        Self {
325            sender: None,
326            receiver: None,
327            tasks: self.tasks.clone(),
328            task_channels: self.task_channels.clone(),
329            pending_requests: self.pending_requests.clone(), // Clone the new map
330            state: self.state.clone(),
331        }
332    }
333}
334
335impl MutantClient {
336    pub fn partial_take_receiver(&mut self) -> Self {
337        let mut clone = self.clone();
338
339        clone.receiver = self.receiver.take();
340
341        clone
342    }
343}
344
345// Need to run this once at the start of the WASM application
346pub fn set_panic_hook() {
347    console_error_panic_hook::set_once();
348}