Skip to main content

fast_pull/core/
mod.rs

1extern crate alloc;
2use crate::{Event, handle::SharedHandle};
3use alloc::sync::{Arc, Weak};
4use core::sync::atomic::{AtomicBool, Ordering};
5use crossfire::{MAsyncRx, mpmc};
6use fast_steal::{Executor, Handle, TaskQueue};
7use tokio::task::{AbortHandle, JoinError, JoinHandle};
8
9pub mod handle;
10pub mod mock;
11pub mod multi;
12pub mod single;
13
14pub struct DownloadResult<E, PullError, PushError>
15where
16    E: Executor,
17    PullError: Send + Unpin + 'static,
18    PushError: Send + Unpin + 'static,
19{
20    pub event_chain: MAsyncRx<mpmc::List<Event<PullError, PushError>>>,
21    handle: Arc<SharedHandle<()>>,
22    abort_handles: Option<Arc<[AbortHandle]>>,
23    task_queue: Option<(Weak<E>, TaskQueue<E::Handle>)>,
24    is_aborted: Arc<AtomicBool>,
25}
26
27impl<E, PullError, PushError> Clone for DownloadResult<E, PullError, PushError>
28where
29    E: Executor,
30    PullError: Send + Unpin + 'static,
31    PushError: Send + Unpin + 'static,
32{
33    fn clone(&self) -> Self {
34        Self {
35            event_chain: self.event_chain.clone(),
36            handle: self.handle.clone(),
37            abort_handles: self.abort_handles.clone(),
38            task_queue: self.task_queue.clone(),
39            is_aborted: self.is_aborted.clone(),
40        }
41    }
42}
43
44impl<E, PullError, PushError> DownloadResult<E, PullError, PushError>
45where
46    E: Executor,
47    PullError: Send + Unpin + 'static,
48    PushError: Send + Unpin + 'static,
49{
50    pub fn new(
51        event_chain: MAsyncRx<mpmc::List<Event<PullError, PushError>>>,
52        handle: JoinHandle<()>,
53        abort_handles: Option<&[AbortHandle]>,
54        task_queue: Option<(Weak<E>, TaskQueue<E::Handle>)>,
55    ) -> Self {
56        Self {
57            event_chain,
58            handle: Arc::new(SharedHandle::new(handle)),
59            abort_handles: abort_handles.map(Arc::from),
60            task_queue,
61            is_aborted: Arc::new(AtomicBool::new(false)),
62        }
63    }
64
65    pub async fn join(&self) -> Result<(), Arc<JoinError>> {
66        self.handle.join().await
67    }
68
69    pub fn abort(&self) {
70        if let Some(handles) = &self.abort_handles {
71            for handle in handles.iter() {
72                handle.abort();
73            }
74        }
75        if let Some((_, task_queue)) = &self.task_queue {
76            task_queue.handles(|iter| {
77                for handle in iter {
78                    handle.abort();
79                }
80            });
81        }
82        self.is_aborted.store(true, Ordering::Release);
83    }
84
85    pub fn set_threads(&self, threads: usize, min_chunk_size: u64) {
86        if let Some((executor, task_queue)) = &self.task_queue {
87            let executor = executor.upgrade();
88            let res = task_queue.set_threads(
89                threads,
90                min_chunk_size,
91                executor.as_ref().map(|e| e.as_ref()),
92            );
93            if res.is_some() && threads > 0 {
94                self.is_aborted.store(false, Ordering::Release);
95            }
96        }
97    }
98
99    pub fn is_aborted(&self) -> bool {
100        self.is_aborted.load(Ordering::Acquire)
101    }
102}