1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use super::FetchedState;
use chrono::{DateTime, Utc};
use failure::{Fail, ResultExt};
use filetime::FileTime;
use futures::{future::ok as OkFuture, Future, Stream};
use reqwest::{self, async::Response, header::CONTENT_LENGTH};
use std::{
    io::Write,
    path::{Path, PathBuf},
    sync::Arc,
};
use tokio::{fs::File, io::flush};
use {FetchError, FetchErrorKind, FetchEvent};

pub trait RequestFuture:
    Future<Item = Option<(Response, Option<DateTime<Utc>>)>, Error = reqwest::Error> + Send
{
}

impl<
        T: Future<Item = Option<(Response, Option<DateTime<Utc>>)>, Error = reqwest::Error> + Send,
    > RequestFuture for T
{}

/// This state manages downloading a response into the temporary location.
pub struct ResponseState<T: RequestFuture + 'static> {
    pub future: T,
    pub path: PathBuf,
    pub(crate) progress: Option<Arc<dyn Fn(FetchEvent) + Send + Sync>>
}

impl<T: RequestFuture + 'static> ResponseState<T> {
    /// If the file is to be downloaded, this will construct a future that does just that.
    pub fn then_download(self, download_location: PathBuf) -> FetchedState {
        let final_destination = self.path;
        let future = self.future;
        let cb = self.progress.clone();

        // Fetch the file to the download location.
        let download_location_: Arc<Path> = Arc::from(download_location.clone());
        let dl1 = download_location_.clone();
        let dl2 = download_location_.clone();
        let download_future = future
            .map_err(move |why| {
                FetchError::from(why.context(FetchErrorKind::Fetch(dl1.to_path_buf())))
            })
            .and_then(move |resp| {
                let future: Box<
                    dyn Future<Item = Option<Option<FileTime>>, Error = FetchError> + Send,
                > = match resp {
                    None => Box::new(OkFuture(None)),
                    Some((resp, date)) => {
                        let length = resp
                            .headers()
                            .get(CONTENT_LENGTH)
                            .and_then(|h| h.to_str().ok())
                            .and_then(|h| h.parse::<u64>().ok())
                            .unwrap_or(0);

                        // Signal the caller that we are about to fetch a file that is this size.
                        if let Some(cb) = cb.as_ref() {
                            cb(FetchEvent::Total(length));
                        }

                        let cb2 = cb.clone();

                        let future = File::create(download_location_.clone())
                            .map_err(move |why| {
                                FetchError::from(why.context(FetchErrorKind::Create(dl2.to_path_buf())))
                            })
                            // Set the length of the file to the length we fetched from the header.
                            .and_then(move |file| {
                                let file = file.into_std();
                                file.set_len(length).context(FetchErrorKind::LengthSet)?;
                                let copy = file.try_clone().context(FetchErrorKind::FdCopy)?;
                                Ok((File::from_std(file), File::from_std(copy)))
                            })
                            // Download the file to the given download location.
                            .and_then(move |(mut file, copy)| {
                                debug!("downloading to {}", download_location_.display());
                                resp.into_body()
                                        .map_err(|why| {
                                            FetchError::from(why.context(FetchErrorKind::ChunkRequest))
                                        })
                                        // Attempt to write each chunk to our file.
                                        .for_each(move |chunk| {
                                            let chunk: &[u8] = chunk.as_ref();
                                            file.write_all(chunk)
                                                .map(|_| ())
                                                .context(FetchErrorKind::ChunkWrite)
                                                .map_err(FetchError::from)?;

                                            // Signal the caller that we just fetched this many bytes.
                                            if let Some(cb) = cb.as_ref() {
                                                cb(FetchEvent::Progress(chunk.len() as u64));
                                            }

                                            Ok(())
                                        })
                                        // Return the file on success.
                                        .map(move |_| {
                                            // Signal the caller that we just finished fetching many bytes.
                                            if let Some(cb) = cb2.as_ref() {
                                                cb(FetchEvent::DownloadComplete);
                                            }

                                            copy
                                        })
                            })
                            // Ensure that the file is fully written to the disk.
                            .and_then(|file| {
                                flush(file).map_err(|why| {
                                    FetchError::from(why.context(FetchErrorKind::Flush))
                                })
                            })
                            // On success, we will return the filetime to assign to the destionation.
                            .map(move |_| Some(date.map(|date| FileTime::from_unix_time(date.timestamp(), 0))));

                        Box::new(future)
                    }
                };

                future
            });

        FetchedState {
            future: Box::new(download_future),
            download_location: Arc::from(download_location),
            final_destination: Arc::from(final_destination),
            progress: self.progress
        }
    }

    /// Convert this state into the future that it owns.
    pub fn into_future(self) -> T {
        self.future
    }
}