rusty-cat 0.2.2

Async HTTP client for resumable file upload and download.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};

use tokio::sync::oneshot;

use crate::dflt::default_http_transfer::{default_breakpoint_arcs, DefaultHttpTransfer};
use crate::error::{InnerErrorCode, MeowError};
use crate::file_transfer_record::FileTransferRecord;
use crate::ids::{GlobalProgressListenerId, TaskId};
use crate::inner::executor::Executor;
use crate::inner::inner_task::InnerTask;
use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
use crate::meow_config::MeowConfig;
use crate::pounce_task::PounceTask;
use crate::transfer_snapshot::TransferSnapshot;
use crate::transfer_status::TransferStatus;

/// Callback type for globally observing task progress events.
///
/// The callback is invoked from runtime worker context. Keep callback logic
/// fast and non-blocking to avoid delaying event processing.
pub type GlobalProgressListener = ProgressCb;

/// Main entry point of the `rusty-cat` SDK.
///
/// `MeowClient` owns runtime state and provides high-level operations:
/// enqueue, pause, resume, cancel, snapshot, and close.
///
/// # Usage pattern
///
/// 1. Create [`MeowConfig`].
/// 2. Construct `MeowClient::new(config)`.
/// 3. Build tasks with upload/download builders.
/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
/// 5. Control task lifecycle with pause/resume/cancel.
/// 6. Call [`Self::close`] during shutdown.
///
/// # Lifecycle contract: you **must** call [`Self::close`]
///
/// The background scheduler runs on a dedicated [`std::thread`] that drives
/// its own Tokio runtime. The clean shutdown protocol is an explicit
/// `close().await` command which:
///
/// - cancels in-flight transfers,
/// - flushes `Paused` status events to user callbacks for every known group,
/// - drains already submitted callback jobs,
/// - joins the scheduler thread and lets the runtime drop.
///
/// Forgetting to call `close` leaves the scheduler thread alive until all
/// command senders are dropped (which does happen when `MeowClient` is
/// dropped, but only as a fallback). When that fallback path runs, the
/// guarantees above do **not** hold: callers may miss terminal status
/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
/// impossible to debug from the outside.
///
/// To help surface this misuse the internal executor implements a
/// **best-effort [`Drop`]** that, when `close` was never called:
///
/// - emits a `Warn`-level log via the debug log listener (tag
///   `"executor_drop"`),
/// - performs a non-blocking `try_send` of a final `Close` command so the
///   worker still has a chance to drain its state,
/// - then drops the command sender, causing the worker loop to exit on its
///   own.
///
/// This is a safety net, **not** a substitute for calling `close`. Treat
/// `close().await` as a mandatory step in your shutdown sequence.
///
/// # Sharing across tasks / threads
///
/// `MeowClient` **intentionally does not implement [`Clone`]**.
///
/// The client owns a lazily-initialized [`Executor`] (a single background
/// worker loop plus its task table, scheduler state and shutdown flag). A
/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
/// it was initialized, letting different clones each spin up their **own**
/// executor on first use. The result would be:
///
/// - multiple independent task tables (tasks enqueued via one clone are
///   invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
///   [`MeowConfig::max_download_concurrency`]) silently multiplied by the
///   number of clones;
/// - [`Self::close`] only shutting down one of the worker loops, leaking the
///   rest.
///
/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
/// and clone the `Arc` instead:
///
/// ```no_run
/// use std::sync::Arc;
/// use rusty_cat::api::{MeowClient, MeowConfig};
///
/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
/// let client_for_task = Arc::clone(&client);
/// tokio::spawn(async move {
///     let _ = client_for_task; // use the shared client here
/// });
/// ```
/// Outcome of a task that reached [`TransferStatus::Complete`].
///
/// Returned by [`MeowClient::enqueue_and_wait`].
#[derive(Debug, Clone)]
pub struct TaskOutcome {
    /// Task identifier returned by the underlying scheduler.
    pub task_id: TaskId,
    /// Provider-defined payload returned by upload protocol's `complete_upload`.
    /// Download tasks usually receive `None`.
    pub payload: Option<String>,
}

pub struct MeowClient {
    /// Lazily initialized task executor.
    ///
    /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
    /// there is exactly one owner of this `OnceLock`. Share the whole client
    /// via `Arc<MeowClient>` when multi-owner access is needed.
    executor: OnceLock<Executor>,
    /// Immutable runtime configuration.
    config: MeowConfig,
    /// Global listeners receiving progress records for all tasks.
    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
    /// Global closed flag. Once set to `true`, task control APIs reject calls.
    closed: Arc<AtomicBool>,
}

impl std::fmt::Debug for MeowClient {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("MeowClient")
            .field("config", &self.config)
            .field("global_progress_listener", &"..")
            .finish()
    }
}

impl MeowClient {
    /// Creates a new client with the provided configuration.
    ///
    /// The internal executor is initialized lazily on first task operation.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// let config = MeowConfig::default();
    /// let client = MeowClient::new(config);
    /// let _ = client;
    /// ```
    pub fn new(config: MeowConfig) -> Self {
        MeowClient {
            executor: Default::default(),
            config,
            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
            closed: Arc::new(AtomicBool::new(false)),
        }
    }

    /// Returns a `reqwest::Client` aligned with this client's configuration.
    ///
    /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
    ///   injected a custom client, this
    ///   returns its clone.
    /// - Otherwise, this builds a new client from `http_timeout` and
    ///   `tcp_keepalive`.
    ///
    /// # Errors
    ///
    /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
    /// fails.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// let client = MeowClient::new(MeowConfig::default());
    /// let http = client.http_client()?;
    /// let _ = http;
    /// # Ok::<(), rusty_cat::api::MeowError>(())
    /// ```
    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
        if let Some(c) = self.config.http_client_ref() {
            return Ok(c.clone());
        }
        reqwest::Client::builder()
            .timeout(self.config.http_timeout())
            .tcp_keepalive(self.config.tcp_keepalive())
            .build()
            .map_err(|e| {
                MeowError::from_source(
                    InnerErrorCode::HttpClientBuildFailed,
                    format!(
                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
                        self.config.http_timeout(),
                        self.config.tcp_keepalive()
                    ),
                    e,
                )
            })
    }

    fn get_exec(&self) -> Result<&Executor, MeowError> {
        if let Some(exec) = self.executor.get() {
            crate::meow_flow_log!("executor", "reuse existing executor");
            return Ok(exec);
        }
        let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
            self.config.http_timeout(),
            self.config.tcp_keepalive(),
        )?;
        crate::meow_flow_log!(
            "executor",
            "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
            self.config.http_timeout(),
            self.config.tcp_keepalive()
        );
        let exec = Executor::new(
            self.config.clone(),
            Arc::new(default_http_transfer),
            self.global_progress_listener.clone(),
        )?;
        let _ = self.executor.set(exec);
        self.executor.get().ok_or_else(|| {
            crate::meow_flow_log!(
                "executor",
                "executor init race failed after set; returning RuntimeCreationFailedError"
            );
            MeowError::from_code_str(
                InnerErrorCode::RuntimeCreationFailedError,
                "executor init race failed",
            )
        })
    }

    /// Ensures the client is still open.
    ///
    /// Returns `ClientClosed` if [`Self::close`] was called successfully.
    fn ensure_open(&self) -> Result<(), MeowError> {
        if self.closed.load(Ordering::SeqCst) {
            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
            Err(MeowError::from_code_str(
                InnerErrorCode::ClientClosed,
                "meow client is already closed",
            ))
        } else {
            Ok(())
        }
    }

    /// Registers a global progress listener for all tasks.
    ///
    /// # Parameters
    ///
    /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
    ///
    /// # Returns
    ///
    /// Returns a listener ID used by
    /// [`Self::unregister_global_progress_listener`].
    ///
    /// # Usage rules
    ///
    /// Keep callback execution short and panic-free. A heavy callback can slow
    /// down global event delivery.
    ///
    /// # Errors
    ///
    /// Returns `LockPoisoned` when listener storage lock is poisoned.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// let client = MeowClient::new(MeowConfig::default());
    /// let listener_id = client.register_global_progress_listener(|record| {
    ///     println!("task={} progress={:.2}", record.task_id(), record.progress());
    /// })?;
    /// let _ = listener_id;
    /// # Ok::<(), rusty_cat::api::MeowError>(())
    /// ```
    pub fn register_global_progress_listener<F>(
        &self,
        listener: F,
    ) -> Result<GlobalProgressListenerId, MeowError>
    where
        F: Fn(FileTransferRecord) + Send + Sync + 'static,
    {
        let id = GlobalProgressListenerId::new();
        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
        let mut guard = self.global_progress_listener.write().map_err(|e| {
            MeowError::from_code(
                InnerErrorCode::LockPoisoned,
                format!("register global listener lock poisoned: {}", e),
            )
        })?;
        guard.push((id, Arc::new(listener)));
        Ok(id)
    }

    /// Unregisters one previously registered global progress listener.
    ///
    /// Returns `Ok(false)` when the ID does not exist.
    ///
    /// # Errors
    ///
    /// Returns `LockPoisoned` when listener storage lock is poisoned.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// let client = MeowClient::new(MeowConfig::default());
    /// let id = client.register_global_progress_listener(|_| {})?;
    /// let removed = client.unregister_global_progress_listener(id)?;
    /// assert!(removed);
    /// # Ok::<(), rusty_cat::api::MeowError>(())
    /// ```
    pub fn unregister_global_progress_listener(
        &self,
        id: GlobalProgressListenerId,
    ) -> Result<bool, MeowError> {
        let mut g = self.global_progress_listener.write().map_err(|e| {
            MeowError::from_code(
                InnerErrorCode::LockPoisoned,
                format!("unregister global listener lock poisoned: {}", e),
            )
        })?;
        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
            g.remove(pos);
            crate::meow_flow_log!(
                "listener",
                "unregister global listener success: id={:?}",
                id
            );
            Ok(true)
        } else {
            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
            Ok(false)
        }
    }

    /// Removes all registered global progress listeners.
    ///
    /// # Errors
    ///
    /// Returns `LockPoisoned` when listener storage lock is poisoned.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// let client = MeowClient::new(MeowConfig::default());
    /// client.clear_global_listener()?;
    /// # Ok::<(), rusty_cat::api::MeowError>(())
    /// ```
    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
        crate::meow_flow_log!("listener", "clear all global listeners");
        self.global_progress_listener
            .write()
            .map_err(|e| {
                MeowError::from_code(
                    InnerErrorCode::LockPoisoned,
                    format!("clear global listeners lock poisoned: {}", e),
                )
            })?
            .clear();
        Ok(())
    }

    /// Sets or clears the global debug log listener.
    ///
    /// - Pass `Some(listener)` to set/replace.
    /// - Pass `None` to clear.
    ///
    /// This affects all `MeowClient` instances in the current process.
    ///
    /// # Errors
    ///
    /// Returns [`DebugLogListenerError`] when the internal global listener lock
    /// is poisoned.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use std::sync::Arc;
    /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
    ///
    /// let client = MeowClient::new(MeowConfig::default());
    /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
    ///     println!("{log}");
    /// })))?;
    ///
    /// // Clear listener when no longer needed.
    /// client.set_debug_log_listener(None)?;
    /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
    /// ```
    pub fn set_debug_log_listener(
        &self,
        listener: Option<DebugLogListener>,
    ) -> Result<(), DebugLogListenerError> {
        set_debug_log_listener(listener)
    }
}

impl MeowClient {
    /// Submits a transfer task to the internal scheduler and returns its
    /// [`TaskId`].
    ///
    /// The actual upload/download execution is dispatched to an internal
    /// worker system thread. This method only performs lightweight validation
    /// and submission, so it does not block the caller thread waiting for full
    /// transfer completion.
    ///
    /// `try_enqueue` is also the recovery entrypoint after process restart.
    /// If the application was killed during a previous upload/download,
    /// restart your process and call `try_enqueue` again to resume that
    /// transfer workflow.
    ///
    /// # Back-pressure semantics (why the `try_` prefix)
    ///
    /// Internally this method uses
    /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
    /// to the scheduler worker, **not** `send().await`. That means:
    ///
    /// - The `await` point in this function is used for task normalization
    ///   (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
    ///   for waiting on command-queue capacity.
    /// - If the command queue is momentarily full (bursty enqueue under
    ///   [`MeowConfig::command_queue_capacity`]), this method returns an
    ///   immediate `CommandSendFailed` error instead of suspending the
    ///   caller until a slot frees up.
    /// - Other control APIs ([`Self::pause`], [`Self::resume`],
    ///   [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
    ///   wait for queue capacity. Only enqueue is fail-fast.
    ///
    /// Callers that want to batch-enqueue under burst load should either:
    ///
    /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
    /// 2. retry on `CommandSendFailed` with their own back-off, or
    /// 3. rate-limit enqueue calls on the caller side.
    ///
    /// The name explicitly carries `try_` so this fail-fast behavior is
    /// visible at the call site. If a fully-awaiting variant is introduced
    /// later it should be named `enqueue` (without the `try_` prefix).
    ///
    /// # Parameters
    ///
    /// - `task`: Built by upload/download task builders.
    /// - `progress_cb`: Per-task callback invoked with transfer progress.
    /// - `complete_cb`: Callback fired once when task reaches
    ///   [`crate::transfer_status::TransferStatus::Complete`]. The second
    ///   argument is provider-defined payload returned by upload protocol
    ///   `complete_upload`; download tasks usually receive `None`.
    ///
    /// # Usage rules
    ///
    /// - `task` must be non-empty (required path/name/url and valid upload size).
    /// - Callback should be lightweight and non-blocking.
    /// - Store returned task ID for subsequent task control operations.
    /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
    /// - For restart recovery, re-enqueue the same logical task (same
    ///   upload/download target and compatible checkpoint context) so the
    ///   runtime can continue from existing local/remote progress.
    ///
    /// # Errors
    ///
    /// Returns:
    /// - `ClientClosed` if the client was closed.
    /// - `ParameterEmpty` if the task is invalid/empty.
    /// - `CommandSendFailed` if the scheduler command queue is full at the
    ///   moment of submission (see back-pressure semantics above).
    /// - Any runtime initialization errors from the executor.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
    ///
    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// let task = DownloadPounceBuilder::new(
    ///     "example.bin",
    ///     "./downloads/example.bin",
    ///     1024 * 1024,
    ///     "https://example.com/example.bin",
    /// )
    /// .build();
    ///
    /// let task_id = client
    ///     .try_enqueue(
    ///         task,
    ///         |record| {
    ///             println!("status={:?} progress={:.2}", record.status(), record.progress());
    ///         },
    ///         |task_id, payload| {
    ///             println!("task {task_id} completed, payload={payload:?}");
    ///         },
    ///     )
    ///     .await?;
    /// println!("enqueued task: {task_id}");
    /// # Ok(())
    /// # }
    /// ```
    pub async fn try_enqueue<PCB, CCB>(
        &self,
        task: PounceTask,
        progress_cb: PCB,
        complete_cb: CCB,
    ) -> Result<TaskId, MeowError>
    where
        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
        CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
    {
        self.ensure_open()?;
        if task.is_empty() {
            crate::meow_flow_log!("try_enqueue", "reject empty task");
            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
        }

        crate::meow_flow_log!("try_enqueue", "task={:?}", task);

        let progress: ProgressCb = Arc::new(progress_cb);
        let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
        let callbacks = TaskCallbacks::new(Some(progress), complete);

        let (def_up, def_down) = default_breakpoint_arcs();
        let inner = InnerTask::from_pounce(
            task,
            self.config.breakpoint_download_http().clone(),
            self.config.http_client_ref().cloned(),
            def_up,
            def_down,
        )
        .await?;

        let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
        crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
        Ok(task_id)
    }

    /// Enqueues a task and `await`s until it reaches a terminal status.
    ///
    /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
    /// do not have to write the channel + double-callback + single-send-guard
    /// boilerplate themselves.
    ///
    /// # Returns
    ///
    /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
    /// - `Err(MeowError)` carrying the underlying failure for
    ///   [`TransferStatus::Failed`].
    /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
    ///   [`TransferStatus::Canceled`].
    ///
    /// # Progress
    ///
    /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
    /// the per-task progress callback in [`Self::try_enqueue`].
    ///
    /// # Cancellation / timeout
    ///
    /// Dropping the returned future does **not** cancel the underlying transfer;
    /// the task continues running in the executor. Use [`Self::cancel`] with
    /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
    /// abort an in-flight transfer.
    ///
    /// To cap wall-clock waiting time, wrap this future:
    ///
    /// ```ignore
    /// let outcome = tokio::time::timeout(
    ///     std::time::Duration::from_secs(60),
    ///     client.enqueue_and_wait(task, |_| {}),
    /// )
    /// .await??;
    /// ```
    ///
    /// # Errors
    ///
    /// In addition to the terminal-status errors above, propagates any error
    /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
    /// `CommandSendFailed`).
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
    ///
    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// let task = DownloadPounceBuilder::new(
    ///     "example.bin",
    ///     "./downloads/example.bin",
    ///     1024 * 1024,
    ///     "https://example.com/example.bin",
    /// )
    /// .build();
    ///
    /// let outcome = client
    ///     .enqueue_and_wait(task, |record| {
    ///         println!(
    ///             "task={} progress={:.2}",
    ///             record.task_id(),
    ///             record.progress()
    ///         );
    ///     })
    ///     .await?;
    /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn enqueue_and_wait<PCB>(
        &self,
        task: PounceTask,
        progress_cb: PCB,
    ) -> Result<TaskOutcome, MeowError>
    where
        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
    {
        type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
        let (tx, rx) = oneshot::channel::<TerminalMsg>();
        let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
            Arc::new(StdMutex::new(Some(tx)));
        let progress_slot = Arc::clone(&tx_slot);
        let complete_slot = tx_slot;

        self.try_enqueue(
            task,
            move |record: FileTransferRecord| {
                progress_cb(record.clone());
                match record.status() {
                    TransferStatus::Failed(err) => {
                        send_terminal_once(&progress_slot, Err(err.clone()));
                    }
                    TransferStatus::Canceled => {
                        send_terminal_once(
                            &progress_slot,
                            Err(MeowError::from_code_str(
                                InnerErrorCode::TaskCanceled,
                                "task was canceled",
                            )),
                        );
                    }
                    _ => {}
                }
            },
            move |task_id, payload| {
                send_terminal_once(&complete_slot, Ok((task_id, payload)));
            },
        )
        .await?;

        match rx.await {
            Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
            Ok(Err(err)) => Err(err),
            Err(_) => Err(MeowError::from_code_str(
                InnerErrorCode::CommandResponseFailed,
                "transfer terminal channel closed without notification",
            )),
        }
    }

    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
    //     todo!(arman) -
    // }

    /// Pauses a running or pending task by ID.
    ///
    /// This API sends a control command to the internal scheduler worker
    /// thread. It does not execute transfer pause logic on the caller thread.
    ///
    /// # Usage rules
    ///
    /// Call this with a valid task ID returned by [`Self::enqueue`].
    ///
    /// # Errors
    ///
    /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
    ///
    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// client.pause(task_id).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
        self.ensure_open()?;
        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
        self.get_exec()?.pause(task_id).await
    }

    /// Resumes a previously paused task.
    ///
    /// The same [`TaskId`] continues to identify the task after resume.
    /// The resume command is forwarded to the internal scheduler worker
    /// thread, so caller thread is not responsible for running transfer logic.
    ///
    /// # Errors
    ///
    /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
    ///
    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// client.resume(task_id).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
        self.ensure_open()?;
        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
        self.get_exec()?.resume(task_id).await
    }

    /// Cancels a task by ID.
    ///
    /// Cancellation is requested through the internal scheduler worker thread.
    /// Transfer cancellation execution happens in background runtime workers.
    ///
    /// # Usage rules
    ///
    /// Cancellation is best-effort; protocol-specific cleanup may run.
    ///
    /// # Errors
    ///
    /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
    ///
    /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// client.cancel(task_id).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
        self.ensure_open()?;
        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
        self.get_exec()?.cancel(task_id).await
    }

    /// Returns a snapshot of queue and active transfer groups.
    ///
    /// Useful for diagnostics and external monitoring dashboards.
    /// Snapshot collection is coordinated by internal scheduler worker state.
    ///
    /// # Errors
    ///
    /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
    /// snapshot retrieval errors.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// let snap = client.snapshot().await?;
    /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
        self.ensure_open()?;
        crate::meow_flow_log!("client_api", "snapshot called");
        self.get_exec()?.snapshot().await
    }

    /// Closes this client and its underlying executor.
    ///
    /// `close` is the terminal lifecycle operation for a `MeowClient`. After
    /// it succeeds, this client stays permanently closed; submit more work by
    /// constructing a new `MeowClient` and enqueueing tasks there.
    ///
    /// After a successful close:
    ///
    /// - New task and control operations on this client are rejected with
    ///   `ClientClosed`.
    /// - All known unfinished task groups (queued, paused, or active) receive
    ///   a `Paused` progress notification through their task callback and all
    ///   registered global listeners.
    /// - In-flight transfers are cancelled and the scheduler state is cleared.
    /// - Already submitted callback jobs are drained before returning.
    /// - The internal scheduler thread is joined, which drops its Tokio
    ///   runtime and releases SDK-owned background execution resources.
    ///
    /// `Paused` is used for shutdown notifications rather than `Canceled` so
    /// callers can recreate a client later and re-enqueue the same logical
    /// transfer when they want to resume from available breakpoint state.
    ///
    /// # Idempotency
    ///
    /// Calling `close` more than once returns `ClientClosed`.
    ///
    /// # Retry behavior
    ///
    /// If executor close fails, the closed flag is rolled back so caller can
    /// retry close. A successful close is not restartable.
    ///
    /// # Errors
    ///
    /// Returns `ClientClosed` when already closed, or underlying executor close
    /// errors when shutdown is not completed.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
    /// let client = MeowClient::new(MeowConfig::default());
    /// client.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn close(&self) -> Result<(), MeowError> {
        if self
            .closed
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
            .is_err()
        {
            crate::meow_flow_log!("client_api", "close rejected: already closed");
            return Err(MeowError::from_code_str(
                InnerErrorCode::ClientClosed,
                "meow client is already closed",
            ));
        }
        if let Some(exec) = self.executor.get() {
            crate::meow_flow_log!("client_api", "close forwarding to executor");
            if let Err(e) = exec.close().await {
                // Roll back closed flag so caller can retry close.
                self.closed.store(false, Ordering::SeqCst);
                return Err(e);
            }
            Ok(())
        } else {
            crate::meow_flow_log!("client_api", "close with no executor initialized");
            Ok(())
        }
    }

    /// Returns whether this client is currently closed.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use rusty_cat::api::{MeowClient, MeowConfig};
    ///
    /// let client = MeowClient::new(MeowConfig::default());
    /// let _closed = client.is_closed();
    /// ```
    pub fn is_closed(&self) -> bool {
        self.closed.load(Ordering::SeqCst)
    }
}

fn send_terminal_once(
    slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
    msg: Result<(TaskId, Option<String>), MeowError>,
) {
    if let Ok(mut guard) = slot.lock() {
        if let Some(sender) = guard.take() {
            let _ = sender.send(msg);
        }
    }
}