Skip to main content

http_ferry/
local.rs

1use std::ffi::OsStr;
2use std::path::{Component, Path, PathBuf};
3
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use crate::{Checksum, Error, Hasher, Prepared, Sink, SinkFactory, Target};
7
8/// Collection destination: writes each file to `dir/<name>`. `dir` must already
9/// exist before driving, so a bad output path fails the stream once rather than
10/// once per file; use [`LocalDir::create_all`] to create it up front.
11pub struct LocalDir {
12    pub dir: PathBuf,
13}
14
15impl LocalDir {
16    /// Build a `LocalDir`, creating `dir` (and parents) first with a blocking
17    /// `std::fs::create_dir_all`. Call it during setup, not inside the transfer
18    /// loop.
19    pub fn create_all(dir: impl Into<PathBuf>) -> std::io::Result<Self> {
20        let dir = dir.into();
21        std::fs::create_dir_all(&dir)?;
22        Ok(Self { dir })
23    }
24}
25
26impl SinkFactory for LocalDir {
27    type Sink = LocalSink;
28    type Location = PathBuf;
29
30    async fn make(&mut self, target: Target<'_>) -> Result<LocalSink, Error> {
31        // `name` comes from the source listing, not the caller, so it is
32        // untrusted. `Path::join` silently escapes the directory on an absolute
33        // path or `..`, so require a single, plain filename component.
34        let name = single_component(target.name).ok_or_else(|| Error::InvalidDownloadPath {
35            path: PathBuf::from(target.name),
36        })?;
37        LocalSink::new(self.dir.join(name))
38    }
39}
40
41/// Returns the lone normal filename component of `name`, or `None` if `name` is
42/// empty, absolute, contains a separator, or includes `.`/`..` — anything that
43/// could place the output outside the destination directory.
44fn single_component(name: &str) -> Option<&OsStr> {
45    let mut components = Path::new(name).components();
46    match (components.next(), components.next()) {
47        (Some(Component::Normal(c)), None) => Some(c),
48        _ => None,
49    }
50}
51
52/// Singular destination: writes one file to a fixed, caller-chosen path.
53pub struct LocalPath {
54    pub path: PathBuf,
55}
56
57impl SinkFactory for LocalPath {
58    type Sink = LocalSink;
59    type Location = PathBuf;
60
61    async fn make(&mut self, _target: Target<'_>) -> Result<LocalSink, Error> {
62        LocalSink::new(self.path.clone())
63    }
64}
65
66pub struct LocalSink {
67    final_path: PathBuf,
68    partial_path: PathBuf,
69    out: Option<tokio::fs::File>,
70}
71
72impl LocalSink {
73    pub(crate) fn new(final_path: PathBuf) -> Result<Self, Error> {
74        let partial_path = partial_path(&final_path)?;
75        Ok(Self {
76            final_path,
77            partial_path,
78            out: None,
79        })
80    }
81}
82
83impl Sink for LocalSink {
84    type Location = PathBuf;
85
86    async fn prepare(&mut self, target: Target<'_>) -> Result<Prepared<Self::Location>, Error> {
87        if existing_matches_destination(&self.final_path, target.checksum, target.size).await? {
88            return Ok(Prepared::Skip {
89                location: self.final_path.clone(),
90            });
91        }
92
93        let (received, partial) =
94            examine_partial(&self.partial_path, target.size, target.checksum).await?;
95        let out = if received > 0 {
96            tokio::fs::OpenOptions::new()
97                .append(true)
98                .open(&self.partial_path)
99                .await?
100        } else {
101            tokio::fs::File::create(&self.partial_path).await?
102        };
103        self.out = Some(out);
104        Ok(Prepared::Resume { received, partial })
105    }
106
107    async fn write_chunk(&mut self, chunk: &[u8]) -> Result<(), Error> {
108        let out = self
109            .out
110            .as_mut()
111            .expect("write_chunk before prepare or after finalize");
112        out.write_all(chunk).await?;
113        Ok(())
114    }
115
116    async fn restart(&mut self) -> Result<(), Error> {
117        let replacement = tokio::fs::OpenOptions::new()
118            .write(true)
119            .truncate(true)
120            .create(true)
121            .open(&self.partial_path)
122            .await?;
123        self.out = Some(replacement);
124        Ok(())
125    }
126
127    async fn finalize(mut self) -> Result<Self::Location, Error> {
128        let out = self.out.take().expect("finalize without prepare");
129        out.sync_all().await?;
130        drop(out);
131        tokio::fs::rename(&self.partial_path, &self.final_path).await?;
132        Ok(self.final_path)
133    }
134}
135
136// Skip-existing rule:
137// - checksum from the caller: only a hash match is good enough.
138// - no checksum: fall back to size match. Re-downloading a multi-GB WARC every
139//   run when no hash was supplied is the alternative; size is the cheapest
140//   evidence we have.
141async fn existing_matches_destination(
142    path: &Path,
143    checksum: Option<&Checksum>,
144    size: u64,
145) -> Result<bool, Error> {
146    if !tokio::fs::try_exists(path).await? {
147        return Ok(false);
148    }
149    match checksum {
150        Some(expected) => {
151            let mut hasher = Hasher::for_checksum(Some(expected));
152            seed_hasher_from_file(path, &mut hasher).await?;
153            Ok(hasher.finalize_hex().as_deref() == Some(expected.hex()))
154        }
155        None => {
156            let m = tokio::fs::metadata(path).await?;
157            Ok(m.len() == size)
158        }
159    }
160}
161
162async fn examine_partial(
163    partial_path: &Path,
164    expected_size: u64,
165    checksum: Option<&Checksum>,
166) -> Result<(u64, Hasher), Error> {
167    if !tokio::fs::try_exists(partial_path).await? {
168        return Ok((0, Hasher::for_checksum(checksum)));
169    }
170    let m = tokio::fs::metadata(partial_path).await?;
171    if m.len() > expected_size {
172        tokio::fs::remove_file(partial_path).await?;
173        return Ok((0, Hasher::for_checksum(checksum)));
174    }
175    let mut hasher = Hasher::for_checksum(checksum);
176    if m.len() > 0 {
177        seed_hasher_from_file(partial_path, &mut hasher).await?;
178    }
179    Ok((m.len(), hasher))
180}
181
182async fn seed_hasher_from_file(path: &Path, hasher: &mut Hasher) -> Result<(), Error> {
183    let mut f = tokio::fs::File::open(path).await?;
184    let mut buf = vec![0u8; 64 * 1024];
185    loop {
186        let n = f.read(&mut buf).await?;
187        if n == 0 {
188            break;
189        }
190        hasher.update(&buf[..n]);
191    }
192    Ok(())
193}
194
195fn partial_path(path: &Path) -> Result<PathBuf, Error> {
196    let mut file_name = path
197        .file_name()
198        .ok_or_else(|| Error::InvalidDownloadPath { path: path.into() })?
199        .to_os_string();
200    file_name.push(".part");
201    Ok(path.with_file_name(file_name))
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn partial_path_appends_part_suffix() {
210        let result = partial_path(Path::new("/tmp/foo.warc.gz")).unwrap();
211        assert_eq!(result, PathBuf::from("/tmp/foo.warc.gz.part"));
212    }
213
214    #[test]
215    fn partial_path_rejects_path_with_no_filename() {
216        let err = partial_path(Path::new("/")).unwrap_err();
217        assert!(matches!(err, Error::InvalidDownloadPath { .. }));
218    }
219
220    #[test]
221    fn single_component_accepts_a_plain_filename() {
222        assert_eq!(
223            single_component("foo.warc.gz"),
224            Some(OsStr::new("foo.warc.gz"))
225        );
226    }
227
228    #[test]
229    fn single_component_rejects_traversal_and_multi_component_names() {
230        assert!(single_component("").is_none());
231        assert!(single_component("..").is_none());
232        assert!(single_component(".").is_none());
233        assert!(single_component("/etc/passwd").is_none());
234        assert!(single_component("../../etc/passwd").is_none());
235        assert!(single_component("sub/dir/file.warc.gz").is_none());
236    }
237}