Skip to main content

fast_pull/core/
mod.rs

1extern crate alloc;
2use crate::{Event, handle::SharedHandle};
3use alloc::sync::{Arc, Weak};
4use crossfire::{MAsyncRx, mpmc};
5use fast_steal::{Executor, Handle, TaskQueue};
6use tokio::task::{AbortHandle, JoinError, JoinHandle};
7
8pub mod handle;
9pub mod mock;
10pub mod multi;
11pub mod single;
12
13pub struct DownloadResult<E, PullError, PushError>
14where
15    E: Executor,
16    PullError: Send + Unpin + 'static,
17    PushError: Send + Unpin + 'static,
18{
19    pub event_chain: MAsyncRx<mpmc::List<Event<PullError, PushError>>>,
20    handle: Arc<SharedHandle<()>>,
21    abort_handles: Option<Arc<[AbortHandle]>>,
22    task_queue: Option<(Weak<E>, TaskQueue<E::Handle>)>,
23}
24
25impl<E, PullError, PushError> Clone for DownloadResult<E, PullError, PushError>
26where
27    E: Executor,
28    PullError: Send + Unpin + 'static,
29    PushError: Send + Unpin + 'static,
30{
31    fn clone(&self) -> Self {
32        Self {
33            event_chain: self.event_chain.clone(),
34            handle: self.handle.clone(),
35            abort_handles: self.abort_handles.clone(),
36            task_queue: self.task_queue.clone(),
37        }
38    }
39}
40
41impl<E, PullError, PushError> DownloadResult<E, PullError, PushError>
42where
43    E: Executor,
44    PullError: Send + Unpin + 'static,
45    PushError: Send + Unpin + 'static,
46{
47    pub fn new(
48        event_chain: MAsyncRx<mpmc::List<Event<PullError, PushError>>>,
49        handle: JoinHandle<()>,
50        abort_handles: Option<&[AbortHandle]>,
51        task_queue: Option<(Weak<E>, TaskQueue<E::Handle>)>,
52    ) -> Self {
53        Self {
54            event_chain,
55            handle: Arc::new(SharedHandle::new(handle)),
56            abort_handles: abort_handles.map(Arc::from),
57            task_queue,
58        }
59    }
60
61    pub async fn join(&self) -> Result<(), Arc<JoinError>> {
62        self.handle.join().await
63    }
64
65    pub fn abort(&self) {
66        if let Some(handles) = &self.abort_handles {
67            for handle in handles.iter() {
68                handle.abort();
69            }
70        }
71        if let Some((_, task_queue)) = &self.task_queue {
72            task_queue.handles(|iter| {
73                for handle in iter {
74                    handle.abort();
75                }
76            });
77        }
78    }
79
80    pub fn set_threads(&self, threads: usize, min_chunk_size: u64) {
81        if let Some((executor, task_queue)) = &self.task_queue {
82            let executor = executor.upgrade();
83            task_queue.set_threads(
84                threads,
85                min_chunk_size,
86                executor.as_ref().map(|e| e.as_ref()),
87            );
88        }
89    }
90}