fast_pull/core/
mod.rs

1extern crate alloc;
2use crate::{Event, handle::SharedHandle};
3use alloc::sync::{Arc, Weak};
4use core::num::{NonZeroU64, NonZeroUsize};
5use fast_steal::{Executor, Handle, TaskList};
6use kanal::AsyncReceiver;
7use tokio::task::{AbortHandle, JoinError, JoinHandle};
8
9pub mod handle;
10mod macros;
11pub mod mock;
12pub mod multi;
13pub mod single;
14
15#[derive(Debug)]
16pub struct DownloadResult<E: Executor, PullError, PushError> {
17    pub event_chain: AsyncReceiver<Event<PullError, PushError>>,
18    handle: Arc<SharedHandle<()>>,
19    abort_handles: Option<Arc<[AbortHandle]>>,
20    task_list: Option<Weak<TaskList<E>>>,
21}
22
23impl<E: Executor, PullError, PushError> Clone for DownloadResult<E, PullError, PushError> {
24    fn clone(&self) -> Self {
25        Self {
26            event_chain: self.event_chain.clone(),
27            handle: self.handle.clone(),
28            abort_handles: self.abort_handles.clone(),
29            task_list: self.task_list.clone(),
30        }
31    }
32}
33
34impl<E: Executor, PullError, PushError> DownloadResult<E, PullError, PushError> {
35    pub fn new(
36        event_chain: AsyncReceiver<Event<PullError, PushError>>,
37        handle: JoinHandle<()>,
38        abort_handles: Option<&[AbortHandle]>,
39        task_list: Option<Weak<TaskList<E>>>,
40    ) -> Self {
41        Self {
42            event_chain,
43            abort_handles: abort_handles.map(Arc::from),
44            handle: Arc::new(SharedHandle::new(handle)),
45            task_list,
46        }
47    }
48
49    pub async fn join(&self) -> Result<(), Arc<JoinError>> {
50        self.handle.join().await
51    }
52
53    pub fn abort(&self) {
54        if let Some(handles) = &self.abort_handles {
55            for handle in handles.iter() {
56                handle.abort();
57            }
58        }
59        if let Some(task_list) = &self.task_list
60            && let Some(task_list) = task_list.upgrade()
61        {
62            task_list.handles(|iter| {
63                for handle in iter {
64                    handle.abort();
65                }
66            });
67        }
68    }
69
70    pub fn set_threads(&self, threads: NonZeroUsize, min_chunk_size: NonZeroU64) {
71        if let Some(task_list) = &self.task_list
72            && let Some(task_list) = task_list.upgrade()
73        {
74            task_list.set_threads(threads, min_chunk_size);
75        }
76    }
77}