rusty_cat/meow_client.rs
1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};
3
4use tokio::sync::oneshot;
5
6use crate::dflt::default_http_transfer::{default_breakpoint_arcs, DefaultHttpTransfer};
7use crate::error::{InnerErrorCode, MeowError};
8use crate::file_transfer_record::FileTransferRecord;
9use crate::ids::{GlobalProgressListenerId, TaskId};
10use crate::inner::executor::Executor;
11use crate::inner::inner_task::InnerTask;
12use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
13use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
14use crate::meow_config::MeowConfig;
15use crate::pounce_task::PounceTask;
16use crate::transfer_snapshot::TransferSnapshot;
17use crate::transfer_status::TransferStatus;
18
19/// Callback type for globally observing task progress events.
20///
21/// The callback is invoked from runtime worker context. Keep callback logic
22/// fast and non-blocking to avoid delaying event processing.
23pub type GlobalProgressListener = ProgressCb;
24
25/// Main entry point of the `rusty-cat` SDK.
26///
27/// `MeowClient` owns runtime state and provides high-level operations:
28/// enqueue, pause, resume, cancel, snapshot, and close.
29///
30/// # Usage pattern
31///
32/// 1. Create [`MeowConfig`].
33/// 2. Construct `MeowClient::new(config)`.
34/// 3. Build tasks with upload/download builders.
35/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
36/// 5. Control task lifecycle with pause/resume/cancel.
37/// 6. Call [`Self::close`] during shutdown.
38///
39/// # Lifecycle contract: you **must** call [`Self::close`]
40///
41/// The background scheduler runs on a dedicated [`std::thread`] that drives
42/// its own Tokio runtime. The clean shutdown protocol is an explicit
43/// `close().await` command which:
44///
45/// - cancels in-flight transfers,
46/// - flushes `Paused` status events to user callbacks for every known group,
47/// - drains already submitted callback jobs,
48/// - joins the scheduler thread and lets the runtime drop.
49///
50/// Forgetting to call `close` leaves the scheduler thread alive until all
51/// command senders are dropped (which does happen when `MeowClient` is
52/// dropped, but only as a fallback). When that fallback path runs, the
53/// guarantees above do **not** hold: callers may miss terminal status
54/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
55/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
56/// impossible to debug from the outside.
57///
58/// To help surface this misuse the internal executor implements a
59/// **best-effort [`Drop`]** that, when `close` was never called:
60///
61/// - emits a `Warn`-level log via the debug log listener (tag
62/// `"executor_drop"`),
63/// - performs a non-blocking `try_send` of a final `Close` command so the
64/// worker still has a chance to drain its state,
65/// - then drops the command sender, causing the worker loop to exit on its
66/// own.
67///
68/// This is a safety net, **not** a substitute for calling `close`. Treat
69/// `close().await` as a mandatory step in your shutdown sequence.
70///
71/// # Sharing across tasks / threads
72///
73/// `MeowClient` **intentionally does not implement [`Clone`]**.
74///
75/// The client owns a lazily-initialized [`Executor`] (a single background
76/// worker loop plus its task table, scheduler state and shutdown flag). A
77/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
78/// it was initialized, letting different clones each spin up their **own**
79/// executor on first use. The result would be:
80///
81/// - multiple independent task tables (tasks enqueued via one clone are
82/// invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
83/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
84/// [`MeowConfig::max_download_concurrency`]) silently multiplied by the
85/// number of clones;
86/// - [`Self::close`] only shutting down one of the worker loops, leaking the
87/// rest.
88///
89/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
90/// and clone the `Arc` instead:
91///
92/// ```no_run
93/// use std::sync::Arc;
94/// use rusty_cat::api::{MeowClient, MeowConfig};
95///
96/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
97/// let client_for_task = Arc::clone(&client);
98/// tokio::spawn(async move {
99/// let _ = client_for_task; // use the shared client here
100/// });
101/// ```
102/// Outcome of a task that reached [`TransferStatus::Complete`].
103///
104/// Returned by [`MeowClient::enqueue_and_wait`].
105#[derive(Debug, Clone)]
106pub struct TaskOutcome {
107 /// Task identifier returned by the underlying scheduler.
108 pub task_id: TaskId,
109 /// Provider-defined payload returned by upload protocol's `complete_upload`.
110 /// Download tasks usually receive `None`.
111 pub payload: Option<String>,
112}
113
114pub struct MeowClient {
115 /// Lazily initialized task executor.
116 ///
117 /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
118 /// there is exactly one owner of this `OnceLock`. Share the whole client
119 /// via `Arc<MeowClient>` when multi-owner access is needed.
120 executor: OnceLock<Executor>,
121 /// Immutable runtime configuration.
122 config: MeowConfig,
123 /// Global listeners receiving progress records for all tasks.
124 global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
125 /// Global closed flag. Once set to `true`, task control APIs reject calls.
126 closed: Arc<AtomicBool>,
127}
128
129impl std::fmt::Debug for MeowClient {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("MeowClient")
132 .field("config", &self.config)
133 .field("global_progress_listener", &"..")
134 .finish()
135 }
136}
137
138impl MeowClient {
139 /// Creates a new client with the provided configuration.
140 ///
141 /// The internal executor is initialized lazily on first task operation.
142 ///
143 /// # Examples
144 ///
145 /// ```no_run
146 /// use rusty_cat::api::{MeowClient, MeowConfig};
147 ///
148 /// let config = MeowConfig::default();
149 /// let client = MeowClient::new(config);
150 /// let _ = client;
151 /// ```
152 pub fn new(config: MeowConfig) -> Self {
153 MeowClient {
154 executor: Default::default(),
155 config,
156 global_progress_listener: Arc::new(RwLock::new(Vec::new())),
157 closed: Arc::new(AtomicBool::new(false)),
158 }
159 }
160
161 /// Returns a `reqwest::Client` aligned with this client's configuration.
162 ///
163 /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
164 /// injected a custom client, this
165 /// returns its clone.
166 /// - Otherwise, this builds a new client from `http_timeout` and
167 /// `tcp_keepalive`.
168 ///
169 /// # Errors
170 ///
171 /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
172 /// fails.
173 ///
174 /// # Examples
175 ///
176 /// ```no_run
177 /// use rusty_cat::api::{MeowClient, MeowConfig};
178 ///
179 /// let client = MeowClient::new(MeowConfig::default());
180 /// let http = client.http_client()?;
181 /// let _ = http;
182 /// # Ok::<(), rusty_cat::api::MeowError>(())
183 /// ```
184 pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
185 if let Some(c) = self.config.http_client_ref() {
186 return Ok(c.clone());
187 }
188 reqwest::Client::builder()
189 .timeout(self.config.http_timeout())
190 .tcp_keepalive(self.config.tcp_keepalive())
191 .build()
192 .map_err(|e| {
193 MeowError::from_source(
194 InnerErrorCode::HttpClientBuildFailed,
195 format!(
196 "build reqwest client failed (timeout={:?}, keepalive={:?})",
197 self.config.http_timeout(),
198 self.config.tcp_keepalive()
199 ),
200 e,
201 )
202 })
203 }
204
205 fn get_exec(&self) -> Result<&Executor, MeowError> {
206 if let Some(exec) = self.executor.get() {
207 crate::meow_flow_log!("executor", "reuse existing executor");
208 return Ok(exec);
209 }
210 let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
211 self.config.http_timeout(),
212 self.config.tcp_keepalive(),
213 )?;
214 crate::meow_flow_log!(
215 "executor",
216 "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
217 self.config.http_timeout(),
218 self.config.tcp_keepalive()
219 );
220 let exec = Executor::new(
221 self.config.clone(),
222 Arc::new(default_http_transfer),
223 self.global_progress_listener.clone(),
224 )?;
225 let _ = self.executor.set(exec);
226 self.executor.get().ok_or_else(|| {
227 crate::meow_flow_log!(
228 "executor",
229 "executor init race failed after set; returning RuntimeCreationFailedError"
230 );
231 MeowError::from_code_str(
232 InnerErrorCode::RuntimeCreationFailedError,
233 "executor init race failed",
234 )
235 })
236 }
237
238 /// Ensures the client is still open.
239 ///
240 /// Returns `ClientClosed` if [`Self::close`] was called successfully.
241 fn ensure_open(&self) -> Result<(), MeowError> {
242 if self.closed.load(Ordering::SeqCst) {
243 crate::meow_flow_log!("client", "ensure_open failed: client already closed");
244 Err(MeowError::from_code_str(
245 InnerErrorCode::ClientClosed,
246 "meow client is already closed",
247 ))
248 } else {
249 Ok(())
250 }
251 }
252
253 /// Registers a global progress listener for all tasks.
254 ///
255 /// # Parameters
256 ///
257 /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
258 ///
259 /// # Returns
260 ///
261 /// Returns a listener ID used by
262 /// [`Self::unregister_global_progress_listener`].
263 ///
264 /// # Usage rules
265 ///
266 /// Keep callback execution short and panic-free. A heavy callback can slow
267 /// down global event delivery.
268 ///
269 /// # Errors
270 ///
271 /// Returns `LockPoisoned` when listener storage lock is poisoned.
272 ///
273 /// # Examples
274 ///
275 /// ```no_run
276 /// use rusty_cat::api::{MeowClient, MeowConfig};
277 ///
278 /// let client = MeowClient::new(MeowConfig::default());
279 /// let listener_id = client.register_global_progress_listener(|record| {
280 /// println!("task={} progress={:.2}", record.task_id(), record.progress());
281 /// })?;
282 /// let _ = listener_id;
283 /// # Ok::<(), rusty_cat::api::MeowError>(())
284 /// ```
285 pub fn register_global_progress_listener<F>(
286 &self,
287 listener: F,
288 ) -> Result<GlobalProgressListenerId, MeowError>
289 where
290 F: Fn(FileTransferRecord) + Send + Sync + 'static,
291 {
292 let id = GlobalProgressListenerId::new();
293 crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
294 let mut guard = self.global_progress_listener.write().map_err(|e| {
295 MeowError::from_code(
296 InnerErrorCode::LockPoisoned,
297 format!("register global listener lock poisoned: {}", e),
298 )
299 })?;
300 guard.push((id, Arc::new(listener)));
301 Ok(id)
302 }
303
304 /// Unregisters one previously registered global progress listener.
305 ///
306 /// Returns `Ok(false)` when the ID does not exist.
307 ///
308 /// # Errors
309 ///
310 /// Returns `LockPoisoned` when listener storage lock is poisoned.
311 ///
312 /// # Examples
313 ///
314 /// ```no_run
315 /// use rusty_cat::api::{MeowClient, MeowConfig};
316 ///
317 /// let client = MeowClient::new(MeowConfig::default());
318 /// let id = client.register_global_progress_listener(|_| {})?;
319 /// let removed = client.unregister_global_progress_listener(id)?;
320 /// assert!(removed);
321 /// # Ok::<(), rusty_cat::api::MeowError>(())
322 /// ```
323 pub fn unregister_global_progress_listener(
324 &self,
325 id: GlobalProgressListenerId,
326 ) -> Result<bool, MeowError> {
327 let mut g = self.global_progress_listener.write().map_err(|e| {
328 MeowError::from_code(
329 InnerErrorCode::LockPoisoned,
330 format!("unregister global listener lock poisoned: {}", e),
331 )
332 })?;
333 if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
334 g.remove(pos);
335 crate::meow_flow_log!(
336 "listener",
337 "unregister global listener success: id={:?}",
338 id
339 );
340 Ok(true)
341 } else {
342 crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
343 Ok(false)
344 }
345 }
346
347 /// Removes all registered global progress listeners.
348 ///
349 /// # Errors
350 ///
351 /// Returns `LockPoisoned` when listener storage lock is poisoned.
352 ///
353 /// # Examples
354 ///
355 /// ```no_run
356 /// use rusty_cat::api::{MeowClient, MeowConfig};
357 ///
358 /// let client = MeowClient::new(MeowConfig::default());
359 /// client.clear_global_listener()?;
360 /// # Ok::<(), rusty_cat::api::MeowError>(())
361 /// ```
362 pub fn clear_global_listener(&self) -> Result<(), MeowError> {
363 crate::meow_flow_log!("listener", "clear all global listeners");
364 self.global_progress_listener
365 .write()
366 .map_err(|e| {
367 MeowError::from_code(
368 InnerErrorCode::LockPoisoned,
369 format!("clear global listeners lock poisoned: {}", e),
370 )
371 })?
372 .clear();
373 Ok(())
374 }
375
376 /// Sets or clears the global debug log listener.
377 ///
378 /// - Pass `Some(listener)` to set/replace.
379 /// - Pass `None` to clear.
380 ///
381 /// This affects all `MeowClient` instances in the current process.
382 ///
383 /// # Errors
384 ///
385 /// Returns [`DebugLogListenerError`] when the internal global listener lock
386 /// is poisoned.
387 ///
388 /// # Examples
389 ///
390 /// ```no_run
391 /// use std::sync::Arc;
392 /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
393 ///
394 /// let client = MeowClient::new(MeowConfig::default());
395 /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
396 /// println!("{log}");
397 /// })))?;
398 ///
399 /// // Clear listener when no longer needed.
400 /// client.set_debug_log_listener(None)?;
401 /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
402 /// ```
403 pub fn set_debug_log_listener(
404 &self,
405 listener: Option<DebugLogListener>,
406 ) -> Result<(), DebugLogListenerError> {
407 set_debug_log_listener(listener)
408 }
409}
410
411impl MeowClient {
412 /// Submits a transfer task to the internal scheduler and returns its
413 /// [`TaskId`].
414 ///
415 /// The actual upload/download execution is dispatched to an internal
416 /// worker system thread. This method only performs lightweight validation
417 /// and submission, so it does not block the caller thread waiting for full
418 /// transfer completion.
419 ///
420 /// `try_enqueue` is also the recovery entrypoint after process restart.
421 /// If the application was killed during a previous upload/download,
422 /// restart your process and call `try_enqueue` again to resume that
423 /// transfer workflow.
424 ///
425 /// # Back-pressure semantics (why the `try_` prefix)
426 ///
427 /// Internally this method uses
428 /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
429 /// to the scheduler worker, **not** `send().await`. That means:
430 ///
431 /// - The `await` point in this function is used for task normalization
432 /// (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
433 /// for waiting on command-queue capacity.
434 /// - If the command queue is momentarily full (bursty enqueue under
435 /// [`MeowConfig::command_queue_capacity`]), this method returns an
436 /// immediate `CommandSendFailed` error instead of suspending the
437 /// caller until a slot frees up.
438 /// - Other control APIs ([`Self::pause`], [`Self::resume`],
439 /// [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
440 /// wait for queue capacity. Only enqueue is fail-fast.
441 ///
442 /// Callers that want to batch-enqueue under burst load should either:
443 ///
444 /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
445 /// 2. retry on `CommandSendFailed` with their own back-off, or
446 /// 3. rate-limit enqueue calls on the caller side.
447 ///
448 /// The name explicitly carries `try_` so this fail-fast behavior is
449 /// visible at the call site. If a fully-awaiting variant is introduced
450 /// later it should be named `enqueue` (without the `try_` prefix).
451 ///
452 /// # Parameters
453 ///
454 /// - `task`: Built by upload/download task builders.
455 /// - `progress_cb`: Per-task callback invoked with transfer progress.
456 /// - `complete_cb`: Callback fired once when task reaches
457 /// [`crate::transfer_status::TransferStatus::Complete`]. The second
458 /// argument is provider-defined payload returned by upload protocol
459 /// `complete_upload`; download tasks usually receive `None`.
460 ///
461 /// # Usage rules
462 ///
463 /// - `task` must be non-empty (required path/name/url and valid upload size).
464 /// - Callback should be lightweight and non-blocking.
465 /// - Store returned task ID for subsequent task control operations.
466 /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
467 /// - For restart recovery, re-enqueue the same logical task (same
468 /// upload/download target and compatible checkpoint context) so the
469 /// runtime can continue from existing local/remote progress.
470 ///
471 /// # Errors
472 ///
473 /// Returns:
474 /// - `ClientClosed` if the client was closed.
475 /// - `ParameterEmpty` if the task is invalid/empty.
476 /// - `CommandSendFailed` if the scheduler command queue is full at the
477 /// moment of submission (see back-pressure semantics above).
478 /// - Any runtime initialization errors from the executor.
479 ///
480 /// # Examples
481 ///
482 /// ```no_run
483 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
484 ///
485 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
486 /// let client = MeowClient::new(MeowConfig::default());
487 /// let task = DownloadPounceBuilder::new(
488 /// "example.bin",
489 /// "./downloads/example.bin",
490 /// 1024 * 1024,
491 /// "https://example.com/example.bin",
492 /// )
493 /// .build();
494 ///
495 /// let task_id = client
496 /// .try_enqueue(
497 /// task,
498 /// |record| {
499 /// println!("status={:?} progress={:.2}", record.status(), record.progress());
500 /// },
501 /// |task_id, payload| {
502 /// println!("task {task_id} completed, payload={payload:?}");
503 /// },
504 /// )
505 /// .await?;
506 /// println!("enqueued task: {task_id}");
507 /// # Ok(())
508 /// # }
509 /// ```
510 pub async fn try_enqueue<PCB, CCB>(
511 &self,
512 task: PounceTask,
513 progress_cb: PCB,
514 complete_cb: CCB,
515 ) -> Result<TaskId, MeowError>
516 where
517 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
518 CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
519 {
520 self.ensure_open()?;
521 if task.is_empty() {
522 crate::meow_flow_log!("try_enqueue", "reject empty task");
523 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
524 }
525
526 crate::meow_flow_log!("try_enqueue", "task={:?}", task);
527
528 let progress: ProgressCb = Arc::new(progress_cb);
529 let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
530 let callbacks = TaskCallbacks::new(Some(progress), complete);
531
532 let (def_up, def_down) = default_breakpoint_arcs();
533 let inner = InnerTask::from_pounce(
534 task,
535 self.config.breakpoint_download_http().clone(),
536 self.config.http_client_ref().cloned(),
537 def_up,
538 def_down,
539 )
540 .await?;
541
542 let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
543 crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
544 Ok(task_id)
545 }
546
547 /// Enqueues a task and `await`s until it reaches a terminal status.
548 ///
549 /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
550 /// do not have to write the channel + double-callback + single-send-guard
551 /// boilerplate themselves.
552 ///
553 /// # Returns
554 ///
555 /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
556 /// - `Err(MeowError)` carrying the underlying failure for
557 /// [`TransferStatus::Failed`].
558 /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
559 /// [`TransferStatus::Canceled`].
560 ///
561 /// # Progress
562 ///
563 /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
564 /// the per-task progress callback in [`Self::try_enqueue`].
565 ///
566 /// # Cancellation / timeout
567 ///
568 /// Dropping the returned future does **not** cancel the underlying transfer;
569 /// the task continues running in the executor. Use [`Self::cancel`] with
570 /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
571 /// abort an in-flight transfer.
572 ///
573 /// To cap wall-clock waiting time, wrap this future:
574 ///
575 /// ```ignore
576 /// let outcome = tokio::time::timeout(
577 /// std::time::Duration::from_secs(60),
578 /// client.enqueue_and_wait(task, |_| {}),
579 /// )
580 /// .await??;
581 /// ```
582 ///
583 /// # Errors
584 ///
585 /// In addition to the terminal-status errors above, propagates any error
586 /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
587 /// `CommandSendFailed`).
588 ///
589 /// # Examples
590 ///
591 /// ```no_run
592 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
593 ///
594 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
595 /// let client = MeowClient::new(MeowConfig::default());
596 /// let task = DownloadPounceBuilder::new(
597 /// "example.bin",
598 /// "./downloads/example.bin",
599 /// 1024 * 1024,
600 /// "https://example.com/example.bin",
601 /// )
602 /// .build();
603 ///
604 /// let outcome = client
605 /// .enqueue_and_wait(task, |record| {
606 /// println!(
607 /// "task={} progress={:.2}",
608 /// record.task_id(),
609 /// record.progress()
610 /// );
611 /// })
612 /// .await?;
613 /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
614 /// # Ok(())
615 /// # }
616 /// ```
617 pub async fn enqueue_and_wait<PCB>(
618 &self,
619 task: PounceTask,
620 progress_cb: PCB,
621 ) -> Result<TaskOutcome, MeowError>
622 where
623 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
624 {
625 type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
626 let (tx, rx) = oneshot::channel::<TerminalMsg>();
627 let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
628 Arc::new(StdMutex::new(Some(tx)));
629 let progress_slot = Arc::clone(&tx_slot);
630 let complete_slot = tx_slot;
631
632 self.try_enqueue(
633 task,
634 move |record: FileTransferRecord| {
635 progress_cb(record.clone());
636 match record.status() {
637 TransferStatus::Failed(err) => {
638 send_terminal_once(&progress_slot, Err(err.clone()));
639 }
640 TransferStatus::Canceled => {
641 send_terminal_once(
642 &progress_slot,
643 Err(MeowError::from_code_str(
644 InnerErrorCode::TaskCanceled,
645 "task was canceled",
646 )),
647 );
648 }
649 _ => {}
650 }
651 },
652 move |task_id, payload| {
653 send_terminal_once(&complete_slot, Ok((task_id, payload)));
654 },
655 )
656 .await?;
657
658 match rx.await {
659 Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
660 Ok(Err(err)) => Err(err),
661 Err(_) => Err(MeowError::from_code_str(
662 InnerErrorCode::CommandResponseFailed,
663 "transfer terminal channel closed without notification",
664 )),
665 }
666 }
667
668 // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
669 // todo!(arman) -
670 // }
671
672 /// Pauses a running or pending task by ID.
673 ///
674 /// This API sends a control command to the internal scheduler worker
675 /// thread. It does not execute transfer pause logic on the caller thread.
676 ///
677 /// # Usage rules
678 ///
679 /// Call this with a valid task ID returned by [`Self::enqueue`].
680 ///
681 /// # Errors
682 ///
683 /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
684 ///
685 /// # Examples
686 ///
687 /// ```no_run
688 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
689 ///
690 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
691 /// let client = MeowClient::new(MeowConfig::default());
692 /// client.pause(task_id).await?;
693 /// # Ok(())
694 /// # }
695 /// ```
696 pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
697 self.ensure_open()?;
698 crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
699 self.get_exec()?.pause(task_id).await
700 }
701
702 /// Resumes a previously paused task.
703 ///
704 /// The same [`TaskId`] continues to identify the task after resume.
705 /// The resume command is forwarded to the internal scheduler worker
706 /// thread, so caller thread is not responsible for running transfer logic.
707 ///
708 /// # Errors
709 ///
710 /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
711 ///
712 /// # Examples
713 ///
714 /// ```no_run
715 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
716 ///
717 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
718 /// let client = MeowClient::new(MeowConfig::default());
719 /// client.resume(task_id).await?;
720 /// # Ok(())
721 /// # }
722 /// ```
723 pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
724 self.ensure_open()?;
725 crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
726 self.get_exec()?.resume(task_id).await
727 }
728
729 /// Cancels a task by ID.
730 ///
731 /// Cancellation is requested through the internal scheduler worker thread.
732 /// Transfer cancellation execution happens in background runtime workers.
733 ///
734 /// # Usage rules
735 ///
736 /// Cancellation is best-effort; protocol-specific cleanup may run.
737 ///
738 /// # Errors
739 ///
740 /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
741 ///
742 /// # Examples
743 ///
744 /// ```no_run
745 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
746 ///
747 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
748 /// let client = MeowClient::new(MeowConfig::default());
749 /// client.cancel(task_id).await?;
750 /// # Ok(())
751 /// # }
752 /// ```
753 pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
754 self.ensure_open()?;
755 crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
756 self.get_exec()?.cancel(task_id).await
757 }
758
759 /// Returns a snapshot of queue and active transfer groups.
760 ///
761 /// Useful for diagnostics and external monitoring dashboards.
762 /// Snapshot collection is coordinated by internal scheduler worker state.
763 ///
764 /// # Errors
765 ///
766 /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
767 /// snapshot retrieval errors.
768 ///
769 /// # Examples
770 ///
771 /// ```no_run
772 /// use rusty_cat::api::{MeowClient, MeowConfig};
773 ///
774 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
775 /// let client = MeowClient::new(MeowConfig::default());
776 /// let snap = client.snapshot().await?;
777 /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
778 /// # Ok(())
779 /// # }
780 /// ```
781 pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
782 self.ensure_open()?;
783 crate::meow_flow_log!("client_api", "snapshot called");
784 self.get_exec()?.snapshot().await
785 }
786
787 /// Closes this client and its underlying executor.
788 ///
789 /// `close` is the terminal lifecycle operation for a `MeowClient`. After
790 /// it succeeds, this client stays permanently closed; submit more work by
791 /// constructing a new `MeowClient` and enqueueing tasks there.
792 ///
793 /// After a successful close:
794 ///
795 /// - New task and control operations on this client are rejected with
796 /// `ClientClosed`.
797 /// - All known unfinished task groups (queued, paused, or active) receive
798 /// a `Paused` progress notification through their task callback and all
799 /// registered global listeners.
800 /// - In-flight transfers are cancelled and the scheduler state is cleared.
801 /// - Already submitted callback jobs are drained before returning.
802 /// - The internal scheduler thread is joined, which drops its Tokio
803 /// runtime and releases SDK-owned background execution resources.
804 ///
805 /// `Paused` is used for shutdown notifications rather than `Canceled` so
806 /// callers can recreate a client later and re-enqueue the same logical
807 /// transfer when they want to resume from available breakpoint state.
808 ///
809 /// # Idempotency
810 ///
811 /// Calling `close` more than once returns `ClientClosed`.
812 ///
813 /// # Retry behavior
814 ///
815 /// If executor close fails, the closed flag is rolled back so caller can
816 /// retry close. A successful close is not restartable.
817 ///
818 /// # Errors
819 ///
820 /// Returns `ClientClosed` when already closed, or underlying executor close
821 /// errors when shutdown is not completed.
822 ///
823 /// # Examples
824 ///
825 /// ```no_run
826 /// use rusty_cat::api::{MeowClient, MeowConfig};
827 ///
828 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
829 /// let client = MeowClient::new(MeowConfig::default());
830 /// client.close().await?;
831 /// # Ok(())
832 /// # }
833 /// ```
834 pub async fn close(&self) -> Result<(), MeowError> {
835 if self
836 .closed
837 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
838 .is_err()
839 {
840 crate::meow_flow_log!("client_api", "close rejected: already closed");
841 return Err(MeowError::from_code_str(
842 InnerErrorCode::ClientClosed,
843 "meow client is already closed",
844 ));
845 }
846 if let Some(exec) = self.executor.get() {
847 crate::meow_flow_log!("client_api", "close forwarding to executor");
848 if let Err(e) = exec.close().await {
849 // Roll back closed flag so caller can retry close.
850 self.closed.store(false, Ordering::SeqCst);
851 return Err(e);
852 }
853 Ok(())
854 } else {
855 crate::meow_flow_log!("client_api", "close with no executor initialized");
856 Ok(())
857 }
858 }
859
860 /// Returns whether this client is currently closed.
861 ///
862 /// # Examples
863 ///
864 /// ```no_run
865 /// use rusty_cat::api::{MeowClient, MeowConfig};
866 ///
867 /// let client = MeowClient::new(MeowConfig::default());
868 /// let _closed = client.is_closed();
869 /// ```
870 pub fn is_closed(&self) -> bool {
871 self.closed.load(Ordering::SeqCst)
872 }
873}
874
875fn send_terminal_once(
876 slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
877 msg: Result<(TaskId, Option<String>), MeowError>,
878) {
879 if let Ok(mut guard) = slot.lock() {
880 if let Some(sender) = guard.take() {
881 let _ = sender.send(msg);
882 }
883 }
884}