Skip to main content

rusty_cat/
meow_client.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use crate::dflt::default_http_client::{default_breakpoint_arcs, DefaultHttpClient};
5use crate::error::{InnerErrorCode, MeowError};
6use crate::file_transfer_record::FileTransferRecord;
7use crate::ids::{GlobalProgressListenerId, TaskId};
8use crate::inner::executor::Executor;
9use crate::inner::inner_task::InnerTask;
10use crate::inner::task_callbacks::{ProgressCb, TaskCallbacks};
11use crate::log::{set_debug_log_listener, DebugLogListener, DebugLogListenerError};
12use crate::meow_config::MeowConfig;
13use crate::pounce_task::PounceTask;
14use crate::transfer_snapshot::TransferSnapshot;
15
16pub type GlobalProgressListener = ProgressCb;
17
18#[derive(Clone)]
19pub struct MeowClient {
20    executor: OnceLock<Executor>,
21    config: MeowConfig,
22    global_progress_listener: Arc<RwLock<Vec<(GlobalProgressListenerId, GlobalProgressListener)>>>,
23    closed: Arc<AtomicBool>,
24}
25
26impl std::fmt::Debug for MeowClient {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("MeowClient")
29            .field("config", &self.config)
30            .field("global_progress_listener", &"..")
31            .finish()
32    }
33}
34
35impl MeowClient {
36    pub fn new(config: MeowConfig) -> Self {
37        MeowClient {
38            executor: Default::default(),
39            config,
40            global_progress_listener: Arc::new(RwLock::new(Vec::new())),
41            closed: Arc::new(AtomicBool::new(false)),
42        }
43    }
44
45    fn get_exec(&self) -> Result<&Executor, MeowError> {
46        if let Some(exec) = self.executor.get() {
47            crate::meow_flow_log!("executor", "reuse existing executor");
48            return Ok(exec);
49        }
50        let default = DefaultHttpClient::try_with_http_timeouts(
51            self.config.http_timeout(),
52            self.config.tcp_keepalive(),
53        )?;
54        crate::meow_flow_log!(
55            "executor",
56            "initializing default HTTP client (timeout={:?}, tcp_keepalive={:?})",
57            self.config.http_timeout(),
58            self.config.tcp_keepalive()
59        );
60        let exec = Executor::new(
61            self.config.clone(),
62            Arc::new(default),
63            self.global_progress_listener.clone(),
64        )?;
65        let _ = self.executor.set(exec);
66        self.executor.get().ok_or_else(|| {
67            crate::meow_flow_log!(
68                "executor",
69                "executor init race failed after set; returning RuntimeCreationFailedError"
70            );
71            MeowError::from_code_str(
72                InnerErrorCode::RuntimeCreationFailedError,
73                "executor init race failed",
74            )
75        })
76    }
77
78    fn ensure_open(&self) -> Result<(), MeowError> {
79        if self.closed.load(Ordering::SeqCst) {
80            crate::meow_flow_log!("client", "ensure_open failed: client already closed");
81            Err(MeowError::from_code_str(
82                InnerErrorCode::ClientClosed,
83                "meow client is already closed",
84            ))
85        } else {
86            Ok(())
87        }
88    }
89
90    pub fn register_global_progress_listener<F>(
91        &self,
92        listener: F,
93    ) -> Result<GlobalProgressListenerId, MeowError>
94    where
95        F: Fn(FileTransferRecord) + Send + Sync + 'static,
96    {
97        let id = GlobalProgressListenerId::new_v4();
98        crate::meow_flow_log!("listener", "register global listener: id={:?}", id);
99        let mut guard = self.global_progress_listener.write().map_err(|e| {
100            MeowError::from_code(
101                InnerErrorCode::LockPoisoned,
102                format!("register global listener lock poisoned: {}", e),
103            )
104        })?;
105        guard.push((id, Arc::new(listener)));
106        Ok(id)
107    }
108
109    /// 移除此前注册的一个全局监听;若 `id` 不存在则返回 `false`。
110    pub fn unregister_global_progress_listener(
111        &self,
112        id: GlobalProgressListenerId,
113    ) -> Result<bool, MeowError> {
114        let mut g = self.global_progress_listener.write().map_err(|e| {
115            MeowError::from_code(
116                InnerErrorCode::LockPoisoned,
117                format!("unregister global listener lock poisoned: {}", e),
118            )
119        })?;
120        if let Some(pos) = g.iter().position(|(k, _)| *k == id) {
121            g.remove(pos);
122            crate::meow_flow_log!(
123                "listener",
124                "unregister global listener success: id={:?}",
125                id
126            );
127            Ok(true)
128        } else {
129            crate::meow_flow_log!("listener", "unregister global listener missed: id={:?}", id);
130            Ok(false)
131        }
132    }
133
134    pub fn clear_global_listener(&self) -> Result<(), MeowError> {
135        crate::meow_flow_log!("listener", "clear all global listeners");
136        self.global_progress_listener
137            .write()
138            .map_err(|e| {
139                MeowError::from_code(
140                    InnerErrorCode::LockPoisoned,
141                    format!("clear global listeners lock poisoned: {}", e),
142                )
143            })?
144            .clear();
145        Ok(())
146    }
147
148    /// 设置或取消流程调试日志监听器(全局生效)。
149    ///
150    /// - 传入 `Some(listener)`:设置/替换监听器
151    /// - 传入 `None`:取消注册监听器
152    pub fn set_debug_log_listener(
153        &self,
154        listener: Option<DebugLogListener>,
155    ) -> Result<(), DebugLogListenerError> {
156        set_debug_log_listener(listener)
157    }
158}
159
160impl MeowClient {
161    pub async fn enqueue<PCB>(
162        &self,
163        task: PounceTask,
164        progress_cb: PCB,
165    ) -> Result<TaskId, MeowError>
166    where
167        PCB: Fn(FileTransferRecord) + Send + Sync + 'static,
168    {
169        self.ensure_open()?;
170        if task.is_empty() {
171            crate::meow_flow_log!("enqueue", "reject empty task");
172            return Err(MeowError::from_code1(InnerErrorCode::ParameterEmpty));
173        }
174
175        crate::meow_flow_log!("enqueue", "task={:?}", task);
176
177        let progress: ProgressCb = Arc::new(progress_cb);
178        let callbacks = TaskCallbacks::new(Some(progress));
179
180        let (def_up, def_down) = default_breakpoint_arcs();
181        let inner = InnerTask::from_pounce(
182            task,
183            self.config.breakpoint_download_http().clone(),
184            self.config.http_client_ref().cloned(),
185            def_up,
186            def_down,
187        )
188        .await?;
189
190        let task_id = self.get_exec()?.enqueue(inner, callbacks)?;
191        crate::meow_flow_log!("enqueue", "enqueue success: task_id={:?}", task_id);
192        Ok(task_id)
193    }
194
195    // pub async fn get_task_status(&self, task_id: TaskId)-> Result<FileTransferRecord, MeowError> {
196    //     todo!(arman) -
197    // }
198
199    pub async fn pause(&self, task_id: TaskId) -> Result<(), MeowError> {
200        self.ensure_open()?;
201        crate::meow_flow_log!("client_api", "pause called: task_id={:?}", task_id);
202        self.get_exec()?.pause(task_id).await
203    }
204
205    /// 恢复一个此前被 pause 的任务,继续使用同一个 task_id 进行控制。
206    pub async fn resume(&self, task_id: TaskId) -> Result<(), MeowError> {
207        self.ensure_open()?;
208        crate::meow_flow_log!("client_api", "resume called: task_id={:?}", task_id);
209        self.get_exec()?.resume(task_id).await
210    }
211
212    pub async fn cancel(&self, task_id: TaskId) -> Result<(), MeowError> {
213        self.ensure_open()?;
214        crate::meow_flow_log!("client_api", "cancel called: task_id={:?}", task_id);
215        self.get_exec()?.cancel(task_id).await
216    }
217
218    pub async fn snapshot(&self) -> Result<TransferSnapshot, MeowError> {
219        self.ensure_open()?;
220        crate::meow_flow_log!("client_api", "snapshot called");
221        self.get_exec()?.snapshot().await
222    }
223
224    pub async fn close(&self) -> Result<(), MeowError> {
225        if self
226            .closed
227            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
228            .is_err()
229        {
230            crate::meow_flow_log!("client_api", "close rejected: already closed");
231            return Err(MeowError::from_code_str(
232                InnerErrorCode::ClientClosed,
233                "meow client is already closed",
234            ));
235        }
236        if let Some(exec) = self.executor.get() {
237            crate::meow_flow_log!("client_api", "close forwarding to executor");
238            if let Err(e) = exec.close().await {
239                // 关闭命令未完成时回滚 closed,允许调用方重试 close。
240                self.closed.store(false, Ordering::SeqCst);
241                return Err(e);
242            }
243            Ok(())
244        } else {
245            crate::meow_flow_log!("client_api", "close with no executor initialized");
246            Ok(())
247        }
248    }
249
250    pub async fn is_closed(&self) -> bool {
251        self.closed.load(Ordering::SeqCst)
252    }
253}