rusty_cat/meow_client.rs
1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_client::{default_breakpoint_arcs, DefaultHttpClient};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{CompleteCb, ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16/// Callback type for globally observing task progress events.
17///
18/// The callback is invoked from runtime worker context. Keep callback logic
19/// fast and non-blocking to avoid delaying event processing.
20pub type GlobalProgressListener = ProgressCb;
21
22/// Main entry point of the `rusty-cat` SDK.
23///
24/// `MeowClient` owns runtime state and provides high-level operations:
25/// enqueue, pause, resume, cancel, snapshot, and close.
26///
27/// # Usage pattern
28///
29/// 1. Create [`MeowConfig`].
30/// 2. Construct `MeowClient::new(config)`.
31/// 3. Build tasks with upload/download builders.
32/// 4. Call [`Self::enqueue`] and store returned [`TaskId`].
33/// 5. Control task lifecycle with pause/resume/cancel.
34/// 6. Call [`Self::close`] during shutdown.
35#[derive(Clone)]
36pub struct MeowClient {
37 /// Lazily initialized task executor.
38 executor: OnceLock<Executor>,
39 /// Immutable runtime configuration.
40 config: MeowConfig,
41 /// Global listeners receiving progress records for all tasks.
42 global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
43 /// Global closed flag. Once set to `true`, task control APIs reject calls.
44 closed: Arc<AtomicBool>,
45}
46
47impl std::fmt::Debug for MeowClient {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("MeowClient")
50 .field("config", &self.config)
51 .field("global_progress_listener", &"..")
52 .finish()
53 }
54}
55
56impl MeowClient {
57 /// Creates a new client with the provided configuration.
58 ///
59 /// The internal executor is initialized lazily on first task operation.
60 ///
61 /// # Examples
62 ///
63 /// ```no_run
64 /// use rusty_cat::api::{MeowClient, MeowConfig};
65 ///
66 /// let config = MeowConfig::default();
67 /// let client = MeowClient::new(config);
68 /// let _ = client;
69 /// ```
70 pub fn new(config: MeowConfig) -> Self {
71 MeowClient {
72 executor: Default::default(),
73 config,
74 global_progress_listener: Arc::new(RwLock::new(Vec::new())),
75 closed: Arc::new(AtomicBool::new(false)),
76 }
77 }
78
79 /// Returns a `reqwest::Client` aligned with this client's configuration.
80 ///
81 /// - If [`MeowConfig::with_http_client`] injected a custom client, this
82 /// returns its clone.
83 /// - Otherwise, this builds a new client from `http_timeout` and
84 /// `tcp_keepalive`.
85 ///
86 /// # Errors
87 ///
88 /// Returns [`MeowError`] with `HttpClientBuildFailed` when client creation
89 /// fails.
90 ///
91 /// # Examples
92 ///
93 /// ```no_run
94 /// use rusty_cat::api::{MeowClient, MeowConfig};
95 ///
96 /// let client = MeowClient::new(MeowConfig::default());
97 /// let http = client.http_client()?;
98 /// let _ = http;
99 /// # Ok::<(), rusty_cat::api::MeowError>(())
100 /// ```
101 pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
102 if let Some(c) = self.config.http_client_ref() {
103 return Ok(c.clone());
104 }
105 reqwest::Client::builder()
106 .timeout(self.config.http_timeout())
107 .tcp_keepalive(self.config.tcp_keepalive())
108 .build()
109 .map_err(|e| {
110 MeowError::from_source(
111 InnerErrorCode::HttpClientBuildFailed,
112 format!(
113 "build reqwest client failed (timeout={:?}, keepalive={:?})",
114 self.config.http_timeout(),
115 self.config.tcp_keepalive()
116 ),
117 e,
118 )
119 })
120 }
121
122 fn get_exec(&self) -> Result<&Executor, MeowError> {
123 if let Some(exec) = self.executor.get() {
124 crate::meow_flow_log!("executor", "reuse existing executor");
125 return Ok(exec);
126 }
127 let default = DefaultHttpClient::try_with_http_timeouts(
128 self.config.http_timeout(),
129 self.config.tcp_keepalive(),
130 )?;
131 crate::meow_flow_log!(
132 "executor",
133 "initializing default HTTP client (timeout={:?}, tcp_keepalive={:?})",
134 self.config.http_timeout(),
135 self.config.tcp_keepalive()
136 );
137 let exec = Executor::new(
138 self.config.clone(),
139 Arc::new(default),
140 self.global_progress_listener.clone(),
141 )?;
142 let _ = self.executor.set(exec);
143 self.executor.get().ok_or_else(|| {
144 crate::meow_flow_log!(
145 "executor",
146 "executor init race failed after set; returning RuntimeCreationFailedError"
147 );
148 MeowError::from_code_str(
149 InnerErrorCode::RuntimeCreationFailedError,
150 "executor init race failed",
151 )
152 })
153 }
154
155 /// Ensures the client is still open.
156 ///
157 /// Returns `ClientClosed` if [`Self::close`] was called successfully.
158 fn ensure_open(&self) -> Result<(), MeowError> {
159 if self.closed.load(Ordering::SeqCst) {
160 crate::meow_flow_log!("client", "ensure_open failed: client already closed");
161 Err(MeowError::from_code_str(
162 InnerErrorCode::ClientClosed,
163 "meow client is already closed",
164 ))
165 } else {
166 Ok(())
167 }
168 }
169
170 /// Registers a global progress listener for all tasks.
171 ///
172 /// # Parameters
173 ///
174 /// - `listener`: Callback receiving [`FileTransferRecord`] updates.
175 ///
176 /// # Returns
177 ///
178 /// Returns a listener ID used by
179 /// [`Self::unregister_global_progress_listener`].
180 ///
181 /// # Usage rules
182 ///
183 /// Keep callback execution short and panic-free. A heavy callback can slow
184 /// down global event delivery.
185 ///
186 /// # Errors
187 ///
188 /// Returns `LockPoisoned` when listener storage lock is poisoned.
189 ///
190 /// # Examples
191 ///
192 /// ```no_run
193 /// use rusty_cat::api::{MeowClient, MeowConfig};
194 ///
195 /// let client = MeowClient::new(MeowConfig::default());
196 /// let listener_id = client.register_global_progress_listener(|record| {
197 /// println!("task={} progress={:.2}", record.task_id(), record.progress());
198 /// })?;
199 /// let _ = listener_id;
200 /// # Ok::<(), rusty_cat::api::MeowError>(())
201 /// ```
202 pub fn register_global_progress_listener<F>(
203 &self,
204 listener: F,
205 ) -> Result<GlobalProgressListenerId, MeowError>
206 where
207 F: Fn(FileTransferRecord) + Send + Sync + 'static,
208 {
209 let id = GlobalProgressListenerId::new_v4();
210 crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
211 let mut guard = self.global_progress_listener.write().map_err(|e| {
212 MeowError::from_code(
213 InnerErrorCode::LockPoisoned,
214 format!("register global listener lock poisoned: {}", e),
215 )
216 })?;
217 guard.push((id, Arc::new(listener)));
218 Ok(id)
219 }
220
221 /// Unregisters one previously registered global progress listener.
222 ///
223 /// Returns `Ok(false)` when the ID does not exist.
224 ///
225 /// # Errors
226 ///
227 /// Returns `LockPoisoned` when listener storage lock is poisoned.
228 ///
229 /// # Examples
230 ///
231 /// ```no_run
232 /// use rusty_cat::api::{MeowClient, MeowConfig};
233 ///
234 /// let client = MeowClient::new(MeowConfig::default());
235 /// let id = client.register_global_progress_listener(|_| {})?;
236 /// let removed = client.unregister_global_progress_listener(id)?;
237 /// assert!(removed);
238 /// # Ok::<(), rusty_cat::api::MeowError>(())
239 /// ```
240 pub fn unregister_global_progress_listener(
241 &self,
242 id: GlobalProgressListenerId,
243 ) -> Result<bool, MeowError> {
244 let mut g = self.global_progress_listener.write().map_err(|e| {
245 MeowError::from_code(
246 InnerErrorCode::LockPoisoned,
247 format!("unregister global listener lock poisoned: {}", e),
248 )
249 })?;
250 if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
251 g.remove(pos);
252 crate::meow_flow_log!(
253 "listener",
254 "unregister global listener success: id={:?}",
255 id
256 );
257 Ok(true)
258 } else {
259 crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
260 Ok(false)
261 }
262 }
263
264 /// Removes all registered global progress listeners.
265 ///
266 /// # Errors
267 ///
268 /// Returns `LockPoisoned` when listener storage lock is poisoned.
269 ///
270 /// # Examples
271 ///
272 /// ```no_run
273 /// use rusty_cat::api::{MeowClient, MeowConfig};
274 ///
275 /// let client = MeowClient::new(MeowConfig::default());
276 /// client.clear_global_listener()?;
277 /// # Ok::<(), rusty_cat::api::MeowError>(())
278 /// ```
279 pub fn clear_global_listener(&self) -> Result<(), MeowError> {
280 crate::meow_flow_log!("listener", "clear all global listeners");
281 self.global_progress_listener
282 .write()
283 .map_err(|e| {
284 MeowError::from_code(
285 InnerErrorCode::LockPoisoned,
286 format!("clear global listeners lock poisoned: {}", e),
287 )
288 })?
289 .clear();
290 Ok(())
291 }
292
293 /// Sets or clears the global debug log listener.
294 ///
295 /// - Pass `Some(listener)` to set/replace.
296 /// - Pass `None` to clear.
297 ///
298 /// This affects all `MeowClient` instances in the current process.
299 ///
300 /// # Errors
301 ///
302 /// Returns [`DebugLogListenerError`] when the internal global listener lock
303 /// is poisoned.
304 ///
305 /// # Examples
306 ///
307 /// ```no_run
308 /// use std::sync::Arc;
309 /// use rusty_cat::api::{Log, MeowClient, MeowConfig};
310 ///
311 /// let client = MeowClient::new(MeowConfig::default());
312 /// client.set_debug_log_listener(Some(Arc::new(|log: Log| {
313 /// println!("{log}");
314 /// })))?;
315 ///
316 /// // Clear listener when no longer needed.
317 /// client.set_debug_log_listener(None)?;
318 /// # Ok::<(), rusty_cat::api::DebugLogListenerError>(())
319 /// ```
320 pub fn set_debug_log_listener(
321 &self,
322 listener: Option<DebugLogListener>,
323 ) -> Result<(), DebugLogListenerError> {
324 set_debug_log_listener(listener)
325 }
326}
327
328impl MeowClient {
329 /// Enqueues a transfer task and returns its [`TaskId`].
330 ///
331 /// The actual upload/download execution is dispatched to an internal
332 /// worker system thread. This method only performs lightweight validation
333 /// and submission, so it does not block the caller thread waiting for full
334 /// transfer completion.
335 ///
336 /// `enqueue` is also the recovery entrypoint after process restart. If the
337 /// application was killed during a previous upload/download, restart your
338 /// process and call `enqueue` again to resume that transfer workflow.
339 ///
340 /// # Parameters
341 ///
342 /// - `task`: Built by upload/download task builders.
343 /// - `progress_cb`: Per-task callback invoked with transfer progress.
344 /// - `complete_cb`: Optional callback fired once when task reaches
345 /// [`crate::transfer_status::TransferStatus::Complete`]. The second
346 /// argument is provider-defined payload returned by upload protocol
347 /// `complete_upload`; download tasks usually receive `None`.
348 ///
349 /// # Usage rules
350 ///
351 /// - `task` must be non-empty (required path/name/url and valid upload size).
352 /// - Callback should be lightweight and non-blocking.
353 /// - Store returned task ID for subsequent task control operations.
354 /// - `enqueue` is asynchronous task submission, not synchronous transfer.
355 /// - For restart recovery, re-enqueue the same logical task (same
356 /// upload/download target and compatible checkpoint context) so the
357 /// runtime can continue from existing local/remote progress.
358 ///
359 /// # Errors
360 ///
361 /// Returns:
362 /// - `ClientClosed` if the client was closed.
363 /// - `ParameterEmpty` if the task is invalid/empty.
364 /// - Any runtime initialization or enqueue errors from the executor.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use reqwest::Method;
370 /// use rusty_cat::api::{DownloadPounceBuilder, MeowClient, MeowConfig};
371 ///
372 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
373 /// let client = MeowClient::new(MeowConfig::default());
374 /// let task = DownloadPounceBuilder::new(
375 /// "example.bin",
376 /// "./downloads/example.bin",
377 /// 1024 * 1024,
378 /// "https://example.com/example.bin",
379 /// Method::GET,
380 /// )
381 /// .build();
382 ///
383 /// let task_id = client
384 /// .enqueue(
385 /// task,
386 /// |record| {
387 /// println!("status={:?} progress={:.2}", record.status(), record.progress());
388 /// },
389 /// Some(|task_id, payload| {
390 /// println!("task {task_id} completed, payload={payload:?}");
391 /// }),
392 /// )
393 /// .await?;
394 /// println!("enqueued task: {task_id}");
395 /// # Ok(())
396 /// # }
397 /// ```
398 pub async fn enqueue<PCB,CCB>(
399 &self,
400 task: PounceTask,
401 progress_cb: PCB,
402 complete_cb: Option<CCB>,
403 ) -> Result<TaskId, MeowError>
404 where
405 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
406 CCB: Fn(TaskId, Option<String>) + Send + Sync + 'static,
407 {
408 self.ensure_open()?;
409 if task.is_empty() {
410 crate::meow_flow_log!("enqueue", "reject empty task");
411 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
412 }
413
414 crate::meow_flow_log!("enqueue", "task={:?}", task);
415
416 let progress: ProgressCb = Arc::new(progress_cb);
417 let complete: Option<CompleteCb> = complete_cb.map(|cb| Arc::new(cb) as CompleteCb);
418 let callbacks = TaskCallbacks::new(Some(progress), complete);
419
420 let (def_up, def_down) = default_breakpoint_arcs();
421 let inner = InnerTask::from_pounce(
422 task,
423 self.config.breakpoint_download_http().clone(),
424 self.config.http_client_ref().cloned(),
425 def_up,
426 def_down,
427 )
428 .await?;
429
430 let task_id = self.get_exec()?.enqueue(inner, callbacks)?;
431 crate::meow_flow_log!("enqueue", "enqueue success: task_id={:?}", task_id);
432 Ok(task_id)
433 }
434
435 // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
436 // todo!(arman) -
437 // }
438
439 /// Pauses a running or pending task by ID.
440 ///
441 /// This API sends a control command to the internal scheduler worker
442 /// thread. It does not execute transfer pause logic on the caller thread.
443 ///
444 /// # Usage rules
445 ///
446 /// Call this with a valid task ID returned by [`Self::enqueue`].
447 ///
448 /// # Errors
449 ///
450 /// Returns `ClientClosed`, `TaskNotFound`, or state-transition errors.
451 ///
452 /// # Examples
453 ///
454 /// ```no_run
455 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
456 ///
457 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
458 /// let client = MeowClient::new(MeowConfig::default());
459 /// client.pause(task_id).await?;
460 /// # Ok(())
461 /// # }
462 /// ```
463 pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
464 self.ensure_open()?;
465 crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
466 self.get_exec()?.pause(task_id).await
467 }
468
469 /// Resumes a previously paused task.
470 ///
471 /// The same [`TaskId`] continues to identify the task after resume.
472 /// The resume command is forwarded to the internal scheduler worker
473 /// thread, so caller thread is not responsible for running transfer logic.
474 ///
475 /// # Errors
476 ///
477 /// Returns `ClientClosed`, `TaskNotFound`, or `InvalidTaskState`.
478 ///
479 /// # Examples
480 ///
481 /// ```no_run
482 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
483 ///
484 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
485 /// let client = MeowClient::new(MeowConfig::default());
486 /// client.resume(task_id).await?;
487 /// # Ok(())
488 /// # }
489 /// ```
490 pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
491 self.ensure_open()?;
492 crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
493 self.get_exec()?.resume(task_id).await
494 }
495
496 /// Cancels a task by ID.
497 ///
498 /// Cancellation is requested through the internal scheduler worker thread.
499 /// Transfer cancellation execution happens in background runtime workers.
500 ///
501 /// # Usage rules
502 ///
503 /// Cancellation is best-effort; protocol-specific cleanup may run.
504 ///
505 /// # Errors
506 ///
507 /// Returns `ClientClosed`, `TaskNotFound`, or runtime cancellation errors.
508 ///
509 /// # Examples
510 ///
511 /// ```no_run
512 /// use rusty_cat::api::{MeowClient, MeowConfig, TaskId};
513 ///
514 /// # async fn run(task_id: TaskId) -> Result<(), rusty_cat::api::MeowError> {
515 /// let client = MeowClient::new(MeowConfig::default());
516 /// client.cancel(task_id).await?;
517 /// # Ok(())
518 /// # }
519 /// ```
520 pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
521 self.ensure_open()?;
522 crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
523 self.get_exec()?.cancel(task_id).await
524 }
525
526 /// Returns a snapshot of queue and active transfer groups.
527 ///
528 /// Useful for diagnostics and external monitoring dashboards.
529 /// Snapshot collection is coordinated by internal scheduler worker state.
530 ///
531 /// # Errors
532 ///
533 /// Returns `ClientClosed`, runtime command delivery errors, or scheduler
534 /// snapshot retrieval errors.
535 ///
536 /// # Examples
537 ///
538 /// ```no_run
539 /// use rusty_cat::api::{MeowClient, MeowConfig};
540 ///
541 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
542 /// let client = MeowClient::new(MeowConfig::default());
543 /// let snap = client.snapshot().await?;
544 /// println!("queued={}, active={}", snap.queued_groups, snap.active_groups);
545 /// # Ok(())
546 /// # }
547 /// ```
548 pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
549 self.ensure_open()?;
550 crate::meow_flow_log!("client_api", "snapshot called");
551 self.get_exec()?.snapshot().await
552 }
553
554 /// Closes this client and its underlying executor.
555 ///
556 /// After a successful close:
557 /// - New task operations are rejected.
558 /// - Existing runtime resources are released.
559 ///
560 /// # Idempotency
561 ///
562 /// Calling `close` more than once returns `ClientClosed`.
563 ///
564 /// # Retry behavior
565 ///
566 /// If executor close fails, the closed flag is rolled back so caller can
567 /// retry close.
568 ///
569 /// # Errors
570 ///
571 /// Returns `ClientClosed` when already closed, or underlying executor close
572 /// errors when shutdown is not completed.
573 ///
574 /// # Examples
575 ///
576 /// ```no_run
577 /// use rusty_cat::api::{MeowClient, MeowConfig};
578 ///
579 /// # async fn run() -> Result<(), rusty_cat::api::MeowError> {
580 /// let client = MeowClient::new(MeowConfig::default());
581 /// client.close().await?;
582 /// # Ok(())
583 /// # }
584 /// ```
585 pub async fn close(&self) -> Result<(), MeowError> {
586 if self
587 .closed
588 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
589 .is_err()
590 {
591 crate::meow_flow_log!("client_api", "close rejected: already closed");
592 return Err(MeowError::from_code_str(
593 InnerErrorCode::ClientClosed,
594 "meow client is already closed",
595 ));
596 }
597 if let Some(exec) = self.executor.get() {
598 crate::meow_flow_log!("client_api", "close forwarding to executor");
599 if let Err(e) = exec.close().await {
600 // Roll back closed flag so caller can retry close.
601 self.closed.store(false, Ordering::SeqCst);
602 return Err(e);
603 }
604 Ok(())
605 } else {
606 crate::meow_flow_log!("client_api", "close with no executor initialized");
607 Ok(())
608 }
609 }
610
611 /// Returns whether this client is currently closed.
612 ///
613 /// # Examples
614 ///
615 /// ```no_run
616 /// use rusty_cat::api::{MeowClient, MeowConfig};
617 ///
618 /// # async fn run() {
619 /// let client = MeowClient::new(MeowConfig::default());
620 /// let _closed = client.is_closed().await;
621 /// # }
622 /// ```
623 pub async fn is_closed(&self) -> bool {
624 self.closed.load(Ordering::SeqCst)
625 }
626}