fast_pull/core/
mod.rs

1extern crate alloc;
2use crate::Event;
3use alloc::sync::{Arc, Weak};
4use core::num::{NonZeroU64, NonZeroUsize};
5use fast_steal::{Executor, Handle, TaskList};
6use kanal::AsyncReceiver;
7use tokio::{
8    sync::Mutex,
9    task::{AbortHandle, JoinError, JoinHandle},
10};
11
12mod macros;
13pub mod mock;
14pub mod multi;
15pub mod single;
16
17#[derive(Debug)]
18pub struct DownloadResult<E: Executor, PullError, PushError> {
19    pub event_chain: AsyncReceiver<Event<PullError, PushError>>,
20    handle: Arc<Mutex<Option<JoinHandle<()>>>>,
21    abort_handles: Option<Arc<[AbortHandle]>>,
22    task_list: Option<Weak<TaskList<E>>>,
23}
24
25impl<E: Executor, PullError, PushError> Clone for DownloadResult<E, PullError, PushError> {
26    fn clone(&self) -> Self {
27        Self {
28            event_chain: self.event_chain.clone(),
29            handle: self.handle.clone(),
30            abort_handles: self.abort_handles.clone(),
31            task_list: self.task_list.clone(),
32        }
33    }
34}
35
36impl<E: Executor, PullError, PushError> DownloadResult<E, PullError, PushError> {
37    pub fn new(
38        event_chain: AsyncReceiver<Event<PullError, PushError>>,
39        handle: JoinHandle<()>,
40        abort_handles: Option<&[AbortHandle]>,
41        task_list: Option<Weak<TaskList<E>>>,
42    ) -> Self {
43        Self {
44            event_chain,
45            abort_handles: abort_handles.map(Arc::from),
46            handle: Arc::new(Mutex::new(Some(handle))),
47            task_list,
48        }
49    }
50
51    /// 只有第一次调用有效
52    pub async fn join(&self) -> Result<(), JoinError> {
53        if let Some(handle) = self.handle.lock().await.take() {
54            handle.await?
55        }
56        Ok(())
57    }
58
59    pub fn abort(&self) {
60        if let Some(handles) = &self.abort_handles {
61            for handle in handles.iter() {
62                handle.abort();
63            }
64        }
65        if let Some(task_list) = &self.task_list
66            && let Some(task_list) = task_list.upgrade()
67        {
68            task_list.handles(|iter| {
69                for mut handle in iter {
70                    handle.abort();
71                }
72            });
73        }
74    }
75
76    pub fn set_threads(&self, threads: NonZeroUsize, min_chunk_size: NonZeroU64) {
77        if let Some(task_list) = &self.task_list
78            && let Some(task_list) = task_list.upgrade()
79        {
80            task_list.set_threads(threads, min_chunk_size);
81        }
82    }
83}