1extern crate std;
2use crate::{DownloadResult, Event, Puller, PullerError, Pusher, multi::TokioExecutor};
3use core::time::Duration;
4use crossfire::{mpmc, spsc};
5use futures::TryStreamExt;
6
7#[derive(Debug, Clone)]
8pub struct DownloadOptions {
9 pub retry_gap: Duration,
10 pub push_queue_cap: usize,
11}
12
13pub fn download_single<R: Puller, W: Pusher>(
14 mut puller: R,
15 mut pusher: W,
16 options: DownloadOptions,
17) -> DownloadResult<TokioExecutor<R, W::Error>, R::Error, W::Error> {
18 let (tx, event_chain) = mpmc::unbounded_async();
19 let (tx_push, rx_push) = spsc::bounded_async(options.push_queue_cap);
20 let tx_clone = tx.clone();
21 const ID: usize = 0;
22 let rx_push = rx_push.into_blocking();
23 let push_handle = tokio::task::spawn_blocking(move || {
24 while let Ok((spin, mut data)) = rx_push.recv() {
25 loop {
26 match pusher.push(&spin, data) {
27 Ok(_) => break,
28 Err((err, bytes)) => {
29 data = bytes;
30 let _ = tx_clone.send(Event::PushError(ID, err));
31 }
32 }
33 std::thread::sleep(options.retry_gap);
34 }
35 let _ = tx_clone.send(Event::PushProgress(ID, spin));
36 }
37 loop {
38 match pusher.flush() {
39 Ok(_) => break,
40 Err(err) => {
41 let _ = tx_clone.send(Event::FlushError(err));
42 }
43 }
44 std::thread::sleep(options.retry_gap);
45 }
46 });
47 let handle = tokio::spawn(async move {
48 'redownload: loop {
49 let _ = tx.send(Event::Pulling(ID));
50 let mut downloaded: u64 = 0;
51 let mut stream = loop {
52 match puller.pull(None).await {
53 Ok(t) => break t,
54 Err((e, retry_gap)) => {
55 let _ = tx.send(Event::PullError(ID, e));
56 tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await;
57 }
58 }
59 };
60 loop {
61 match stream.try_next().await {
62 Ok(Some(chunk)) => {
63 let len = chunk.len() as u64;
64 let span = downloaded..(downloaded + len);
65 let _ = tx.send(Event::PullProgress(ID, span.clone()));
66 tx_push.send((span, chunk)).await.unwrap();
67 downloaded += len;
68 }
69 Ok(None) => break 'redownload,
70 Err((e, retry_gap)) => {
71 let is_irrecoverable = e.is_irrecoverable();
72 let _ = tx.send(Event::PullError(ID, e));
73 tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await;
74 if is_irrecoverable {
75 continue 'redownload;
76 }
77 }
78 }
79 }
80 }
81 let _ = tx.send(Event::Finished(ID));
82 });
83 DownloadResult::new(
84 event_chain,
85 push_handle,
86 Some(&[handle.abort_handle()]),
87 None,
88 )
89}
90
91#[cfg(test)]
92mod tests {
93 extern crate std;
94 use super::*;
95 use crate::{
96 Merge, ProgressEntry,
97 mem::MemPusher,
98 mock::{MockPuller, build_mock_data},
99 };
100 use std::{dbg, vec};
101 use vec::Vec;
102
103 #[tokio::test]
104 async fn test_sequential_download() {
105 let mock_data = build_mock_data(3 * 1024);
106 let puller = MockPuller::new(&mock_data);
107 let pusher = MemPusher::with_capacity(mock_data.len());
108 #[allow(clippy::single_range_in_vec_init)]
109 let download_chunks = vec![0..mock_data.len() as u64];
110 let result = download_single(
111 puller,
112 pusher.clone(),
113 DownloadOptions {
114 retry_gap: Duration::from_secs(1),
115 push_queue_cap: 1024,
116 },
117 );
118
119 let mut pull_progress: Vec<ProgressEntry> = Vec::new();
120 let mut push_progress: Vec<ProgressEntry> = Vec::new();
121 while let Ok(e) = result.event_chain.recv().await {
122 match e {
123 Event::PullProgress(_, p) => {
124 pull_progress.merge_progress(p);
125 }
126 Event::PushProgress(_, p) => {
127 push_progress.merge_progress(p);
128 }
129 _ => {}
130 }
131 }
132 dbg!(&pull_progress);
133 dbg!(&push_progress);
134 assert_eq!(pull_progress, download_chunks);
135 assert_eq!(push_progress, download_chunks);
136
137 result.join().await.unwrap();
138 assert_eq!(&**pusher.receive.lock(), mock_data);
139 }
140}