Skip to main content

fast_pull/core/
single.rs

1extern crate std;
2use crate::{DownloadResult, Event, Puller, Pusher, multi::TokioExecutor};
3use core::time::Duration;
4use crossfire::{mpmc, spsc};
5use futures::TryStreamExt;
6
7#[derive(Debug, Clone)]
8pub struct DownloadOptions {
9    pub retry_gap: Duration,
10    pub push_queue_cap: usize,
11}
12
13pub fn download_single<R: Puller, W: Pusher>(
14    mut puller: R,
15    mut pusher: W,
16    options: DownloadOptions,
17) -> DownloadResult<TokioExecutor<R, W::Error>, R::Error, W::Error> {
18    let (tx, event_chain) = mpmc::unbounded_async();
19    let (tx_push, rx_push) = spsc::bounded_async(options.push_queue_cap);
20    let tx_clone = tx.clone();
21    const ID: usize = 0;
22    let rx_push = rx_push.into_blocking();
23    let push_handle = tokio::task::spawn_blocking(move || {
24        while let Ok((spin, mut data)) = rx_push.recv() {
25            loop {
26                match pusher.push(&spin, data) {
27                    Ok(_) => break,
28                    Err((err, bytes)) => {
29                        data = bytes;
30                        let _ = tx_clone.send(Event::PushError(ID, err));
31                    }
32                }
33                std::thread::sleep(options.retry_gap);
34            }
35            let _ = tx_clone.send(Event::PushProgress(ID, spin));
36        }
37        loop {
38            match pusher.flush() {
39                Ok(_) => break,
40                Err(err) => {
41                    let _ = tx_clone.send(Event::FlushError(err));
42                }
43            }
44            std::thread::sleep(options.retry_gap);
45        }
46    });
47    let handle = tokio::spawn(async move {
48        let _ = tx.send(Event::Pulling(ID));
49        let mut downloaded: u64 = 0;
50        let mut stream = loop {
51            match puller.pull(None).await {
52                Ok(t) => break t,
53                Err((e, retry_gap)) => {
54                    let _ = tx.send(Event::PullError(ID, e));
55                    tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await;
56                }
57            }
58        };
59        loop {
60            match stream.try_next().await {
61                Ok(Some(chunk)) => {
62                    let len = chunk.len() as u64;
63                    let span = downloaded..(downloaded + len);
64                    let _ = tx.send(Event::PullProgress(ID, span.clone()));
65                    tx_push.send((span, chunk)).await.unwrap();
66                    downloaded += len;
67                }
68                Ok(None) => break,
69                Err((e, retry_gap)) => {
70                    let _ = tx.send(Event::PullError(ID, e));
71                    tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await
72                }
73            }
74        }
75        let _ = tx.send(Event::Finished(ID));
76    });
77    DownloadResult::new(
78        event_chain,
79        push_handle,
80        Some(&[handle.abort_handle()]),
81        None,
82    )
83}
84
85#[cfg(test)]
86mod tests {
87    extern crate std;
88    use super::*;
89    use crate::{
90        Merge, ProgressEntry,
91        mem::MemPusher,
92        mock::{MockPuller, build_mock_data},
93    };
94    use std::{dbg, vec};
95    use vec::Vec;
96
97    #[tokio::test]
98    async fn test_sequential_download() {
99        let mock_data = build_mock_data(3 * 1024);
100        let puller = MockPuller::new(&mock_data);
101        let pusher = MemPusher::with_capacity(mock_data.len());
102        #[allow(clippy::single_range_in_vec_init)]
103        let download_chunks = vec![0..mock_data.len() as u64];
104        let result = download_single(
105            puller,
106            pusher.clone(),
107            DownloadOptions {
108                retry_gap: Duration::from_secs(1),
109                push_queue_cap: 1024,
110            },
111        );
112
113        let mut pull_progress: Vec<ProgressEntry> = Vec::new();
114        let mut push_progress: Vec<ProgressEntry> = Vec::new();
115        while let Ok(e) = result.event_chain.recv().await {
116            match e {
117                Event::PullProgress(_, p) => {
118                    pull_progress.merge_progress(p);
119                }
120                Event::PushProgress(_, p) => {
121                    push_progress.merge_progress(p);
122                }
123                _ => {}
124            }
125        }
126        dbg!(&pull_progress);
127        dbg!(&push_progress);
128        assert_eq!(pull_progress, download_chunks);
129        assert_eq!(push_progress, download_chunks);
130
131        result.join().await.unwrap();
132        assert_eq!(&**pusher.receive.lock(), mock_data);
133    }
134}