1use std::ffi::OsStr;
2use std::path::{Component, Path, PathBuf};
3
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use crate::{Checksum, Error, Hasher, Prepared, Sink, SinkFactory, Target};
7
8pub struct LocalDir {
12 pub dir: PathBuf,
13}
14
15impl LocalDir {
16 pub fn create_all(dir: impl Into<PathBuf>) -> std::io::Result<Self> {
20 let dir = dir.into();
21 std::fs::create_dir_all(&dir)?;
22 Ok(Self { dir })
23 }
24}
25
26impl SinkFactory for LocalDir {
27 type Sink = LocalSink;
28 type Location = PathBuf;
29
30 async fn make(&mut self, target: Target<'_>) -> Result<LocalSink, Error> {
31 let name = single_component(target.name).ok_or_else(|| Error::InvalidDownloadPath {
35 path: PathBuf::from(target.name),
36 })?;
37 LocalSink::new(self.dir.join(name))
38 }
39}
40
41fn single_component(name: &str) -> Option<&OsStr> {
45 let mut components = Path::new(name).components();
46 match (components.next(), components.next()) {
47 (Some(Component::Normal(c)), None) => Some(c),
48 _ => None,
49 }
50}
51
52pub struct LocalPath {
54 pub path: PathBuf,
55}
56
57impl SinkFactory for LocalPath {
58 type Sink = LocalSink;
59 type Location = PathBuf;
60
61 async fn make(&mut self, _target: Target<'_>) -> Result<LocalSink, Error> {
62 LocalSink::new(self.path.clone())
63 }
64}
65
66pub struct LocalSink {
67 final_path: PathBuf,
68 partial_path: PathBuf,
69 out: Option<tokio::fs::File>,
70}
71
72impl LocalSink {
73 pub(crate) fn new(final_path: PathBuf) -> Result<Self, Error> {
74 let partial_path = partial_path(&final_path)?;
75 Ok(Self {
76 final_path,
77 partial_path,
78 out: None,
79 })
80 }
81}
82
83impl Sink for LocalSink {
84 type Location = PathBuf;
85
86 async fn prepare(&mut self, target: Target<'_>) -> Result<Prepared<Self::Location>, Error> {
87 if existing_matches_destination(&self.final_path, target.checksum, target.size).await? {
88 return Ok(Prepared::Skip {
89 location: self.final_path.clone(),
90 });
91 }
92
93 let (received, partial) =
94 examine_partial(&self.partial_path, target.size, target.checksum).await?;
95 let out = if received > 0 {
96 tokio::fs::OpenOptions::new()
97 .append(true)
98 .open(&self.partial_path)
99 .await?
100 } else {
101 tokio::fs::File::create(&self.partial_path).await?
102 };
103 self.out = Some(out);
104 Ok(Prepared::Resume { received, partial })
105 }
106
107 async fn write_chunk(&mut self, chunk: &[u8]) -> Result<(), Error> {
108 let out = self
109 .out
110 .as_mut()
111 .expect("write_chunk before prepare or after finalize");
112 out.write_all(chunk).await?;
113 Ok(())
114 }
115
116 async fn restart(&mut self) -> Result<(), Error> {
117 let replacement = tokio::fs::OpenOptions::new()
118 .write(true)
119 .truncate(true)
120 .create(true)
121 .open(&self.partial_path)
122 .await?;
123 self.out = Some(replacement);
124 Ok(())
125 }
126
127 async fn finalize(mut self) -> Result<Self::Location, Error> {
128 let out = self.out.take().expect("finalize without prepare");
129 out.sync_all().await?;
130 drop(out);
131 tokio::fs::rename(&self.partial_path, &self.final_path).await?;
132 Ok(self.final_path)
133 }
134}
135
136async fn existing_matches_destination(
142 path: &Path,
143 checksum: Option<&Checksum>,
144 size: u64,
145) -> Result<bool, Error> {
146 if !tokio::fs::try_exists(path).await? {
147 return Ok(false);
148 }
149 match checksum {
150 Some(expected) => {
151 let mut hasher = Hasher::for_checksum(Some(expected));
152 seed_hasher_from_file(path, &mut hasher).await?;
153 Ok(hasher.finalize_hex().as_deref() == Some(expected.hex()))
154 }
155 None => {
156 let m = tokio::fs::metadata(path).await?;
157 Ok(m.len() == size)
158 }
159 }
160}
161
162async fn examine_partial(
163 partial_path: &Path,
164 expected_size: u64,
165 checksum: Option<&Checksum>,
166) -> Result<(u64, Hasher), Error> {
167 if !tokio::fs::try_exists(partial_path).await? {
168 return Ok((0, Hasher::for_checksum(checksum)));
169 }
170 let m = tokio::fs::metadata(partial_path).await?;
171 if m.len() > expected_size {
172 tokio::fs::remove_file(partial_path).await?;
173 return Ok((0, Hasher::for_checksum(checksum)));
174 }
175 let mut hasher = Hasher::for_checksum(checksum);
176 if m.len() > 0 {
177 seed_hasher_from_file(partial_path, &mut hasher).await?;
178 }
179 Ok((m.len(), hasher))
180}
181
182async fn seed_hasher_from_file(path: &Path, hasher: &mut Hasher) -> Result<(), Error> {
183 let mut f = tokio::fs::File::open(path).await?;
184 let mut buf = vec![0u8; 64 * 1024];
185 loop {
186 let n = f.read(&mut buf).await?;
187 if n == 0 {
188 break;
189 }
190 hasher.update(&buf[..n]);
191 }
192 Ok(())
193}
194
195fn partial_path(path: &Path) -> Result<PathBuf, Error> {
196 let mut file_name = path
197 .file_name()
198 .ok_or_else(|| Error::InvalidDownloadPath { path: path.into() })?
199 .to_os_string();
200 file_name.push(".part");
201 Ok(path.with_file_name(file_name))
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn partial_path_appends_part_suffix() {
210 let result = partial_path(Path::new("/tmp/foo.warc.gz")).unwrap();
211 assert_eq!(result, PathBuf::from("/tmp/foo.warc.gz.part"));
212 }
213
214 #[test]
215 fn partial_path_rejects_path_with_no_filename() {
216 let err = partial_path(Path::new("/")).unwrap_err();
217 assert!(matches!(err, Error::InvalidDownloadPath { .. }));
218 }
219
220 #[test]
221 fn single_component_accepts_a_plain_filename() {
222 assert_eq!(
223 single_component("foo.warc.gz"),
224 Some(OsStr::new("foo.warc.gz"))
225 );
226 }
227
228 #[test]
229 fn single_component_rejects_traversal_and_multi_component_names() {
230 assert!(single_component("").is_none());
231 assert!(single_component("..").is_none());
232 assert!(single_component(".").is_none());
233 assert!(single_component("/etc/passwd").is_none());
234 assert!(single_component("../../etc/passwd").is_none());
235 assert!(single_component("sub/dir/file.warc.gz").is_none());
236 }
237}