1extern crate alloc;
2use crate::{Event, handle::SharedHandle};
3use alloc::sync::{Arc, Weak};
4use core::sync::atomic::{AtomicBool, Ordering};
5use crossfire::{MAsyncRx, mpmc};
6use fast_steal::{Executor, Handle, TaskQueue};
7use tokio::task::{AbortHandle, JoinError, JoinHandle};
8
9pub mod handle;
10pub mod mock;
11pub mod multi;
12pub mod single;
13
14pub struct DownloadResult<E, PullError, PushError>
15where
16 E: Executor,
17 PullError: Send + Unpin + 'static,
18 PushError: Send + Unpin + 'static,
19{
20 pub event_chain: MAsyncRx<mpmc::List<Event<PullError, PushError>>>,
21 handle: Arc<SharedHandle<()>>,
22 abort_handles: Option<Arc<[AbortHandle]>>,
23 task_queue: Option<(Weak<E>, TaskQueue<E::Handle>)>,
24 is_aborted: Arc<AtomicBool>,
25}
26
27impl<E, PullError, PushError> Clone for DownloadResult<E, PullError, PushError>
28where
29 E: Executor,
30 PullError: Send + Unpin + 'static,
31 PushError: Send + Unpin + 'static,
32{
33 fn clone(&self) -> Self {
34 Self {
35 event_chain: self.event_chain.clone(),
36 handle: self.handle.clone(),
37 abort_handles: self.abort_handles.clone(),
38 task_queue: self.task_queue.clone(),
39 is_aborted: self.is_aborted.clone(),
40 }
41 }
42}
43
44impl<E, PullError, PushError> DownloadResult<E, PullError, PushError>
45where
46 E: Executor,
47 PullError: Send + Unpin + 'static,
48 PushError: Send + Unpin + 'static,
49{
50 pub fn new(
51 event_chain: MAsyncRx<mpmc::List<Event<PullError, PushError>>>,
52 handle: JoinHandle<()>,
53 abort_handles: Option<&[AbortHandle]>,
54 task_queue: Option<(Weak<E>, TaskQueue<E::Handle>)>,
55 ) -> Self {
56 Self {
57 event_chain,
58 handle: Arc::new(SharedHandle::new(handle)),
59 abort_handles: abort_handles.map(Arc::from),
60 task_queue,
61 is_aborted: Arc::new(AtomicBool::new(false)),
62 }
63 }
64
65 pub async fn join(&self) -> Result<(), Arc<JoinError>> {
66 self.handle.join().await
67 }
68
69 pub fn abort(&self) {
70 if let Some(handles) = &self.abort_handles {
71 for handle in handles.iter() {
72 handle.abort();
73 }
74 }
75 if let Some((_, task_queue)) = &self.task_queue {
76 task_queue.handles(|iter| {
77 for handle in iter {
78 handle.abort();
79 }
80 });
81 }
82 self.is_aborted.store(true, Ordering::Release);
83 }
84
85 pub fn set_threads(&self, threads: usize, min_chunk_size: u64) {
86 if let Some((executor, task_queue)) = &self.task_queue {
87 let executor = executor.upgrade();
88 let res = task_queue.set_threads(
89 threads,
90 min_chunk_size,
91 executor.as_ref().map(|e| e.as_ref()),
92 );
93 if res.is_some() && threads > 0 {
94 self.is_aborted.store(false, Ordering::Release);
95 }
96 }
97 }
98
99 pub fn is_aborted(&self) -> bool {
100 self.is_aborted.load(Ordering::Acquire)
101 }
102}