Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};
3
4use tokio::sync::oneshot;
5
6use crate::dflt::default_http_transfer::{default_breakpoint_arcs, DefaultHttpTransfer};
7use crate::error::{InnerErrorCode, MeowError};
8use crate::file_transfer_record::FileTransferRecord;
9use crate::ids::{GlobalProgressListenerId, TaskId};
10use crate::inner::executor::Executor;
11use crate::inner::inner_task::InnerTask;
12use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
13use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
14use crate::meow_config::MeowConfig;
15use crate::pounce_task::PounceTask;
16use crate::transfer_snapshot::TransferSnapshot;
17use crate::transfer_status::TransferStatus;
18
19/// Callback type for globally observing task progress events.
20///
21/// The callback is invoked from runtime worker context. Keep callback logic
22/// fast and non-blocking to avoid delaying event processing.
23pub type GlobalProgressListener = ProgressCb;
24
25/// Main entry point of the `rusty-cat` SDK.
26///
27/// `MeowClient` owns runtime state and provides high-level operations:
28/// enqueue, pause, resume, cancel, snapshot, and close.
29///
30/// # Usage pattern
31///
32/// 1. Create [`MeowConfig`].
33/// 2. Construct `MeowClient::new(config)`.
34/// 3. Build tasks with upload/download builders.
35/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
36/// 5. Control task lifecycle with pause/resume/cancel.
37/// 6. Call [`Self::close`] during shutdown.
38///
39/// # Lifecycle contract: you **must** call [`Self::close`]
40///
41/// The background scheduler runs on a dedicated [`std::thread`] that drives
42/// its own Tokio runtime. The clean shutdown protocol is an explicit
43/// `close().await` command which:
44///
45/// - cancels in-flight transfers,
46/// - flushes `Paused` status events to user callbacks for every known group,
47/// - drains already submitted callback jobs,
48/// - joins the scheduler thread and lets the runtime drop.
49///
50/// Forgetting to call `close` leaves the scheduler thread alive until all
51/// command senders are dropped (which does happen when `MeowClient` is
52/// dropped, but only as a fallback). When that fallback path runs, the
53/// guarantees above do **not** hold: callers may miss terminal status
54/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
55/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
56/// impossible to debug from the outside.
57///
58/// To help surface this misuse the internal executor implements a
59/// **best-effort [`Drop`]** that, when `close` was never called:
60///
61/// - emits a `Warn`-level log via the debug log listener (tag
62///   `"executor_drop"`),
63/// - performs a non-blocking `try_send` of a final `Close` command so the
64///   worker still has a chance to drain its state,
65/// - then drops the command sender, causing the worker loop to exit on its
66///   own.
67///
68/// This is a safety net, **not** a substitute for calling `close`. Treat
69/// `close().await` as a mandatory step in your shutdown sequence.
70///
71/// # Sharing across tasks / threads
72///
73/// `MeowClient` **intentionally does not implement [`Clone`]**.
74///
75/// The client owns a lazily-initialized [`Executor`] (a single background
76/// worker loop plus its task table, scheduler state and shutdown flag). A
77/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
78/// it was initialized, letting different clones each spin up their **own**
79/// executor on first use. The result would be:
80///
81/// - multiple independent task tables (tasks enqueued via one clone are
82///   invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
83/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
84///   [`MeowConfig::max_download_concurrency`]) silently multiplied by the
85///   number of clones;
86/// - [`Self::close`] only shutting down one of the worker loops, leaking the
87///   rest.
88///
89/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
90/// and clone the `Arc` instead:
91///
92/// ```no_run
93/// use std::sync::Arc;
94/// use rusty_cat::api::{MeowClient, MeowConfig};
95///
96/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
97/// let client_for_task = Arc::clone(&client);
98/// tokio::spawn(async move {
99///     let _ = client_for_task; // use the shared client here
100/// });
101/// ```
102/// Outcome of a task that reached [`TransferStatus::Complete`].
103///
104/// Returned by [`MeowClient::enqueue_and_wait`].
105#[derive(Debug, Clone)]
106pub struct TaskOutcome {
107    /// Task identifier returned by the underlying scheduler.
108    pub task_id: TaskId,
109    /// Provider-defined payload returned by upload protocol's `complete_upload`.
110    /// Download tasks usually receive `None`.
111    pub payload: Option<String>,
112}
113
114pub struct MeowClient {
115    /// Lazily initialized task executor.
116    ///
117    /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
118    /// there is exactly one owner of this `OnceLock`. Share the whole client
119    /// via `Arc<MeowClient>` when multi-owner access is needed.
120    executor: OnceLock<Executor>,
121    executor_init: StdMutex<()>,
122    /// Immutable runtime configuration.
123    config: MeowConfig,
124    /// Global listeners receiving progress records for all tasks.
125    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
126    /// Global closed flag. Once set to `true`, task control APIs reject calls.
127    closed: Arc<AtomicBool>,
128}
129
130impl std::fmt::Debug for MeowClient {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("MeowClient")
133            .field("config", &self.config)
134            .field("global_progress_listener", &"..")
135            .finish()
136    }
137}
138
139impl MeowClient {
140    /// Creates a new client with the provided configuration.
141    ///
142    /// The internal executor is initialized lazily on first task operation.
143    ///
144    /// # Examples
145    ///
146    /// ```no_run
147    /// use rusty_cat::api::{MeowClient, MeowConfig};
148    ///
149    /// let config = MeowConfig::default();
150    /// let client = MeowClient::new(config);
151    /// let _ = client;
152    /// ```
153    pub fn new(config: MeowConfig) -> Self {
154        MeowClient {
155            executor: Default::default(),
156            executor_init: StdMutex::new(()),
157            config,
158            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
159            closed: Arc::new(AtomicBool::new(false)),
160        }
161    }
162
163    /// Returns a `reqwest::Client` aligned with this client's configuration.
164    ///
165    /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
166    ///   injected a custom client, this
167    ///   returns its clone.
168    /// - Otherwise, this builds a new client from `http_timeout` and
169    ///   `tcp_keepalive`.
170    ///
171    /// # Errors
172    ///
173    /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
174    /// fails.
175    ///
176    /// # Examples
177    ///
178    /// ```no_run
179    /// use rusty_cat::api::{MeowClient, MeowConfig};
180    ///
181    /// let client = MeowClient::new(MeowConfig::default());
182    /// let http = client.http_client()?;
183    /// let _ = http;
184    /// # Ok::<(), rusty_cat::api::MeowError>(())
185    /// ```
186    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
187        if let Some(c) = self.config.http_client_ref() {
188            return Ok(c.clone());
189        }
190        reqwest::Client::builder()
191            .timeout(self.config.http_timeout())
192            .tcp_keepalive(self.config.tcp_keepalive())
193            .build()
194            .map_err(|e| {
195                MeowError::from_source(
196                    InnerErrorCode::HttpClientBuildFailed,
197                    format!(
198                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
199                        self.config.http_timeout(),
200                        self.config.tcp_keepalive()
201                    ),
202                    e,
203                )
204            })
205    }
206
207    fn get_exec(&self) -> Result<&Executor, MeowError> {
208        if let Some(exec) = self.executor.get() {
209            crate::meow_flow_log!("executor", "reuse existing executor");
210            return Ok(exec);
211        }
212
213        let _init_guard = self.executor_init.lock().map_err(|e| {
214            MeowError::from_code(
215                InnerErrorCode::LockPoisoned,
216                format!("executor init lock poisoned: {}", e),
217            )
218        })?;
219        if let Some(exec) = self.executor.get() {
220            crate::meow_flow_log!(
221                "executor",
222                "reuse executor initialized by concurrent caller"
223            );
224            return Ok(exec);
225        }
226
227        let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
228            self.config.http_timeout(),
229            self.config.tcp_keepalive(),
230        )?;
231        crate::meow_flow_log!(
232            "executor",
233            "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
234            self.config.http_timeout(),
235            self.config.tcp_keepalive()
236        );
237        let exec = Executor::new(
238            self.config.clone(),
239            Arc::new(default_http_transfer),
240            self.global_progress_listener.clone(),
241        )?;
242        self.executor.set(exec).map_err(|_| {
243            crate::meow_flow_log!(
244                "executor",
245                "executor init race failed while holding init lock"
246            );
247            MeowError::from_code_str(
248                InnerErrorCode::RuntimeCreationFailedError,
249                "executor init race failed",
250            )
251        })?;
252        self.executor.get().ok_or_else(|| {
253            crate::meow_flow_log!(
254                "executor",
255                "executor init race failed after set; returning RuntimeCreationFailedError"
256            );
257            MeowError::from_code_str(
258                InnerErrorCode::RuntimeCreationFailedError,
259                "executor init race failed",
260            )
261        })
262    }
263
264    /// Ensures the client is still open.
265    ///
266    /// Returns `ClientClosed` if [`Self::close`] was called successfully.
267    fn ensure_open(&self) -> Result<(), MeowError> {
268        if self.closed.load(Ordering::SeqCst) {
269            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
270            Err(MeowError::from_code_str(
271                InnerErrorCode::ClientClosed,
272                "meow client is already closed",
273            ))
274        } else {
275            Ok(())
276        }
277    }
278
279    /// Registers a global progress listener for all tasks.
280    ///
281    /// # Parameters
282    ///
283    /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
284    ///
285    /// # Returns
286    ///
287    /// Returns a listener ID used by
288    /// [`Self::unregister_global_progress_listener`].
289    ///
290    /// # Usage rules
291    ///
292    /// Keep callback execution short and panic-free. A heavy callback can slow
293    /// down global event delivery.
294    ///
295    /// # Errors
296    ///
297    /// Returns `LockPoisoned` when listener storage lock is poisoned.
298    ///
299    /// # Examples
300    ///
301    /// ```no_run
302    /// use rusty_cat::api::{MeowClient, MeowConfig};
303    ///
304    /// let client = MeowClient::new(MeowConfig::default());
305    /// let listener_id = client.register_global_progress_listener(|record| {
306    ///     println!("task={} progress={:.2}", record.task_id(), record.progress());
307    /// })?;
308    /// let _ = listener_id;
309    /// # Ok::<(), rusty_cat::api::MeowError>(())
310    /// ```
311    pub fn register_global_progress_listener<F>(
312        &self,
313        listener: F,
314    ) -> Result<GlobalProgressListenerId, MeowError>
315    where
316        F: Fn(FileTransferRecord) + Send + Sync + 'static,
317    {
318        let id = GlobalProgressListenerId::new();
319        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
320        let mut guard = self.global_progress_listener.write().map_err(|e| {
321            MeowError::from_code(
322                InnerErrorCode::LockPoisoned,
323                format!("register global listener lock poisoned: {}", e),
324            )
325        })?;
326        guard.push((id, Arc::new(listener)));
327        Ok(id)
328    }
329
330    /// Unregisters one previously registered global progress listener.
331    ///
332    /// Returns `Ok(false)` when the ID does not exist.
333    ///
334    /// # Errors
335    ///
336    /// Returns `LockPoisoned` when listener storage lock is poisoned.
337    ///
338    /// # Examples
339    ///
340    /// ```no_run
341    /// use rusty_cat::api::{MeowClient, MeowConfig};
342    ///
343    /// let client = MeowClient::new(MeowConfig::default());
344    /// let id = client.register_global_progress_listener(|_| {})?;
345    /// let removed = client.unregister_global_progress_listener(id)?;
346    /// assert!(removed);
347    /// # Ok::<(), rusty_cat::api::MeowError>(())
348    /// ```
349    pub fn unregister_global_progress_listener(
350        &self,
351        id: GlobalProgressListenerId,
352    ) -> Result<bool, MeowError> {
353        let mut g = self.global_progress_listener.write().map_err(|e| {
354            MeowError::from_code(
355                InnerErrorCode::LockPoisoned,
356                format!("unregister global listener lock poisoned: {}", e),
357            )
358        })?;
359        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
360            g.remove(pos);
361            crate::meow_flow_log!(
362                "listener",
363                "unregister global listener success: id={:?}",
364                id
365            );
366            Ok(true)
367        } else {
368            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
369            Ok(false)
370        }
371    }
372
373    /// Removes all registered global progress listeners.
374    ///
375    /// # Errors
376    ///
377    /// Returns `LockPoisoned` when listener storage lock is poisoned.
378    ///
379    /// # Examples
380    ///
381    /// ```no_run
382    /// use rusty_cat::api::{MeowClient, MeowConfig};
383    ///
384    /// let client = MeowClient::new(MeowConfig::default());
385    /// client.clear_global_listener()?;
386    /// # Ok::<(), rusty_cat::api::MeowError>(())
387    /// ```
388    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
389        crate::meow_flow_log!("listener", "clear all global listeners");
390        self.global_progress_listener
391            .write()
392            .map_err(|e| {
393                MeowError::from_code(
394                    InnerErrorCode::LockPoisoned,
395                    format!("clear global listeners lock poisoned: {}", e),
396                )
397            })?
398            .clear();
399        Ok(())
400    }
401
402    /// Sets or clears the global debug log listener.
403    ///
404    /// - Pass `Some(listener)` to set/replace.
405    /// - Pass `None` to clear.
406    ///
407    /// This affects all `MeowClient` instances in the current process.
408    ///
409    /// # Errors
410    ///
411    /// Returns [`DebugLogListenerError`] when the internal global listener lock
412    /// is poisoned.
413    ///
414    /// # Examples
415    ///
416    /// ```no_run
417    /// use std::sync::Arc;
418    /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
419    ///
420    /// let client = MeowClient::new(MeowConfig::default());
421    /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
422    ///     println!("{log}");
423    /// })))?;
424    ///
425    /// // Clear listener when no longer needed.
426    /// client.set_debug_log_listener(None)?;
427    /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
428    /// ```
429    pub fn set_debug_log_listener(
430        &self,
431        listener: Option<DebugLogListener>,
432    ) -> Result<(), DebugLogListenerError> {
433        set_debug_log_listener(listener)
434    }
435}
436
437impl MeowClient {
438    /// Submits a transfer task to the internal scheduler and returns its
439    /// [`TaskId`].
440    ///
441    /// The actual upload/download execution is dispatched to an internal
442    /// worker system thread. This method only performs lightweight validation
443    /// and submission, so it does not block the caller thread waiting for full
444    /// transfer completion.
445    ///
446    /// `try_enqueue` is also the recovery entrypoint after process restart.
447    /// If the application was killed during a previous upload/download,
448    /// restart your process and call `try_enqueue` again to resume that
449    /// transfer workflow.
450    ///
451    /// # Back-pressure semantics (why the `try_` prefix)
452    ///
453    /// Internally this method uses
454    /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
455    /// to the scheduler worker, **not** `send().await`. That means:
456    ///
457    /// - The `await` point in this function is used for task normalization
458    ///   (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
459    ///   for waiting on command-queue capacity.
460    /// - If the command queue is momentarily full (bursty enqueue under
461    ///   [`MeowConfig::command_queue_capacity`]), this method returns an
462    ///   immediate `CommandSendFailed` error instead of suspending the
463    ///   caller until a slot frees up.
464    /// - Other control APIs ([`Self::pause`], [`Self::resume`],
465    ///   [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
466    ///   wait for queue capacity. Only enqueue is fail-fast.
467    ///
468    /// Callers that want to batch-enqueue under burst load should either:
469    ///
470    /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
471    /// 2. retry on `CommandSendFailed` with their own back-off, or
472    /// 3. rate-limit enqueue calls on the caller side.
473    ///
474    /// The name explicitly carries `try_` so this fail-fast behavior is
475    /// visible at the call site. If a fully-awaiting variant is introduced
476    /// later it should be named `enqueue` (without the `try_` prefix).
477    ///
478    /// # Parameters
479    ///
480    /// - `task`: Built by upload/download task builders.
481    /// - `progress_cb`: Per-task callback invoked with transfer progress.
482    /// - `complete_cb`: Callback fired once when task reaches
483    ///   [`crate::transfer_status::TransferStatus::Complete`]. The second
484    ///   argument is provider-defined payload returned by upload protocol
485    ///   `complete_upload`; download tasks usually receive `None`.
486    ///
487    /// # Usage rules
488    ///
489    /// - `task` must be non-empty (required path/name/url and valid upload size).
490    /// - Callback should be lightweight and non-blocking.
491    /// - Store returned task ID for subsequent task control operations.
492    /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
493    /// - For restart recovery, re-enqueue the same logical task (same
494    ///   upload/download target and compatible checkpoint context) so the
495    ///   runtime can continue from existing local/remote progress.
496    ///
497    /// # Errors
498    ///
499    /// Returns:
500    /// - `ClientClosed` if the client was closed.
501    /// - `ParameterEmpty` if the task is invalid/empty.
502    /// - `CommandSendFailed` if the scheduler command queue is full at the
503    ///   moment of submission (see back-pressure semantics above).
504    /// - Any runtime initialization errors from the executor.
505    ///
506    /// # Examples
507    ///
508    /// ```no_run
509    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
510    ///
511    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
512    /// let client = MeowClient::new(MeowConfig::default());
513    /// let task = DownloadPounceBuilder::new(
514    ///     "example.bin",
515    ///     "./downloads/example.bin",
516    ///     1024 * 1024,
517    ///     "https://example.com/example.bin",
518    /// )
519    /// .build();
520    ///
521    /// let task_id = client
522    ///     .try_enqueue(
523    ///         task,
524    ///         |record| {
525    ///             println!("status={:?} progress={:.2}", record.status(), record.progress());
526    ///         },
527    ///         |task_id, payload| {
528    ///             println!("task {task_id} completed, payload={payload:?}");
529    ///         },
530    ///     )
531    ///     .await?;
532    /// println!("enqueued task: {task_id}");
533    /// # Ok(())
534    /// # }
535    /// ```
536    pub async fn try_enqueue<PCB, CCB>(
537        &self,
538        task: PounceTask,
539        progress_cb: PCB,
540        complete_cb: CCB,
541    ) -> Result<TaskId, MeowError>
542    where
543        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
544        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
545    {
546        self.ensure_open()?;
547        if task.is_empty() {
548            crate::meow_flow_log!("try_enqueue", "reject empty task");
549            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
550        }
551
552        crate::meow_flow_log!("try_enqueue", "task={:?}", task);
553
554        let progress: ProgressCb = Arc::new(progress_cb);
555        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
556        let callbacks = TaskCallbacks::new(Some(progress), complete);
557
558        let (def_up, def_down) = default_breakpoint_arcs();
559        let inner = InnerTask::from_pounce(
560            task,
561            self.config.breakpoint_download_http().clone(),
562            self.config.http_client_ref().cloned(),
563            def_up,
564            def_down,
565        )
566        .await?;
567
568        let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
569        crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
570        Ok(task_id)
571    }
572
573    /// Imports a transfer task in the **paused** state without scheduling it.
574    ///
575    /// This is the restart/restore entry point for callers that persist their
576    /// own transfer records: rebuild a [`PounceTask`] from your database, import
577    /// it here, and the task is registered into the scheduler as
578    /// [`TransferStatus::Paused`] **without** queueing, so it performs **zero
579    /// network or file I/O** until you explicitly start it.
580    ///
581    /// To start a previously imported task, call [`Self::resume`] with the
582    /// returned [`TaskId`]. A typical "restore N, start a user-selected subset"
583    /// flow imports every task with `try_enqueue_paused` and then calls
584    /// [`Self::resume`] only for the ids the user chose; the rest stay paused.
585    ///
586    /// # Difference from [`Self::try_enqueue`]
587    ///
588    /// - `try_enqueue` schedules immediately (the task becomes `Pending` and may
589    ///   start transferring as soon as a concurrency slot is free).
590    /// - `try_enqueue_paused` registers the task as `Paused` and never queues it
591    ///   until [`Self::resume`] is called.
592    ///
593    /// Back-pressure is identical: this method uses
594    /// [`tokio::sync::mpsc::Sender::try_send`] and fails fast with
595    /// `CommandSendFailed` if the command queue is full (see
596    /// [`Self::try_enqueue`] for the rationale behind the `try_` prefix).
597    ///
598    /// # Resume semantics after import
599    ///
600    /// When the imported task is later resumed, the resume point is recomputed
601    /// by the executor, **not** taken from any value passed here:
602    ///
603    /// - **Download**: resumes from the on-disk partial file length, so the
604    ///   partial file must still exist at the task's `file_path`.
605    /// - **Upload**: resumes from the server-reported `next_byte` during the
606    ///   upload `prepare` stage.
607    ///
608    /// # Progress reporting while paused
609    ///
610    /// The single `Paused` [`FileTransferRecord`] emitted on import reports
611    /// progress `0.0` because no `prepare` has run yet. Render the imported
612    /// task's real progress from your own persisted record; the SDK corrects it
613    /// after the first resume.
614    ///
615    /// # Parameters
616    ///
617    /// Same as [`Self::try_enqueue`]: a built `task`, a per-task `progress_cb`,
618    /// and a `complete_cb` fired once on terminal `Complete`.
619    ///
620    /// # Errors
621    ///
622    /// - `ClientClosed` if the client was closed.
623    /// - `ParameterEmpty` if the task is invalid/empty.
624    /// - `CommandSendFailed` if the scheduler command queue is full.
625    /// - Any runtime initialization errors from the executor.
626    ///
627    /// # Examples
628    ///
629    /// ```no_run
630    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
631    ///
632    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
633    /// let client = MeowClient::new(MeowConfig::default());
634    /// let task = DownloadPounceBuilder::new(
635    ///     "example.bin",
636    ///     "./downloads/example.bin",
637    ///     1024 * 1024,
638    ///     "https://example.com/example.bin",
639    /// )
640    /// .build();
641    ///
642    /// // Import without starting it (no HTTP request, no file open).
643    /// let task_id = client
644    ///     .try_enqueue_paused(task, |_record| {}, |_id, _payload| {})
645    ///     .await?;
646    ///
647    /// // Later, when the user chooses to start this one:
648    /// client.resume(task_id).await?;
649    /// # Ok(())
650    /// # }
651    /// ```
652    pub async fn try_enqueue_paused<PCB, CCB>(
653        &self,
654        task: PounceTask,
655        progress_cb: PCB,
656        complete_cb: CCB,
657    ) -> Result<TaskId, MeowError>
658    where
659        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
660        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
661    {
662        self.ensure_open()?;
663        if task.is_empty() {
664            crate::meow_flow_log!("try_enqueue_paused", "reject empty task");
665            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
666        }
667
668        crate::meow_flow_log!("try_enqueue_paused", "task={:?}", task);
669
670        let progress: ProgressCb = Arc::new(progress_cb);
671        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
672        let callbacks = TaskCallbacks::new(Some(progress), complete);
673
674        let (def_up, def_down) = default_breakpoint_arcs();
675        let inner = InnerTask::from_pounce(
676            task,
677            self.config.breakpoint_download_http().clone(),
678            self.config.http_client_ref().cloned(),
679            def_up,
680            def_down,
681        )
682        .await?;
683
684        let task_id = self.get_exec()?.try_enqueue_paused(inner, callbacks)?;
685        crate::meow_flow_log!(
686            "try_enqueue_paused",
687            "try_enqueue_paused success: task_id={:?}",
688            task_id
689        );
690        Ok(task_id)
691    }
692
693    /// Enqueues a task and `await`s until it reaches a terminal status.
694    ///
695    /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
696    /// do not have to write the channel + double-callback + single-send-guard
697    /// boilerplate themselves.
698    ///
699    /// # Returns
700    ///
701    /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
702    /// - `Err(MeowError)` carrying the underlying failure for
703    ///   [`TransferStatus::Failed`].
704    /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
705    ///   [`TransferStatus::Canceled`].
706    ///
707    /// # Progress
708    ///
709    /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
710    /// the per-task progress callback in [`Self::try_enqueue`].
711    ///
712    /// # Cancellation / timeout
713    ///
714    /// Dropping the returned future does **not** cancel the underlying transfer;
715    /// the task continues running in the executor. Use [`Self::cancel`] with
716    /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
717    /// abort an in-flight transfer.
718    ///
719    /// To cap wall-clock waiting time, wrap this future:
720    ///
721    /// ```ignore
722    /// let outcome = tokio::time::timeout(
723    ///     std::time::Duration::from_secs(60),
724    ///     client.enqueue_and_wait(task, |_| {}),
725    /// )
726    /// .await??;
727    /// ```
728    ///
729    /// # Errors
730    ///
731    /// In addition to the terminal-status errors above, propagates any error
732    /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
733    /// `CommandSendFailed`).
734    ///
735    /// # Examples
736    ///
737    /// ```no_run
738    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
739    ///
740    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
741    /// let client = MeowClient::new(MeowConfig::default());
742    /// let task = DownloadPounceBuilder::new(
743    ///     "example.bin",
744    ///     "./downloads/example.bin",
745    ///     1024 * 1024,
746    ///     "https://example.com/example.bin",
747    /// )
748    /// .build();
749    ///
750    /// let outcome = client
751    ///     .enqueue_and_wait(task, |record| {
752    ///         println!(
753    ///             "task={} progress={:.2}",
754    ///             record.task_id(),
755    ///             record.progress()
756    ///         );
757    ///     })
758    ///     .await?;
759    /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
760    /// # Ok(())
761    /// # }
762    /// ```
763    pub async fn enqueue_and_wait<PCB>(
764        &self,
765        task: PounceTask,
766        progress_cb: PCB,
767    ) -> Result<TaskOutcome, MeowError>
768    where
769        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
770    {
771        type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
772        let (tx, rx) = oneshot::channel::<TerminalMsg>();
773        let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
774            Arc::new(StdMutex::new(Some(tx)));
775        let progress_slot = Arc::clone(&tx_slot);
776        let complete_slot = tx_slot;
777
778        self.try_enqueue(
779            task,
780            move |record: FileTransferRecord| {
781                progress_cb(record.clone());
782                match record.status() {
783                    TransferStatus::Failed(err) => {
784                        send_terminal_once(&progress_slot, Err(err.clone()));
785                    }
786                    TransferStatus::Canceled => {
787                        send_terminal_once(
788                            &progress_slot,
789                            Err(MeowError::from_code_str(
790                                InnerErrorCode::TaskCanceled,
791                                "task was canceled",
792                            )),
793                        );
794                    }
795                    _ => {}
796                }
797            },
798            move |task_id, payload| {
799                send_terminal_once(&complete_slot, Ok((task_id, payload)));
800            },
801        )
802        .await?;
803
804        match rx.await {
805            Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
806            Ok(Err(err)) => Err(err),
807            Err(_) => Err(MeowError::from_code_str(
808                InnerErrorCode::CommandResponseFailed,
809                "transfer terminal channel closed without notification",
810            )),
811        }
812    }
813
814    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
815    //     todo!(arman) -
816    // }
817
818    /// Pauses a running or pending task by ID.
819    ///
820    /// This API sends a control command to the internal scheduler worker
821    /// thread. It does not execute transfer pause logic on the caller thread.
822    ///
823    /// # Usage rules
824    ///
825    /// Call this with a valid task ID returned by [`Self::enqueue`].
826    ///
827    /// # Errors
828    ///
829    /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
830    ///
831    /// # Examples
832    ///
833    /// ```no_run
834    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
835    ///
836    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
837    /// let client = MeowClient::new(MeowConfig::default());
838    /// client.pause(task_id).await?;
839    /// # Ok(())
840    /// # }
841    /// ```
842    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
843        self.ensure_open()?;
844        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
845        self.get_exec()?.pause(task_id).await
846    }
847
848    /// Resumes a previously paused task.
849    ///
850    /// The same [`TaskId`] continues to identify the task after resume.
851    /// The resume command is forwarded to the internal scheduler worker
852    /// thread, so caller thread is not responsible for running transfer logic.
853    ///
854    /// # Errors
855    ///
856    /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
857    ///
858    /// # Examples
859    ///
860    /// ```no_run
861    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
862    ///
863    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
864    /// let client = MeowClient::new(MeowConfig::default());
865    /// client.resume(task_id).await?;
866    /// # Ok(())
867    /// # }
868    /// ```
869    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
870        self.ensure_open()?;
871        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
872        self.get_exec()?.resume(task_id).await
873    }
874
875    /// Cancels a task by ID.
876    ///
877    /// Cancellation is requested through the internal scheduler worker thread.
878    /// Transfer cancellation execution happens in background runtime workers.
879    ///
880    /// # Usage rules
881    ///
882    /// Cancellation is best-effort; protocol-specific cleanup may run.
883    ///
884    /// # Errors
885    ///
886    /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
887    ///
888    /// # Examples
889    ///
890    /// ```no_run
891    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
892    ///
893    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
894    /// let client = MeowClient::new(MeowConfig::default());
895    /// client.cancel(task_id).await?;
896    /// # Ok(())
897    /// # }
898    /// ```
899    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
900        self.ensure_open()?;
901        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
902        self.get_exec()?.cancel(task_id).await
903    }
904
905    /// Returns a snapshot of queue and active transfer groups.
906    ///
907    /// Useful for diagnostics and external monitoring dashboards.
908    /// Snapshot collection is coordinated by internal scheduler worker state.
909    ///
910    /// # Errors
911    ///
912    /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
913    /// snapshot retrieval errors.
914    ///
915    /// # Examples
916    ///
917    /// ```no_run
918    /// use rusty_cat::api::{MeowClient, MeowConfig};
919    ///
920    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
921    /// let client = MeowClient::new(MeowConfig::default());
922    /// let snap = client.snapshot().await?;
923    /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
924    /// # Ok(())
925    /// # }
926    /// ```
927    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
928        self.ensure_open()?;
929        crate::meow_flow_log!("client_api", "snapshot called");
930        self.get_exec()?.snapshot().await
931    }
932
933    /// Closes this client and its underlying executor.
934    ///
935    /// `close` is the terminal lifecycle operation for a `MeowClient`. After
936    /// it succeeds, this client stays permanently closed; submit more work by
937    /// constructing a new `MeowClient` and enqueueing tasks there.
938    ///
939    /// After a successful close:
940    ///
941    /// - New task and control operations on this client are rejected with
942    ///   `ClientClosed`.
943    /// - All known unfinished task groups (queued, paused, or active) receive
944    ///   a `Paused` progress notification through their task callback and all
945    ///   registered global listeners.
946    /// - In-flight transfers are cancelled and the scheduler state is cleared.
947    /// - Already submitted callback jobs are drained before returning.
948    /// - The internal scheduler thread is joined, which drops its Tokio
949    ///   runtime and releases SDK-owned background execution resources.
950    ///
951    /// `Paused` is used for shutdown notifications rather than `Canceled` so
952    /// callers can recreate a client later and re-enqueue the same logical
953    /// transfer when they want to resume from available breakpoint state.
954    ///
955    /// # Idempotency
956    ///
957    /// Calling `close` more than once returns `ClientClosed`.
958    ///
959    /// # Retry behavior
960    ///
961    /// If executor close fails, the closed flag is rolled back so caller can
962    /// retry close. A successful close is not restartable.
963    ///
964    /// # Errors
965    ///
966    /// Returns `ClientClosed` when already closed, or underlying executor close
967    /// errors when shutdown is not completed.
968    ///
969    /// # Examples
970    ///
971    /// ```no_run
972    /// use rusty_cat::api::{MeowClient, MeowConfig};
973    ///
974    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
975    /// let client = MeowClient::new(MeowConfig::default());
976    /// client.close().await?;
977    /// # Ok(())
978    /// # }
979    /// ```
980    pub async fn close(&self) -> Result<(), MeowError> {
981        if self
982            .closed
983            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
984            .is_err()
985        {
986            crate::meow_flow_log!("client_api", "close rejected: already closed");
987            return Err(MeowError::from_code_str(
988                InnerErrorCode::ClientClosed,
989                "meow client is already closed",
990            ));
991        }
992        if let Some(exec) = self.executor.get() {
993            crate::meow_flow_log!("client_api", "close forwarding to executor");
994            if let Err(e) = exec.close().await {
995                // Roll back closed flag so caller can retry close.
996                self.closed.store(false, Ordering::SeqCst);
997                return Err(e);
998            }
999            Ok(())
1000        } else {
1001            crate::meow_flow_log!("client_api", "close with no executor initialized");
1002            Ok(())
1003        }
1004    }
1005
1006    /// Returns whether this client is currently closed.
1007    ///
1008    /// # Examples
1009    ///
1010    /// ```no_run
1011    /// use rusty_cat::api::{MeowClient, MeowConfig};
1012    ///
1013    /// let client = MeowClient::new(MeowConfig::default());
1014    /// let _closed = client.is_closed();
1015    /// ```
1016    pub fn is_closed(&self) -> bool {
1017        self.closed.load(Ordering::SeqCst)
1018    }
1019}
1020
1021fn send_terminal_once(
1022    slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
1023    msg: Result<(TaskId, Option<String>), MeowError>,
1024) {
1025    if let Ok(mut guard) = slot.lock() {
1026        if let Some(sender) = guard.take() {
1027            let _ = sender.send(msg);
1028        }
1029    }
1030}