1extern crate std;
2use crate::{DownloadResult, Event, Puller, Pusher, multi::TokioExecutor};
3use core::time::Duration;
4use crossfire::{mpmc, spsc};
5use futures::TryStreamExt;
6
7#[derive(Debug, Clone)]
8pub struct DownloadOptions {
9 pub retry_gap: Duration,
10 pub push_queue_cap: usize,
11}
12
13pub fn download_single<R: Puller, W: Pusher>(
14 mut puller: R,
15 mut pusher: W,
16 options: DownloadOptions,
17) -> DownloadResult<TokioExecutor<R, W::Error>, R::Error, W::Error> {
18 let (tx, event_chain) = mpmc::unbounded_async();
19 let (tx_push, rx_push) = spsc::bounded_async(options.push_queue_cap);
20 let tx_clone = tx.clone();
21 const ID: usize = 0;
22 let rx_push = rx_push.into_blocking();
23 let push_handle = tokio::task::spawn_blocking(move || {
24 while let Ok((spin, mut data)) = rx_push.recv() {
25 loop {
26 match pusher.push(&spin, data) {
27 Ok(_) => break,
28 Err((err, bytes)) => {
29 data = bytes;
30 let _ = tx_clone.send(Event::PushError(ID, err));
31 }
32 }
33 std::thread::sleep(options.retry_gap);
34 }
35 let _ = tx_clone.send(Event::PushProgress(ID, spin));
36 }
37 loop {
38 match pusher.flush() {
39 Ok(_) => break,
40 Err(err) => {
41 let _ = tx_clone.send(Event::FlushError(err));
42 }
43 }
44 std::thread::sleep(options.retry_gap);
45 }
46 });
47 let handle = tokio::spawn(async move {
48 let _ = tx.send(Event::Pulling(ID));
49 let mut downloaded: u64 = 0;
50 let mut stream = loop {
51 match puller.pull(None).await {
52 Ok(t) => break t,
53 Err((e, retry_gap)) => {
54 let _ = tx.send(Event::PullError(ID, e));
55 tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await;
56 }
57 }
58 };
59 loop {
60 match stream.try_next().await {
61 Ok(Some(chunk)) => {
62 let len = chunk.len() as u64;
63 let span = downloaded..(downloaded + len);
64 let _ = tx.send(Event::PullProgress(ID, span.clone()));
65 tx_push.send((span, chunk)).await.unwrap();
66 downloaded += len;
67 }
68 Ok(None) => break,
69 Err((e, retry_gap)) => {
70 let _ = tx.send(Event::PullError(ID, e));
71 tokio::time::sleep(retry_gap.unwrap_or(options.retry_gap)).await
72 }
73 }
74 }
75 let _ = tx.send(Event::Finished(ID));
76 });
77 DownloadResult::new(
78 event_chain,
79 push_handle,
80 Some(&[handle.abort_handle()]),
81 None,
82 )
83}
84
85#[cfg(test)]
86mod tests {
87 extern crate std;
88 use super::*;
89 use crate::{
90 Merge, ProgressEntry,
91 mem::MemPusher,
92 mock::{MockPuller, build_mock_data},
93 };
94 use std::{dbg, vec};
95 use vec::Vec;
96
97 #[tokio::test]
98 async fn test_sequential_download() {
99 let mock_data = build_mock_data(3 * 1024);
100 let puller = MockPuller::new(&mock_data);
101 let pusher = MemPusher::with_capacity(mock_data.len());
102 #[allow(clippy::single_range_in_vec_init)]
103 let download_chunks = vec![0..mock_data.len() as u64];
104 let result = download_single(
105 puller,
106 pusher.clone(),
107 DownloadOptions {
108 retry_gap: Duration::from_secs(1),
109 push_queue_cap: 1024,
110 },
111 );
112
113 let mut pull_progress: Vec<ProgressEntry> = Vec::new();
114 let mut push_progress: Vec<ProgressEntry> = Vec::new();
115 while let Ok(e) = result.event_chain.recv().await {
116 match e {
117 Event::PullProgress(_, p) => {
118 pull_progress.merge_progress(p);
119 }
120 Event::PushProgress(_, p) => {
121 push_progress.merge_progress(p);
122 }
123 _ => {}
124 }
125 }
126 dbg!(&pull_progress);
127 dbg!(&push_progress);
128 assert_eq!(pull_progress, download_chunks);
129 assert_eq!(push_progress, download_chunks);
130
131 result.join().await.unwrap();
132 assert_eq!(&**pusher.receive.lock(), mock_data);
133 }
134}