Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_transfer::{default_breakpoint_arcs, DefaultHttpTransfer};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16/// Callback type for globally observing task progress events.
17///
18/// The callback is invoked from runtime worker context. Keep callback logic
19/// fast and non-blocking to avoid delaying event processing.
20pub type GlobalProgressListener = ProgressCb;
21
22/// Main entry point of the `rusty-cat` SDK.
23///
24/// `MeowClient` owns runtime state and provides high-level operations:
25/// enqueue, pause, resume, cancel, snapshot, and close.
26///
27/// # Usage pattern
28///
29/// 1. Create [`MeowConfig`].
30/// 2. Construct `MeowClient::new(config)`.
31/// 3. Build tasks with upload/download builders.
32/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
33/// 5. Control task lifecycle with pause/resume/cancel.
34/// 6. Call [`Self::close`] during shutdown.
35///
36/// # Lifecycle contract: you **must** call [`Self::close`]
37///
38/// The background scheduler runs on a dedicated [`std::thread`] that drives
39/// its own Tokio runtime. The clean shutdown protocol is an explicit
40/// `close().await` command which:
41///
42/// - cancels in-flight transfers,
43/// - flushes `Paused` status events to user callbacks for every known group,
44/// - drains already submitted callback jobs,
45/// - joins the scheduler thread and lets the runtime drop.
46///
47/// Forgetting to call `close` leaves the scheduler thread alive until all
48/// command senders are dropped (which does happen when `MeowClient` is
49/// dropped, but only as a fallback). When that fallback path runs, the
50/// guarantees above do **not** hold: callers may miss terminal status
51/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
52/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
53/// impossible to debug from the outside.
54///
55/// To help surface this misuse the internal executor implements a
56/// **best-effort [`Drop`]** that, when `close` was never called:
57///
58/// - emits a `Warn`-level log via the debug log listener (tag
59///   `"executor_drop"`),
60/// - performs a non-blocking `try_send` of a final `Close` command so the
61///   worker still has a chance to drain its state,
62/// - then drops the command sender, causing the worker loop to exit on its
63///   own.
64///
65/// This is a safety net, **not** a substitute for calling `close`. Treat
66/// `close().await` as a mandatory step in your shutdown sequence.
67///
68/// # Sharing across tasks / threads
69///
70/// `MeowClient` **intentionally does not implement [`Clone`]**.
71///
72/// The client owns a lazily-initialized [`Executor`] (a single background
73/// worker loop plus its task table, scheduler state and shutdown flag). A
74/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
75/// it was initialized, letting different clones each spin up their **own**
76/// executor on first use. The result would be:
77///
78/// - multiple independent task tables (tasks enqueued via one clone are
79///   invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
80/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
81///   [`MeowConfig::max_download_concurrency`]) silently multiplied by the
82///   number of clones;
83/// - [`Self::close`] only shutting down one of the worker loops, leaking the
84///   rest.
85///
86/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
87/// and clone the `Arc` instead:
88///
89/// ```no_run
90/// use std::sync::Arc;
91/// use rusty_cat::api::{MeowClient, MeowConfig};
92///
93/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
94/// let client_for_task = Arc::clone(&client);
95/// tokio::spawn(async move {
96///     let _ = client_for_task; // use the shared client here
97/// });
98/// ```
99pub struct MeowClient {
100    /// Lazily initialized task executor.
101    ///
102    /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
103    /// there is exactly one owner of this `OnceLock`. Share the whole client
104    /// via `Arc<MeowClient>` when multi-owner access is needed.
105    executor: OnceLock<Executor>,
106    /// Immutable runtime configuration.
107    config: MeowConfig,
108    /// Global listeners receiving progress records for all tasks.
109    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
110    /// Global closed flag. Once set to `true`, task control APIs reject calls.
111    closed: Arc<AtomicBool>,
112}
113
114impl std::fmt::Debug for MeowClient {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("MeowClient")
117            .field("config", &self.config)
118            .field("global_progress_listener", &"..")
119            .finish()
120    }
121}
122
123impl MeowClient {
124    /// Creates a new client with the provided configuration.
125    ///
126    /// The internal executor is initialized lazily on first task operation.
127    ///
128    /// # Examples
129    ///
130    /// ```no_run
131    /// use rusty_cat::api::{MeowClient, MeowConfig};
132    ///
133    /// let config = MeowConfig::default();
134    /// let client = MeowClient::new(config);
135    /// let _ = client;
136    /// ```
137    pub fn new(config: MeowConfig) -> Self {
138        MeowClient {
139            executor: Default::default(),
140            config,
141            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
142            closed: Arc::new(AtomicBool::new(false)),
143        }
144    }
145
146    /// Returns a `reqwest::Client` aligned with this client's configuration.
147    ///
148    /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
149    ///   injected a custom client, this
150    ///   returns its clone.
151    /// - Otherwise, this builds a new client from `http_timeout` and
152    ///   `tcp_keepalive`.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
157    /// fails.
158    ///
159    /// # Examples
160    ///
161    /// ```no_run
162    /// use rusty_cat::api::{MeowClient, MeowConfig};
163    ///
164    /// let client = MeowClient::new(MeowConfig::default());
165    /// let http = client.http_client()?;
166    /// let _ = http;
167    /// # Ok::<(), rusty_cat::api::MeowError>(())
168    /// ```
169    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
170        if let Some(c) = self.config.http_client_ref() {
171            return Ok(c.clone());
172        }
173        reqwest::Client::builder()
174            .timeout(self.config.http_timeout())
175            .tcp_keepalive(self.config.tcp_keepalive())
176            .build()
177            .map_err(|e| {
178                MeowError::from_source(
179                    InnerErrorCode::HttpClientBuildFailed,
180                    format!(
181                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
182                        self.config.http_timeout(),
183                        self.config.tcp_keepalive()
184                    ),
185                    e,
186                )
187            })
188    }
189
190    fn get_exec(&self) -> Result<&Executor, MeowError> {
191        if let Some(exec) = self.executor.get() {
192            crate::meow_flow_log!("executor", "reuse existing executor");
193            return Ok(exec);
194        }
195        let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
196            self.config.http_timeout(),
197            self.config.tcp_keepalive(),
198        )?;
199        crate::meow_flow_log!(
200            "executor",
201            "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
202            self.config.http_timeout(),
203            self.config.tcp_keepalive()
204        );
205        let exec = Executor::new(
206            self.config.clone(),
207            Arc::new(default_http_transfer),
208            self.global_progress_listener.clone(),
209        )?;
210        let _ = self.executor.set(exec);
211        self.executor.get().ok_or_else(|| {
212            crate::meow_flow_log!(
213                "executor",
214                "executor init race failed after set; returning RuntimeCreationFailedError"
215            );
216            MeowError::from_code_str(
217                InnerErrorCode::RuntimeCreationFailedError,
218                "executor init race failed",
219            )
220        })
221    }
222
223    /// Ensures the client is still open.
224    ///
225    /// Returns `ClientClosed` if [`Self::close`] was called successfully.
226    fn ensure_open(&self) -> Result<(), MeowError> {
227        if self.closed.load(Ordering::SeqCst) {
228            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
229            Err(MeowError::from_code_str(
230                InnerErrorCode::ClientClosed,
231                "meow client is already closed",
232            ))
233        } else {
234            Ok(())
235        }
236    }
237
238    /// Registers a global progress listener for all tasks.
239    ///
240    /// # Parameters
241    ///
242    /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
243    ///
244    /// # Returns
245    ///
246    /// Returns a listener ID used by
247    /// [`Self::unregister_global_progress_listener`].
248    ///
249    /// # Usage rules
250    ///
251    /// Keep callback execution short and panic-free. A heavy callback can slow
252    /// down global event delivery.
253    ///
254    /// # Errors
255    ///
256    /// Returns `LockPoisoned` when listener storage lock is poisoned.
257    ///
258    /// # Examples
259    ///
260    /// ```no_run
261    /// use rusty_cat::api::{MeowClient, MeowConfig};
262    ///
263    /// let client = MeowClient::new(MeowConfig::default());
264    /// let listener_id = client.register_global_progress_listener(|record| {
265    ///     println!("task={} progress={:.2}", record.task_id(), record.progress());
266    /// })?;
267    /// let _ = listener_id;
268    /// # Ok::<(), rusty_cat::api::MeowError>(())
269    /// ```
270    pub fn register_global_progress_listener<F>(
271        &self,
272        listener: F,
273    ) -> Result<GlobalProgressListenerId, MeowError>
274    where
275        F: Fn(FileTransferRecord) + Send + Sync + 'static,
276    {
277        let id = GlobalProgressListenerId::new();
278        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
279        let mut guard = self.global_progress_listener.write().map_err(|e| {
280            MeowError::from_code(
281                InnerErrorCode::LockPoisoned,
282                format!("register global listener lock poisoned: {}", e),
283            )
284        })?;
285        guard.push((id, Arc::new(listener)));
286        Ok(id)
287    }
288
289    /// Unregisters one previously registered global progress listener.
290    ///
291    /// Returns `Ok(false)` when the ID does not exist.
292    ///
293    /// # Errors
294    ///
295    /// Returns `LockPoisoned` when listener storage lock is poisoned.
296    ///
297    /// # Examples
298    ///
299    /// ```no_run
300    /// use rusty_cat::api::{MeowClient, MeowConfig};
301    ///
302    /// let client = MeowClient::new(MeowConfig::default());
303    /// let id = client.register_global_progress_listener(|_| {})?;
304    /// let removed = client.unregister_global_progress_listener(id)?;
305    /// assert!(removed);
306    /// # Ok::<(), rusty_cat::api::MeowError>(())
307    /// ```
308    pub fn unregister_global_progress_listener(
309        &self,
310        id: GlobalProgressListenerId,
311    ) -> Result<bool, MeowError> {
312        let mut g = self.global_progress_listener.write().map_err(|e| {
313            MeowError::from_code(
314                InnerErrorCode::LockPoisoned,
315                format!("unregister global listener lock poisoned: {}", e),
316            )
317        })?;
318        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
319            g.remove(pos);
320            crate::meow_flow_log!(
321                "listener",
322                "unregister global listener success: id={:?}",
323                id
324            );
325            Ok(true)
326        } else {
327            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
328            Ok(false)
329        }
330    }
331
332    /// Removes all registered global progress listeners.
333    ///
334    /// # Errors
335    ///
336    /// Returns `LockPoisoned` when listener storage lock is poisoned.
337    ///
338    /// # Examples
339    ///
340    /// ```no_run
341    /// use rusty_cat::api::{MeowClient, MeowConfig};
342    ///
343    /// let client = MeowClient::new(MeowConfig::default());
344    /// client.clear_global_listener()?;
345    /// # Ok::<(), rusty_cat::api::MeowError>(())
346    /// ```
347    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
348        crate::meow_flow_log!("listener", "clear all global listeners");
349        self.global_progress_listener
350            .write()
351            .map_err(|e| {
352                MeowError::from_code(
353                    InnerErrorCode::LockPoisoned,
354                    format!("clear global listeners lock poisoned: {}", e),
355                )
356            })?
357            .clear();
358        Ok(())
359    }
360
361    /// Sets or clears the global debug log listener.
362    ///
363    /// - Pass `Some(listener)` to set/replace.
364    /// - Pass `None` to clear.
365    ///
366    /// This affects all `MeowClient` instances in the current process.
367    ///
368    /// # Errors
369    ///
370    /// Returns [`DebugLogListenerError`] when the internal global listener lock
371    /// is poisoned.
372    ///
373    /// # Examples
374    ///
375    /// ```no_run
376    /// use std::sync::Arc;
377    /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
378    ///
379    /// let client = MeowClient::new(MeowConfig::default());
380    /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
381    ///     println!("{log}");
382    /// })))?;
383    ///
384    /// // Clear listener when no longer needed.
385    /// client.set_debug_log_listener(None)?;
386    /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
387    /// ```
388    pub fn set_debug_log_listener(
389        &self,
390        listener: Option<DebugLogListener>,
391    ) -> Result<(), DebugLogListenerError> {
392        set_debug_log_listener(listener)
393    }
394}
395
396impl MeowClient {
397    /// Submits a transfer task to the internal scheduler and returns its
398    /// [`TaskId`].
399    ///
400    /// The actual upload/download execution is dispatched to an internal
401    /// worker system thread. This method only performs lightweight validation
402    /// and submission, so it does not block the caller thread waiting for full
403    /// transfer completion.
404    ///
405    /// `try_enqueue` is also the recovery entrypoint after process restart.
406    /// If the application was killed during a previous upload/download,
407    /// restart your process and call `try_enqueue` again to resume that
408    /// transfer workflow.
409    ///
410    /// # Back-pressure semantics (why the `try_` prefix)
411    ///
412    /// Internally this method uses
413    /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
414    /// to the scheduler worker, **not** `send().await`. That means:
415    ///
416    /// - The `await` point in this function is used for task normalization
417    ///   (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
418    ///   for waiting on command-queue capacity.
419    /// - If the command queue is momentarily full (bursty enqueue under
420    ///   [`MeowConfig::command_queue_capacity`]), this method returns an
421    ///   immediate `CommandSendFailed` error instead of suspending the
422    ///   caller until a slot frees up.
423    /// - Other control APIs ([`Self::pause`], [`Self::resume`],
424    ///   [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
425    ///   wait for queue capacity. Only enqueue is fail-fast.
426    ///
427    /// Callers that want to batch-enqueue under burst load should either:
428    ///
429    /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
430    /// 2. retry on `CommandSendFailed` with their own back-off, or
431    /// 3. rate-limit enqueue calls on the caller side.
432    ///
433    /// The name explicitly carries `try_` so this fail-fast behavior is
434    /// visible at the call site. If a fully-awaiting variant is introduced
435    /// later it should be named `enqueue` (without the `try_` prefix).
436    ///
437    /// # Parameters
438    ///
439    /// - `task`: Built by upload/download task builders.
440    /// - `progress_cb`: Per-task callback invoked with transfer progress.
441    /// - `complete_cb`: Callback fired once when task reaches
442    ///   [`crate::transfer_status::TransferStatus::Complete`]. The second
443    ///   argument is provider-defined payload returned by upload protocol
444    ///   `complete_upload`; download tasks usually receive `None`.
445    ///
446    /// # Usage rules
447    ///
448    /// - `task` must be non-empty (required path/name/url and valid upload size).
449    /// - Callback should be lightweight and non-blocking.
450    /// - Store returned task ID for subsequent task control operations.
451    /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
452    /// - For restart recovery, re-enqueue the same logical task (same
453    ///   upload/download target and compatible checkpoint context) so the
454    ///   runtime can continue from existing local/remote progress.
455    ///
456    /// # Errors
457    ///
458    /// Returns:
459    /// - `ClientClosed` if the client was closed.
460    /// - `ParameterEmpty` if the task is invalid/empty.
461    /// - `CommandSendFailed` if the scheduler command queue is full at the
462    ///   moment of submission (see back-pressure semantics above).
463    /// - Any runtime initialization errors from the executor.
464    ///
465    /// # Examples
466    ///
467    /// ```no_run
468    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
469    ///
470    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
471    /// let client = MeowClient::new(MeowConfig::default());
472    /// let task = DownloadPounceBuilder::new(
473    ///     "example.bin",
474    ///     "./downloads/example.bin",
475    ///     1024 * 1024,
476    ///     "https://example.com/example.bin",
477    /// )
478    /// .build();
479    ///
480    /// let task_id = client
481    ///     .try_enqueue(
482    ///         task,
483    ///         |record| {
484    ///             println!("status={:?} progress={:.2}", record.status(), record.progress());
485    ///         },
486    ///         |task_id, payload| {
487    ///             println!("task {task_id} completed, payload={payload:?}");
488    ///         },
489    ///     )
490    ///     .await?;
491    /// println!("enqueued task: {task_id}");
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub async fn try_enqueue<PCB, CCB>(
496        &self,
497        task: PounceTask,
498        progress_cb: PCB,
499        complete_cb: CCB,
500    ) -> Result<TaskId, MeowError>
501    where
502        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
503        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
504    {
505        self.ensure_open()?;
506        if task.is_empty() {
507            crate::meow_flow_log!("try_enqueue", "reject empty task");
508            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
509        }
510
511        crate::meow_flow_log!("try_enqueue", "task={:?}", task);
512
513        let progress: ProgressCb = Arc::new(progress_cb);
514        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
515        let callbacks = TaskCallbacks::new(Some(progress), complete);
516
517        let (def_up, def_down) = default_breakpoint_arcs();
518        let inner = InnerTask::from_pounce(
519            task,
520            self.config.breakpoint_download_http().clone(),
521            self.config.http_client_ref().cloned(),
522            def_up,
523            def_down,
524        )
525        .await?;
526
527        let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
528        crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
529        Ok(task_id)
530    }
531
532    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
533    //     todo!(arman) -
534    // }
535
536    /// Pauses a running or pending task by ID.
537    ///
538    /// This API sends a control command to the internal scheduler worker
539    /// thread. It does not execute transfer pause logic on the caller thread.
540    ///
541    /// # Usage rules
542    ///
543    /// Call this with a valid task ID returned by [`Self::enqueue`].
544    ///
545    /// # Errors
546    ///
547    /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
548    ///
549    /// # Examples
550    ///
551    /// ```no_run
552    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
553    ///
554    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
555    /// let client = MeowClient::new(MeowConfig::default());
556    /// client.pause(task_id).await?;
557    /// # Ok(())
558    /// # }
559    /// ```
560    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
561        self.ensure_open()?;
562        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
563        self.get_exec()?.pause(task_id).await
564    }
565
566    /// Resumes a previously paused task.
567    ///
568    /// The same [`TaskId`] continues to identify the task after resume.
569    /// The resume command is forwarded to the internal scheduler worker
570    /// thread, so caller thread is not responsible for running transfer logic.
571    ///
572    /// # Errors
573    ///
574    /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
575    ///
576    /// # Examples
577    ///
578    /// ```no_run
579    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
580    ///
581    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
582    /// let client = MeowClient::new(MeowConfig::default());
583    /// client.resume(task_id).await?;
584    /// # Ok(())
585    /// # }
586    /// ```
587    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
588        self.ensure_open()?;
589        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
590        self.get_exec()?.resume(task_id).await
591    }
592
593    /// Cancels a task by ID.
594    ///
595    /// Cancellation is requested through the internal scheduler worker thread.
596    /// Transfer cancellation execution happens in background runtime workers.
597    ///
598    /// # Usage rules
599    ///
600    /// Cancellation is best-effort; protocol-specific cleanup may run.
601    ///
602    /// # Errors
603    ///
604    /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
605    ///
606    /// # Examples
607    ///
608    /// ```no_run
609    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
610    ///
611    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
612    /// let client = MeowClient::new(MeowConfig::default());
613    /// client.cancel(task_id).await?;
614    /// # Ok(())
615    /// # }
616    /// ```
617    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
618        self.ensure_open()?;
619        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
620        self.get_exec()?.cancel(task_id).await
621    }
622
623    /// Returns a snapshot of queue and active transfer groups.
624    ///
625    /// Useful for diagnostics and external monitoring dashboards.
626    /// Snapshot collection is coordinated by internal scheduler worker state.
627    ///
628    /// # Errors
629    ///
630    /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
631    /// snapshot retrieval errors.
632    ///
633    /// # Examples
634    ///
635    /// ```no_run
636    /// use rusty_cat::api::{MeowClient, MeowConfig};
637    ///
638    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
639    /// let client = MeowClient::new(MeowConfig::default());
640    /// let snap = client.snapshot().await?;
641    /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
642    /// # Ok(())
643    /// # }
644    /// ```
645    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
646        self.ensure_open()?;
647        crate::meow_flow_log!("client_api", "snapshot called");
648        self.get_exec()?.snapshot().await
649    }
650
651    /// Closes this client and its underlying executor.
652    ///
653    /// `close` is the terminal lifecycle operation for a `MeowClient`. After
654    /// it succeeds, this client stays permanently closed; submit more work by
655    /// constructing a new `MeowClient` and enqueueing tasks there.
656    ///
657    /// After a successful close:
658    ///
659    /// - New task and control operations on this client are rejected with
660    ///   `ClientClosed`.
661    /// - All known unfinished task groups (queued, paused, or active) receive
662    ///   a `Paused` progress notification through their task callback and all
663    ///   registered global listeners.
664    /// - In-flight transfers are cancelled and the scheduler state is cleared.
665    /// - Already submitted callback jobs are drained before returning.
666    /// - The internal scheduler thread is joined, which drops its Tokio
667    ///   runtime and releases SDK-owned background execution resources.
668    ///
669    /// `Paused` is used for shutdown notifications rather than `Canceled` so
670    /// callers can recreate a client later and re-enqueue the same logical
671    /// transfer when they want to resume from available breakpoint state.
672    ///
673    /// # Idempotency
674    ///
675    /// Calling `close` more than once returns `ClientClosed`.
676    ///
677    /// # Retry behavior
678    ///
679    /// If executor close fails, the closed flag is rolled back so caller can
680    /// retry close. A successful close is not restartable.
681    ///
682    /// # Errors
683    ///
684    /// Returns `ClientClosed` when already closed, or underlying executor close
685    /// errors when shutdown is not completed.
686    ///
687    /// # Examples
688    ///
689    /// ```no_run
690    /// use rusty_cat::api::{MeowClient, MeowConfig};
691    ///
692    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
693    /// let client = MeowClient::new(MeowConfig::default());
694    /// client.close().await?;
695    /// # Ok(())
696    /// # }
697    /// ```
698    pub async fn close(&self) -> Result<(), MeowError> {
699        if self
700            .closed
701            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
702            .is_err()
703        {
704            crate::meow_flow_log!("client_api", "close rejected: already closed");
705            return Err(MeowError::from_code_str(
706                InnerErrorCode::ClientClosed,
707                "meow client is already closed",
708            ));
709        }
710        if let Some(exec) = self.executor.get() {
711            crate::meow_flow_log!("client_api", "close forwarding to executor");
712            if let Err(e) = exec.close().await {
713                // Roll back closed flag so caller can retry close.
714                self.closed.store(false, Ordering::SeqCst);
715                return Err(e);
716            }
717            Ok(())
718        } else {
719            crate::meow_flow_log!("client_api", "close with no executor initialized");
720            Ok(())
721        }
722    }
723
724    /// Returns whether this client is currently closed.
725    ///
726    /// # Examples
727    ///
728    /// ```no_run
729    /// use rusty_cat::api::{MeowClient, MeowConfig};
730    ///
731    /// let client = MeowClient::new(MeowConfig::default());
732    /// let _closed = client.is_closed();
733    /// ```
734    pub fn is_closed(&self) -> bool {
735        self.closed.load(Ordering::SeqCst)
736    }
737}