1use std::ffi::OsString;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5
6use fs2::FileExt;
7use sha2::{Digest, Sha256};
8
9use crate::chunk::{ChunkPlan, normalize_chunk_size};
10use crate::metadata::{PartMetadata, RemoteInfo, slot_size_for};
11use crate::{HashConfig, Result, TakanawaError, hash_url};
12
13#[derive(Debug)]
14pub struct PartFile {
15 file: File,
16 lock_file: File,
17 lock_path: PathBuf,
18 part_path: PathBuf,
19 slot_size: u64,
20 active_slot: u8,
21 metadata: PartMetadata,
22}
23
24impl PartFile {
25 pub fn open_or_create(
26 target_path: &Path,
27 url: &str,
28 remote: &RemoteInfo,
29 chunk_size: u64,
30 hash: HashConfig,
31 ) -> Result<Self> {
32 if target_path.exists() {
33 return Err(TakanawaError::TargetExists(target_path.to_owned()));
34 }
35
36 let chunk_size = normalize_chunk_size(chunk_size)?;
37 let part_path = part_path_for(target_path);
38 let lock_path = part_lock_path_for(target_path);
39 let lock_file = acquire_lock(&lock_path)?;
40 let slot_size = slot_size_for(remote.content_len, chunk_size)?;
41 let expected_len = remote
42 .content_len
43 .checked_add(slot_size.checked_mul(2).ok_or_else(|| {
44 TakanawaError::InvalidConfig("part file length overflow".to_owned())
45 })?)
46 .ok_or_else(|| TakanawaError::InvalidConfig("part file length overflow".to_owned()))?;
47 let url_hash = hash_url(url);
48
49 if part_path.exists() {
50 let mut file = OpenOptions::new().read(true).write(true).open(&part_path)?;
51 let actual_len = file.metadata()?.len();
52 if actual_len != expected_len {
53 return Err(TakanawaError::PartSizeMismatch {
54 expected: expected_len,
55 actual: actual_len,
56 });
57 }
58
59 let (metadata, active_slot) =
60 read_best_metadata(&mut file, remote.content_len, slot_size)?;
61 metadata.ensure_compatible(url_hash, remote, chunk_size, hash)?;
62 return Ok(Self {
63 file,
64 lock_file,
65 lock_path,
66 part_path,
67 slot_size,
68 active_slot,
69 metadata,
70 });
71 }
72
73 let mut file = OpenOptions::new()
74 .read(true)
75 .write(true)
76 .create_new(true)
77 .open(&part_path)?;
78 file.set_len(expected_len)?;
79
80 let metadata = PartMetadata::new(url_hash, remote, chunk_size, hash)?;
81 let slot = metadata.encode_slot(slot_size)?;
82 file.seek(SeekFrom::Start(remote.content_len))?;
83 file.write_all(&slot)?;
84 file.sync_all()?;
85
86 Ok(Self {
87 file,
88 lock_file,
89 lock_path,
90 part_path,
91 slot_size,
92 active_slot: 0,
93 metadata,
94 })
95 }
96
97 #[must_use]
98 pub const fn metadata(&self) -> &PartMetadata {
99 &self.metadata
100 }
101
102 #[must_use]
103 pub fn incomplete_chunks(&self) -> Vec<u64> {
104 self.metadata.bitmap.incomplete_indices()
105 }
106
107 pub fn write_chunk(&mut self, index: u64, bytes: &[u8]) -> Result<()> {
108 let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
109 let chunk = plan.chunk(index)?;
110 if bytes.len() != usize::try_from(chunk.len).unwrap_or(usize::MAX) {
111 return Err(TakanawaError::HttpProtocol(format!(
112 "chunk {index} length mismatch: expected {}, got {}",
113 chunk.len,
114 bytes.len()
115 )));
116 }
117 if self.metadata.bitmap.is_complete(index)? {
118 return Ok(());
119 }
120
121 self.write_chunk_bytes(index, 0, bytes)?;
122 self.commit_chunk(index)
123 }
124
125 pub fn write_chunk_bytes(&mut self, index: u64, chunk_offset: u64, bytes: &[u8]) -> Result<()> {
126 let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
127 let chunk = plan.chunk(index)?;
128 let len = u64::try_from(bytes.len()).map_err(|_| {
129 TakanawaError::InvalidConfig(format!(
130 "chunk {index} write length does not fit in file offsets"
131 ))
132 })?;
133 let end = chunk_offset.checked_add(len).ok_or_else(|| {
134 TakanawaError::InvalidConfig(format!("chunk {index} write offset overflow"))
135 })?;
136 if end > chunk.len {
137 return Err(TakanawaError::InvalidConfig(format!(
138 "chunk {index} write range {chunk_offset}..{end} exceeds chunk length {}",
139 chunk.len
140 )));
141 }
142 if bytes.is_empty() || self.metadata.bitmap.is_complete(index)? {
143 return Ok(());
144 }
145
146 self.file
147 .seek(SeekFrom::Start(chunk.start + chunk_offset))?;
148 self.file.write_all(bytes)?;
149 Ok(())
150 }
151
152 pub fn commit_chunk(&mut self, index: u64) -> Result<()> {
153 let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
154 let _chunk = plan.chunk(index)?;
155 if self.metadata.bitmap.is_complete(index)? {
156 return Ok(());
157 }
158
159 self.file.sync_data()?;
160
161 self.metadata.bitmap.mark_complete(index)?;
162 self.commit_metadata()
163 }
164
165 pub fn finalize(mut self, target_path: &Path) -> Result<()> {
166 if target_path.exists() {
167 return Err(TakanawaError::TargetExists(target_path.to_owned()));
168 }
169 if !self.metadata.all_complete() {
170 return Err(TakanawaError::InvalidConfig(
171 "cannot finalize an incomplete part file".to_owned(),
172 ));
173 }
174
175 if let HashConfig::Sha256(expected) = self.metadata.hash {
176 let actual = self.compute_sha256()?;
177 if actual != expected {
178 return Err(TakanawaError::HashMismatch);
179 }
180 }
181
182 let PartFile {
183 file,
184 lock_file,
185 lock_path,
186 part_path,
187 metadata,
188 ..
189 } = self;
190 file.set_len(metadata.content_len)?;
191 file.sync_all()?;
192 drop(file);
193 fs::rename(&part_path, target_path)?;
194 sync_parent_dir(target_path);
195 drop(lock_file);
196 let _ = fs::remove_file(lock_path);
197 Ok(())
198 }
199
200 fn commit_metadata(&mut self) -> Result<()> {
201 self.metadata.generation = self.metadata.generation.checked_add(1).ok_or_else(|| {
202 TakanawaError::InvalidConfig("metadata generation overflow".to_owned())
203 })?;
204 self.active_slot = (self.metadata.generation % 2) as u8;
205 let slot = self.metadata.encode_slot(self.slot_size)?;
206 let offset = self.metadata.content_len + u64::from(self.active_slot) * self.slot_size;
207 self.file.seek(SeekFrom::Start(offset))?;
208 self.file.write_all(&slot)?;
209 self.file.sync_all()?;
210 Ok(())
211 }
212
213 fn compute_sha256(&mut self) -> Result<[u8; 32]> {
214 let mut hasher = Sha256::new();
215 let mut remaining = self.metadata.content_len;
216 let mut buffer = vec![0; 1024 * 1024];
217 self.file.seek(SeekFrom::Start(0))?;
218 while remaining > 0 {
219 let read_len = usize::try_from(remaining.min(buffer.len() as u64))
220 .expect("bounded by buffer length");
221 self.file.read_exact(&mut buffer[..read_len])?;
222 hasher.update(&buffer[..read_len]);
223 remaining -= read_len as u64;
224 }
225 Ok(hasher.finalize().into())
226 }
227}
228
229#[must_use]
230pub fn part_path_for(target_path: &Path) -> PathBuf {
231 let mut value: OsString = target_path.as_os_str().to_owned();
232 value.push(".part");
233 PathBuf::from(value)
234}
235
236#[must_use]
237pub fn part_lock_path_for(target_path: &Path) -> PathBuf {
238 let mut value: OsString = target_path.as_os_str().to_owned();
239 value.push(".part.lock");
240 PathBuf::from(value)
241}
242
243fn acquire_lock(lock_path: &Path) -> Result<File> {
244 let lock_file = OpenOptions::new()
245 .read(true)
246 .write(true)
247 .create(true)
248 .truncate(false)
249 .open(lock_path)?;
250 match lock_file.try_lock_exclusive() {
251 Ok(()) => Ok(lock_file),
252 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
253 Err(TakanawaError::PartBusy(lock_path.to_owned()))
254 }
255 Err(err) => Err(TakanawaError::Io(err)),
256 }
257}
258
259fn read_best_metadata(
260 file: &mut File,
261 content_len: u64,
262 slot_size: u64,
263) -> Result<(PartMetadata, u8)> {
264 let slot_len = usize::try_from(slot_size)
265 .map_err(|_| TakanawaError::PartCorrupt("slot size overflow".to_owned()))?;
266 let mut slots = Vec::new();
267
268 for slot_index in 0..2_u8 {
269 let offset = content_len + u64::from(slot_index) * slot_size;
270 let mut buffer = vec![0; slot_len];
271 file.seek(SeekFrom::Start(offset))?;
272 file.read_exact(&mut buffer)?;
273 if let Ok(metadata) = PartMetadata::decode_slot(&buffer) {
274 slots.push((metadata, slot_index));
275 }
276 }
277
278 slots
279 .into_iter()
280 .max_by_key(|(metadata, _)| metadata.generation)
281 .ok_or_else(|| TakanawaError::PartCorrupt("no valid metadata slot found".to_owned()))
282}
283
284#[cfg(unix)]
285fn sync_parent_dir(target_path: &Path) {
286 if let Some(parent) = target_path.parent() {
287 if let Ok(dir) = File::open(parent) {
288 let _ = dir.sync_all();
289 }
290 }
291}
292
293#[cfg(not(unix))]
294fn sync_parent_dir(_target_path: &Path) {}
295
296#[cfg(test)]
297mod tests {
298 use std::fs;
299
300 use tempfile::TempDir;
301
302 use super::*;
303
304 fn remote(content_len: u64) -> RemoteInfo {
305 RemoteInfo {
306 content_len,
307 etag: Some("etag".to_owned()),
308 last_modified: Some("now".to_owned()),
309 }
310 }
311
312 #[test]
313 fn resumes_valid_part() {
314 let dir = TempDir::new().unwrap();
315 let target = dir.path().join("file.bin");
316 {
317 let mut part = PartFile::open_or_create(
318 &target,
319 "https://example.test/file",
320 &remote(6),
321 3,
322 HashConfig::None,
323 )
324 .unwrap();
325 part.write_chunk(0, b"abc").unwrap();
326 }
327
328 let part = PartFile::open_or_create(
329 &target,
330 "https://example.test/file",
331 &remote(6),
332 3,
333 HashConfig::None,
334 )
335 .unwrap();
336
337 assert_eq!(part.metadata().completed_chunks(), 1);
338 assert_eq!(part.incomplete_chunks(), vec![1]);
339 }
340
341 #[test]
342 fn partial_chunk_write_is_not_committed_on_reopen() {
343 let dir = TempDir::new().unwrap();
344 let target = dir.path().join("file.bin");
345 {
346 let mut part = PartFile::open_or_create(
347 &target,
348 "https://example.test/file",
349 &remote(6),
350 3,
351 HashConfig::None,
352 )
353 .unwrap();
354 part.write_chunk_bytes(0, 0, b"ab").unwrap();
355 }
356
357 let part = PartFile::open_or_create(
358 &target,
359 "https://example.test/file",
360 &remote(6),
361 3,
362 HashConfig::None,
363 )
364 .unwrap();
365
366 assert_eq!(part.metadata().completed_chunks(), 0);
367 assert_eq!(part.incomplete_chunks(), vec![0, 1]);
368 }
369
370 #[test]
371 fn partial_chunk_can_be_overwritten_and_committed() {
372 let dir = TempDir::new().unwrap();
373 let target = dir.path().join("file.bin");
374 let mut part = PartFile::open_or_create(
375 &target,
376 "https://example.test/file",
377 &remote(6),
378 3,
379 HashConfig::None,
380 )
381 .unwrap();
382 part.write_chunk_bytes(0, 0, b"xx").unwrap();
383 part.write_chunk_bytes(0, 0, b"abc").unwrap();
384 part.commit_chunk(0).unwrap();
385 part.write_chunk(1, b"def").unwrap();
386 part.finalize(&target).unwrap();
387
388 assert_eq!(fs::read(&target).unwrap(), b"abcdef");
389 }
390
391 #[test]
392 fn partial_chunk_write_rejects_out_of_bounds_ranges() {
393 let dir = TempDir::new().unwrap();
394 let target = dir.path().join("file.bin");
395 let mut part = PartFile::open_or_create(
396 &target,
397 "https://example.test/file",
398 &remote(6),
399 3,
400 HashConfig::None,
401 )
402 .unwrap();
403
404 let err = part.write_chunk_bytes(0, 2, b"bc").unwrap_err();
405
406 assert!(matches!(err, TakanawaError::InvalidConfig(_)));
407 }
408
409 #[test]
410 fn rejects_part_size_mismatch() {
411 let dir = TempDir::new().unwrap();
412 let target = dir.path().join("file.bin");
413 let part_path = part_path_for(&target);
414 fs::write(&part_path, b"too short").unwrap();
415
416 let err = PartFile::open_or_create(
417 &target,
418 "https://example.test/file",
419 &remote(6),
420 3,
421 HashConfig::None,
422 )
423 .unwrap_err();
424
425 assert!(matches!(err, TakanawaError::PartSizeMismatch { .. }));
426 }
427
428 #[test]
429 fn finalizes_and_strips_metadata() {
430 let dir = TempDir::new().unwrap();
431 let target = dir.path().join("file.bin");
432 let mut part = PartFile::open_or_create(
433 &target,
434 "https://example.test/file",
435 &remote(6),
436 3,
437 HashConfig::None,
438 )
439 .unwrap();
440 part.write_chunk(1, b"def").unwrap();
441 part.write_chunk(0, b"abc").unwrap();
442 part.finalize(&target).unwrap();
443
444 assert_eq!(fs::read(&target).unwrap(), b"abcdef");
445 assert!(!part_path_for(&target).exists());
446 }
447}