rusty_cat/meow_client.rs
1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};
3
4use tokio::sync::oneshot;
5
6use crate::dflt::default_http_transfer::{default_breakpoint_arcs, DefaultHttpTransfer};
7use crate::error::{InnerErrorCode, MeowError};
8use crate::file_transfer_record::FileTransferRecord;
9use crate::ids::{GlobalProgressListenerId, TaskId};
10use crate::inner::executor::Executor;
11use crate::inner::inner_task::InnerTask;
12use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
13use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
14use crate::meow_config::MeowConfig;
15use crate::pounce_task::PounceTask;
16use crate::transfer_snapshot::TransferSnapshot;
17use crate::transfer_status::TransferStatus;
18
19/// Callback type for globally observing task progress events.
20///
21/// The callback is invoked from runtime worker context. Keep callback logic
22/// fast and non-blocking to avoid delaying event processing.
23pub type GlobalProgressListener = ProgressCb;
24
25/// Main entry point of the `rusty-cat` SDK.
26///
27/// `MeowClient` owns runtime state and provides high-level operations:
28/// enqueue, pause, resume, cancel, snapshot, and close.
29///
30/// # Usage pattern
31///
32/// 1. Create [`MeowConfig`].
33/// 2. Construct `MeowClient::new(config)`.
34/// 3. Build tasks with upload/download builders.
35/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
36/// 5. Control task lifecycle with pause/resume/cancel.
37/// 6. Call [`Self::close`] during shutdown.
38///
39/// # Lifecycle contract: you **must** call [`Self::close`]
40///
41/// The background scheduler runs on a dedicated [`std::thread`] that drives
42/// its own Tokio runtime. The clean shutdown protocol is an explicit
43/// `close().await` command which:
44///
45/// - cancels in-flight transfers,
46/// - flushes `Paused` status events to user callbacks for every known group,
47/// - drains already submitted callback jobs,
48/// - joins the scheduler thread and lets the runtime drop.
49///
50/// Forgetting to call `close` leaves the scheduler thread alive until all
51/// command senders are dropped (which does happen when `MeowClient` is
52/// dropped, but only as a fallback). When that fallback path runs, the
53/// guarantees above do **not** hold: callers may miss terminal status
54/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
55/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
56/// impossible to debug from the outside.
57///
58/// To help surface this misuse the internal executor implements a
59/// **best-effort [`Drop`]** that, when `close` was never called:
60///
61/// - emits a `Warn`-level log via the debug log listener (tag
62/// `"executor_drop"`),
63/// - performs a non-blocking `try_send` of a final `Close` command so the
64/// worker still has a chance to drain its state,
65/// - then drops the command sender, causing the worker loop to exit on its
66/// own.
67///
68/// This is a safety net, **not** a substitute for calling `close`. Treat
69/// `close().await` as a mandatory step in your shutdown sequence.
70///
71/// # Sharing across tasks / threads
72///
73/// `MeowClient` **intentionally does not implement [`Clone`]**.
74///
75/// The client owns a lazily-initialized [`Executor`] (a single background
76/// worker loop plus its task table, scheduler state and shutdown flag). A
77/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
78/// it was initialized, letting different clones each spin up their **own**
79/// executor on first use. The result would be:
80///
81/// - multiple independent task tables (tasks enqueued via one clone are
82/// invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
83/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
84/// [`MeowConfig::max_download_concurrency`]) silently multiplied by the
85/// number of clones;
86/// - [`Self::close`] only shutting down one of the worker loops, leaking the
87/// rest.
88///
89/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
90/// and clone the `Arc` instead:
91///
92/// ```no_run
93/// use std::sync::Arc;
94/// use rusty_cat::api::{MeowClient, MeowConfig};
95///
96/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
97/// let client_for_task = Arc::clone(&client);
98/// tokio::spawn(async move {
99/// let _ = client_for_task; // use the shared client here
100/// });
101/// ```
102/// Outcome of a task that reached [`TransferStatus::Complete`].
103///
104/// Returned by [`MeowClient::enqueue_and_wait`].
105#[derive(Debug, Clone)]
106pub struct TaskOutcome {
107 /// Task identifier returned by the underlying scheduler.
108 pub task_id: TaskId,
109 /// Provider-defined payload returned by upload protocol's `complete_upload`.
110 /// Download tasks usually receive `None`.
111 pub payload: Option<String>,
112}
113
114pub struct MeowClient {
115 /// Lazily initialized task executor.
116 ///
117 /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
118 /// there is exactly one owner of this `OnceLock`. Share the whole client
119 /// via `Arc<MeowClient>` when multi-owner access is needed.
120 executor: OnceLock<Executor>,
121 executor_init: StdMutex<()>,
122 /// Immutable runtime configuration.
123 config: MeowConfig,
124 /// Global listeners receiving progress records for all tasks.
125 global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
126 /// Global closed flag. Once set to `true`, task control APIs reject calls.
127 closed: Arc<AtomicBool>,
128}
129
130impl std::fmt::Debug for MeowClient {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 f.debug_struct("MeowClient")
133 .field("config", &self.config)
134 .field("global_progress_listener", &"..")
135 .finish()
136 }
137}
138
139impl MeowClient {
140 /// Creates a new client with the provided configuration.
141 ///
142 /// The internal executor is initialized lazily on first task operation.
143 ///
144 /// # Examples
145 ///
146 /// ```no_run
147 /// use rusty_cat::api::{MeowClient, MeowConfig};
148 ///
149 /// let config = MeowConfig::default();
150 /// let client = MeowClient::new(config);
151 /// let _ = client;
152 /// ```
153 pub fn new(config: MeowConfig) -> Self {
154 MeowClient {
155 executor: Default::default(),
156 executor_init: StdMutex::new(()),
157 config,
158 global_progress_listener: Arc::new(RwLock::new(Vec::new())),
159 closed: Arc::new(AtomicBool::new(false)),
160 }
161 }
162
163 /// Returns a `reqwest::Client` aligned with this client's configuration.
164 ///
165 /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
166 /// injected a custom client, this
167 /// returns its clone.
168 /// - Otherwise, this builds a new client from `http_timeout` and
169 /// `tcp_keepalive`.
170 ///
171 /// # Errors
172 ///
173 /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
174 /// fails.
175 ///
176 /// # Examples
177 ///
178 /// ```no_run
179 /// use rusty_cat::api::{MeowClient, MeowConfig};
180 ///
181 /// let client = MeowClient::new(MeowConfig::default());
182 /// let http = client.http_client()?;
183 /// let _ = http;
184 /// # Ok::<(), rusty_cat::api::MeowError>(())
185 /// ```
186 pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
187 if let Some(c) = self.config.http_client_ref() {
188 return Ok(c.clone());
189 }
190 reqwest::Client::builder()
191 .timeout(self.config.http_timeout())
192 .tcp_keepalive(self.config.tcp_keepalive())
193 .build()
194 .map_err(|e| {
195 MeowError::from_source(
196 InnerErrorCode::HttpClientBuildFailed,
197 format!(
198 "build reqwest client failed (timeout={:?}, keepalive={:?})",
199 self.config.http_timeout(),
200 self.config.tcp_keepalive()
201 ),
202 e,
203 )
204 })
205 }
206
207 fn get_exec(&self) -> Result<&Executor, MeowError> {
208 if let Some(exec) = self.executor.get() {
209 crate::meow_flow_log!("executor", "reuse existing executor");
210 return Ok(exec);
211 }
212
213 let _init_guard = self.executor_init.lock().map_err(|e| {
214 MeowError::from_code(
215 InnerErrorCode::LockPoisoned,
216 format!("executor init lock poisoned: {}", e),
217 )
218 })?;
219 if let Some(exec) = self.executor.get() {
220 crate::meow_flow_log!(
221 "executor",
222 "reuse executor initialized by concurrent caller"
223 );
224 return Ok(exec);
225 }
226
227 let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
228 self.config.http_timeout(),
229 self.config.tcp_keepalive(),
230 )?;
231 crate::meow_flow_log!(
232 "executor",
233 "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
234 self.config.http_timeout(),
235 self.config.tcp_keepalive()
236 );
237 let exec = Executor::new(
238 self.config.clone(),
239 Arc::new(default_http_transfer),
240 self.global_progress_listener.clone(),
241 )?;
242 self.executor.set(exec).map_err(|_| {
243 crate::meow_flow_log!(
244 "executor",
245 "executor init race failed while holding init lock"
246 );
247 MeowError::from_code_str(
248 InnerErrorCode::RuntimeCreationFailedError,
249 "executor init race failed",
250 )
251 })?;
252 self.executor.get().ok_or_else(|| {
253 crate::meow_flow_log!(
254 "executor",
255 "executor init race failed after set; returning RuntimeCreationFailedError"
256 );
257 MeowError::from_code_str(
258 InnerErrorCode::RuntimeCreationFailedError,
259 "executor init race failed",
260 )
261 })
262 }
263
264 /// Ensures the client is still open.
265 ///
266 /// Returns `ClientClosed` if [`Self::close`] was called successfully.
267 fn ensure_open(&self) -> Result<(), MeowError> {
268 if self.closed.load(Ordering::SeqCst) {
269 crate::meow_flow_log!("client", "ensure_open failed: client already closed");
270 Err(MeowError::from_code_str(
271 InnerErrorCode::ClientClosed,
272 "meow client is already closed",
273 ))
274 } else {
275 Ok(())
276 }
277 }
278
279 /// Registers a global progress listener for all tasks.
280 ///
281 /// # Parameters
282 ///
283 /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
284 ///
285 /// # Returns
286 ///
287 /// Returns a listener ID used by
288 /// [`Self::unregister_global_progress_listener`].
289 ///
290 /// # Usage rules
291 ///
292 /// Keep callback execution short and panic-free. A heavy callback can slow
293 /// down global event delivery.
294 ///
295 /// # Errors
296 ///
297 /// Returns `LockPoisoned` when listener storage lock is poisoned.
298 ///
299 /// # Examples
300 ///
301 /// ```no_run
302 /// use rusty_cat::api::{MeowClient, MeowConfig};
303 ///
304 /// let client = MeowClient::new(MeowConfig::default());
305 /// let listener_id = client.register_global_progress_listener(|record| {
306 /// println!("task={} progress={:.2}", record.task_id(), record.progress());
307 /// })?;
308 /// let _ = listener_id;
309 /// # Ok::<(), rusty_cat::api::MeowError>(())
310 /// ```
311 pub fn register_global_progress_listener<F>(
312 &self,
313 listener: F,
314 ) -> Result<GlobalProgressListenerId, MeowError>
315 where
316 F: Fn(FileTransferRecord) + Send + Sync + 'static,
317 {
318 let id = GlobalProgressListenerId::new();
319 crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
320 let mut guard = self.global_progress_listener.write().map_err(|e| {
321 MeowError::from_code(
322 InnerErrorCode::LockPoisoned,
323 format!("register global listener lock poisoned: {}", e),
324 )
325 })?;
326 guard.push((id, Arc::new(listener)));
327 Ok(id)
328 }
329
330 /// Unregisters one previously registered global progress listener.
331 ///
332 /// Returns `Ok(false)` when the ID does not exist.
333 ///
334 /// # Errors
335 ///
336 /// Returns `LockPoisoned` when listener storage lock is poisoned.
337 ///
338 /// # Examples
339 ///
340 /// ```no_run
341 /// use rusty_cat::api::{MeowClient, MeowConfig};
342 ///
343 /// let client = MeowClient::new(MeowConfig::default());
344 /// let id = client.register_global_progress_listener(|_| {})?;
345 /// let removed = client.unregister_global_progress_listener(id)?;
346 /// assert!(removed);
347 /// # Ok::<(), rusty_cat::api::MeowError>(())
348 /// ```
349 pub fn unregister_global_progress_listener(
350 &self,
351 id: GlobalProgressListenerId,
352 ) -> Result<bool, MeowError> {
353 let mut g = self.global_progress_listener.write().map_err(|e| {
354 MeowError::from_code(
355 InnerErrorCode::LockPoisoned,
356 format!("unregister global listener lock poisoned: {}", e),
357 )
358 })?;
359 if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
360 g.remove(pos);
361 crate::meow_flow_log!(
362 "listener",
363 "unregister global listener success: id={:?}",
364 id
365 );
366 Ok(true)
367 } else {
368 crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
369 Ok(false)
370 }
371 }
372
373 /// Removes all registered global progress listeners.
374 ///
375 /// # Errors
376 ///
377 /// Returns `LockPoisoned` when listener storage lock is poisoned.
378 ///
379 /// # Examples
380 ///
381 /// ```no_run
382 /// use rusty_cat::api::{MeowClient, MeowConfig};
383 ///
384 /// let client = MeowClient::new(MeowConfig::default());
385 /// client.clear_global_listener()?;
386 /// # Ok::<(), rusty_cat::api::MeowError>(())
387 /// ```
388 pub fn clear_global_listener(&self) -> Result<(), MeowError> {
389 crate::meow_flow_log!("listener", "clear all global listeners");
390 self.global_progress_listener
391 .write()
392 .map_err(|e| {
393 MeowError::from_code(
394 InnerErrorCode::LockPoisoned,
395 format!("clear global listeners lock poisoned: {}", e),
396 )
397 })?
398 .clear();
399 Ok(())
400 }
401
402 /// Sets or clears the global debug log listener.
403 ///
404 /// - Pass `Some(listener)` to set/replace.
405 /// - Pass `None` to clear.
406 ///
407 /// This affects all `MeowClient` instances in the current process.
408 ///
409 /// # Errors
410 ///
411 /// Returns [`DebugLogListenerError`] when the internal global listener lock
412 /// is poisoned.
413 ///
414 /// # Examples
415 ///
416 /// ```no_run
417 /// use std::sync::Arc;
418 /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
419 ///
420 /// let client = MeowClient::new(MeowConfig::default());
421 /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
422 /// println!("{log}");
423 /// })))?;
424 ///
425 /// // Clear listener when no longer needed.
426 /// client.set_debug_log_listener(None)?;
427 /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
428 /// ```
429 pub fn set_debug_log_listener(
430 &self,
431 listener: Option<DebugLogListener>,
432 ) -> Result<(), DebugLogListenerError> {
433 set_debug_log_listener(listener)
434 }
435}
436
437impl MeowClient {
438 /// Submits a transfer task to the internal scheduler and returns its
439 /// [`TaskId`].
440 ///
441 /// The actual upload/download execution is dispatched to an internal
442 /// worker system thread. This method only performs lightweight validation
443 /// and submission, so it does not block the caller thread waiting for full
444 /// transfer completion.
445 ///
446 /// `try_enqueue` is also the recovery entrypoint after process restart.
447 /// If the application was killed during a previous upload/download,
448 /// restart your process and call `try_enqueue` again to resume that
449 /// transfer workflow.
450 ///
451 /// # Back-pressure semantics (why the `try_` prefix)
452 ///
453 /// Internally this method uses
454 /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
455 /// to the scheduler worker, **not** `send().await`. That means:
456 ///
457 /// - The `await` point in this function is used for task normalization
458 /// (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
459 /// for waiting on command-queue capacity.
460 /// - If the command queue is momentarily full (bursty enqueue under
461 /// [`MeowConfig::command_queue_capacity`]), this method returns an
462 /// immediate `CommandSendFailed` error instead of suspending the
463 /// caller until a slot frees up.
464 /// - Other control APIs ([`Self::pause`], [`Self::resume`],
465 /// [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
466 /// wait for queue capacity. Only enqueue is fail-fast.
467 ///
468 /// Callers that want to batch-enqueue under burst load should either:
469 ///
470 /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
471 /// 2. retry on `CommandSendFailed` with their own back-off, or
472 /// 3. rate-limit enqueue calls on the caller side.
473 ///
474 /// The name explicitly carries `try_` so this fail-fast behavior is
475 /// visible at the call site. If a fully-awaiting variant is introduced
476 /// later it should be named `enqueue` (without the `try_` prefix).
477 ///
478 /// # Parameters
479 ///
480 /// - `task`: Built by upload/download task builders.
481 /// - `progress_cb`: Per-task callback invoked with transfer progress.
482 /// - `complete_cb`: Callback fired once when task reaches
483 /// [`crate::transfer_status::TransferStatus::Complete`]. The second
484 /// argument is provider-defined payload returned by upload protocol
485 /// `complete_upload`; download tasks usually receive `None`.
486 ///
487 /// # Usage rules
488 ///
489 /// - `task` must be non-empty (required path/name/url and valid upload size).
490 /// - Callback should be lightweight and non-blocking.
491 /// - Store returned task ID for subsequent task control operations.
492 /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
493 /// - For restart recovery, re-enqueue the same logical task (same
494 /// upload/download target and compatible checkpoint context) so the
495 /// runtime can continue from existing local/remote progress.
496 ///
497 /// # Errors
498 ///
499 /// Returns:
500 /// - `ClientClosed` if the client was closed.
501 /// - `ParameterEmpty` if the task is invalid/empty.
502 /// - `CommandSendFailed` if the scheduler command queue is full at the
503 /// moment of submission (see back-pressure semantics above).
504 /// - Any runtime initialization errors from the executor.
505 ///
506 /// # Examples
507 ///
508 /// ```no_run
509 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
510 ///
511 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
512 /// let client = MeowClient::new(MeowConfig::default());
513 /// let task = DownloadPounceBuilder::new(
514 /// "example.bin",
515 /// "./downloads/example.bin",
516 /// 1024 * 1024,
517 /// "https://example.com/example.bin",
518 /// )
519 /// .build();
520 ///
521 /// let task_id = client
522 /// .try_enqueue(
523 /// task,
524 /// |record| {
525 /// println!("status={:?} progress={:.2}", record.status(), record.progress());
526 /// },
527 /// |task_id, payload| {
528 /// println!("task {task_id} completed, payload={payload:?}");
529 /// },
530 /// )
531 /// .await?;
532 /// println!("enqueued task: {task_id}");
533 /// # Ok(())
534 /// # }
535 /// ```
536 pub async fn try_enqueue<PCB, CCB>(
537 &self,
538 task: PounceTask,
539 progress_cb: PCB,
540 complete_cb: CCB,
541 ) -> Result<TaskId, MeowError>
542 where
543 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
544 CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
545 {
546 self.ensure_open()?;
547 if task.is_empty() {
548 crate::meow_flow_log!("try_enqueue", "reject empty task");
549 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
550 }
551
552 crate::meow_flow_log!("try_enqueue", "task={:?}", task);
553
554 let progress: ProgressCb = Arc::new(progress_cb);
555 let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
556 let callbacks = TaskCallbacks::new(Some(progress), complete);
557
558 let (def_up, def_down) = default_breakpoint_arcs();
559 let inner = InnerTask::from_pounce(
560 task,
561 self.config.breakpoint_download_http().clone(),
562 self.config.http_client_ref().cloned(),
563 def_up,
564 def_down,
565 )
566 .await?;
567
568 let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
569 crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
570 Ok(task_id)
571 }
572
573 /// Imports a transfer task in the **paused** state without scheduling it.
574 ///
575 /// This is the restart/restore entry point for callers that persist their
576 /// own transfer records: rebuild a [`PounceTask`] from your database, import
577 /// it here, and the task is registered into the scheduler as
578 /// [`TransferStatus::Paused`] **without** queueing, so it performs **zero
579 /// network or file I/O** until you explicitly start it.
580 ///
581 /// To start a previously imported task, call [`Self::resume`] with the
582 /// returned [`TaskId`]. A typical "restore N, start a user-selected subset"
583 /// flow imports every task with `try_enqueue_paused` and then calls
584 /// [`Self::resume`] only for the ids the user chose; the rest stay paused.
585 ///
586 /// # Difference from [`Self::try_enqueue`]
587 ///
588 /// - `try_enqueue` schedules immediately (the task becomes `Pending` and may
589 /// start transferring as soon as a concurrency slot is free).
590 /// - `try_enqueue_paused` registers the task as `Paused` and never queues it
591 /// until [`Self::resume`] is called.
592 ///
593 /// Back-pressure is identical: this method uses
594 /// [`tokio::sync::mpsc::Sender::try_send`] and fails fast with
595 /// `CommandSendFailed` if the command queue is full (see
596 /// [`Self::try_enqueue`] for the rationale behind the `try_` prefix).
597 ///
598 /// # Resume semantics after import
599 ///
600 /// When the imported task is later resumed, the resume point is recomputed
601 /// by the executor, **not** taken from any value passed here:
602 ///
603 /// - **Download**: resumes from the on-disk partial file length, so the
604 /// partial file must still exist at the task's `file_path`.
605 /// - **Upload**: resumes from the server-reported `next_byte` during the
606 /// upload `prepare` stage.
607 ///
608 /// # Progress reporting while paused
609 ///
610 /// The single `Paused` [`FileTransferRecord`] emitted on import reports
611 /// progress `0.0` because no `prepare` has run yet. Render the imported
612 /// task's real progress from your own persisted record; the SDK corrects it
613 /// after the first resume.
614 ///
615 /// # Parameters
616 ///
617 /// Same as [`Self::try_enqueue`]: a built `task`, a per-task `progress_cb`,
618 /// and a `complete_cb` fired once on terminal `Complete`.
619 ///
620 /// # Errors
621 ///
622 /// - `ClientClosed` if the client was closed.
623 /// - `ParameterEmpty` if the task is invalid/empty.
624 /// - `CommandSendFailed` if the scheduler command queue is full.
625 /// - Any runtime initialization errors from the executor.
626 ///
627 /// # Examples
628 ///
629 /// ```no_run
630 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
631 ///
632 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
633 /// let client = MeowClient::new(MeowConfig::default());
634 /// let task = DownloadPounceBuilder::new(
635 /// "example.bin",
636 /// "./downloads/example.bin",
637 /// 1024 * 1024,
638 /// "https://example.com/example.bin",
639 /// )
640 /// .build();
641 ///
642 /// // Import without starting it (no HTTP request, no file open).
643 /// let task_id = client
644 /// .try_enqueue_paused(task, |_record| {}, |_id, _payload| {})
645 /// .await?;
646 ///
647 /// // Later, when the user chooses to start this one:
648 /// client.resume(task_id).await?;
649 /// # Ok(())
650 /// # }
651 /// ```
652 pub async fn try_enqueue_paused<PCB, CCB>(
653 &self,
654 task: PounceTask,
655 progress_cb: PCB,
656 complete_cb: CCB,
657 ) -> Result<TaskId, MeowError>
658 where
659 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
660 CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
661 {
662 self.ensure_open()?;
663 if task.is_empty() {
664 crate::meow_flow_log!("try_enqueue_paused", "reject empty task");
665 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
666 }
667
668 crate::meow_flow_log!("try_enqueue_paused", "task={:?}", task);
669
670 let progress: ProgressCb = Arc::new(progress_cb);
671 let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
672 let callbacks = TaskCallbacks::new(Some(progress), complete);
673
674 let (def_up, def_down) = default_breakpoint_arcs();
675 let inner = InnerTask::from_pounce(
676 task,
677 self.config.breakpoint_download_http().clone(),
678 self.config.http_client_ref().cloned(),
679 def_up,
680 def_down,
681 )
682 .await?;
683
684 let task_id = self.get_exec()?.try_enqueue_paused(inner, callbacks)?;
685 crate::meow_flow_log!(
686 "try_enqueue_paused",
687 "try_enqueue_paused success: task_id={:?}",
688 task_id
689 );
690 Ok(task_id)
691 }
692
693 /// Enqueues a task and `await`s until it reaches a terminal status.
694 ///
695 /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
696 /// do not have to write the channel + double-callback + single-send-guard
697 /// boilerplate themselves.
698 ///
699 /// # Returns
700 ///
701 /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
702 /// - `Err(MeowError)` carrying the underlying failure for
703 /// [`TransferStatus::Failed`].
704 /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
705 /// [`TransferStatus::Canceled`].
706 ///
707 /// # Progress
708 ///
709 /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
710 /// the per-task progress callback in [`Self::try_enqueue`].
711 ///
712 /// # Cancellation / timeout
713 ///
714 /// Dropping the returned future does **not** cancel the underlying transfer;
715 /// the task continues running in the executor. Use [`Self::cancel`] with
716 /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
717 /// abort an in-flight transfer.
718 ///
719 /// To cap wall-clock waiting time, wrap this future:
720 ///
721 /// ```ignore
722 /// let outcome = tokio::time::timeout(
723 /// std::time::Duration::from_secs(60),
724 /// client.enqueue_and_wait(task, |_| {}),
725 /// )
726 /// .await??;
727 /// ```
728 ///
729 /// # Errors
730 ///
731 /// In addition to the terminal-status errors above, propagates any error
732 /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
733 /// `CommandSendFailed`).
734 ///
735 /// # Examples
736 ///
737 /// ```no_run
738 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
739 ///
740 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
741 /// let client = MeowClient::new(MeowConfig::default());
742 /// let task = DownloadPounceBuilder::new(
743 /// "example.bin",
744 /// "./downloads/example.bin",
745 /// 1024 * 1024,
746 /// "https://example.com/example.bin",
747 /// )
748 /// .build();
749 ///
750 /// let outcome = client
751 /// .enqueue_and_wait(task, |record| {
752 /// println!(
753 /// "task={} progress={:.2}",
754 /// record.task_id(),
755 /// record.progress()
756 /// );
757 /// })
758 /// .await?;
759 /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
760 /// # Ok(())
761 /// # }
762 /// ```
763 pub async fn enqueue_and_wait<PCB>(
764 &self,
765 task: PounceTask,
766 progress_cb: PCB,
767 ) -> Result<TaskOutcome, MeowError>
768 where
769 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
770 {
771 type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
772 let (tx, rx) = oneshot::channel::<TerminalMsg>();
773 let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
774 Arc::new(StdMutex::new(Some(tx)));
775 let progress_slot = Arc::clone(&tx_slot);
776 let complete_slot = tx_slot;
777
778 self.try_enqueue(
779 task,
780 move |record: FileTransferRecord| {
781 progress_cb(record.clone());
782 match record.status() {
783 TransferStatus::Failed(err) => {
784 send_terminal_once(&progress_slot, Err(err.clone()));
785 }
786 TransferStatus::Canceled => {
787 send_terminal_once(
788 &progress_slot,
789 Err(MeowError::from_code_str(
790 InnerErrorCode::TaskCanceled,
791 "task was canceled",
792 )),
793 );
794 }
795 _ => {}
796 }
797 },
798 move |task_id, payload| {
799 send_terminal_once(&complete_slot, Ok((task_id, payload)));
800 },
801 )
802 .await?;
803
804 match rx.await {
805 Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
806 Ok(Err(err)) => Err(err),
807 Err(_) => Err(MeowError::from_code_str(
808 InnerErrorCode::CommandResponseFailed,
809 "transfer terminal channel closed without notification",
810 )),
811 }
812 }
813
814 // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
815 // todo!(arman) -
816 // }
817
818 /// Pauses a running or pending task by ID.
819 ///
820 /// This API sends a control command to the internal scheduler worker
821 /// thread. It does not execute transfer pause logic on the caller thread.
822 ///
823 /// # Usage rules
824 ///
825 /// Call this with a valid task ID returned by [`Self::enqueue`].
826 ///
827 /// # Errors
828 ///
829 /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
830 ///
831 /// # Examples
832 ///
833 /// ```no_run
834 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
835 ///
836 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
837 /// let client = MeowClient::new(MeowConfig::default());
838 /// client.pause(task_id).await?;
839 /// # Ok(())
840 /// # }
841 /// ```
842 pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
843 self.ensure_open()?;
844 crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
845 self.get_exec()?.pause(task_id).await
846 }
847
848 /// Resumes a previously paused task.
849 ///
850 /// The same [`TaskId`] continues to identify the task after resume.
851 /// The resume command is forwarded to the internal scheduler worker
852 /// thread, so caller thread is not responsible for running transfer logic.
853 ///
854 /// # Errors
855 ///
856 /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
857 ///
858 /// # Examples
859 ///
860 /// ```no_run
861 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
862 ///
863 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
864 /// let client = MeowClient::new(MeowConfig::default());
865 /// client.resume(task_id).await?;
866 /// # Ok(())
867 /// # }
868 /// ```
869 pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
870 self.ensure_open()?;
871 crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
872 self.get_exec()?.resume(task_id).await
873 }
874
875 /// Cancels a task by ID.
876 ///
877 /// Cancellation is requested through the internal scheduler worker thread.
878 /// Transfer cancellation execution happens in background runtime workers.
879 ///
880 /// # Usage rules
881 ///
882 /// Cancellation is best-effort; protocol-specific cleanup may run.
883 ///
884 /// # Errors
885 ///
886 /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
887 ///
888 /// # Examples
889 ///
890 /// ```no_run
891 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
892 ///
893 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
894 /// let client = MeowClient::new(MeowConfig::default());
895 /// client.cancel(task_id).await?;
896 /// # Ok(())
897 /// # }
898 /// ```
899 pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
900 self.ensure_open()?;
901 crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
902 self.get_exec()?.cancel(task_id).await
903 }
904
905 /// Returns a snapshot of queue and active transfer groups.
906 ///
907 /// Useful for diagnostics and external monitoring dashboards.
908 /// Snapshot collection is coordinated by internal scheduler worker state.
909 ///
910 /// # Errors
911 ///
912 /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
913 /// snapshot retrieval errors.
914 ///
915 /// # Examples
916 ///
917 /// ```no_run
918 /// use rusty_cat::api::{MeowClient, MeowConfig};
919 ///
920 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
921 /// let client = MeowClient::new(MeowConfig::default());
922 /// let snap = client.snapshot().await?;
923 /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
924 /// # Ok(())
925 /// # }
926 /// ```
927 pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
928 self.ensure_open()?;
929 crate::meow_flow_log!("client_api", "snapshot called");
930 self.get_exec()?.snapshot().await
931 }
932
933 /// Closes this client and its underlying executor.
934 ///
935 /// `close` is the terminal lifecycle operation for a `MeowClient`. After
936 /// it succeeds, this client stays permanently closed; submit more work by
937 /// constructing a new `MeowClient` and enqueueing tasks there.
938 ///
939 /// After a successful close:
940 ///
941 /// - New task and control operations on this client are rejected with
942 /// `ClientClosed`.
943 /// - All known unfinished task groups (queued, paused, or active) receive
944 /// a `Paused` progress notification through their task callback and all
945 /// registered global listeners.
946 /// - In-flight transfers are cancelled and the scheduler state is cleared.
947 /// - Already submitted callback jobs are drained before returning.
948 /// - The internal scheduler thread is joined, which drops its Tokio
949 /// runtime and releases SDK-owned background execution resources.
950 ///
951 /// `Paused` is used for shutdown notifications rather than `Canceled` so
952 /// callers can recreate a client later and re-enqueue the same logical
953 /// transfer when they want to resume from available breakpoint state.
954 ///
955 /// # Idempotency
956 ///
957 /// Calling `close` more than once returns `ClientClosed`.
958 ///
959 /// # Retry behavior
960 ///
961 /// If executor close fails, the closed flag is rolled back so caller can
962 /// retry close. A successful close is not restartable.
963 ///
964 /// # Errors
965 ///
966 /// Returns `ClientClosed` when already closed, or underlying executor close
967 /// errors when shutdown is not completed.
968 ///
969 /// # Examples
970 ///
971 /// ```no_run
972 /// use rusty_cat::api::{MeowClient, MeowConfig};
973 ///
974 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
975 /// let client = MeowClient::new(MeowConfig::default());
976 /// client.close().await?;
977 /// # Ok(())
978 /// # }
979 /// ```
980 pub async fn close(&self) -> Result<(), MeowError> {
981 if self
982 .closed
983 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
984 .is_err()
985 {
986 crate::meow_flow_log!("client_api", "close rejected: already closed");
987 return Err(MeowError::from_code_str(
988 InnerErrorCode::ClientClosed,
989 "meow client is already closed",
990 ));
991 }
992 if let Some(exec) = self.executor.get() {
993 crate::meow_flow_log!("client_api", "close forwarding to executor");
994 if let Err(e) = exec.close().await {
995 // Roll back closed flag so caller can retry close.
996 self.closed.store(false, Ordering::SeqCst);
997 return Err(e);
998 }
999 Ok(())
1000 } else {
1001 crate::meow_flow_log!("client_api", "close with no executor initialized");
1002 Ok(())
1003 }
1004 }
1005
1006 /// Returns whether this client is currently closed.
1007 ///
1008 /// # Examples
1009 ///
1010 /// ```no_run
1011 /// use rusty_cat::api::{MeowClient, MeowConfig};
1012 ///
1013 /// let client = MeowClient::new(MeowConfig::default());
1014 /// let _closed = client.is_closed();
1015 /// ```
1016 pub fn is_closed(&self) -> bool {
1017 self.closed.load(Ordering::SeqCst)
1018 }
1019}
1020
1021fn send_terminal_once(
1022 slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
1023 msg: Result<(TaskId, Option<String>), MeowError>,
1024) {
1025 if let Ok(mut guard) = slot.lock() {
1026 if let Some(sender) = guard.take() {
1027 let _ = sender.send(msg);
1028 }
1029 }
1030}