1use std::ffi::OsStr;
2use std::path::{Component, Path, PathBuf};
3
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use crate::{Checksum, Error, Hasher, Prepared, Sink, SinkFactory, Target};
7
8pub struct LocalDir {
13 pub dir: PathBuf,
14}
15
16impl LocalDir {
17 pub fn create_all(dir: impl Into<PathBuf>) -> std::io::Result<Self> {
21 let dir = dir.into();
22 std::fs::create_dir_all(&dir)?;
23 Ok(Self { dir })
24 }
25}
26
27impl SinkFactory for LocalDir {
28 type Sink = LocalSink;
29 type Location = PathBuf;
30
31 async fn make(&mut self, target: Target<'_>) -> Result<LocalSink, Error> {
32 let name = single_component(target.name).ok_or_else(|| Error::InvalidDownloadPath {
36 path: PathBuf::from(target.name),
37 })?;
38 LocalSink::new(self.dir.join(name))
39 }
40}
41
42fn single_component(name: &str) -> Option<&OsStr> {
46 let mut components = Path::new(name).components();
47 match (components.next(), components.next()) {
48 (Some(Component::Normal(c)), None) => Some(c),
49 _ => None,
50 }
51}
52
53pub struct LocalPath {
55 pub path: PathBuf,
56}
57
58impl SinkFactory for LocalPath {
59 type Sink = LocalSink;
60 type Location = PathBuf;
61
62 async fn make(&mut self, _target: Target<'_>) -> Result<LocalSink, Error> {
63 LocalSink::new(self.path.clone())
64 }
65}
66
67pub struct LocalSink {
68 final_path: PathBuf,
69 partial_path: PathBuf,
70 out: Option<tokio::fs::File>,
71}
72
73impl LocalSink {
74 pub(crate) fn new(final_path: PathBuf) -> Result<Self, Error> {
75 let partial_path = partial_path(&final_path)?;
76 Ok(Self {
77 final_path,
78 partial_path,
79 out: None,
80 })
81 }
82}
83
84impl Sink for LocalSink {
85 type Location = PathBuf;
86
87 async fn prepare(&mut self, target: Target<'_>) -> Result<Prepared<Self::Location>, Error> {
88 if existing_matches_destination(&self.final_path, target.checksum, target.size).await? {
89 return Ok(Prepared::Skip {
90 location: self.final_path.clone(),
91 });
92 }
93
94 let (received, partial) =
95 examine_partial(&self.partial_path, target.size, target.checksum).await?;
96 let out = if received > 0 {
97 tokio::fs::OpenOptions::new()
98 .append(true)
99 .open(&self.partial_path)
100 .await?
101 } else {
102 tokio::fs::File::create(&self.partial_path).await?
103 };
104 self.out = Some(out);
105 Ok(Prepared::Resume { received, partial })
106 }
107
108 async fn write_chunk(&mut self, chunk: &[u8]) -> Result<(), Error> {
109 let out = self
110 .out
111 .as_mut()
112 .expect("write_chunk before prepare or after finalize");
113 out.write_all(chunk).await?;
114 Ok(())
115 }
116
117 async fn restart(&mut self) -> Result<(), Error> {
118 let replacement = tokio::fs::OpenOptions::new()
119 .write(true)
120 .truncate(true)
121 .create(true)
122 .open(&self.partial_path)
123 .await?;
124 self.out = Some(replacement);
125 Ok(())
126 }
127
128 async fn finalize(mut self) -> Result<Self::Location, Error> {
129 let out = self.out.take().expect("finalize without prepare");
130 out.sync_all().await?;
131 drop(out);
132 tokio::fs::rename(&self.partial_path, &self.final_path).await?;
133 Ok(self.final_path)
134 }
135}
136
137async fn existing_matches_destination(
143 path: &Path,
144 checksum: Option<&Checksum>,
145 size: u64,
146) -> Result<bool, Error> {
147 if !tokio::fs::try_exists(path).await? {
148 return Ok(false);
149 }
150 match checksum {
151 Some(expected) => {
152 let mut hasher = Hasher::for_checksum(Some(expected));
153 seed_hasher_from_file(path, &mut hasher).await?;
154 Ok(hasher.finalize_hex().as_deref() == Some(expected.hex()))
155 }
156 None => {
157 let m = tokio::fs::metadata(path).await?;
158 Ok(m.len() == size)
159 }
160 }
161}
162
163async fn examine_partial(
164 partial_path: &Path,
165 expected_size: u64,
166 checksum: Option<&Checksum>,
167) -> Result<(u64, Hasher), Error> {
168 if !tokio::fs::try_exists(partial_path).await? {
169 return Ok((0, Hasher::for_checksum(checksum)));
170 }
171 let m = tokio::fs::metadata(partial_path).await?;
172 if m.len() > expected_size {
173 tokio::fs::remove_file(partial_path).await?;
174 return Ok((0, Hasher::for_checksum(checksum)));
175 }
176 let mut hasher = Hasher::for_checksum(checksum);
177 if m.len() > 0 {
178 seed_hasher_from_file(partial_path, &mut hasher).await?;
179 }
180 Ok((m.len(), hasher))
181}
182
183async fn seed_hasher_from_file(path: &Path, hasher: &mut Hasher) -> Result<(), Error> {
184 let mut f = tokio::fs::File::open(path).await?;
185 let mut buf = vec![0u8; 64 * 1024];
186 loop {
187 let n = f.read(&mut buf).await?;
188 if n == 0 {
189 break;
190 }
191 hasher.update(&buf[..n]);
192 }
193 Ok(())
194}
195
196fn partial_path(path: &Path) -> Result<PathBuf, Error> {
197 let mut file_name = path
198 .file_name()
199 .ok_or_else(|| Error::InvalidDownloadPath { path: path.into() })?
200 .to_os_string();
201 file_name.push(".part");
202 Ok(path.with_file_name(file_name))
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn partial_path_appends_part_suffix() {
211 let result = partial_path(Path::new("/tmp/foo.warc.gz")).unwrap();
212 assert_eq!(result, PathBuf::from("/tmp/foo.warc.gz.part"));
213 }
214
215 #[test]
216 fn partial_path_rejects_path_with_no_filename() {
217 let err = partial_path(Path::new("/")).unwrap_err();
218 assert!(matches!(err, Error::InvalidDownloadPath { .. }));
219 }
220
221 #[test]
222 fn single_component_accepts_a_plain_filename() {
223 assert_eq!(
224 single_component("foo.warc.gz"),
225 Some(OsStr::new("foo.warc.gz"))
226 );
227 }
228
229 #[test]
230 fn single_component_rejects_traversal_and_multi_component_names() {
231 assert!(single_component("").is_none());
232 assert!(single_component("..").is_none());
233 assert!(single_component(".").is_none());
234 assert!(single_component("/etc/passwd").is_none());
235 assert!(single_component("../../etc/passwd").is_none());
236 assert!(single_component("sub/dir/file.warc.gz").is_none());
237 }
238}