Skip to main content

fast_pull/core/
single.rs

1extern crate std;
2use crate::{DownloadResult, Event, Puller, PullerError, Pusher, multi::TokioExecutor};
3use core::time::Duration;
4use crossfire::{mpmc, spsc};
5use futures::TryStreamExt;
6
7#[derive(Debug, Clone)]
8pub struct DownloadOptions {
9    pub retry_gap: Duration,
10    pub push_queue_cap: usize,
11}
12
13pub fn download_single<R: Puller, W: Pusher>(
14    mut puller: R,
15    mut pusher: W,
16    options: DownloadOptions,
17) -> DownloadResult<TokioExecutor<R, W::Error>, R::Error, W::Error> {
18    let (tx, event_chain) = mpmc::unbounded_async();
19    let (tx_push, rx_push) = spsc::bounded_async(options.push_queue_cap);
20    let tx_clone = tx.clone();
21    const ID: usize = 0;
22    let rx_push = rx_push.into_blocking();
23    let push_handle = tokio::task::spawn_blocking(move || {
24        while let Ok((spin, mut data)) = rx_push.recv() {
25            loop {
26                match pusher.push(&spin, data) {
27                    Ok(_) => break,
28                    Err((err, bytes)) => {
29                        data = bytes;
30                        let _ = tx_clone.send(Event::PushError(ID, err));
31                    }
32                }
33                std::thread::sleep(options.retry_gap);
34            }
35            let _ = tx_clone.send(Event::PushProgress(ID, spin));
36        }
37        loop {
38            match pusher.flush() {
39                Ok(_) => break,
40                Err(err) => {
41                    let _ = tx_clone.send(Event::FlushError(err));
42                }
43            }
44            std::thread::sleep(options.retry_gap);
45        }
46    });
47    let handle = tokio::spawn(async move {
48        'redownload: loop {
49            let _ = tx.send(Event::Pulling(ID));
50            let mut downloaded: u64 = 0;
51            let mut stream = loop {
52                match puller.pull(None).await {
53                    Ok(t) => break t,
54                    Err((e, retry_gap)) => {
55                        let _ = tx.send(Event::PullError(ID, e));
56                        tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await;
57                    }
58                }
59            };
60            loop {
61                match stream.try_next().await {
62                    Ok(Some(chunk)) => {
63                        let len = chunk.len() as u64;
64                        let span = downloaded..(downloaded + len);
65                        let _ = tx.send(Event::PullProgress(ID, span.clone()));
66                        tx_push.send((span, chunk)).await.unwrap();
67                        downloaded += len;
68                    }
69                    Ok(None) => break 'redownload,
70                    Err((e, retry_gap)) => {
71                        let is_irrecoverable = e.is_irrecoverable();
72                        let _ = tx.send(Event::PullError(ID, e));
73                        tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await;
74                        if is_irrecoverable {
75                            continue 'redownload;
76                        }
77                    }
78                }
79            }
80        }
81        let _ = tx.send(Event::Finished(ID));
82    });
83    DownloadResult::new(
84        event_chain,
85        push_handle,
86        Some(&[handle.abort_handle()]),
87        None,
88    )
89}
90
91#[cfg(test)]
92mod tests {
93    extern crate std;
94    use super::*;
95    use crate::{
96        Merge, ProgressEntry,
97        mem::MemPusher,
98        mock::{MockPuller, build_mock_data},
99    };
100    use std::{dbg, vec};
101    use vec::Vec;
102
103    #[tokio::test]
104    async fn test_sequential_download() {
105        let mock_data = build_mock_data(3 * 1024);
106        let puller = MockPuller::new(&mock_data);
107        let pusher = MemPusher::with_capacity(mock_data.len());
108        #[allow(clippy::single_range_in_vec_init)]
109        let download_chunks = vec![0..mock_data.len() as u64];
110        let result = download_single(
111            puller,
112            pusher.clone(),
113            DownloadOptions {
114                retry_gap: Duration::from_secs(1),
115                push_queue_cap: 1024,
116            },
117        );
118
119        let mut pull_progress: Vec<ProgressEntry> = Vec::new();
120        let mut push_progress: Vec<ProgressEntry> = Vec::new();
121        while let Ok(e) = result.event_chain.recv().await {
122            match e {
123                Event::PullProgress(_, p) => {
124                    pull_progress.merge_progress(p);
125                }
126                Event::PushProgress(_, p) => {
127                    push_progress.merge_progress(p);
128                }
129                _ => {}
130            }
131        }
132        dbg!(&pull_progress);
133        dbg!(&push_progress);
134        assert_eq!(pull_progress, download_chunks);
135        assert_eq!(push_progress, download_chunks);
136
137        result.join().await.unwrap();
138        assert_eq!(&**pusher.receive.lock(), mock_data);
139    }
140}