Skip to main content

http_ferry/
local.rs

1use std::ffi::OsStr;
2use std::path::{Component, Path, PathBuf};
3
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use crate::{Checksum, Error, Hasher, Prepared, Sink, SinkFactory, Target};
7
8/// Collection destination: writes each file to `dir/<name>`. `dir` must already
9/// exist before driving — destination preflight is not a per-item concern (a bad
10/// output path should fail the stream once, not once per file). Use
11/// [`LocalDir::create_all`] to create it up front.
12pub struct LocalDir {
13    pub dir: PathBuf,
14}
15
16impl LocalDir {
17    /// Build a `LocalDir`, creating `dir` (and parents) first. A cheap one-time
18    /// synchronous preflight (a single `std::fs::create_dir_all`); fine to call
19    /// from async setup code before entering the transfer loop.
20    pub fn create_all(dir: impl Into<PathBuf>) -> std::io::Result<Self> {
21        let dir = dir.into();
22        std::fs::create_dir_all(&dir)?;
23        Ok(Self { dir })
24    }
25}
26
27impl SinkFactory for LocalDir {
28    type Sink = LocalSink;
29    type Location = PathBuf;
30
31    async fn make(&mut self, target: Target<'_>) -> Result<LocalSink, Error> {
32        // `name` comes from the source listing, not the caller, so it is
33        // untrusted. `Path::join` silently escapes the directory on an absolute
34        // path or `..`, so require a single, plain filename component.
35        let name = single_component(target.name).ok_or_else(|| Error::InvalidDownloadPath {
36            path: PathBuf::from(target.name),
37        })?;
38        LocalSink::new(self.dir.join(name))
39    }
40}
41
42/// Returns the lone normal filename component of `name`, or `None` if `name` is
43/// empty, absolute, contains a separator, or includes `.`/`..` — anything that
44/// could place the output outside the destination directory.
45fn single_component(name: &str) -> Option<&OsStr> {
46    let mut components = Path::new(name).components();
47    match (components.next(), components.next()) {
48        (Some(Component::Normal(c)), None) => Some(c),
49        _ => None,
50    }
51}
52
53/// Singular destination: writes one file to a fixed, caller-chosen path.
54pub struct LocalPath {
55    pub path: PathBuf,
56}
57
58impl SinkFactory for LocalPath {
59    type Sink = LocalSink;
60    type Location = PathBuf;
61
62    async fn make(&mut self, _target: Target<'_>) -> Result<LocalSink, Error> {
63        LocalSink::new(self.path.clone())
64    }
65}
66
67pub struct LocalSink {
68    final_path: PathBuf,
69    partial_path: PathBuf,
70    out: Option<tokio::fs::File>,
71}
72
73impl LocalSink {
74    pub(crate) fn new(final_path: PathBuf) -> Result<Self, Error> {
75        let partial_path = partial_path(&final_path)?;
76        Ok(Self {
77            final_path,
78            partial_path,
79            out: None,
80        })
81    }
82}
83
84impl Sink for LocalSink {
85    type Location = PathBuf;
86
87    async fn prepare(&mut self, target: Target<'_>) -> Result<Prepared<Self::Location>, Error> {
88        if existing_matches_destination(&self.final_path, target.checksum, target.size).await? {
89            return Ok(Prepared::Skip {
90                location: self.final_path.clone(),
91            });
92        }
93
94        let (received, partial) =
95            examine_partial(&self.partial_path, target.size, target.checksum).await?;
96        let out = if received > 0 {
97            tokio::fs::OpenOptions::new()
98                .append(true)
99                .open(&self.partial_path)
100                .await?
101        } else {
102            tokio::fs::File::create(&self.partial_path).await?
103        };
104        self.out = Some(out);
105        Ok(Prepared::Resume { received, partial })
106    }
107
108    async fn write_chunk(&mut self, chunk: &[u8]) -> Result<(), Error> {
109        let out = self
110            .out
111            .as_mut()
112            .expect("write_chunk before prepare or after finalize");
113        out.write_all(chunk).await?;
114        Ok(())
115    }
116
117    async fn restart(&mut self) -> Result<(), Error> {
118        let replacement = tokio::fs::OpenOptions::new()
119            .write(true)
120            .truncate(true)
121            .create(true)
122            .open(&self.partial_path)
123            .await?;
124        self.out = Some(replacement);
125        Ok(())
126    }
127
128    async fn finalize(mut self) -> Result<Self::Location, Error> {
129        let out = self.out.take().expect("finalize without prepare");
130        out.sync_all().await?;
131        drop(out);
132        tokio::fs::rename(&self.partial_path, &self.final_path).await?;
133        Ok(self.final_path)
134    }
135}
136
137// Skip-existing rule:
138// - checksum from the caller: only a hash match is good enough.
139// - no checksum: fall back to size match. Re-downloading a multi-GB WARC every
140//   run when no hash was supplied is the alternative; size is the cheapest
141//   evidence we have.
142async fn existing_matches_destination(
143    path: &Path,
144    checksum: Option<&Checksum>,
145    size: u64,
146) -> Result<bool, Error> {
147    if !tokio::fs::try_exists(path).await? {
148        return Ok(false);
149    }
150    match checksum {
151        Some(expected) => {
152            let mut hasher = Hasher::for_checksum(Some(expected));
153            seed_hasher_from_file(path, &mut hasher).await?;
154            Ok(hasher.finalize_hex().as_deref() == Some(expected.hex()))
155        }
156        None => {
157            let m = tokio::fs::metadata(path).await?;
158            Ok(m.len() == size)
159        }
160    }
161}
162
163async fn examine_partial(
164    partial_path: &Path,
165    expected_size: u64,
166    checksum: Option<&Checksum>,
167) -> Result<(u64, Hasher), Error> {
168    if !tokio::fs::try_exists(partial_path).await? {
169        return Ok((0, Hasher::for_checksum(checksum)));
170    }
171    let m = tokio::fs::metadata(partial_path).await?;
172    if m.len() > expected_size {
173        tokio::fs::remove_file(partial_path).await?;
174        return Ok((0, Hasher::for_checksum(checksum)));
175    }
176    let mut hasher = Hasher::for_checksum(checksum);
177    if m.len() > 0 {
178        seed_hasher_from_file(partial_path, &mut hasher).await?;
179    }
180    Ok((m.len(), hasher))
181}
182
183async fn seed_hasher_from_file(path: &Path, hasher: &mut Hasher) -> Result<(), Error> {
184    let mut f = tokio::fs::File::open(path).await?;
185    let mut buf = vec![0u8; 64 * 1024];
186    loop {
187        let n = f.read(&mut buf).await?;
188        if n == 0 {
189            break;
190        }
191        hasher.update(&buf[..n]);
192    }
193    Ok(())
194}
195
196fn partial_path(path: &Path) -> Result<PathBuf, Error> {
197    let mut file_name = path
198        .file_name()
199        .ok_or_else(|| Error::InvalidDownloadPath { path: path.into() })?
200        .to_os_string();
201    file_name.push(".part");
202    Ok(path.with_file_name(file_name))
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn partial_path_appends_part_suffix() {
211        let result = partial_path(Path::new("/tmp/foo.warc.gz")).unwrap();
212        assert_eq!(result, PathBuf::from("/tmp/foo.warc.gz.part"));
213    }
214
215    #[test]
216    fn partial_path_rejects_path_with_no_filename() {
217        let err = partial_path(Path::new("/")).unwrap_err();
218        assert!(matches!(err, Error::InvalidDownloadPath { .. }));
219    }
220
221    #[test]
222    fn single_component_accepts_a_plain_filename() {
223        assert_eq!(
224            single_component("foo.warc.gz"),
225            Some(OsStr::new("foo.warc.gz"))
226        );
227    }
228
229    #[test]
230    fn single_component_rejects_traversal_and_multi_component_names() {
231        assert!(single_component("").is_none());
232        assert!(single_component("..").is_none());
233        assert!(single_component(".").is_none());
234        assert!(single_component("/etc/passwd").is_none());
235        assert!(single_component("../../etc/passwd").is_none());
236        assert!(single_component("sub/dir/file.warc.gz").is_none());
237    }
238}