1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_client::{default_breakpoint_arcs, DefaultHttpClient};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16pub type GlobalProgressListener = ProgressCb;
17
18#[derive(Clone)]
19pub struct MeowClient {
20 executor: OnceLock<Executor>,
21 config: MeowConfig,
22 global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
23 closed: Arc<AtomicBool>,
24}
25
26impl std::fmt::Debug for MeowClient {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("MeowClient")
29 .field("config", &self.config)
30 .field("global_progress_listener", &"..")
31 .finish()
32 }
33}
34
35impl MeowClient {
36 pub fn new(config: MeowConfig) -> Self {
37 MeowClient {
38 executor: Default::default(),
39 config,
40 global_progress_listener: Arc::new(RwLock::new(Vec::new())),
41 closed: Arc::new(AtomicBool::new(false)),
42 }
43 }
44
45 fn get_exec(&self) -> Result<&Executor, MeowError> {
46 if let Some(exec) = self.executor.get() {
47 crate::meow_flow_log!("executor", "reuse existing executor");
48 return Ok(exec);
49 }
50 let default = DefaultHttpClient::try_with_http_timeouts(
51 self.config.http_timeout(),
52 self.config.tcp_keepalive(),
53 )?;
54 crate::meow_flow_log!(
55 "executor",
56 "initializing default HTTP client (timeout={:?}, tcp_keepalive={:?})",
57 self.config.http_timeout(),
58 self.config.tcp_keepalive()
59 );
60 let exec = Executor::new(
61 self.config.clone(),
62 Arc::new(default),
63 self.global_progress_listener.clone(),
64 )?;
65 let _ = self.executor.set(exec);
66 self.executor.get().ok_or_else(|| {
67 crate::meow_flow_log!(
68 "executor",
69 "executor init race failed after set; returning RuntimeCreationFailedError"
70 );
71 MeowError::from_code_str(
72 InnerErrorCode::RuntimeCreationFailedError,
73 "executor init race failed",
74 )
75 })
76 }
77
78 fn ensure_open(&self) -> Result<(), MeowError> {
79 if self.closed.load(Ordering::SeqCst) {
80 crate::meow_flow_log!("client", "ensure_open failed: client already closed");
81 Err(MeowError::from_code_str(
82 InnerErrorCode::ClientClosed,
83 "meow client is already closed",
84 ))
85 } else {
86 Ok(())
87 }
88 }
89
90 pub fn register_global_progress_listener<F>(
91 &self,
92 listener: F,
93 ) -> Result<GlobalProgressListenerId, MeowError>
94 where
95 F: Fn(FileTransferRecord) + Send + Sync + 'static,
96 {
97 let id = GlobalProgressListenerId::new_v4();
98 crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
99 let mut guard = self.global_progress_listener.write().map_err(|e| {
100 MeowError::from_code(
101 InnerErrorCode::LockPoisoned,
102 format!("register global listener lock poisoned: {}", e),
103 )
104 })?;
105 guard.push((id, Arc::new(listener)));
106 Ok(id)
107 }
108
109 pub fn unregister_global_progress_listener(
111 &self,
112 id: GlobalProgressListenerId,
113 ) -> Result<bool, MeowError> {
114 let mut g = self.global_progress_listener.write().map_err(|e| {
115 MeowError::from_code(
116 InnerErrorCode::LockPoisoned,
117 format!("unregister global listener lock poisoned: {}", e),
118 )
119 })?;
120 if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
121 g.remove(pos);
122 crate::meow_flow_log!(
123 "listener",
124 "unregister global listener success: id={:?}",
125 id
126 );
127 Ok(true)
128 } else {
129 crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
130 Ok(false)
131 }
132 }
133
134 pub fn clear_global_listener(&self) -> Result<(), MeowError> {
135 crate::meow_flow_log!("listener", "clear all global listeners");
136 self.global_progress_listener
137 .write()
138 .map_err(|e| {
139 MeowError::from_code(
140 InnerErrorCode::LockPoisoned,
141 format!("clear global listeners lock poisoned: {}", e),
142 )
143 })?
144 .clear();
145 Ok(())
146 }
147
148 pub fn set_debug_log_listener(
153 &self,
154 listener: Option<DebugLogListener>,
155 ) -> Result<(), DebugLogListenerError> {
156 set_debug_log_listener(listener)
157 }
158}
159
160impl MeowClient {
161 pub async fn enqueue<PCB>(
162 &self,
163 task: PounceTask,
164 progress_cb: PCB,
165 ) -> Result<TaskId, MeowError>
166 where
167 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
168 {
169 self.ensure_open()?;
170 if task.is_empty() {
171 crate::meow_flow_log!("enqueue", "reject empty task");
172 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
173 }
174
175 crate::meow_flow_log!("enqueue", "task={:?}", task);
176
177 let progress: ProgressCb = Arc::new(progress_cb);
178 let callbacks = TaskCallbacks::new(Some(progress));
179
180 let (def_up, def_down) = default_breakpoint_arcs();
181 let inner = InnerTask::from_pounce(
182 task,
183 self.config.breakpoint_download_http().clone(),
184 self.config.http_client_ref().cloned(),
185 def_up,
186 def_down,
187 )
188 .await?;
189
190 let task_id = self.get_exec()?.enqueue(inner, callbacks)?;
191 crate::meow_flow_log!("enqueue", "enqueue success: task_id={:?}", task_id);
192 Ok(task_id)
193 }
194
195 pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
200 self.ensure_open()?;
201 crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
202 self.get_exec()?.pause(task_id).await
203 }
204
205 pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
207 self.ensure_open()?;
208 crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
209 self.get_exec()?.resume(task_id).await
210 }
211
212 pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
213 self.ensure_open()?;
214 crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
215 self.get_exec()?.cancel(task_id).await
216 }
217
218 pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
219 self.ensure_open()?;
220 crate::meow_flow_log!("client_api", "snapshot called");
221 self.get_exec()?.snapshot().await
222 }
223
224 pub async fn close(&self) -> Result<(), MeowError> {
225 if self
226 .closed
227 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
228 .is_err()
229 {
230 crate::meow_flow_log!("client_api", "close rejected: already closed");
231 return Err(MeowError::from_code_str(
232 InnerErrorCode::ClientClosed,
233 "meow client is already closed",
234 ));
235 }
236 if let Some(exec) = self.executor.get() {
237 crate::meow_flow_log!("client_api", "close forwarding to executor");
238 if let Err(e) = exec.close().await {
239 self.closed.store(false, Ordering::SeqCst);
241 return Err(e);
242 }
243 Ok(())
244 } else {
245 crate::meow_flow_log!("client_api", "close with no executor initialized");
246 Ok(())
247 }
248 }
249
250 pub async fn is_closed(&self) -> bool {
251 self.closed.load(Ordering::SeqCst)
252 }
253}