rusty_cat/meow_client.rs
1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex as StdMutex, OnceLock, RwLock};
3
4use tokio::sync::oneshot;
5
6use crate::dflt::default_http_transfer::{
7 build_internal_client, default_breakpoint_arcs, DefaultHttpTransfer,
8};
9use crate::error::{InnerErrorCode, MeowError};
10use crate::file_transfer_record::FileTransferRecord;
11use crate::ids::{GlobalProgressListenerId, TaskId};
12use crate::inner::executor::Executor;
13use crate::inner::inner_task::InnerTask;
14use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
15use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
16use crate::meow_config::MeowConfig;
17use crate::pounce_task::PounceTask;
18use crate::transfer_snapshot::TransferSnapshot;
19use crate::transfer_status::TransferStatus;
20
21/// Callback type for globally observing task progress events.
22///
23/// The callback is invoked from runtime worker context. Keep callback logic
24/// fast and non-blocking to avoid delaying event processing.
25pub type GlobalProgressListener = ProgressCb;
26
27/// Main entry point of the `rusty-cat` SDK.
28///
29/// `MeowClient` owns runtime state and provides high-level operations:
30/// enqueue, pause, resume, cancel, snapshot, and close.
31///
32/// # Usage pattern
33///
34/// 1. Create [`MeowConfig`].
35/// 2. Construct `MeowClient::new(config)`.
36/// 3. Build tasks with upload/download builders.
37/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
38/// 5. Control task lifecycle with pause/resume/cancel.
39/// 6. Call [`Self::close`] during shutdown.
40///
41/// # Lifecycle contract: you **must** call [`Self::close`]
42///
43/// The background scheduler runs on a dedicated [`std::thread`] that drives
44/// its own Tokio runtime. The clean shutdown protocol is an explicit
45/// `close().await` command which:
46///
47/// - cancels in-flight transfers,
48/// - flushes `Paused` status events to user callbacks for every known group,
49/// - drains already submitted callback jobs,
50/// - joins the scheduler thread and lets the runtime drop.
51///
52/// Forgetting to call `close` leaves the scheduler thread alive until all
53/// command senders are dropped (which does happen when `MeowClient` is
54/// dropped, but only as a fallback). When that fallback path runs, the
55/// guarantees above do **not** hold: callers may miss terminal status
56/// events, in-flight HTTP transfers are aborted abruptly, and for long-lived
57/// SDK hosts (servers, mobile runtimes, etc.) the misuse is nearly
58/// impossible to debug from the outside.
59///
60/// To help surface this misuse the internal executor implements a
61/// **best-effort [`Drop`]** that, when `close` was never called:
62///
63/// - emits a `Warn`-level log via the debug log listener (tag
64/// `"executor_drop"`),
65/// - performs a non-blocking `try_send` of a final `Close` command so the
66/// worker still has a chance to drain its state,
67/// - then drops the command sender, causing the worker loop to exit on its
68/// own.
69///
70/// This is a safety net, **not** a substitute for calling `close`. Treat
71/// `close().await` as a mandatory step in your shutdown sequence.
72///
73/// # Sharing across tasks / threads
74///
75/// `MeowClient` **intentionally does not implement [`Clone`]**.
76///
77/// The client owns a lazily-initialized [`Executor`] (a single background
78/// worker loop plus its task table, scheduler state and shutdown flag). A
79/// naive field-by-field `Clone` would copy the `OnceLock<Executor>` *before*
80/// it was initialized, letting different clones each spin up their **own**
81/// executor on first use. The result would be:
82///
83/// - multiple independent task tables (tasks enqueued via one clone are
84/// invisible to `pause` / `resume` / `cancel` / `snapshot` on another);
85/// - concurrency limits ([`MeowConfig::max_upload_concurrency`] /
86/// [`MeowConfig::max_download_concurrency`]) silently multiplied by the
87/// number of clones;
88/// - [`Self::close`] only shutting down one of the worker loops, leaking the
89/// rest.
90///
91/// To share a client across tasks or threads, wrap it in [`std::sync::Arc`]
92/// and clone the `Arc` instead:
93///
94/// ```no_run
95/// use std::sync::Arc;
96/// use rusty_cat::api::{MeowClient, MeowConfig};
97///
98/// let client = Arc::new(MeowClient::new(MeowConfig::default()));
99/// let client_for_task = Arc::clone(&client);
100/// tokio::spawn(async move {
101/// let _ = client_for_task; // use the shared client here
102/// });
103/// ```
104/// Outcome of a task that reached [`TransferStatus::Complete`].
105///
106/// Returned by [`MeowClient::enqueue_and_wait`].
107#[derive(Debug, Clone)]
108pub struct TaskOutcome {
109 /// Task identifier returned by the underlying scheduler.
110 pub task_id: TaskId,
111 /// Provider-defined payload returned by upload protocol's `complete_upload`.
112 /// Download tasks usually receive `None`.
113 pub payload: Option<String>,
114}
115
116pub struct MeowClient {
117 /// Lazily initialized task executor.
118 ///
119 /// Deliberately **not** wrapped in `Arc`: `MeowClient` is not `Clone`, so
120 /// there is exactly one owner of this `OnceLock`. Share the whole client
121 /// via `Arc<MeowClient>` when multi-owner access is needed.
122 executor: OnceLock<Executor>,
123 executor_init: StdMutex<()>,
124 /// Immutable runtime configuration.
125 config: MeowConfig,
126 /// Global listeners receiving progress records for all tasks.
127 global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
128 /// Global closed flag. Once set to `true`, task control APIs reject calls.
129 closed: Arc<AtomicBool>,
130}
131
132impl std::fmt::Debug for MeowClient {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct("MeowClient")
135 .field("config", &self.config)
136 .field("global_progress_listener", &"..")
137 .finish()
138 }
139}
140
141impl MeowClient {
142 /// Creates a new client with the provided configuration.
143 ///
144 /// The internal executor is initialized lazily on first task operation.
145 ///
146 /// # Examples
147 ///
148 /// ```no_run
149 /// use rusty_cat::api::{MeowClient, MeowConfig};
150 ///
151 /// let config = MeowConfig::default();
152 /// let client = MeowClient::new(config);
153 /// let _ = client;
154 /// ```
155 pub fn new(config: MeowConfig) -> Self {
156 MeowClient {
157 executor: Default::default(),
158 executor_init: StdMutex::new(()),
159 config,
160 global_progress_listener: Arc::new(RwLock::new(Vec::new())),
161 closed: Arc::new(AtomicBool::new(false)),
162 }
163 }
164
165 /// Returns a `reqwest::Client` aligned with this client's configuration.
166 ///
167 /// - If [`MeowConfigBuilder::http_client`](crate::api::MeowConfigBuilder::http_client)
168 /// injected a custom client, this
169 /// returns its clone.
170 /// - Otherwise, this builds a new client from `http_timeout` and
171 /// `tcp_keepalive`.
172 ///
173 /// # Errors
174 ///
175 /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
176 /// fails.
177 ///
178 /// # Examples
179 ///
180 /// ```no_run
181 /// use rusty_cat::api::{MeowClient, MeowConfig};
182 ///
183 /// let client = MeowClient::new(MeowConfig::default());
184 /// let http = client.http_client()?;
185 /// let _ = http;
186 /// # Ok::<(), rusty_cat::api::MeowError>(())
187 /// ```
188 pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
189 if let Some(c) = self.config.http_client_ref() {
190 return Ok(c.clone());
191 }
192 // Build through the shared helper so this client carries the exact same
193 // transport policy (connect timeout + idle connection pool) as the
194 // transfer backend, rather than reqwest's bare defaults.
195 build_internal_client(self.config.http_timeout(), self.config.tcp_keepalive())
196 .map_err(|e| {
197 MeowError::from_source(
198 InnerErrorCode::HttpClientBuildFailed,
199 format!(
200 "build reqwest client failed (timeout={:?}, keepalive={:?})",
201 self.config.http_timeout(),
202 self.config.tcp_keepalive()
203 ),
204 e,
205 )
206 })
207 }
208
209 fn get_exec(&self) -> Result<&Executor, MeowError> {
210 if let Some(exec) = self.executor.get() {
211 crate::meow_flow_log!("executor", "reuse existing executor");
212 return Ok(exec);
213 }
214
215 let _init_guard = self.executor_init.lock().map_err(|e| {
216 MeowError::from_code(
217 InnerErrorCode::LockPoisoned,
218 format!("executor init lock poisoned: {}", e),
219 )
220 })?;
221 if let Some(exec) = self.executor.get() {
222 crate::meow_flow_log!(
223 "executor",
224 "reuse executor initialized by concurrent caller"
225 );
226 return Ok(exec);
227 }
228
229 let default_http_transfer = DefaultHttpTransfer::try_with_http_timeouts(
230 self.config.http_timeout(),
231 self.config.tcp_keepalive(),
232 )?;
233 crate::meow_flow_log!(
234 "executor",
235 "initializing DefaultHttpTransfer (timeout={:?}, tcp_keepalive={:?})",
236 self.config.http_timeout(),
237 self.config.tcp_keepalive()
238 );
239 let exec = Executor::new(
240 self.config.clone(),
241 Arc::new(default_http_transfer),
242 self.global_progress_listener.clone(),
243 )?;
244 self.executor.set(exec).map_err(|_| {
245 crate::meow_flow_log!(
246 "executor",
247 "executor init race failed while holding init lock"
248 );
249 MeowError::from_code_str(
250 InnerErrorCode::RuntimeCreationFailedError,
251 "executor init race failed",
252 )
253 })?;
254 self.executor.get().ok_or_else(|| {
255 crate::meow_flow_log!(
256 "executor",
257 "executor init race failed after set; returning RuntimeCreationFailedError"
258 );
259 MeowError::from_code_str(
260 InnerErrorCode::RuntimeCreationFailedError,
261 "executor init race failed",
262 )
263 })
264 }
265
266 /// Ensures the client is still open.
267 ///
268 /// Returns `ClientClosed` if [`Self::close`] was called successfully.
269 fn ensure_open(&self) -> Result<(), MeowError> {
270 if self.closed.load(Ordering::SeqCst) {
271 crate::meow_flow_log!("client", "ensure_open failed: client already closed");
272 Err(MeowError::from_code_str(
273 InnerErrorCode::ClientClosed,
274 "meow client is already closed",
275 ))
276 } else {
277 Ok(())
278 }
279 }
280
281 /// Registers a global progress listener for all tasks.
282 ///
283 /// # Parameters
284 ///
285 /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
286 ///
287 /// # Returns
288 ///
289 /// Returns a listener ID used by
290 /// [`Self::unregister_global_progress_listener`].
291 ///
292 /// # Usage rules
293 ///
294 /// Keep callback execution short and panic-free. A heavy callback can slow
295 /// down global event delivery.
296 ///
297 /// # Errors
298 ///
299 /// Returns `LockPoisoned` when listener storage lock is poisoned.
300 ///
301 /// # Examples
302 ///
303 /// ```no_run
304 /// use rusty_cat::api::{MeowClient, MeowConfig};
305 ///
306 /// let client = MeowClient::new(MeowConfig::default());
307 /// let listener_id = client.register_global_progress_listener(|record| {
308 /// println!("task={} progress={:.2}", record.task_id(), record.progress());
309 /// })?;
310 /// let _ = listener_id;
311 /// # Ok::<(), rusty_cat::api::MeowError>(())
312 /// ```
313 pub fn register_global_progress_listener<F>(
314 &self,
315 listener: F,
316 ) -> Result<GlobalProgressListenerId, MeowError>
317 where
318 F: Fn(FileTransferRecord) + Send + Sync + 'static,
319 {
320 let id = GlobalProgressListenerId::new();
321 crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
322 let mut guard = self.global_progress_listener.write().map_err(|e| {
323 MeowError::from_code(
324 InnerErrorCode::LockPoisoned,
325 format!("register global listener lock poisoned: {}", e),
326 )
327 })?;
328 guard.push((id, Arc::new(listener)));
329 Ok(id)
330 }
331
332 /// Unregisters one previously registered global progress listener.
333 ///
334 /// Returns `Ok(false)` when the ID does not exist.
335 ///
336 /// # Errors
337 ///
338 /// Returns `LockPoisoned` when listener storage lock is poisoned.
339 ///
340 /// # Examples
341 ///
342 /// ```no_run
343 /// use rusty_cat::api::{MeowClient, MeowConfig};
344 ///
345 /// let client = MeowClient::new(MeowConfig::default());
346 /// let id = client.register_global_progress_listener(|_| {})?;
347 /// let removed = client.unregister_global_progress_listener(id)?;
348 /// assert!(removed);
349 /// # Ok::<(), rusty_cat::api::MeowError>(())
350 /// ```
351 pub fn unregister_global_progress_listener(
352 &self,
353 id: GlobalProgressListenerId,
354 ) -> Result<bool, MeowError> {
355 let mut g = self.global_progress_listener.write().map_err(|e| {
356 MeowError::from_code(
357 InnerErrorCode::LockPoisoned,
358 format!("unregister global listener lock poisoned: {}", e),
359 )
360 })?;
361 if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
362 g.remove(pos);
363 crate::meow_flow_log!(
364 "listener",
365 "unregister global listener success: id={:?}",
366 id
367 );
368 Ok(true)
369 } else {
370 crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
371 Ok(false)
372 }
373 }
374
375 /// Removes all registered global progress listeners.
376 ///
377 /// # Errors
378 ///
379 /// Returns `LockPoisoned` when listener storage lock is poisoned.
380 ///
381 /// # Examples
382 ///
383 /// ```no_run
384 /// use rusty_cat::api::{MeowClient, MeowConfig};
385 ///
386 /// let client = MeowClient::new(MeowConfig::default());
387 /// client.clear_global_listener()?;
388 /// # Ok::<(), rusty_cat::api::MeowError>(())
389 /// ```
390 pub fn clear_global_listener(&self) -> Result<(), MeowError> {
391 crate::meow_flow_log!("listener", "clear all global listeners");
392 self.global_progress_listener
393 .write()
394 .map_err(|e| {
395 MeowError::from_code(
396 InnerErrorCode::LockPoisoned,
397 format!("clear global listeners lock poisoned: {}", e),
398 )
399 })?
400 .clear();
401 Ok(())
402 }
403
404 /// Sets or clears the global debug log listener.
405 ///
406 /// - Pass `Some(listener)` to set/replace.
407 /// - Pass `None` to clear.
408 ///
409 /// This affects all `MeowClient` instances in the current process.
410 ///
411 /// # Errors
412 ///
413 /// Returns [`DebugLogListenerError`] when the internal global listener lock
414 /// is poisoned.
415 ///
416 /// # Examples
417 ///
418 /// ```no_run
419 /// use std::sync::Arc;
420 /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
421 ///
422 /// let client = MeowClient::new(MeowConfig::default());
423 /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
424 /// println!("{log}");
425 /// })))?;
426 ///
427 /// // Clear listener when no longer needed.
428 /// client.set_debug_log_listener(None)?;
429 /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
430 /// ```
431 pub fn set_debug_log_listener(
432 &self,
433 listener: Option<DebugLogListener>,
434 ) -> Result<(), DebugLogListenerError> {
435 set_debug_log_listener(listener)
436 }
437}
438
439impl MeowClient {
440 /// Submits a transfer task to the internal scheduler and returns its
441 /// [`TaskId`].
442 ///
443 /// The actual upload/download execution is dispatched to an internal
444 /// worker system thread. This method only performs lightweight validation
445 /// and submission, so it does not block the caller thread waiting for full
446 /// transfer completion.
447 ///
448 /// `try_enqueue` is also the recovery entrypoint after process restart.
449 /// If the application was killed during a previous upload/download,
450 /// restart your process and call `try_enqueue` again to resume that
451 /// transfer workflow.
452 ///
453 /// # Back-pressure semantics (why the `try_` prefix)
454 ///
455 /// Internally this method uses
456 /// [`tokio::sync::mpsc::Sender::try_send`] to hand the `Enqueue` command
457 /// to the scheduler worker, **not** `send().await`. That means:
458 ///
459 /// - The `await` point in this function is used for task normalization
460 /// (e.g. resolving upload breakpoints, building [`InnerTask`]), **not**
461 /// for waiting on command-queue capacity.
462 /// - If the command queue is momentarily full (bursty enqueue under
463 /// [`MeowConfig::command_queue_capacity`]), this method returns an
464 /// immediate `CommandSendFailed` error instead of suspending the
465 /// caller until a slot frees up.
466 /// - Other control APIs ([`Self::pause`], [`Self::resume`],
467 /// [`Self::cancel`], [`Self::snapshot`]) use `send().await` and **do**
468 /// wait for queue capacity. Only enqueue is fail-fast.
469 ///
470 /// Callers that want to batch-enqueue under burst load should either:
471 ///
472 /// 1. size [`MeowConfig::command_queue_capacity`] appropriately, or
473 /// 2. retry on `CommandSendFailed` with their own back-off, or
474 /// 3. rate-limit enqueue calls on the caller side.
475 ///
476 /// The name explicitly carries `try_` so this fail-fast behavior is
477 /// visible at the call site. If a fully-awaiting variant is introduced
478 /// later it should be named `enqueue` (without the `try_` prefix).
479 ///
480 /// # Parameters
481 ///
482 /// - `task`: Built by upload/download task builders.
483 /// - `progress_cb`: Per-task callback invoked with transfer progress.
484 /// - `complete_cb`: Callback fired once when task reaches
485 /// [`crate::transfer_status::TransferStatus::Complete`]. The second
486 /// argument is provider-defined payload returned by upload protocol
487 /// `complete_upload`; download tasks usually receive `None`.
488 ///
489 /// # Usage rules
490 ///
491 /// - `task` must be non-empty (required path/name/url and valid upload size).
492 /// - Callback should be lightweight and non-blocking.
493 /// - Store returned task ID for subsequent task control operations.
494 /// - `try_enqueue` is asynchronous task submission, not synchronous transfer.
495 /// - For restart recovery, re-enqueue the same logical task (same
496 /// upload/download target and compatible checkpoint context) so the
497 /// runtime can continue from existing local/remote progress.
498 ///
499 /// # Errors
500 ///
501 /// Returns:
502 /// - `ClientClosed` if the client was closed.
503 /// - `ParameterEmpty` if the task is invalid/empty.
504 /// - `CommandSendFailed` if the scheduler command queue is full at the
505 /// moment of submission (see back-pressure semantics above).
506 /// - Any runtime initialization errors from the executor.
507 ///
508 /// # Examples
509 ///
510 /// ```no_run
511 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
512 ///
513 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
514 /// let client = MeowClient::new(MeowConfig::default());
515 /// let task = DownloadPounceBuilder::new(
516 /// "example.bin",
517 /// "./downloads/example.bin",
518 /// 1024 * 1024,
519 /// "https://example.com/example.bin",
520 /// )
521 /// .build();
522 ///
523 /// let task_id = client
524 /// .try_enqueue(
525 /// task,
526 /// |record| {
527 /// println!("status={:?} progress={:.2}", record.status(), record.progress());
528 /// },
529 /// |task_id, payload| {
530 /// println!("task {task_id} completed, payload={payload:?}");
531 /// },
532 /// )
533 /// .await?;
534 /// println!("enqueued task: {task_id}");
535 /// # Ok(())
536 /// # }
537 /// ```
538 pub async fn try_enqueue<PCB, CCB>(
539 &self,
540 task: PounceTask,
541 progress_cb: PCB,
542 complete_cb: CCB,
543 ) -> Result<TaskId, MeowError>
544 where
545 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
546 CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
547 {
548 self.ensure_open()?;
549 if task.is_empty() {
550 crate::meow_flow_log!("try_enqueue", "reject empty task");
551 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
552 }
553
554 crate::meow_flow_log!("try_enqueue", "task={:?}", task);
555
556 let progress: ProgressCb = Arc::new(progress_cb);
557 let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
558 let callbacks = TaskCallbacks::new(Some(progress), complete);
559
560 let (def_up, def_down) = default_breakpoint_arcs();
561 let inner = InnerTask::from_pounce(
562 task,
563 self.config.breakpoint_download_http().clone(),
564 self.config.http_client_ref().cloned(),
565 def_up,
566 def_down,
567 )
568 .await?;
569
570 let task_id = self.get_exec()?.try_enqueue(inner, callbacks)?;
571 crate::meow_flow_log!("try_enqueue", "try_enqueue success: task_id={:?}", task_id);
572 Ok(task_id)
573 }
574
575 /// Imports a transfer task in the **paused** state without scheduling it.
576 ///
577 /// This is the restart/restore entry point for callers that persist their
578 /// own transfer records: rebuild a [`PounceTask`] from your database, import
579 /// it here, and the task is registered into the scheduler as
580 /// [`TransferStatus::Paused`] **without** queueing, so it performs **zero
581 /// network or file I/O** until you explicitly start it.
582 ///
583 /// To start a previously imported task, call [`Self::resume`] with the
584 /// returned [`TaskId`]. A typical "restore N, start a user-selected subset"
585 /// flow imports every task with `try_enqueue_paused` and then calls
586 /// [`Self::resume`] only for the ids the user chose; the rest stay paused.
587 ///
588 /// # Difference from [`Self::try_enqueue`]
589 ///
590 /// - `try_enqueue` schedules immediately (the task becomes `Pending` and may
591 /// start transferring as soon as a concurrency slot is free).
592 /// - `try_enqueue_paused` registers the task as `Paused` and never queues it
593 /// until [`Self::resume`] is called.
594 ///
595 /// Back-pressure is identical: this method uses
596 /// [`tokio::sync::mpsc::Sender::try_send`] and fails fast with
597 /// `CommandSendFailed` if the command queue is full (see
598 /// [`Self::try_enqueue`] for the rationale behind the `try_` prefix).
599 ///
600 /// # Resume semantics after import
601 ///
602 /// When the imported task is later resumed, the resume point is recomputed
603 /// by the executor, **not** taken from any value passed here:
604 ///
605 /// - **Download**: resumes from the on-disk partial file length, so the
606 /// partial file must still exist at the task's `file_path`.
607 /// - **Upload**: resumes from the server-reported `next_byte` during the
608 /// upload `prepare` stage.
609 ///
610 /// # Progress reporting while paused
611 ///
612 /// The single `Paused` [`FileTransferRecord`] emitted on import reports
613 /// progress `0.0` because no `prepare` has run yet. Render the imported
614 /// task's real progress from your own persisted record; the SDK corrects it
615 /// after the first resume.
616 ///
617 /// # Parameters
618 ///
619 /// Same as [`Self::try_enqueue`]: a built `task`, a per-task `progress_cb`,
620 /// and a `complete_cb` fired once on terminal `Complete`.
621 ///
622 /// # Errors
623 ///
624 /// - `ClientClosed` if the client was closed.
625 /// - `ParameterEmpty` if the task is invalid/empty.
626 /// - `CommandSendFailed` if the scheduler command queue is full.
627 /// - Any runtime initialization errors from the executor.
628 ///
629 /// # Examples
630 ///
631 /// ```no_run
632 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
633 ///
634 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
635 /// let client = MeowClient::new(MeowConfig::default());
636 /// let task = DownloadPounceBuilder::new(
637 /// "example.bin",
638 /// "./downloads/example.bin",
639 /// 1024 * 1024,
640 /// "https://example.com/example.bin",
641 /// )
642 /// .build();
643 ///
644 /// // Import without starting it (no HTTP request, no file open).
645 /// let task_id = client
646 /// .try_enqueue_paused(task, |_record| {}, |_id, _payload| {})
647 /// .await?;
648 ///
649 /// // Later, when the user chooses to start this one:
650 /// client.resume(task_id).await?;
651 /// # Ok(())
652 /// # }
653 /// ```
654 pub async fn try_enqueue_paused<PCB, CCB>(
655 &self,
656 task: PounceTask,
657 progress_cb: PCB,
658 complete_cb: CCB,
659 ) -> Result<TaskId, MeowError>
660 where
661 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
662 CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
663 {
664 self.ensure_open()?;
665 if task.is_empty() {
666 crate::meow_flow_log!("try_enqueue_paused", "reject empty task");
667 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
668 }
669
670 crate::meow_flow_log!("try_enqueue_paused", "task={:?}", task);
671
672 let progress: ProgressCb = Arc::new(progress_cb);
673 let complete: Option<CompleteCb> = Some(Arc::new(complete_cb) as CompleteCb);
674 let callbacks = TaskCallbacks::new(Some(progress), complete);
675
676 let (def_up, def_down) = default_breakpoint_arcs();
677 let inner = InnerTask::from_pounce(
678 task,
679 self.config.breakpoint_download_http().clone(),
680 self.config.http_client_ref().cloned(),
681 def_up,
682 def_down,
683 )
684 .await?;
685
686 let task_id = self.get_exec()?.try_enqueue_paused(inner, callbacks)?;
687 crate::meow_flow_log!(
688 "try_enqueue_paused",
689 "try_enqueue_paused success: task_id={:?}",
690 task_id
691 );
692 Ok(task_id)
693 }
694
695 /// Enqueues a task and `await`s until it reaches a terminal status.
696 ///
697 /// Wraps [`Self::try_enqueue`] with an internal oneshot channel so callers
698 /// do not have to write the channel + double-callback + single-send-guard
699 /// boilerplate themselves.
700 ///
701 /// # Returns
702 ///
703 /// - `Ok(TaskOutcome)` when the task reaches [`TransferStatus::Complete`].
704 /// - `Err(MeowError)` carrying the underlying failure for
705 /// [`TransferStatus::Failed`].
706 /// - `Err(MeowError)` with code [`InnerErrorCode::TaskCanceled`] for
707 /// [`TransferStatus::Canceled`].
708 ///
709 /// # Progress
710 ///
711 /// `progress_cb` receives every [`FileTransferRecord`] update, identical to
712 /// the per-task progress callback in [`Self::try_enqueue`].
713 ///
714 /// # Cancellation / timeout
715 ///
716 /// Dropping the returned future does **not** cancel the underlying transfer;
717 /// the task continues running in the executor. Use [`Self::cancel`] with
718 /// the task id (obtainable from `progress_cb`'s `record.task_id()`) to
719 /// abort an in-flight transfer.
720 ///
721 /// To cap wall-clock waiting time, wrap this future:
722 ///
723 /// ```ignore
724 /// let outcome = tokio::time::timeout(
725 /// std::time::Duration::from_secs(60),
726 /// client.enqueue_and_wait(task, |_| {}),
727 /// )
728 /// .await??;
729 /// ```
730 ///
731 /// # Errors
732 ///
733 /// In addition to the terminal-status errors above, propagates any error
734 /// from [`Self::try_enqueue`] (e.g. `ClientClosed`, `ParameterEmpty`,
735 /// `CommandSendFailed`).
736 ///
737 /// # Examples
738 ///
739 /// ```no_run
740 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
741 ///
742 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
743 /// let client = MeowClient::new(MeowConfig::default());
744 /// let task = DownloadPounceBuilder::new(
745 /// "example.bin",
746 /// "./downloads/example.bin",
747 /// 1024 * 1024,
748 /// "https://example.com/example.bin",
749 /// )
750 /// .build();
751 ///
752 /// let outcome = client
753 /// .enqueue_and_wait(task, |record| {
754 /// println!(
755 /// "task={} progress={:.2}",
756 /// record.task_id(),
757 /// record.progress()
758 /// );
759 /// })
760 /// .await?;
761 /// println!("task {} complete, payload={:?}", outcome.task_id, outcome.payload);
762 /// # Ok(())
763 /// # }
764 /// ```
765 pub async fn enqueue_and_wait<PCB>(
766 &self,
767 task: PounceTask,
768 progress_cb: PCB,
769 ) -> Result<TaskOutcome, MeowError>
770 where
771 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
772 {
773 type TerminalMsg = Result<(TaskId, Option<String>), MeowError>;
774 let (tx, rx) = oneshot::channel::<TerminalMsg>();
775 let tx_slot: Arc<StdMutex<Option<oneshot::Sender<TerminalMsg>>>> =
776 Arc::new(StdMutex::new(Some(tx)));
777 let progress_slot = Arc::clone(&tx_slot);
778 let complete_slot = tx_slot;
779
780 self.try_enqueue(
781 task,
782 move |record: FileTransferRecord| {
783 progress_cb(record.clone());
784 match record.status() {
785 TransferStatus::Failed(err) => {
786 send_terminal_once(&progress_slot, Err(err.clone()));
787 }
788 TransferStatus::Canceled => {
789 send_terminal_once(
790 &progress_slot,
791 Err(MeowError::from_code_str(
792 InnerErrorCode::TaskCanceled,
793 "task was canceled",
794 )),
795 );
796 }
797 _ => {}
798 }
799 },
800 move |task_id, payload| {
801 send_terminal_once(&complete_slot, Ok((task_id, payload)));
802 },
803 )
804 .await?;
805
806 match rx.await {
807 Ok(Ok((task_id, payload))) => Ok(TaskOutcome { task_id, payload }),
808 Ok(Err(err)) => Err(err),
809 Err(_) => Err(MeowError::from_code_str(
810 InnerErrorCode::CommandResponseFailed,
811 "transfer terminal channel closed without notification",
812 )),
813 }
814 }
815
816 // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
817 // todo!(arman) -
818 // }
819
820 /// Pauses a running or pending task by ID.
821 ///
822 /// This API sends a control command to the internal scheduler worker
823 /// thread. It does not execute transfer pause logic on the caller thread.
824 ///
825 /// # Usage rules
826 ///
827 /// Call this with a valid task ID returned by [`Self::enqueue`].
828 ///
829 /// # Errors
830 ///
831 /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
832 ///
833 /// # Examples
834 ///
835 /// ```no_run
836 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
837 ///
838 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
839 /// let client = MeowClient::new(MeowConfig::default());
840 /// client.pause(task_id).await?;
841 /// # Ok(())
842 /// # }
843 /// ```
844 pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
845 self.ensure_open()?;
846 crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
847 self.get_exec()?.pause(task_id).await
848 }
849
850 /// Resumes a previously paused task.
851 ///
852 /// The same [`TaskId`] continues to identify the task after resume.
853 /// The resume command is forwarded to the internal scheduler worker
854 /// thread, so caller thread is not responsible for running transfer logic.
855 ///
856 /// # Errors
857 ///
858 /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
859 ///
860 /// # Examples
861 ///
862 /// ```no_run
863 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
864 ///
865 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
866 /// let client = MeowClient::new(MeowConfig::default());
867 /// client.resume(task_id).await?;
868 /// # Ok(())
869 /// # }
870 /// ```
871 pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
872 self.ensure_open()?;
873 crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
874 self.get_exec()?.resume(task_id).await
875 }
876
877 /// Cancels a task by ID.
878 ///
879 /// Cancellation is requested through the internal scheduler worker thread.
880 /// Transfer cancellation execution happens in background runtime workers.
881 ///
882 /// # Usage rules
883 ///
884 /// Cancellation is best-effort; protocol-specific cleanup may run.
885 ///
886 /// # Errors
887 ///
888 /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
889 ///
890 /// # Examples
891 ///
892 /// ```no_run
893 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
894 ///
895 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
896 /// let client = MeowClient::new(MeowConfig::default());
897 /// client.cancel(task_id).await?;
898 /// # Ok(())
899 /// # }
900 /// ```
901 pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
902 self.ensure_open()?;
903 crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
904 self.get_exec()?.cancel(task_id).await
905 }
906
907 /// Returns a snapshot of queue and active transfer groups.
908 ///
909 /// Useful for diagnostics and external monitoring dashboards.
910 /// Snapshot collection is coordinated by internal scheduler worker state.
911 ///
912 /// # Errors
913 ///
914 /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
915 /// snapshot retrieval errors.
916 ///
917 /// # Examples
918 ///
919 /// ```no_run
920 /// use rusty_cat::api::{MeowClient, MeowConfig};
921 ///
922 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
923 /// let client = MeowClient::new(MeowConfig::default());
924 /// let snap = client.snapshot().await?;
925 /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
926 /// # Ok(())
927 /// # }
928 /// ```
929 pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
930 self.ensure_open()?;
931 crate::meow_flow_log!("client_api", "snapshot called");
932 self.get_exec()?.snapshot().await
933 }
934
935 /// Closes this client and its underlying executor.
936 ///
937 /// `close` is the terminal lifecycle operation for a `MeowClient`. After
938 /// it succeeds, this client stays permanently closed; submit more work by
939 /// constructing a new `MeowClient` and enqueueing tasks there.
940 ///
941 /// After a successful close:
942 ///
943 /// - New task and control operations on this client are rejected with
944 /// `ClientClosed`.
945 /// - All known unfinished task groups (queued, paused, or active) receive
946 /// a `Paused` progress notification through their task callback and all
947 /// registered global listeners.
948 /// - In-flight transfers are cancelled and the scheduler state is cleared.
949 /// - Already submitted callback jobs are drained before returning.
950 /// - The internal scheduler thread is joined, which drops its Tokio
951 /// runtime and releases SDK-owned background execution resources.
952 ///
953 /// `Paused` is used for shutdown notifications rather than `Canceled` so
954 /// callers can recreate a client later and re-enqueue the same logical
955 /// transfer when they want to resume from available breakpoint state.
956 ///
957 /// # Idempotency
958 ///
959 /// Calling `close` more than once returns `ClientClosed`.
960 ///
961 /// # Retry behavior
962 ///
963 /// If executor close fails, the closed flag is rolled back so caller can
964 /// retry close. A successful close is not restartable.
965 ///
966 /// # Errors
967 ///
968 /// Returns `ClientClosed` when already closed, or underlying executor close
969 /// errors when shutdown is not completed.
970 ///
971 /// # Examples
972 ///
973 /// ```no_run
974 /// use rusty_cat::api::{MeowClient, MeowConfig};
975 ///
976 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
977 /// let client = MeowClient::new(MeowConfig::default());
978 /// client.close().await?;
979 /// # Ok(())
980 /// # }
981 /// ```
982 pub async fn close(&self) -> Result<(), MeowError> {
983 if self
984 .closed
985 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
986 .is_err()
987 {
988 crate::meow_flow_log!("client_api", "close rejected: already closed");
989 return Err(MeowError::from_code_str(
990 InnerErrorCode::ClientClosed,
991 "meow client is already closed",
992 ));
993 }
994 if let Some(exec) = self.executor.get() {
995 crate::meow_flow_log!("client_api", "close forwarding to executor");
996 if let Err(e) = exec.close().await {
997 // Roll back closed flag so caller can retry close.
998 self.closed.store(false, Ordering::SeqCst);
999 return Err(e);
1000 }
1001 Ok(())
1002 } else {
1003 crate::meow_flow_log!("client_api", "close with no executor initialized");
1004 Ok(())
1005 }
1006 }
1007
1008 /// Returns whether this client is currently closed.
1009 ///
1010 /// # Examples
1011 ///
1012 /// ```no_run
1013 /// use rusty_cat::api::{MeowClient, MeowConfig};
1014 ///
1015 /// let client = MeowClient::new(MeowConfig::default());
1016 /// let _closed = client.is_closed();
1017 /// ```
1018 pub fn is_closed(&self) -> bool {
1019 self.closed.load(Ordering::SeqCst)
1020 }
1021}
1022
1023fn send_terminal_once(
1024 slot: &Arc<StdMutex<Option<oneshot::Sender<Result<(TaskId, Option<String>), MeowError>>>>>,
1025 msg: Result<(TaskId, Option<String>), MeowError>,
1026) {
1027 if let Ok(mut guard) = slot.lock() {
1028 if let Some(sender) = guard.take() {
1029 let _ = sender.send(msg);
1030 }
1031 }
1032}