Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};
3
4use tokio::sync::oneshot;
5
6use crate::dflt::default_http_transfer::{default_breakpoint_arcs, DefaultHttpTransfer};
7use crate::error::{InnerErrorCode, MeowError};
8use crate::file_transfer_record::FileTransferRecord;
9use crate::ids::{GlobalProgressListenerId, TaskId};
10use crate::inner::executor::Executor;
11use crate::inner::inner_task::InnerTask;
12use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
13use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
14use crate::meow_config::MeowConfig;
15use crate::pounce_task::PounceTask;
16use crate::transfer_snapshot::TransferSnapshot;
17use crate::transfer_status::TransferStatus;
18
19/// Callback type for globally observing task progress events.
20///
21/// The callback is invoked from runtime worker context. Keep callback logic
22/// fast and non-blocking to avoid delaying event processing.
23pub type GlobalProgressListener = ProgressCb;
24
25/// Main entry point of the `rusty-cat` SDK.
26///
27/// `MeowClient` owns runtime state and provides high-level operations:
28/// enqueue, pause, resume, cancel, snapshot, and close.
29///
30/// # Usage pattern
31///
32/// 1. Create [`MeowConfig`].
33/// 2. Construct `MeowClient::new(config)`.
34/// 3. Build tasks with upload/download builders.
35/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
36/// 5. Control task lifecycle with pause/resume/cancel.
37/// 6. Call [`Self::close`] during shutdown.
38///
39/// # Lifecycle contract: you **must** call [`Self::close`]
40///
41/// The background scheduler runs on a dedicated [`std::thread`] that drives
42/// its own Tokio runtime. The clean shutdown protocol is an explicit
43/// `close().await` command which:
44///
45/// - cancels in-flight transfers,
46/// - flushes `Paused` status events to user callbacks for every known group,
47/// - drains already submitted callback jobs,
48/// - joins the scheduler thread and lets the runtime drop.
49///
50/// Forgetting to call `close` leaves the scheduler thread alive until all
51/// command senders are dropped (which does happen when `MeowClient` is
52/// dropped, but only as a fallback). When that fallback path runs, the
53/// guarantees above do **not** hold: callers may miss terminal status
54/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
55/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
56/// impossible to debug from the outside.
57///
58/// To help surface this misuse the internal executor implements a
59/// **best-effort [`Drop`]** that, when `close` was never called:
60///
61/// - emits a `Warn`-level log via the debug log listener (tag
62///   `"executor_drop"`),
63/// - performs a non-blocking `try_send` of a final `Close` command so the
64///   worker still has a chance to drain its state,
65/// - then drops the command sender, causing the worker loop to exit on its
66///   own.
67///
68/// This is a safety net, **not** a substitute for calling `close`. Treat
69/// `close().await` as a mandatory step in your shutdown sequence.
70///
71/// # Sharing across tasks / threads
72///
73/// `MeowClient` **intentionally does not implement [`Clone`]**.
74///
75/// The client owns a lazily-initialized [`Executor`] (a single background
76/// worker loop plus its task table, scheduler state and shutdown flag). A
77/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
78/// it was initialized, letting different clones each spin up their **own**
79/// executor on first use. The result would be:
80///
81/// - multiple independent task tables (tasks enqueued via one clone are
82///   invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
83/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
84///   [`MeowConfig::max_download_concurrency`]) silently multiplied by the
85///   number of clones;
86/// - [`Self::close`] only shutting down one of the worker loops, leaking the
87///   rest.
88///
89/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
90/// and clone the `Arc` instead:
91///
92/// ```no_run
93/// use std::sync::Arc;
94/// use rusty_cat::api::{MeowClient, MeowConfig};
95///
96/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
97/// let client_for_task = Arc::clone(&client);
98/// tokio::spawn(async move {
99///     let _ = client_for_task; // use the shared client here
100/// });
101/// ```
102/// Outcome of a task that reached [`TransferStatus::Complete`].
103///
104/// Returned by [`MeowClient::enqueue_and_wait`].
105#[derive(Debug, Clone)]
106pub struct TaskOutcome {
107    /// Task identifier returned by the underlying scheduler.
108    pub task_id: TaskId,
109    /// Provider-defined payload returned by upload protocol's `complete_upload`.
110    /// Download tasks usually receive `None`.
111    pub payload: Option<String>,
112}
113
114pub struct MeowClient {
115    /// Lazily initialized task executor.
116    ///
117    /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
118    /// there is exactly one owner of this `OnceLock`. Share the whole client
119    /// via `Arc<MeowClient>` when multi-owner access is needed.
120    executor: OnceLock<Executor>,
121    /// Immutable runtime configuration.
122    config: MeowConfig,
123    /// Global listeners receiving progress records for all tasks.
124    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
125    /// Global closed flag. Once set to `true`, task control APIs reject calls.
126    closed: Arc<AtomicBool>,
127}
128
129impl std::fmt::Debug for MeowClient {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("MeowClient")
132            .field("config", &self.config)
133            .field("global_progress_listener", &"..")
134            .finish()
135    }
136}
137
138impl MeowClient {
139    /// Creates a new client with the provided configuration.
140    ///
141    /// The internal executor is initialized lazily on first task operation.
142    ///
143    /// # Examples
144    ///
145    /// ```no_run
146    /// use rusty_cat::api::{MeowClient, MeowConfig};
147    ///
148    /// let config = MeowConfig::default();
149    /// let client = MeowClient::new(config);
150    /// let _ = client;
151    /// ```
152    pub fn new(config: MeowConfig) -> Self {
153        MeowClient {
154            executor: Default::default(),
155            config,
156            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
157            closed: Arc::new(AtomicBool::new(false)),
158        }
159    }
160
161    /// Returns a `reqwest::Client` aligned with this client's configuration.
162    ///
163    /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
164    ///   injected a custom client, this
165    ///   returns its clone.
166    /// - Otherwise, this builds a new client from `http_timeout` and
167    ///   `tcp_keepalive`.
168    ///
169    /// # Errors
170    ///
171    /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
172    /// fails.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// use rusty_cat::api::{MeowClient, MeowConfig};
178    ///
179    /// let client = MeowClient::new(MeowConfig::default());
180    /// let http = client.http_client()?;
181    /// let _ = http;
182    /// # Ok::<(), rusty_cat::api::MeowError>(())
183    /// ```
184    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
185        if let Some(c) = self.config.http_client_ref() {
186            return Ok(c.clone());
187        }
188        reqwest::Client::builder()
189            .timeout(self.config.http_timeout())
190            .tcp_keepalive(self.config.tcp_keepalive())
191            .build()
192            .map_err(|e| {
193                MeowError::from_source(
194                    InnerErrorCode::HttpClientBuildFailed,
195                    format!(
196                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
197                        self.config.http_timeout(),
198                        self.config.tcp_keepalive()
199                    ),
200                    e,
201                )
202            })
203    }
204
205    fn get_exec(&self) -> Result<&Executor, MeowError> {
206        if let Some(exec) = self.executor.get() {
207            crate::meow_flow_log!("executor", "reuse existing executor");
208            return Ok(exec);
209        }
210        let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
211            self.config.http_timeout(),
212            self.config.tcp_keepalive(),
213        )?;
214        crate::meow_flow_log!(
215            "executor",
216            "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
217            self.config.http_timeout(),
218            self.config.tcp_keepalive()
219        );
220        let exec = Executor::new(
221            self.config.clone(),
222            Arc::new(default_http_transfer),
223            self.global_progress_listener.clone(),
224        )?;
225        let _ = self.executor.set(exec);
226        self.executor.get().ok_or_else(|| {
227            crate::meow_flow_log!(
228                "executor",
229                "executor init race failed after set; returning RuntimeCreationFailedError"
230            );
231            MeowError::from_code_str(
232                InnerErrorCode::RuntimeCreationFailedError,
233                "executor init race failed",
234            )
235        })
236    }
237
238    /// Ensures the client is still open.
239    ///
240    /// Returns `ClientClosed` if [`Self::close`] was called successfully.
241    fn ensure_open(&self) -> Result<(), MeowError> {
242        if self.closed.load(Ordering::SeqCst) {
243            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
244            Err(MeowError::from_code_str(
245                InnerErrorCode::ClientClosed,
246                "meow client is already closed",
247            ))
248        } else {
249            Ok(())
250        }
251    }
252
253    /// Registers a global progress listener for all tasks.
254    ///
255    /// # Parameters
256    ///
257    /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
258    ///
259    /// # Returns
260    ///
261    /// Returns a listener ID used by
262    /// [`Self::unregister_global_progress_listener`].
263    ///
264    /// # Usage rules
265    ///
266    /// Keep callback execution short and panic-free. A heavy callback can slow
267    /// down global event delivery.
268    ///
269    /// # Errors
270    ///
271    /// Returns `LockPoisoned` when listener storage lock is poisoned.
272    ///
273    /// # Examples
274    ///
275    /// ```no_run
276    /// use rusty_cat::api::{MeowClient, MeowConfig};
277    ///
278    /// let client = MeowClient::new(MeowConfig::default());
279    /// let listener_id = client.register_global_progress_listener(|record| {
280    ///     println!("task={} progress={:.2}", record.task_id(), record.progress());
281    /// })?;
282    /// let _ = listener_id;
283    /// # Ok::<(), rusty_cat::api::MeowError>(())
284    /// ```
285    pub fn register_global_progress_listener<F>(
286        &self,
287        listener: F,
288    ) -> Result<GlobalProgressListenerId, MeowError>
289    where
290        F: Fn(FileTransferRecord) + Send + Sync + 'static,
291    {
292        let id = GlobalProgressListenerId::new();
293        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
294        let mut guard = self.global_progress_listener.write().map_err(|e| {
295            MeowError::from_code(
296                InnerErrorCode::LockPoisoned,
297                format!("register global listener lock poisoned: {}", e),
298            )
299        })?;
300        guard.push((id, Arc::new(listener)));
301        Ok(id)
302    }
303
304    /// Unregisters one previously registered global progress listener.
305    ///
306    /// Returns `Ok(false)` when the ID does not exist.
307    ///
308    /// # Errors
309    ///
310    /// Returns `LockPoisoned` when listener storage lock is poisoned.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// use rusty_cat::api::{MeowClient, MeowConfig};
316    ///
317    /// let client = MeowClient::new(MeowConfig::default());
318    /// let id = client.register_global_progress_listener(|_| {})?;
319    /// let removed = client.unregister_global_progress_listener(id)?;
320    /// assert!(removed);
321    /// # Ok::<(), rusty_cat::api::MeowError>(())
322    /// ```
323    pub fn unregister_global_progress_listener(
324        &self,
325        id: GlobalProgressListenerId,
326    ) -> Result<bool, MeowError> {
327        let mut g = self.global_progress_listener.write().map_err(|e| {
328            MeowError::from_code(
329                InnerErrorCode::LockPoisoned,
330                format!("unregister global listener lock poisoned: {}", e),
331            )
332        })?;
333        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
334            g.remove(pos);
335            crate::meow_flow_log!(
336                "listener",
337                "unregister global listener success: id={:?}",
338                id
339            );
340            Ok(true)
341        } else {
342            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
343            Ok(false)
344        }
345    }
346
347    /// Removes all registered global progress listeners.
348    ///
349    /// # Errors
350    ///
351    /// Returns `LockPoisoned` when listener storage lock is poisoned.
352    ///
353    /// # Examples
354    ///
355    /// ```no_run
356    /// use rusty_cat::api::{MeowClient, MeowConfig};
357    ///
358    /// let client = MeowClient::new(MeowConfig::default());
359    /// client.clear_global_listener()?;
360    /// # Ok::<(), rusty_cat::api::MeowError>(())
361    /// ```
362    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
363        crate::meow_flow_log!("listener", "clear all global listeners");
364        self.global_progress_listener
365            .write()
366            .map_err(|e| {
367                MeowError::from_code(
368                    InnerErrorCode::LockPoisoned,
369                    format!("clear global listeners lock poisoned: {}", e),
370                )
371            })?
372            .clear();
373        Ok(())
374    }
375
376    /// Sets or clears the global debug log listener.
377    ///
378    /// - Pass `Some(listener)` to set/replace.
379    /// - Pass `None` to clear.
380    ///
381    /// This affects all `MeowClient` instances in the current process.
382    ///
383    /// # Errors
384    ///
385    /// Returns [`DebugLogListenerError`] when the internal global listener lock
386    /// is poisoned.
387    ///
388    /// # Examples
389    ///
390    /// ```no_run
391    /// use std::sync::Arc;
392    /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
393    ///
394    /// let client = MeowClient::new(MeowConfig::default());
395    /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
396    ///     println!("{log}");
397    /// })))?;
398    ///
399    /// // Clear listener when no longer needed.
400    /// client.set_debug_log_listener(None)?;
401    /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
402    /// ```
403    pub fn set_debug_log_listener(
404        &self,
405        listener: Option<DebugLogListener>,
406    ) -> Result<(), DebugLogListenerError> {
407        set_debug_log_listener(listener)
408    }
409}
410
411impl MeowClient {
412    /// Submits a transfer task to the internal scheduler and returns its
413    /// [`TaskId`].
414    ///
415    /// The actual upload/download execution is dispatched to an internal
416    /// worker system thread. This method only performs lightweight validation
417    /// and submission, so it does not block the caller thread waiting for full
418    /// transfer completion.
419    ///
420    /// `try_enqueue` is also the recovery entrypoint after process restart.
421    /// If the application was killed during a previous upload/download,
422    /// restart your process and call `try_enqueue` again to resume that
423    /// transfer workflow.
424    ///
425    /// # Back-pressure semantics (why the `try_` prefix)
426    ///
427    /// Internally this method uses
428    /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
429    /// to the scheduler worker, **not** `send().await`. That means:
430    ///
431    /// - The `await` point in this function is used for task normalization
432    ///   (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
433    ///   for waiting on command-queue capacity.
434    /// - If the command queue is momentarily full (bursty enqueue under
435    ///   [`MeowConfig::command_queue_capacity`]), this method returns an
436    ///   immediate `CommandSendFailed` error instead of suspending the
437    ///   caller until a slot frees up.
438    /// - Other control APIs ([`Self::pause`], [`Self::resume`],
439    ///   [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
440    ///   wait for queue capacity. Only enqueue is fail-fast.
441    ///
442    /// Callers that want to batch-enqueue under burst load should either:
443    ///
444    /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
445    /// 2. retry on `CommandSendFailed` with their own back-off, or
446    /// 3. rate-limit enqueue calls on the caller side.
447    ///
448    /// The name explicitly carries `try_` so this fail-fast behavior is
449    /// visible at the call site. If a fully-awaiting variant is introduced
450    /// later it should be named `enqueue` (without the `try_` prefix).
451    ///
452    /// # Parameters
453    ///
454    /// - `task`: Built by upload/download task builders.
455    /// - `progress_cb`: Per-task callback invoked with transfer progress.
456    /// - `complete_cb`: Callback fired once when task reaches
457    ///   [`crate::transfer_status::TransferStatus::Complete`]. The second
458    ///   argument is provider-defined payload returned by upload protocol
459    ///   `complete_upload`; download tasks usually receive `None`.
460    ///
461    /// # Usage rules
462    ///
463    /// - `task` must be non-empty (required path/name/url and valid upload size).
464    /// - Callback should be lightweight and non-blocking.
465    /// - Store returned task ID for subsequent task control operations.
466    /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
467    /// - For restart recovery, re-enqueue the same logical task (same
468    ///   upload/download target and compatible checkpoint context) so the
469    ///   runtime can continue from existing local/remote progress.
470    ///
471    /// # Errors
472    ///
473    /// Returns:
474    /// - `ClientClosed` if the client was closed.
475    /// - `ParameterEmpty` if the task is invalid/empty.
476    /// - `CommandSendFailed` if the scheduler command queue is full at the
477    ///   moment of submission (see back-pressure semantics above).
478    /// - Any runtime initialization errors from the executor.
479    ///
480    /// # Examples
481    ///
482    /// ```no_run
483    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
484    ///
485    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
486    /// let client = MeowClient::new(MeowConfig::default());
487    /// let task = DownloadPounceBuilder::new(
488    ///     "example.bin",
489    ///     "./downloads/example.bin",
490    ///     1024 * 1024,
491    ///     "https://example.com/example.bin",
492    /// )
493    /// .build();
494    ///
495    /// let task_id = client
496    ///     .try_enqueue(
497    ///         task,
498    ///         |record| {
499    ///             println!("status={:?} progress={:.2}", record.status(), record.progress());
500    ///         },
501    ///         |task_id, payload| {
502    ///             println!("task {task_id} completed, payload={payload:?}");
503    ///         },
504    ///     )
505    ///     .await?;
506    /// println!("enqueued task: {task_id}");
507    /// # Ok(())
508    /// # }
509    /// ```
510    pub async fn try_enqueue<PCB, CCB>(
511        &self,
512        task: PounceTask,
513        progress_cb: PCB,
514        complete_cb: CCB,
515    ) -> Result<TaskId, MeowError>
516    where
517        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
518        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
519    {
520        self.ensure_open()?;
521        if task.is_empty() {
522            crate::meow_flow_log!("try_enqueue", "reject empty task");
523            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
524        }
525
526        crate::meow_flow_log!("try_enqueue", "task={:?}", task);
527
528        let progress: ProgressCb = Arc::new(progress_cb);
529        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
530        let callbacks = TaskCallbacks::new(Some(progress), complete);
531
532        let (def_up, def_down) = default_breakpoint_arcs();
533        let inner = InnerTask::from_pounce(
534            task,
535            self.config.breakpoint_download_http().clone(),
536            self.config.http_client_ref().cloned(),
537            def_up,
538            def_down,
539        )
540        .await?;
541
542        let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
543        crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
544        Ok(task_id)
545    }
546
547    /// Enqueues a task and `await`s until it reaches a terminal status.
548    ///
549    /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
550    /// do not have to write the channel + double-callback + single-send-guard
551    /// boilerplate themselves.
552    ///
553    /// # Returns
554    ///
555    /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
556    /// - `Err(MeowError)` carrying the underlying failure for
557    ///   [`TransferStatus::Failed`].
558    /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
559    ///   [`TransferStatus::Canceled`].
560    ///
561    /// # Progress
562    ///
563    /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
564    /// the per-task progress callback in [`Self::try_enqueue`].
565    ///
566    /// # Cancellation / timeout
567    ///
568    /// Dropping the returned future does **not** cancel the underlying transfer;
569    /// the task continues running in the executor. Use [`Self::cancel`] with
570    /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
571    /// abort an in-flight transfer.
572    ///
573    /// To cap wall-clock waiting time, wrap this future:
574    ///
575    /// ```ignore
576    /// let outcome = tokio::time::timeout(
577    ///     std::time::Duration::from_secs(60),
578    ///     client.enqueue_and_wait(task, |_| {}),
579    /// )
580    /// .await??;
581    /// ```
582    ///
583    /// # Errors
584    ///
585    /// In addition to the terminal-status errors above, propagates any error
586    /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
587    /// `CommandSendFailed`).
588    ///
589    /// # Examples
590    ///
591    /// ```no_run
592    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
593    ///
594    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
595    /// let client = MeowClient::new(MeowConfig::default());
596    /// let task = DownloadPounceBuilder::new(
597    ///     "example.bin",
598    ///     "./downloads/example.bin",
599    ///     1024 * 1024,
600    ///     "https://example.com/example.bin",
601    /// )
602    /// .build();
603    ///
604    /// let outcome = client
605    ///     .enqueue_and_wait(task, |record| {
606    ///         println!(
607    ///             "task={} progress={:.2}",
608    ///             record.task_id(),
609    ///             record.progress()
610    ///         );
611    ///     })
612    ///     .await?;
613    /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
614    /// # Ok(())
615    /// # }
616    /// ```
617    pub async fn enqueue_and_wait<PCB>(
618        &self,
619        task: PounceTask,
620        progress_cb: PCB,
621    ) -> Result<TaskOutcome, MeowError>
622    where
623        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
624    {
625        type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
626        let (tx, rx) = oneshot::channel::<TerminalMsg>();
627        let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
628            Arc::new(StdMutex::new(Some(tx)));
629        let progress_slot = Arc::clone(&tx_slot);
630        let complete_slot = tx_slot;
631
632        self.try_enqueue(
633            task,
634            move |record: FileTransferRecord| {
635                progress_cb(record.clone());
636                match record.status() {
637                    TransferStatus::Failed(err) => {
638                        send_terminal_once(&progress_slot, Err(err.clone()));
639                    }
640                    TransferStatus::Canceled => {
641                        send_terminal_once(
642                            &progress_slot,
643                            Err(MeowError::from_code_str(
644                                InnerErrorCode::TaskCanceled,
645                                "task was canceled",
646                            )),
647                        );
648                    }
649                    _ => {}
650                }
651            },
652            move |task_id, payload| {
653                send_terminal_once(&complete_slot, Ok((task_id, payload)));
654            },
655        )
656        .await?;
657
658        match rx.await {
659            Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
660            Ok(Err(err)) => Err(err),
661            Err(_) => Err(MeowError::from_code_str(
662                InnerErrorCode::CommandResponseFailed,
663                "transfer terminal channel closed without notification",
664            )),
665        }
666    }
667
668    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
669    //     todo!(arman) -
670    // }
671
672    /// Pauses a running or pending task by ID.
673    ///
674    /// This API sends a control command to the internal scheduler worker
675    /// thread. It does not execute transfer pause logic on the caller thread.
676    ///
677    /// # Usage rules
678    ///
679    /// Call this with a valid task ID returned by [`Self::enqueue`].
680    ///
681    /// # Errors
682    ///
683    /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
684    ///
685    /// # Examples
686    ///
687    /// ```no_run
688    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
689    ///
690    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
691    /// let client = MeowClient::new(MeowConfig::default());
692    /// client.pause(task_id).await?;
693    /// # Ok(())
694    /// # }
695    /// ```
696    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
697        self.ensure_open()?;
698        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
699        self.get_exec()?.pause(task_id).await
700    }
701
702    /// Resumes a previously paused task.
703    ///
704    /// The same [`TaskId`] continues to identify the task after resume.
705    /// The resume command is forwarded to the internal scheduler worker
706    /// thread, so caller thread is not responsible for running transfer logic.
707    ///
708    /// # Errors
709    ///
710    /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
711    ///
712    /// # Examples
713    ///
714    /// ```no_run
715    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
716    ///
717    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
718    /// let client = MeowClient::new(MeowConfig::default());
719    /// client.resume(task_id).await?;
720    /// # Ok(())
721    /// # }
722    /// ```
723    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
724        self.ensure_open()?;
725        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
726        self.get_exec()?.resume(task_id).await
727    }
728
729    /// Cancels a task by ID.
730    ///
731    /// Cancellation is requested through the internal scheduler worker thread.
732    /// Transfer cancellation execution happens in background runtime workers.
733    ///
734    /// # Usage rules
735    ///
736    /// Cancellation is best-effort; protocol-specific cleanup may run.
737    ///
738    /// # Errors
739    ///
740    /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
741    ///
742    /// # Examples
743    ///
744    /// ```no_run
745    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
746    ///
747    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
748    /// let client = MeowClient::new(MeowConfig::default());
749    /// client.cancel(task_id).await?;
750    /// # Ok(())
751    /// # }
752    /// ```
753    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
754        self.ensure_open()?;
755        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
756        self.get_exec()?.cancel(task_id).await
757    }
758
759    /// Returns a snapshot of queue and active transfer groups.
760    ///
761    /// Useful for diagnostics and external monitoring dashboards.
762    /// Snapshot collection is coordinated by internal scheduler worker state.
763    ///
764    /// # Errors
765    ///
766    /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
767    /// snapshot retrieval errors.
768    ///
769    /// # Examples
770    ///
771    /// ```no_run
772    /// use rusty_cat::api::{MeowClient, MeowConfig};
773    ///
774    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
775    /// let client = MeowClient::new(MeowConfig::default());
776    /// let snap = client.snapshot().await?;
777    /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
778    /// # Ok(())
779    /// # }
780    /// ```
781    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
782        self.ensure_open()?;
783        crate::meow_flow_log!("client_api", "snapshot called");
784        self.get_exec()?.snapshot().await
785    }
786
787    /// Closes this client and its underlying executor.
788    ///
789    /// `close` is the terminal lifecycle operation for a `MeowClient`. After
790    /// it succeeds, this client stays permanently closed; submit more work by
791    /// constructing a new `MeowClient` and enqueueing tasks there.
792    ///
793    /// After a successful close:
794    ///
795    /// - New task and control operations on this client are rejected with
796    ///   `ClientClosed`.
797    /// - All known unfinished task groups (queued, paused, or active) receive
798    ///   a `Paused` progress notification through their task callback and all
799    ///   registered global listeners.
800    /// - In-flight transfers are cancelled and the scheduler state is cleared.
801    /// - Already submitted callback jobs are drained before returning.
802    /// - The internal scheduler thread is joined, which drops its Tokio
803    ///   runtime and releases SDK-owned background execution resources.
804    ///
805    /// `Paused` is used for shutdown notifications rather than `Canceled` so
806    /// callers can recreate a client later and re-enqueue the same logical
807    /// transfer when they want to resume from available breakpoint state.
808    ///
809    /// # Idempotency
810    ///
811    /// Calling `close` more than once returns `ClientClosed`.
812    ///
813    /// # Retry behavior
814    ///
815    /// If executor close fails, the closed flag is rolled back so caller can
816    /// retry close. A successful close is not restartable.
817    ///
818    /// # Errors
819    ///
820    /// Returns `ClientClosed` when already closed, or underlying executor close
821    /// errors when shutdown is not completed.
822    ///
823    /// # Examples
824    ///
825    /// ```no_run
826    /// use rusty_cat::api::{MeowClient, MeowConfig};
827    ///
828    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
829    /// let client = MeowClient::new(MeowConfig::default());
830    /// client.close().await?;
831    /// # Ok(())
832    /// # }
833    /// ```
834    pub async fn close(&self) -> Result<(), MeowError> {
835        if self
836            .closed
837            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
838            .is_err()
839        {
840            crate::meow_flow_log!("client_api", "close rejected: already closed");
841            return Err(MeowError::from_code_str(
842                InnerErrorCode::ClientClosed,
843                "meow client is already closed",
844            ));
845        }
846        if let Some(exec) = self.executor.get() {
847            crate::meow_flow_log!("client_api", "close forwarding to executor");
848            if let Err(e) = exec.close().await {
849                // Roll back closed flag so caller can retry close.
850                self.closed.store(false, Ordering::SeqCst);
851                return Err(e);
852            }
853            Ok(())
854        } else {
855            crate::meow_flow_log!("client_api", "close with no executor initialized");
856            Ok(())
857        }
858    }
859
860    /// Returns whether this client is currently closed.
861    ///
862    /// # Examples
863    ///
864    /// ```no_run
865    /// use rusty_cat::api::{MeowClient, MeowConfig};
866    ///
867    /// let client = MeowClient::new(MeowConfig::default());
868    /// let _closed = client.is_closed();
869    /// ```
870    pub fn is_closed(&self) -> bool {
871        self.closed.load(Ordering::SeqCst)
872    }
873}
874
875fn send_terminal_once(
876    slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
877    msg: Result<(TaskId, Option<String>), MeowError>,
878) {
879    if let Ok(mut guard) = slot.lock() {
880        if let Some(sender) = guard.take() {
881            let _ = sender.send(msg);
882        }
883    }
884}