Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_client::{default_breakpoint_arcs, DefaultHttpClient};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16/// Callback type for globally observing task progress events.
17///
18/// The callback is invoked from runtime worker context. Keep callback logic
19/// fast and non-blocking to avoid delaying event processing.
20pub type GlobalProgressListener = ProgressCb;
21
22/// Main entry point of the `rusty-cat` SDK.
23///
24/// `MeowClient` owns runtime state and provides high-level operations:
25/// enqueue, pause, resume, cancel, snapshot, and close.
26///
27/// # Usage pattern
28///
29/// 1. Create [`MeowConfig`].
30/// 2. Construct `MeowClient::new(config)`.
31/// 3. Build tasks with upload/download builders.
32/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
33/// 5. Control task lifecycle with pause/resume/cancel.
34/// 6. Call [`Self::close`] during shutdown.
35#[derive(Clone)]
36pub struct MeowClient {
37    /// Lazily initialized task executor.
38    executor: OnceLock<Executor>,
39    /// Immutable runtime configuration.
40    config: MeowConfig,
41    /// Global listeners receiving progress records for all tasks.
42    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
43    /// Global closed flag. Once set to `true`, task control APIs reject calls.
44    closed: Arc<AtomicBool>,
45}
46
47impl std::fmt::Debug for MeowClient {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("MeowClient")
50            .field("config", &self.config)
51            .field("global_progress_listener", &"..")
52            .finish()
53    }
54}
55
56impl MeowClient {
57    /// Creates a new client with the provided configuration.
58    ///
59    /// The internal executor is initialized lazily on first task operation.
60    ///
61    /// # Examples
62    ///
63    /// ```no_run
64    /// use rusty_cat::api::{MeowClient, MeowConfig};
65    ///
66    /// let config = MeowConfig::default();
67    /// let client = MeowClient::new(config);
68    /// let _ = client;
69    /// ```
70    pub fn new(config: MeowConfig) -> Self {
71        MeowClient {
72            executor: Default::default(),
73            config,
74            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
75            closed: Arc::new(AtomicBool::new(false)),
76        }
77    }
78
79    /// Returns a `reqwest::Client` aligned with this client's configuration.
80    ///
81    /// - If [`MeowConfig::with_http_client`] injected a custom client, this
82    ///   returns its clone.
83    /// - Otherwise, this builds a new client from `http_timeout` and
84    ///   `tcp_keepalive`.
85    ///
86    /// # Errors
87    ///
88    /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
89    /// fails.
90    ///
91    /// # Examples
92    ///
93    /// ```no_run
94    /// use rusty_cat::api::{MeowClient, MeowConfig};
95    ///
96    /// let client = MeowClient::new(MeowConfig::default());
97    /// let http = client.http_client()?;
98    /// let _ = http;
99    /// # Ok::<(), rusty_cat::api::MeowError>(())
100    /// ```
101    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
102        if let Some(c) = self.config.http_client_ref() {
103            return Ok(c.clone());
104        }
105        reqwest::Client::builder()
106            .timeout(self.config.http_timeout())
107            .tcp_keepalive(self.config.tcp_keepalive())
108            .build()
109            .map_err(|e| {
110                MeowError::from_source(
111                    InnerErrorCode::HttpClientBuildFailed,
112                    format!(
113                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
114                        self.config.http_timeout(),
115                        self.config.tcp_keepalive()
116                    ),
117                    e,
118                )
119            })
120    }
121
122    fn get_exec(&self) -> Result<&Executor, MeowError> {
123        if let Some(exec) = self.executor.get() {
124            crate::meow_flow_log!("executor", "reuse existing executor");
125            return Ok(exec);
126        }
127        let default = DefaultHttpClient::try_with_http_timeouts(
128            self.config.http_timeout(),
129            self.config.tcp_keepalive(),
130        )?;
131        crate::meow_flow_log!(
132            "executor",
133            "initializing default HTTP client (timeout={:?}, tcp_keepalive={:?})",
134            self.config.http_timeout(),
135            self.config.tcp_keepalive()
136        );
137        let exec = Executor::new(
138            self.config.clone(),
139            Arc::new(default),
140            self.global_progress_listener.clone(),
141        )?;
142        let _ = self.executor.set(exec);
143        self.executor.get().ok_or_else(|| {
144            crate::meow_flow_log!(
145                "executor",
146                "executor init race failed after set; returning RuntimeCreationFailedError"
147            );
148            MeowError::from_code_str(
149                InnerErrorCode::RuntimeCreationFailedError,
150                "executor init race failed",
151            )
152        })
153    }
154
155    /// Ensures the client is still open.
156    ///
157    /// Returns `ClientClosed` if [`Self::close`] was called successfully.
158    fn ensure_open(&self) -> Result<(), MeowError> {
159        if self.closed.load(Ordering::SeqCst) {
160            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
161            Err(MeowError::from_code_str(
162                InnerErrorCode::ClientClosed,
163                "meow client is already closed",
164            ))
165        } else {
166            Ok(())
167        }
168    }
169
170    /// Registers a global progress listener for all tasks.
171    ///
172    /// # Parameters
173    ///
174    /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
175    ///
176    /// # Returns
177    ///
178    /// Returns a listener ID used by
179    /// [`Self::unregister_global_progress_listener`].
180    ///
181    /// # Usage rules
182    ///
183    /// Keep callback execution short and panic-free. A heavy callback can slow
184    /// down global event delivery.
185    ///
186    /// # Errors
187    ///
188    /// Returns `LockPoisoned` when listener storage lock is poisoned.
189    ///
190    /// # Examples
191    ///
192    /// ```no_run
193    /// use rusty_cat::api::{MeowClient, MeowConfig};
194    ///
195    /// let client = MeowClient::new(MeowConfig::default());
196    /// let listener_id = client.register_global_progress_listener(|record| {
197    ///     println!("task={} progress={:.2}", record.task_id(), record.progress());
198    /// })?;
199    /// let _ = listener_id;
200    /// # Ok::<(), rusty_cat::api::MeowError>(())
201    /// ```
202    pub fn register_global_progress_listener<F>(
203        &self,
204        listener: F,
205    ) -> Result<GlobalProgressListenerId, MeowError>
206    where
207        F: Fn(FileTransferRecord) + Send + Sync + 'static,
208    {
209        let id = GlobalProgressListenerId::new_v4();
210        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
211        let mut guard = self.global_progress_listener.write().map_err(|e| {
212            MeowError::from_code(
213                InnerErrorCode::LockPoisoned,
214                format!("register global listener lock poisoned: {}", e),
215            )
216        })?;
217        guard.push((id, Arc::new(listener)));
218        Ok(id)
219    }
220
221    /// Unregisters one previously registered global progress listener.
222    ///
223    /// Returns `Ok(false)` when the ID does not exist.
224    ///
225    /// # Errors
226    ///
227    /// Returns `LockPoisoned` when listener storage lock is poisoned.
228    ///
229    /// # Examples
230    ///
231    /// ```no_run
232    /// use rusty_cat::api::{MeowClient, MeowConfig};
233    ///
234    /// let client = MeowClient::new(MeowConfig::default());
235    /// let id = client.register_global_progress_listener(|_| {})?;
236    /// let removed = client.unregister_global_progress_listener(id)?;
237    /// assert!(removed);
238    /// # Ok::<(), rusty_cat::api::MeowError>(())
239    /// ```
240    pub fn unregister_global_progress_listener(
241        &self,
242        id: GlobalProgressListenerId,
243    ) -> Result<bool, MeowError> {
244        let mut g = self.global_progress_listener.write().map_err(|e| {
245            MeowError::from_code(
246                InnerErrorCode::LockPoisoned,
247                format!("unregister global listener lock poisoned: {}", e),
248            )
249        })?;
250        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
251            g.remove(pos);
252            crate::meow_flow_log!(
253                "listener",
254                "unregister global listener success: id={:?}",
255                id
256            );
257            Ok(true)
258        } else {
259            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
260            Ok(false)
261        }
262    }
263
264    /// Removes all registered global progress listeners.
265    ///
266    /// # Errors
267    ///
268    /// Returns `LockPoisoned` when listener storage lock is poisoned.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// use rusty_cat::api::{MeowClient, MeowConfig};
274    ///
275    /// let client = MeowClient::new(MeowConfig::default());
276    /// client.clear_global_listener()?;
277    /// # Ok::<(), rusty_cat::api::MeowError>(())
278    /// ```
279    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
280        crate::meow_flow_log!("listener", "clear all global listeners");
281        self.global_progress_listener
282            .write()
283            .map_err(|e| {
284                MeowError::from_code(
285                    InnerErrorCode::LockPoisoned,
286                    format!("clear global listeners lock poisoned: {}", e),
287                )
288            })?
289            .clear();
290        Ok(())
291    }
292
293    /// Sets or clears the global debug log listener.
294    ///
295    /// - Pass `Some(listener)` to set/replace.
296    /// - Pass `None` to clear.
297    ///
298    /// This affects all `MeowClient` instances in the current process.
299    ///
300    /// # Errors
301    ///
302    /// Returns [`DebugLogListenerError`] when the internal global listener lock
303    /// is poisoned.
304    ///
305    /// # Examples
306    ///
307    /// ```no_run
308    /// use std::sync::Arc;
309    /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
310    ///
311    /// let client = MeowClient::new(MeowConfig::default());
312    /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
313    ///     println!("{log}");
314    /// })))?;
315    ///
316    /// // Clear listener when no longer needed.
317    /// client.set_debug_log_listener(None)?;
318    /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
319    /// ```
320    pub fn set_debug_log_listener(
321        &self,
322        listener: Option<DebugLogListener>,
323    ) -> Result<(), DebugLogListenerError> {
324        set_debug_log_listener(listener)
325    }
326}
327
328impl MeowClient {
329    /// Enqueues a transfer task and returns its [`TaskId`].
330    ///
331    /// The actual upload/download execution is dispatched to an internal
332    /// worker system thread. This method only performs lightweight validation
333    /// and submission, so it does not block the caller thread waiting for full
334    /// transfer completion.
335    ///
336    /// `enqueue` is also the recovery entrypoint after process restart. If the
337    /// application was killed during a previous upload/download, restart your
338    /// process and call `enqueue` again to resume that transfer workflow.
339    ///
340    /// # Parameters
341    ///
342    /// - `task`: Built by upload/download task builders.
343    /// - `progress_cb`: Per-task callback invoked with transfer progress.
344    /// - `complete_cb`: Optional callback fired once when task reaches
345    ///   [`crate::transfer_status::TransferStatus::Complete`]. The second
346    ///   argument is provider-defined payload returned by upload protocol
347    ///   `complete_upload`; download tasks usually receive `None`.
348    ///
349    /// # Usage rules
350    ///
351    /// - `task` must be non-empty (required path/name/url and valid upload size).
352    /// - Callback should be lightweight and non-blocking.
353    /// - Store returned task ID for subsequent task control operations.
354    /// - `enqueue` is asynchronous task submission, not synchronous transfer.
355    /// - For restart recovery, re-enqueue the same logical task (same
356    ///   upload/download target and compatible checkpoint context) so the
357    ///   runtime can continue from existing local/remote progress.
358    ///
359    /// # Errors
360    ///
361    /// Returns:
362    /// - `ClientClosed` if the client was closed.
363    /// - `ParameterEmpty` if the task is invalid/empty.
364    /// - Any runtime initialization or enqueue errors from the executor.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use reqwest::Method;
370    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
371    ///
372    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
373    /// let client = MeowClient::new(MeowConfig::default());
374    /// let task = DownloadPounceBuilder::new(
375    ///     "example.bin",
376    ///     "./downloads/example.bin",
377    ///     1024 * 1024,
378    ///     "https://example.com/example.bin",
379    ///     Method::GET,
380    /// )
381    /// .build();
382    ///
383    /// let task_id = client
384    ///     .enqueue(
385    ///         task,
386    ///         |record| {
387    ///             println!("status={:?} progress={:.2}", record.status(), record.progress());
388    ///         },
389    ///         Some(|task_id, payload| {
390    ///             println!("task {task_id} completed, payload={payload:?}");
391    ///         }),
392    ///     )
393    ///     .await?;
394    /// println!("enqueued task: {task_id}");
395    /// # Ok(())
396    /// # }
397    /// ```
398    pub async fn enqueue<PCB,CCB>(
399        &self,
400        task: PounceTask,
401        progress_cb: PCB,
402        complete_cb: Option<CCB>,
403    ) -> Result<TaskId, MeowError>
404    where
405        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
406        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
407    {
408        self.ensure_open()?;
409        if task.is_empty() {
410            crate::meow_flow_log!("enqueue", "reject empty task");
411            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
412        }
413
414        crate::meow_flow_log!("enqueue", "task={:?}", task);
415
416        let progress: ProgressCb = Arc::new(progress_cb);
417        let complete: Option<CompleteCb> = complete_cb.map(|cb| Arc::new(cb) as CompleteCb);
418        let callbacks = TaskCallbacks::new(Some(progress), complete);
419
420        let (def_up, def_down) = default_breakpoint_arcs();
421        let inner = InnerTask::from_pounce(
422            task,
423            self.config.breakpoint_download_http().clone(),
424            self.config.http_client_ref().cloned(),
425            def_up,
426            def_down,
427        )
428        .await?;
429
430        let task_id = self.get_exec()?.enqueue(inner, callbacks)?;
431        crate::meow_flow_log!("enqueue", "enqueue success: task_id={:?}", task_id);
432        Ok(task_id)
433    }
434
435    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
436    //     todo!(arman) -
437    // }
438
439    /// Pauses a running or pending task by ID.
440    ///
441    /// This API sends a control command to the internal scheduler worker
442    /// thread. It does not execute transfer pause logic on the caller thread.
443    ///
444    /// # Usage rules
445    ///
446    /// Call this with a valid task ID returned by [`Self::enqueue`].
447    ///
448    /// # Errors
449    ///
450    /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
451    ///
452    /// # Examples
453    ///
454    /// ```no_run
455    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
456    ///
457    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
458    /// let client = MeowClient::new(MeowConfig::default());
459    /// client.pause(task_id).await?;
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
464        self.ensure_open()?;
465        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
466        self.get_exec()?.pause(task_id).await
467    }
468
469    /// Resumes a previously paused task.
470    ///
471    /// The same [`TaskId`] continues to identify the task after resume.
472    /// The resume command is forwarded to the internal scheduler worker
473    /// thread, so caller thread is not responsible for running transfer logic.
474    ///
475    /// # Errors
476    ///
477    /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
478    ///
479    /// # Examples
480    ///
481    /// ```no_run
482    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
483    ///
484    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
485    /// let client = MeowClient::new(MeowConfig::default());
486    /// client.resume(task_id).await?;
487    /// # Ok(())
488    /// # }
489    /// ```
490    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
491        self.ensure_open()?;
492        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
493        self.get_exec()?.resume(task_id).await
494    }
495
496    /// Cancels a task by ID.
497    ///
498    /// Cancellation is requested through the internal scheduler worker thread.
499    /// Transfer cancellation execution happens in background runtime workers.
500    ///
501    /// # Usage rules
502    ///
503    /// Cancellation is best-effort; protocol-specific cleanup may run.
504    ///
505    /// # Errors
506    ///
507    /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
508    ///
509    /// # Examples
510    ///
511    /// ```no_run
512    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
513    ///
514    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
515    /// let client = MeowClient::new(MeowConfig::default());
516    /// client.cancel(task_id).await?;
517    /// # Ok(())
518    /// # }
519    /// ```
520    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
521        self.ensure_open()?;
522        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
523        self.get_exec()?.cancel(task_id).await
524    }
525
526    /// Returns a snapshot of queue and active transfer groups.
527    ///
528    /// Useful for diagnostics and external monitoring dashboards.
529    /// Snapshot collection is coordinated by internal scheduler worker state.
530    ///
531    /// # Errors
532    ///
533    /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
534    /// snapshot retrieval errors.
535    ///
536    /// # Examples
537    ///
538    /// ```no_run
539    /// use rusty_cat::api::{MeowClient, MeowConfig};
540    ///
541    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
542    /// let client = MeowClient::new(MeowConfig::default());
543    /// let snap = client.snapshot().await?;
544    /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
545    /// # Ok(())
546    /// # }
547    /// ```
548    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
549        self.ensure_open()?;
550        crate::meow_flow_log!("client_api", "snapshot called");
551        self.get_exec()?.snapshot().await
552    }
553
554    /// Closes this client and its underlying executor.
555    ///
556    /// After a successful close:
557    /// - New task operations are rejected.
558    /// - Existing runtime resources are released.
559    ///
560    /// # Idempotency
561    ///
562    /// Calling `close` more than once returns `ClientClosed`.
563    ///
564    /// # Retry behavior
565    ///
566    /// If executor close fails, the closed flag is rolled back so caller can
567    /// retry close.
568    ///
569    /// # Errors
570    ///
571    /// Returns `ClientClosed` when already closed, or underlying executor close
572    /// errors when shutdown is not completed.
573    ///
574    /// # Examples
575    ///
576    /// ```no_run
577    /// use rusty_cat::api::{MeowClient, MeowConfig};
578    ///
579    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
580    /// let client = MeowClient::new(MeowConfig::default());
581    /// client.close().await?;
582    /// # Ok(())
583    /// # }
584    /// ```
585    pub async fn close(&self) -> Result<(), MeowError> {
586        if self
587            .closed
588            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
589            .is_err()
590        {
591            crate::meow_flow_log!("client_api", "close rejected: already closed");
592            return Err(MeowError::from_code_str(
593                InnerErrorCode::ClientClosed,
594                "meow client is already closed",
595            ));
596        }
597        if let Some(exec) = self.executor.get() {
598            crate::meow_flow_log!("client_api", "close forwarding to executor");
599            if let Err(e) = exec.close().await {
600                // Roll back closed flag so caller can retry close.
601                self.closed.store(false, Ordering::SeqCst);
602                return Err(e);
603            }
604            Ok(())
605        } else {
606            crate::meow_flow_log!("client_api", "close with no executor initialized");
607            Ok(())
608        }
609    }
610
611    /// Returns whether this client is currently closed.
612    ///
613    /// # Examples
614    ///
615    /// ```no_run
616    /// use rusty_cat::api::{MeowClient, MeowConfig};
617    ///
618    /// # async fn run() {
619    /// let client = MeowClient::new(MeowConfig::default());
620    /// let _closed = client.is_closed().await;
621    /// # }
622    /// ```
623    pub async fn is_closed(&self) -> bool {
624        self.closed.load(Ordering::SeqCst)
625    }
626}