1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_client::{default_breakpoint_arcs, DefaultHttpClient};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16pub type GlobalProgressListener = ProgressCb;
17
18#[derive(Clone)]
19pub struct MeowClient {
20 executor: OnceLock<Executor>,
21 config: MeowConfig,
22 global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
23 closed: Arc<AtomicBool>,
24}
25
26impl std::fmt::Debug for MeowClient {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("MeowClient")
29 .field("config", &self.config)
30 .field("global_progress_listener", &"..")
31 .finish()
32 }
33}
34
35impl MeowClient {
36 pub fn new(config: MeowConfig) -> Self {
37 MeowClient {
38 executor: Default::default(),
39 config,
40 global_progress_listener: Arc::new(RwLock::new(Vec::new())),
41 closed: Arc::new(AtomicBool::new(false)),
42 }
43 }
44
45 pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
50 if let Some(c) = self.config.http_client_ref() {
51 return Ok(c.clone());
52 }
53 reqwest::Client::builder()
54 .timeout(self.config.http_timeout())
55 .tcp_keepalive(self.config.tcp_keepalive())
56 .build()
57 .map_err(|e| {
58 MeowError::from_source(
59 InnerErrorCode::HttpClientBuildFailed,
60 format!(
61 "build reqwest client failed (timeout={:?}, keepalive={:?})",
62 self.config.http_timeout(),
63 self.config.tcp_keepalive()
64 ),
65 e,
66 )
67 })
68 }
69
70 fn get_exec(&self) -> Result<&Executor, MeowError> {
71 if let Some(exec) = self.executor.get() {
72 crate::meow_flow_log!("executor", "reuse existing executor");
73 return Ok(exec);
74 }
75 let default = DefaultHttpClient::try_with_http_timeouts(
76 self.config.http_timeout(),
77 self.config.tcp_keepalive(),
78 )?;
79 crate::meow_flow_log!(
80 "executor",
81 "initializing default HTTP client (timeout={:?}, tcp_keepalive={:?})",
82 self.config.http_timeout(),
83 self.config.tcp_keepalive()
84 );
85 let exec = Executor::new(
86 self.config.clone(),
87 Arc::new(default),
88 self.global_progress_listener.clone(),
89 )?;
90 let _ = self.executor.set(exec);
91 self.executor.get().ok_or_else(|| {
92 crate::meow_flow_log!(
93 "executor",
94 "executor init race failed after set; returning RuntimeCreationFailedError"
95 );
96 MeowError::from_code_str(
97 InnerErrorCode::RuntimeCreationFailedError,
98 "executor init race failed",
99 )
100 })
101 }
102
103 fn ensure_open(&self) -> Result<(), MeowError> {
104 if self.closed.load(Ordering::SeqCst) {
105 crate::meow_flow_log!("client", "ensure_open failed: client already closed");
106 Err(MeowError::from_code_str(
107 InnerErrorCode::ClientClosed,
108 "meow client is already closed",
109 ))
110 } else {
111 Ok(())
112 }
113 }
114
115 pub fn register_global_progress_listener<F>(
116 &self,
117 listener: F,
118 ) -> Result<GlobalProgressListenerId, MeowError>
119 where
120 F: Fn(FileTransferRecord) + Send + Sync + 'static,
121 {
122 let id = GlobalProgressListenerId::new_v4();
123 crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
124 let mut guard = self.global_progress_listener.write().map_err(|e| {
125 MeowError::from_code(
126 InnerErrorCode::LockPoisoned,
127 format!("register global listener lock poisoned: {}", e),
128 )
129 })?;
130 guard.push((id, Arc::new(listener)));
131 Ok(id)
132 }
133
134 pub fn unregister_global_progress_listener(
136 &self,
137 id: GlobalProgressListenerId,
138 ) -> Result<bool, MeowError> {
139 let mut g = self.global_progress_listener.write().map_err(|e| {
140 MeowError::from_code(
141 InnerErrorCode::LockPoisoned,
142 format!("unregister global listener lock poisoned: {}", e),
143 )
144 })?;
145 if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
146 g.remove(pos);
147 crate::meow_flow_log!(
148 "listener",
149 "unregister global listener success: id={:?}",
150 id
151 );
152 Ok(true)
153 } else {
154 crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
155 Ok(false)
156 }
157 }
158
159 pub fn clear_global_listener(&self) -> Result<(), MeowError> {
160 crate::meow_flow_log!("listener", "clear all global listeners");
161 self.global_progress_listener
162 .write()
163 .map_err(|e| {
164 MeowError::from_code(
165 InnerErrorCode::LockPoisoned,
166 format!("clear global listeners lock poisoned: {}", e),
167 )
168 })?
169 .clear();
170 Ok(())
171 }
172
173 pub fn set_debug_log_listener(
178 &self,
179 listener: Option<DebugLogListener>,
180 ) -> Result<(), DebugLogListenerError> {
181 set_debug_log_listener(listener)
182 }
183}
184
185impl MeowClient {
186 pub async fn enqueue<PCB>(
187 &self,
188 task: PounceTask,
189 progress_cb: PCB,
190 ) -> Result<TaskId, MeowError>
191 where
192 PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
193 {
194 self.ensure_open()?;
195 if task.is_empty() {
196 crate::meow_flow_log!("enqueue", "reject empty task");
197 return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
198 }
199
200 crate::meow_flow_log!("enqueue", "task={:?}", task);
201
202 let progress: ProgressCb = Arc::new(progress_cb);
203 let callbacks = TaskCallbacks::new(Some(progress));
204
205 let (def_up, def_down) = default_breakpoint_arcs();
206 let inner = InnerTask::from_pounce(
207 task,
208 self.config.breakpoint_download_http().clone(),
209 self.config.http_client_ref().cloned(),
210 def_up,
211 def_down,
212 )
213 .await?;
214
215 let task_id = self.get_exec()?.enqueue(inner, callbacks)?;
216 crate::meow_flow_log!("enqueue", "enqueue success: task_id={:?}", task_id);
217 Ok(task_id)
218 }
219
220 pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
225 self.ensure_open()?;
226 crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
227 self.get_exec()?.pause(task_id).await
228 }
229
230 pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
232 self.ensure_open()?;
233 crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
234 self.get_exec()?.resume(task_id).await
235 }
236
237 pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
238 self.ensure_open()?;
239 crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
240 self.get_exec()?.cancel(task_id).await
241 }
242
243 pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
244 self.ensure_open()?;
245 crate::meow_flow_log!("client_api", "snapshot called");
246 self.get_exec()?.snapshot().await
247 }
248
249 pub async fn close(&self) -> Result<(), MeowError> {
250 if self
251 .closed
252 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
253 .is_err()
254 {
255 crate::meow_flow_log!("client_api", "close rejected: already closed");
256 return Err(MeowError::from_code_str(
257 InnerErrorCode::ClientClosed,
258 "meow client is already closed",
259 ));
260 }
261 if let Some(exec) = self.executor.get() {
262 crate::meow_flow_log!("client_api", "close forwarding to executor");
263 if let Err(e) = exec.close().await {
264 self.closed.store(false, Ordering::SeqCst);
266 return Err(e);
267 }
268 Ok(())
269 } else {
270 crate::meow_flow_log!("client_api", "close with no executor initialized");
271 Ok(())
272 }
273 }
274
275 pub async fn is_closed(&self) -> bool {
276 self.closed.load(Ordering::SeqCst)
277 }
278}