Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};
3
4use tokio::sync::oneshot;
5
6use crate::dflt::default_http_transfer::{
7    build_internal_client, default_breakpoint_arcs, DefaultHttpTransfer,
8};
9use crate::error::{InnerErrorCode, MeowError};
10use crate::file_transfer_record::FileTransferRecord;
11use crate::ids::{GlobalProgressListenerId, TaskId};
12use crate::inner::executor::Executor;
13use crate::inner::inner_task::InnerTask;
14use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
15use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
16use crate::meow_config::MeowConfig;
17use crate::pounce_task::PounceTask;
18use crate::transfer_snapshot::TransferSnapshot;
19use crate::transfer_status::TransferStatus;
20
21/// Callback type for globally observing task progress events.
22///
23/// The callback is invoked from runtime worker context. Keep callback logic
24/// fast and non-blocking to avoid delaying event processing.
25pub type GlobalProgressListener = ProgressCb;
26
27/// Main entry point of the `rusty-cat` SDK.
28///
29/// `MeowClient` owns runtime state and provides high-level operations:
30/// enqueue, pause, resume, cancel, snapshot, and close.
31///
32/// # Usage pattern
33///
34/// 1. Create [`MeowConfig`].
35/// 2. Construct `MeowClient::new(config)`.
36/// 3. Build tasks with upload/download builders.
37/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
38/// 5. Control task lifecycle with pause/resume/cancel.
39/// 6. Call [`Self::close`] during shutdown.
40///
41/// # Lifecycle contract: you **must** call [`Self::close`]
42///
43/// The background scheduler runs on a dedicated [`std::thread`] that drives
44/// its own Tokio runtime. The clean shutdown protocol is an explicit
45/// `close().await` command which:
46///
47/// - cancels in-flight transfers,
48/// - flushes `Paused` status events to user callbacks for every known group,
49/// - drains already submitted callback jobs,
50/// - joins the scheduler thread and lets the runtime drop.
51///
52/// Forgetting to call `close` leaves the scheduler thread alive until all
53/// command senders are dropped (which does happen when `MeowClient` is
54/// dropped, but only as a fallback). When that fallback path runs, the
55/// guarantees above do **not** hold: callers may miss terminal status
56/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
57/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
58/// impossible to debug from the outside.
59///
60/// To help surface this misuse the internal executor implements a
61/// **best-effort [`Drop`]** that, when `close` was never called:
62///
63/// - emits a `Warn`-level log via the debug log listener (tag
64///   `"executor_drop"`),
65/// - performs a non-blocking `try_send` of a final `Close` command so the
66///   worker still has a chance to drain its state,
67/// - then drops the command sender, causing the worker loop to exit on its
68///   own.
69///
70/// This is a safety net, **not** a substitute for calling `close`. Treat
71/// `close().await` as a mandatory step in your shutdown sequence.
72///
73/// # Sharing across tasks / threads
74///
75/// `MeowClient` **intentionally does not implement [`Clone`]**.
76///
77/// The client owns a lazily-initialized [`Executor`] (a single background
78/// worker loop plus its task table, scheduler state and shutdown flag). A
79/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
80/// it was initialized, letting different clones each spin up their **own**
81/// executor on first use. The result would be:
82///
83/// - multiple independent task tables (tasks enqueued via one clone are
84///   invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
85/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
86///   [`MeowConfig::max_download_concurrency`]) silently multiplied by the
87///   number of clones;
88/// - [`Self::close`] only shutting down one of the worker loops, leaking the
89///   rest.
90///
91/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
92/// and clone the `Arc` instead:
93///
94/// ```no_run
95/// use std::sync::Arc;
96/// use rusty_cat::api::{MeowClient, MeowConfig};
97///
98/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
99/// let client_for_task = Arc::clone(&client);
100/// tokio::spawn(async move {
101///     let _ = client_for_task; // use the shared client here
102/// });
103/// ```
104/// Outcome of a task that reached [`TransferStatus::Complete`].
105///
106/// Returned by [`MeowClient::enqueue_and_wait`].
107#[derive(Debug, Clone)]
108pub struct TaskOutcome {
109    /// Task identifier returned by the underlying scheduler.
110    pub task_id: TaskId,
111    /// Provider-defined payload returned by upload protocol's `complete_upload`.
112    /// Download tasks usually receive `None`.
113    pub payload: Option<String>,
114}
115
116pub struct MeowClient {
117    /// Lazily initialized task executor.
118    ///
119    /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
120    /// there is exactly one owner of this `OnceLock`. Share the whole client
121    /// via `Arc<MeowClient>` when multi-owner access is needed.
122    executor: OnceLock<Executor>,
123    executor_init: StdMutex<()>,
124    /// Immutable runtime configuration.
125    config: MeowConfig,
126    /// Global listeners receiving progress records for all tasks.
127    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
128    /// Global closed flag. Once set to `true`, task control APIs reject calls.
129    closed: Arc<AtomicBool>,
130}
131
132impl std::fmt::Debug for MeowClient {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.debug_struct("MeowClient")
135            .field("config", &self.config)
136            .field("global_progress_listener", &"..")
137            .finish()
138    }
139}
140
141impl MeowClient {
142    /// Creates a new client with the provided configuration.
143    ///
144    /// The internal executor is initialized lazily on first task operation.
145    ///
146    /// # Examples
147    ///
148    /// ```no_run
149    /// use rusty_cat::api::{MeowClient, MeowConfig};
150    ///
151    /// let config = MeowConfig::default();
152    /// let client = MeowClient::new(config);
153    /// let _ = client;
154    /// ```
155    pub fn new(config: MeowConfig) -> Self {
156        MeowClient {
157            executor: Default::default(),
158            executor_init: StdMutex::new(()),
159            config,
160            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
161            closed: Arc::new(AtomicBool::new(false)),
162        }
163    }
164
165    /// Returns a `reqwest::Client` aligned with this client's configuration.
166    ///
167    /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
168    ///   injected a custom client, this
169    ///   returns its clone.
170    /// - Otherwise, this builds a new client from `http_timeout` and
171    ///   `tcp_keepalive`.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
176    /// fails.
177    ///
178    /// # Examples
179    ///
180    /// ```no_run
181    /// use rusty_cat::api::{MeowClient, MeowConfig};
182    ///
183    /// let client = MeowClient::new(MeowConfig::default());
184    /// let http = client.http_client()?;
185    /// let _ = http;
186    /// # Ok::<(), rusty_cat::api::MeowError>(())
187    /// ```
188    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
189        if let Some(c) = self.config.http_client_ref() {
190            return Ok(c.clone());
191        }
192        // Build through the shared helper so this client carries the exact same
193        // transport policy (connect timeout + idle connection pool) as the
194        // transfer backend, rather than reqwest's bare defaults.
195        build_internal_client(self.config.http_timeout(), self.config.tcp_keepalive())
196            .map_err(|e| {
197                MeowError::from_source(
198                    InnerErrorCode::HttpClientBuildFailed,
199                    format!(
200                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
201                        self.config.http_timeout(),
202                        self.config.tcp_keepalive()
203                    ),
204                    e,
205                )
206            })
207    }
208
209    fn get_exec(&self) -> Result<&Executor, MeowError> {
210        if let Some(exec) = self.executor.get() {
211            crate::meow_flow_log!("executor", "reuse existing executor");
212            return Ok(exec);
213        }
214
215        let _init_guard = self.executor_init.lock().map_err(|e| {
216            MeowError::from_code(
217                InnerErrorCode::LockPoisoned,
218                format!("executor init lock poisoned: {}", e),
219            )
220        })?;
221        if let Some(exec) = self.executor.get() {
222            crate::meow_flow_log!(
223                "executor",
224                "reuse executor initialized by concurrent caller"
225            );
226            return Ok(exec);
227        }
228
229        let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
230            self.config.http_timeout(),
231            self.config.tcp_keepalive(),
232        )?;
233        crate::meow_flow_log!(
234            "executor",
235            "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
236            self.config.http_timeout(),
237            self.config.tcp_keepalive()
238        );
239        let exec = Executor::new(
240            self.config.clone(),
241            Arc::new(default_http_transfer),
242            self.global_progress_listener.clone(),
243        )?;
244        self.executor.set(exec).map_err(|_| {
245            crate::meow_flow_log!(
246                "executor",
247                "executor init race failed while holding init lock"
248            );
249            MeowError::from_code_str(
250                InnerErrorCode::RuntimeCreationFailedError,
251                "executor init race failed",
252            )
253        })?;
254        self.executor.get().ok_or_else(|| {
255            crate::meow_flow_log!(
256                "executor",
257                "executor init race failed after set; returning RuntimeCreationFailedError"
258            );
259            MeowError::from_code_str(
260                InnerErrorCode::RuntimeCreationFailedError,
261                "executor init race failed",
262            )
263        })
264    }
265
266    /// Ensures the client is still open.
267    ///
268    /// Returns `ClientClosed` if [`Self::close`] was called successfully.
269    fn ensure_open(&self) -> Result<(), MeowError> {
270        if self.closed.load(Ordering::SeqCst) {
271            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
272            Err(MeowError::from_code_str(
273                InnerErrorCode::ClientClosed,
274                "meow client is already closed",
275            ))
276        } else {
277            Ok(())
278        }
279    }
280
281    /// Registers a global progress listener for all tasks.
282    ///
283    /// # Parameters
284    ///
285    /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
286    ///
287    /// # Returns
288    ///
289    /// Returns a listener ID used by
290    /// [`Self::unregister_global_progress_listener`].
291    ///
292    /// # Usage rules
293    ///
294    /// Keep callback execution short and panic-free. A heavy callback can slow
295    /// down global event delivery.
296    ///
297    /// # Errors
298    ///
299    /// Returns `LockPoisoned` when listener storage lock is poisoned.
300    ///
301    /// # Examples
302    ///
303    /// ```no_run
304    /// use rusty_cat::api::{MeowClient, MeowConfig};
305    ///
306    /// let client = MeowClient::new(MeowConfig::default());
307    /// let listener_id = client.register_global_progress_listener(|record| {
308    ///     println!("task={} progress={:.2}", record.task_id(), record.progress());
309    /// })?;
310    /// let _ = listener_id;
311    /// # Ok::<(), rusty_cat::api::MeowError>(())
312    /// ```
313    pub fn register_global_progress_listener<F>(
314        &self,
315        listener: F,
316    ) -> Result<GlobalProgressListenerId, MeowError>
317    where
318        F: Fn(FileTransferRecord) + Send + Sync + 'static,
319    {
320        let id = GlobalProgressListenerId::new();
321        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
322        let mut guard = self.global_progress_listener.write().map_err(|e| {
323            MeowError::from_code(
324                InnerErrorCode::LockPoisoned,
325                format!("register global listener lock poisoned: {}", e),
326            )
327        })?;
328        guard.push((id, Arc::new(listener)));
329        Ok(id)
330    }
331
332    /// Unregisters one previously registered global progress listener.
333    ///
334    /// Returns `Ok(false)` when the ID does not exist.
335    ///
336    /// # Errors
337    ///
338    /// Returns `LockPoisoned` when listener storage lock is poisoned.
339    ///
340    /// # Examples
341    ///
342    /// ```no_run
343    /// use rusty_cat::api::{MeowClient, MeowConfig};
344    ///
345    /// let client = MeowClient::new(MeowConfig::default());
346    /// let id = client.register_global_progress_listener(|_| {})?;
347    /// let removed = client.unregister_global_progress_listener(id)?;
348    /// assert!(removed);
349    /// # Ok::<(), rusty_cat::api::MeowError>(())
350    /// ```
351    pub fn unregister_global_progress_listener(
352        &self,
353        id: GlobalProgressListenerId,
354    ) -> Result<bool, MeowError> {
355        let mut g = self.global_progress_listener.write().map_err(|e| {
356            MeowError::from_code(
357                InnerErrorCode::LockPoisoned,
358                format!("unregister global listener lock poisoned: {}", e),
359            )
360        })?;
361        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
362            g.remove(pos);
363            crate::meow_flow_log!(
364                "listener",
365                "unregister global listener success: id={:?}",
366                id
367            );
368            Ok(true)
369        } else {
370            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
371            Ok(false)
372        }
373    }
374
375    /// Removes all registered global progress listeners.
376    ///
377    /// # Errors
378    ///
379    /// Returns `LockPoisoned` when listener storage lock is poisoned.
380    ///
381    /// # Examples
382    ///
383    /// ```no_run
384    /// use rusty_cat::api::{MeowClient, MeowConfig};
385    ///
386    /// let client = MeowClient::new(MeowConfig::default());
387    /// client.clear_global_listener()?;
388    /// # Ok::<(), rusty_cat::api::MeowError>(())
389    /// ```
390    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
391        crate::meow_flow_log!("listener", "clear all global listeners");
392        self.global_progress_listener
393            .write()
394            .map_err(|e| {
395                MeowError::from_code(
396                    InnerErrorCode::LockPoisoned,
397                    format!("clear global listeners lock poisoned: {}", e),
398                )
399            })?
400            .clear();
401        Ok(())
402    }
403
404    /// Sets or clears the global debug log listener.
405    ///
406    /// - Pass `Some(listener)` to set/replace.
407    /// - Pass `None` to clear.
408    ///
409    /// This affects all `MeowClient` instances in the current process.
410    ///
411    /// # Errors
412    ///
413    /// Returns [`DebugLogListenerError`] when the internal global listener lock
414    /// is poisoned.
415    ///
416    /// # Examples
417    ///
418    /// ```no_run
419    /// use std::sync::Arc;
420    /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
421    ///
422    /// let client = MeowClient::new(MeowConfig::default());
423    /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
424    ///     println!("{log}");
425    /// })))?;
426    ///
427    /// // Clear listener when no longer needed.
428    /// client.set_debug_log_listener(None)?;
429    /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
430    /// ```
431    pub fn set_debug_log_listener(
432        &self,
433        listener: Option<DebugLogListener>,
434    ) -> Result<(), DebugLogListenerError> {
435        set_debug_log_listener(listener)
436    }
437}
438
439impl MeowClient {
440    /// Submits a transfer task to the internal scheduler and returns its
441    /// [`TaskId`].
442    ///
443    /// The actual upload/download execution is dispatched to an internal
444    /// worker system thread. This method only performs lightweight validation
445    /// and submission, so it does not block the caller thread waiting for full
446    /// transfer completion.
447    ///
448    /// `try_enqueue` is also the recovery entrypoint after process restart.
449    /// If the application was killed during a previous upload/download,
450    /// restart your process and call `try_enqueue` again to resume that
451    /// transfer workflow.
452    ///
453    /// # Back-pressure semantics (why the `try_` prefix)
454    ///
455    /// Internally this method uses
456    /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
457    /// to the scheduler worker, **not** `send().await`. That means:
458    ///
459    /// - The `await` point in this function is used for task normalization
460    ///   (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
461    ///   for waiting on command-queue capacity.
462    /// - If the command queue is momentarily full (bursty enqueue under
463    ///   [`MeowConfig::command_queue_capacity`]), this method returns an
464    ///   immediate `CommandSendFailed` error instead of suspending the
465    ///   caller until a slot frees up.
466    /// - Other control APIs ([`Self::pause`], [`Self::resume`],
467    ///   [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
468    ///   wait for queue capacity. Only enqueue is fail-fast.
469    ///
470    /// Callers that want to batch-enqueue under burst load should either:
471    ///
472    /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
473    /// 2. retry on `CommandSendFailed` with their own back-off, or
474    /// 3. rate-limit enqueue calls on the caller side.
475    ///
476    /// The name explicitly carries `try_` so this fail-fast behavior is
477    /// visible at the call site. If a fully-awaiting variant is introduced
478    /// later it should be named `enqueue` (without the `try_` prefix).
479    ///
480    /// # Parameters
481    ///
482    /// - `task`: Built by upload/download task builders.
483    /// - `progress_cb`: Per-task callback invoked with transfer progress.
484    /// - `complete_cb`: Callback fired once when task reaches
485    ///   [`crate::transfer_status::TransferStatus::Complete`]. The second
486    ///   argument is provider-defined payload returned by upload protocol
487    ///   `complete_upload`; download tasks usually receive `None`.
488    ///
489    /// # Usage rules
490    ///
491    /// - `task` must be non-empty (required path/name/url and valid upload size).
492    /// - Callback should be lightweight and non-blocking.
493    /// - Store returned task ID for subsequent task control operations.
494    /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
495    /// - For restart recovery, re-enqueue the same logical task (same
496    ///   upload/download target and compatible checkpoint context) so the
497    ///   runtime can continue from existing local/remote progress.
498    ///
499    /// # Errors
500    ///
501    /// Returns:
502    /// - `ClientClosed` if the client was closed.
503    /// - `ParameterEmpty` if the task is invalid/empty.
504    /// - `CommandSendFailed` if the scheduler command queue is full at the
505    ///   moment of submission (see back-pressure semantics above).
506    /// - Any runtime initialization errors from the executor.
507    ///
508    /// # Examples
509    ///
510    /// ```no_run
511    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
512    ///
513    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
514    /// let client = MeowClient::new(MeowConfig::default());
515    /// let task = DownloadPounceBuilder::new(
516    ///     "example.bin",
517    ///     "./downloads/example.bin",
518    ///     1024 * 1024,
519    ///     "https://example.com/example.bin",
520    /// )
521    /// .build();
522    ///
523    /// let task_id = client
524    ///     .try_enqueue(
525    ///         task,
526    ///         |record| {
527    ///             println!("status={:?} progress={:.2}", record.status(), record.progress());
528    ///         },
529    ///         |task_id, payload| {
530    ///             println!("task {task_id} completed, payload={payload:?}");
531    ///         },
532    ///     )
533    ///     .await?;
534    /// println!("enqueued task: {task_id}");
535    /// # Ok(())
536    /// # }
537    /// ```
538    pub async fn try_enqueue<PCB, CCB>(
539        &self,
540        task: PounceTask,
541        progress_cb: PCB,
542        complete_cb: CCB,
543    ) -> Result<TaskId, MeowError>
544    where
545        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
546        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
547    {
548        self.ensure_open()?;
549        if task.is_empty() {
550            crate::meow_flow_log!("try_enqueue", "reject empty task");
551            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
552        }
553
554        crate::meow_flow_log!("try_enqueue", "task={:?}", task);
555
556        let progress: ProgressCb = Arc::new(progress_cb);
557        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
558        let callbacks = TaskCallbacks::new(Some(progress), complete);
559
560        let (def_up, def_down) = default_breakpoint_arcs();
561        let inner = InnerTask::from_pounce(
562            task,
563            self.config.breakpoint_download_http().clone(),
564            self.config.http_client_ref().cloned(),
565            def_up,
566            def_down,
567        )
568        .await?;
569
570        let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
571        crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
572        Ok(task_id)
573    }
574
575    /// Imports a transfer task in the **paused** state without scheduling it.
576    ///
577    /// This is the restart/restore entry point for callers that persist their
578    /// own transfer records: rebuild a [`PounceTask`] from your database, import
579    /// it here, and the task is registered into the scheduler as
580    /// [`TransferStatus::Paused`] **without** queueing, so it performs **zero
581    /// network or file I/O** until you explicitly start it.
582    ///
583    /// To start a previously imported task, call [`Self::resume`] with the
584    /// returned [`TaskId`]. A typical "restore N, start a user-selected subset"
585    /// flow imports every task with `try_enqueue_paused` and then calls
586    /// [`Self::resume`] only for the ids the user chose; the rest stay paused.
587    ///
588    /// # Difference from [`Self::try_enqueue`]
589    ///
590    /// - `try_enqueue` schedules immediately (the task becomes `Pending` and may
591    ///   start transferring as soon as a concurrency slot is free).
592    /// - `try_enqueue_paused` registers the task as `Paused` and never queues it
593    ///   until [`Self::resume`] is called.
594    ///
595    /// Back-pressure is identical: this method uses
596    /// [`tokio::sync::mpsc::Sender::try_send`] and fails fast with
597    /// `CommandSendFailed` if the command queue is full (see
598    /// [`Self::try_enqueue`] for the rationale behind the `try_` prefix).
599    ///
600    /// # Resume semantics after import
601    ///
602    /// When the imported task is later resumed, the resume point is recomputed
603    /// by the executor, **not** taken from any value passed here:
604    ///
605    /// - **Download**: resumes from the on-disk partial file length, so the
606    ///   partial file must still exist at the task's `file_path`.
607    /// - **Upload**: resumes from the server-reported `next_byte` during the
608    ///   upload `prepare` stage.
609    ///
610    /// # Progress reporting while paused
611    ///
612    /// The single `Paused` [`FileTransferRecord`] emitted on import reports
613    /// progress `0.0` because no `prepare` has run yet. Render the imported
614    /// task's real progress from your own persisted record; the SDK corrects it
615    /// after the first resume.
616    ///
617    /// # Parameters
618    ///
619    /// Same as [`Self::try_enqueue`]: a built `task`, a per-task `progress_cb`,
620    /// and a `complete_cb` fired once on terminal `Complete`.
621    ///
622    /// # Errors
623    ///
624    /// - `ClientClosed` if the client was closed.
625    /// - `ParameterEmpty` if the task is invalid/empty.
626    /// - `CommandSendFailed` if the scheduler command queue is full.
627    /// - Any runtime initialization errors from the executor.
628    ///
629    /// # Examples
630    ///
631    /// ```no_run
632    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
633    ///
634    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
635    /// let client = MeowClient::new(MeowConfig::default());
636    /// let task = DownloadPounceBuilder::new(
637    ///     "example.bin",
638    ///     "./downloads/example.bin",
639    ///     1024 * 1024,
640    ///     "https://example.com/example.bin",
641    /// )
642    /// .build();
643    ///
644    /// // Import without starting it (no HTTP request, no file open).
645    /// let task_id = client
646    ///     .try_enqueue_paused(task, |_record| {}, |_id, _payload| {})
647    ///     .await?;
648    ///
649    /// // Later, when the user chooses to start this one:
650    /// client.resume(task_id).await?;
651    /// # Ok(())
652    /// # }
653    /// ```
654    pub async fn try_enqueue_paused<PCB, CCB>(
655        &self,
656        task: PounceTask,
657        progress_cb: PCB,
658        complete_cb: CCB,
659    ) -> Result<TaskId, MeowError>
660    where
661        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
662        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
663    {
664        self.ensure_open()?;
665        if task.is_empty() {
666            crate::meow_flow_log!("try_enqueue_paused", "reject empty task");
667            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
668        }
669
670        crate::meow_flow_log!("try_enqueue_paused", "task={:?}", task);
671
672        let progress: ProgressCb = Arc::new(progress_cb);
673        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
674        let callbacks = TaskCallbacks::new(Some(progress), complete);
675
676        let (def_up, def_down) = default_breakpoint_arcs();
677        let inner = InnerTask::from_pounce(
678            task,
679            self.config.breakpoint_download_http().clone(),
680            self.config.http_client_ref().cloned(),
681            def_up,
682            def_down,
683        )
684        .await?;
685
686        let task_id = self.get_exec()?.try_enqueue_paused(inner, callbacks)?;
687        crate::meow_flow_log!(
688            "try_enqueue_paused",
689            "try_enqueue_paused success: task_id={:?}",
690            task_id
691        );
692        Ok(task_id)
693    }
694
695    /// Enqueues a task and `await`s until it reaches a terminal status.
696    ///
697    /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
698    /// do not have to write the channel + double-callback + single-send-guard
699    /// boilerplate themselves.
700    ///
701    /// # Returns
702    ///
703    /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
704    /// - `Err(MeowError)` carrying the underlying failure for
705    ///   [`TransferStatus::Failed`].
706    /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
707    ///   [`TransferStatus::Canceled`].
708    ///
709    /// # Progress
710    ///
711    /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
712    /// the per-task progress callback in [`Self::try_enqueue`].
713    ///
714    /// # Cancellation / timeout
715    ///
716    /// Dropping the returned future does **not** cancel the underlying transfer;
717    /// the task continues running in the executor. Use [`Self::cancel`] with
718    /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
719    /// abort an in-flight transfer.
720    ///
721    /// To cap wall-clock waiting time, wrap this future:
722    ///
723    /// ```ignore
724    /// let outcome = tokio::time::timeout(
725    ///     std::time::Duration::from_secs(60),
726    ///     client.enqueue_and_wait(task, |_| {}),
727    /// )
728    /// .await??;
729    /// ```
730    ///
731    /// # Errors
732    ///
733    /// In addition to the terminal-status errors above, propagates any error
734    /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
735    /// `CommandSendFailed`).
736    ///
737    /// # Examples
738    ///
739    /// ```no_run
740    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
741    ///
742    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
743    /// let client = MeowClient::new(MeowConfig::default());
744    /// let task = DownloadPounceBuilder::new(
745    ///     "example.bin",
746    ///     "./downloads/example.bin",
747    ///     1024 * 1024,
748    ///     "https://example.com/example.bin",
749    /// )
750    /// .build();
751    ///
752    /// let outcome = client
753    ///     .enqueue_and_wait(task, |record| {
754    ///         println!(
755    ///             "task={} progress={:.2}",
756    ///             record.task_id(),
757    ///             record.progress()
758    ///         );
759    ///     })
760    ///     .await?;
761    /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
762    /// # Ok(())
763    /// # }
764    /// ```
765    pub async fn enqueue_and_wait<PCB>(
766        &self,
767        task: PounceTask,
768        progress_cb: PCB,
769    ) -> Result<TaskOutcome, MeowError>
770    where
771        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
772    {
773        type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
774        let (tx, rx) = oneshot::channel::<TerminalMsg>();
775        let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
776            Arc::new(StdMutex::new(Some(tx)));
777        let progress_slot = Arc::clone(&tx_slot);
778        let complete_slot = tx_slot;
779
780        self.try_enqueue(
781            task,
782            move |record: FileTransferRecord| {
783                progress_cb(record.clone());
784                match record.status() {
785                    TransferStatus::Failed(err) => {
786                        send_terminal_once(&progress_slot, Err(err.clone()));
787                    }
788                    TransferStatus::Canceled => {
789                        send_terminal_once(
790                            &progress_slot,
791                            Err(MeowError::from_code_str(
792                                InnerErrorCode::TaskCanceled,
793                                "task was canceled",
794                            )),
795                        );
796                    }
797                    _ => {}
798                }
799            },
800            move |task_id, payload| {
801                send_terminal_once(&complete_slot, Ok((task_id, payload)));
802            },
803        )
804        .await?;
805
806        match rx.await {
807            Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
808            Ok(Err(err)) => Err(err),
809            Err(_) => Err(MeowError::from_code_str(
810                InnerErrorCode::CommandResponseFailed,
811                "transfer terminal channel closed without notification",
812            )),
813        }
814    }
815
816    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
817    //     todo!(arman) -
818    // }
819
820    /// Pauses a running or pending task by ID.
821    ///
822    /// This API sends a control command to the internal scheduler worker
823    /// thread. It does not execute transfer pause logic on the caller thread.
824    ///
825    /// # Usage rules
826    ///
827    /// Call this with a valid task ID returned by [`Self::enqueue`].
828    ///
829    /// # Errors
830    ///
831    /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
832    ///
833    /// # Examples
834    ///
835    /// ```no_run
836    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
837    ///
838    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
839    /// let client = MeowClient::new(MeowConfig::default());
840    /// client.pause(task_id).await?;
841    /// # Ok(())
842    /// # }
843    /// ```
844    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
845        self.ensure_open()?;
846        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
847        self.get_exec()?.pause(task_id).await
848    }
849
850    /// Resumes a previously paused task.
851    ///
852    /// The same [`TaskId`] continues to identify the task after resume.
853    /// The resume command is forwarded to the internal scheduler worker
854    /// thread, so caller thread is not responsible for running transfer logic.
855    ///
856    /// # Errors
857    ///
858    /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
859    ///
860    /// # Examples
861    ///
862    /// ```no_run
863    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
864    ///
865    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
866    /// let client = MeowClient::new(MeowConfig::default());
867    /// client.resume(task_id).await?;
868    /// # Ok(())
869    /// # }
870    /// ```
871    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
872        self.ensure_open()?;
873        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
874        self.get_exec()?.resume(task_id).await
875    }
876
877    /// Cancels a task by ID.
878    ///
879    /// Cancellation is requested through the internal scheduler worker thread.
880    /// Transfer cancellation execution happens in background runtime workers.
881    ///
882    /// # Usage rules
883    ///
884    /// Cancellation is best-effort; protocol-specific cleanup may run.
885    ///
886    /// # Errors
887    ///
888    /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
889    ///
890    /// # Examples
891    ///
892    /// ```no_run
893    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
894    ///
895    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
896    /// let client = MeowClient::new(MeowConfig::default());
897    /// client.cancel(task_id).await?;
898    /// # Ok(())
899    /// # }
900    /// ```
901    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
902        self.ensure_open()?;
903        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
904        self.get_exec()?.cancel(task_id).await
905    }
906
907    /// Returns a snapshot of queue and active transfer groups.
908    ///
909    /// Useful for diagnostics and external monitoring dashboards.
910    /// Snapshot collection is coordinated by internal scheduler worker state.
911    ///
912    /// # Errors
913    ///
914    /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
915    /// snapshot retrieval errors.
916    ///
917    /// # Examples
918    ///
919    /// ```no_run
920    /// use rusty_cat::api::{MeowClient, MeowConfig};
921    ///
922    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
923    /// let client = MeowClient::new(MeowConfig::default());
924    /// let snap = client.snapshot().await?;
925    /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
926    /// # Ok(())
927    /// # }
928    /// ```
929    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
930        self.ensure_open()?;
931        crate::meow_flow_log!("client_api", "snapshot called");
932        self.get_exec()?.snapshot().await
933    }
934
935    /// Closes this client and its underlying executor.
936    ///
937    /// `close` is the terminal lifecycle operation for a `MeowClient`. After
938    /// it succeeds, this client stays permanently closed; submit more work by
939    /// constructing a new `MeowClient` and enqueueing tasks there.
940    ///
941    /// After a successful close:
942    ///
943    /// - New task and control operations on this client are rejected with
944    ///   `ClientClosed`.
945    /// - All known unfinished task groups (queued, paused, or active) receive
946    ///   a `Paused` progress notification through their task callback and all
947    ///   registered global listeners.
948    /// - In-flight transfers are cancelled and the scheduler state is cleared.
949    /// - Already submitted callback jobs are drained before returning.
950    /// - The internal scheduler thread is joined, which drops its Tokio
951    ///   runtime and releases SDK-owned background execution resources.
952    ///
953    /// `Paused` is used for shutdown notifications rather than `Canceled` so
954    /// callers can recreate a client later and re-enqueue the same logical
955    /// transfer when they want to resume from available breakpoint state.
956    ///
957    /// # Idempotency
958    ///
959    /// Calling `close` more than once returns `ClientClosed`.
960    ///
961    /// # Retry behavior
962    ///
963    /// If executor close fails, the closed flag is rolled back so caller can
964    /// retry close. A successful close is not restartable.
965    ///
966    /// # Errors
967    ///
968    /// Returns `ClientClosed` when already closed, or underlying executor close
969    /// errors when shutdown is not completed.
970    ///
971    /// # Examples
972    ///
973    /// ```no_run
974    /// use rusty_cat::api::{MeowClient, MeowConfig};
975    ///
976    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
977    /// let client = MeowClient::new(MeowConfig::default());
978    /// client.close().await?;
979    /// # Ok(())
980    /// # }
981    /// ```
982    pub async fn close(&self) -> Result<(), MeowError> {
983        if self
984            .closed
985            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
986            .is_err()
987        {
988            crate::meow_flow_log!("client_api", "close rejected: already closed");
989            return Err(MeowError::from_code_str(
990                InnerErrorCode::ClientClosed,
991                "meow client is already closed",
992            ));
993        }
994        if let Some(exec) = self.executor.get() {
995            crate::meow_flow_log!("client_api", "close forwarding to executor");
996            if let Err(e) = exec.close().await {
997                // Roll back closed flag so caller can retry close.
998                self.closed.store(false, Ordering::SeqCst);
999                return Err(e);
1000            }
1001            Ok(())
1002        } else {
1003            crate::meow_flow_log!("client_api", "close with no executor initialized");
1004            Ok(())
1005        }
1006    }
1007
1008    /// Returns whether this client is currently closed.
1009    ///
1010    /// # Examples
1011    ///
1012    /// ```no_run
1013    /// use rusty_cat::api::{MeowClient, MeowConfig};
1014    ///
1015    /// let client = MeowClient::new(MeowConfig::default());
1016    /// let _closed = client.is_closed();
1017    /// ```
1018    pub fn is_closed(&self) -> bool {
1019        self.closed.load(Ordering::SeqCst)
1020    }
1021}
1022
1023fn send_terminal_once(
1024    slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
1025    msg: Result<(TaskId, Option<String>), MeowError>,
1026) {
1027    if let Ok(mut guard) = slot.lock() {
1028        if let Some(sender) = guard.take() {
1029            let _ = sender.send(msg);
1030        }
1031    }
1032}