Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_client::{default_breakpoint_arcs, DefaultHttpClient};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16pub type GlobalProgressListener = ProgressCb;
17
18#[derive(Clone)]
19pub struct MeowClient {
20    executor: OnceLock<Executor>,
21    config: MeowConfig,
22    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
23    closed: Arc<AtomicBool>,
24}
25
26impl std::fmt::Debug for MeowClient {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("MeowClient")
29            .field("config", &self.config)
30            .field("global_progress_listener", &"..")
31            .finish()
32    }
33}
34
35impl MeowClient {
36    pub fn new(config: MeowConfig) -> Self {
37        MeowClient {
38            executor: Default::default(),
39            config,
40            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
41            closed: Arc::new(AtomicBool::new(false)),
42        }
43    }
44
45    /// 获取与当前 [`MeowClient`] 配置一致的 `reqwest::Client`(可用于复用同一套 timeout/keepalive 配置)。
46    ///
47    /// - 若 [`MeowConfig::with_http_client`] 曾注入自定义 client,则返回其 clone
48    /// - 否则按 `http_timeout/tcp_keepalive` 构建一个新 client
49    pub fn http_client(&self) -> Result<reqwest::Client, MeowError> {
50        if let Some(c) = self.config.http_client_ref() {
51            return Ok(c.clone());
52        }
53        reqwest::Client::builder()
54            .timeout(self.config.http_timeout())
55            .tcp_keepalive(self.config.tcp_keepalive())
56            .build()
57            .map_err(|e| {
58                MeowError::from_source(
59                    InnerErrorCode::HttpClientBuildFailed,
60                    format!(
61                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
62                        self.config.http_timeout(),
63                        self.config.tcp_keepalive()
64                    ),
65                    e,
66                )
67            })
68    }
69
70    fn get_exec(&self) -> Result<&Executor, MeowError> {
71        if let Some(exec) = self.executor.get() {
72            crate::meow_flow_log!("executor", "reuse existing executor");
73            return Ok(exec);
74        }
75        let default = DefaultHttpClient::try_with_http_timeouts(
76            self.config.http_timeout(),
77            self.config.tcp_keepalive(),
78        )?;
79        crate::meow_flow_log!(
80            "executor",
81            "initializing default HTTP client (timeout={:?}, tcp_keepalive={:?})",
82            self.config.http_timeout(),
83            self.config.tcp_keepalive()
84        );
85        let exec = Executor::new(
86            self.config.clone(),
87            Arc::new(default),
88            self.global_progress_listener.clone(),
89        )?;
90        let _ = self.executor.set(exec);
91        self.executor.get().ok_or_else(|| {
92            crate::meow_flow_log!(
93                "executor",
94                "executor init race failed after set; returning RuntimeCreationFailedError"
95            );
96            MeowError::from_code_str(
97                InnerErrorCode::RuntimeCreationFailedError,
98                "executor init race failed",
99            )
100        })
101    }
102
103    fn ensure_open(&self) -> Result<(), MeowError> {
104        if self.closed.load(Ordering::SeqCst) {
105            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
106            Err(MeowError::from_code_str(
107                InnerErrorCode::ClientClosed,
108                "meow client is already closed",
109            ))
110        } else {
111            Ok(())
112        }
113    }
114
115    pub fn register_global_progress_listener<F>(
116        &self,
117        listener: F,
118    ) -> Result<GlobalProgressListenerId, MeowError>
119    where
120        F: Fn(FileTransferRecord) + Send + Sync + 'static,
121    {
122        let id = GlobalProgressListenerId::new_v4();
123        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
124        let mut guard = self.global_progress_listener.write().map_err(|e| {
125            MeowError::from_code(
126                InnerErrorCode::LockPoisoned,
127                format!("register global listener lock poisoned: {}", e),
128            )
129        })?;
130        guard.push((id, Arc::new(listener)));
131        Ok(id)
132    }
133
134    /// 移除此前注册的一个全局监听;若 `id` 不存在则返回 `false`。
135    pub fn unregister_global_progress_listener(
136        &self,
137        id: GlobalProgressListenerId,
138    ) -> Result<bool, MeowError> {
139        let mut g = self.global_progress_listener.write().map_err(|e| {
140            MeowError::from_code(
141                InnerErrorCode::LockPoisoned,
142                format!("unregister global listener lock poisoned: {}", e),
143            )
144        })?;
145        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
146            g.remove(pos);
147            crate::meow_flow_log!(
148                "listener",
149                "unregister global listener success: id={:?}",
150                id
151            );
152            Ok(true)
153        } else {
154            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
155            Ok(false)
156        }
157    }
158
159    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
160        crate::meow_flow_log!("listener", "clear all global listeners");
161        self.global_progress_listener
162            .write()
163            .map_err(|e| {
164                MeowError::from_code(
165                    InnerErrorCode::LockPoisoned,
166                    format!("clear global listeners lock poisoned: {}", e),
167                )
168            })?
169            .clear();
170        Ok(())
171    }
172
173    /// 设置或取消流程调试日志监听器(全局生效)。
174    ///
175    /// - 传入 `Some(listener)`:设置/替换监听器
176    /// - 传入 `None`:取消注册监听器
177    pub fn set_debug_log_listener(
178        &self,
179        listener: Option<DebugLogListener>,
180    ) -> Result<(), DebugLogListenerError> {
181        set_debug_log_listener(listener)
182    }
183}
184
185impl MeowClient {
186    pub async fn enqueue<PCB>(
187        &self,
188        task: PounceTask,
189        progress_cb: PCB,
190    ) -> Result<TaskId, MeowError>
191    where
192        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
193    {
194        self.ensure_open()?;
195        if task.is_empty() {
196            crate::meow_flow_log!("enqueue", "reject empty task");
197            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
198        }
199
200        crate::meow_flow_log!("enqueue", "task={:?}", task);
201
202        let progress: ProgressCb = Arc::new(progress_cb);
203        let callbacks = TaskCallbacks::new(Some(progress));
204
205        let (def_up, def_down) = default_breakpoint_arcs();
206        let inner = InnerTask::from_pounce(
207            task,
208            self.config.breakpoint_download_http().clone(),
209            self.config.http_client_ref().cloned(),
210            def_up,
211            def_down,
212        )
213        .await?;
214
215        let task_id = self.get_exec()?.enqueue(inner, callbacks)?;
216        crate::meow_flow_log!("enqueue", "enqueue success: task_id={:?}", task_id);
217        Ok(task_id)
218    }
219
220    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
221    //     todo!(arman) -
222    // }
223
224    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
225        self.ensure_open()?;
226        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
227        self.get_exec()?.pause(task_id).await
228    }
229
230    /// 恢复一个此前被 pause 的任务,继续使用同一个 task_id 进行控制。
231    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
232        self.ensure_open()?;
233        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
234        self.get_exec()?.resume(task_id).await
235    }
236
237    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
238        self.ensure_open()?;
239        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
240        self.get_exec()?.cancel(task_id).await
241    }
242
243    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
244        self.ensure_open()?;
245        crate::meow_flow_log!("client_api", "snapshot called");
246        self.get_exec()?.snapshot().await
247    }
248
249    pub async fn close(&self) -> Result<(), MeowError> {
250        if self
251            .closed
252            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
253            .is_err()
254        {
255            crate::meow_flow_log!("client_api", "close rejected: already closed");
256            return Err(MeowError::from_code_str(
257                InnerErrorCode::ClientClosed,
258                "meow client is already closed",
259            ));
260        }
261        if let Some(exec) = self.executor.get() {
262            crate::meow_flow_log!("client_api", "close forwarding to executor");
263            if let Err(e) = exec.close().await {
264                // 关闭命令未完成时回滚 closed,允许调用方重试 close。
265                self.closed.store(false, Ordering::SeqCst);
266                return Err(e);
267            }
268            Ok(())
269        } else {
270            crate::meow_flow_log!("client_api", "close with no executor initialized");
271            Ok(())
272        }
273    }
274
275    pub async fn is_closed(&self) -> bool {
276        self.closed.load(Ordering::SeqCst)
277    }
278}