fast_pull/core/
mod.rs

1extern crate alloc;
2use crate::Event;
3use alloc::sync::{Arc, Weak};
4use core::num::{NonZeroU64, NonZeroUsize};
5use fast_steal::{Executor, TaskList};
6use kanal::AsyncReceiver;
7use tokio::{
8    sync::Mutex,
9    task::{AbortHandle, JoinError, JoinHandle},
10};
11
12mod macros;
13pub mod mock;
14pub mod multi;
15pub mod single;
16
17pub struct DownloadResult<E: Executor, PullError, PushError> {
18    pub event_chain: AsyncReceiver<Event<PullError, PushError>>,
19    handle: Arc<Mutex<Option<JoinHandle<()>>>>,
20    abort_handles: Arc<[AbortHandle]>,
21    task_list: Option<Weak<TaskList<E>>>,
22}
23impl<E: Executor, PullError, PushError> Clone for DownloadResult<E, PullError, PushError> {
24    fn clone(&self) -> Self {
25        Self {
26            event_chain: self.event_chain.clone(),
27            handle: self.handle.clone(),
28            abort_handles: self.abort_handles.clone(),
29            task_list: self.task_list.clone(),
30        }
31    }
32}
33
34impl<E: Executor, PullError, PushError> DownloadResult<E, PullError, PushError> {
35    pub fn new(
36        event_chain: AsyncReceiver<Event<PullError, PushError>>,
37        handle: JoinHandle<()>,
38        abort_handles: &[AbortHandle],
39        task_list: Option<Weak<TaskList<E>>>,
40    ) -> Self {
41        Self {
42            event_chain,
43            abort_handles: Arc::from(abort_handles),
44            handle: Arc::new(Mutex::new(Some(handle))),
45            task_list,
46        }
47    }
48
49    /// 只有第一次调用有效
50    pub async fn join(&self) -> Result<(), JoinError> {
51        if let Some(handle) = self.handle.lock().await.take() {
52            handle.await?
53        }
54        Ok(())
55    }
56
57    pub fn abort(&self) {
58        for abort_handle in self.abort_handles.iter() {
59            abort_handle.abort();
60        }
61    }
62
63    pub fn set_threads(&self, threads: NonZeroUsize, min_chunk_size: NonZeroU64) {
64        if let Some(task_list) = &self.task_list
65            && let Some(task_list) = task_list.upgrade()
66        {
67            task_list.set_threads(threads, min_chunk_size);
68        }
69    }
70}