Skip to main content

takanawa_core/
part_file.rs

1use std::ffi::OsString;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5
6use fs2::FileExt;
7use sha2::{Digest, Sha256};
8
9use crate::chunk::{ChunkPlan, normalize_chunk_size};
10use crate::metadata::{PartMetadata, RemoteInfo, slot_size_for};
11use crate::{HashConfig, Result, TakanawaError, hash_url};
12
13#[derive(Debug)]
14pub struct PartFile {
15    file: File,
16    lock_file: File,
17    lock_path: PathBuf,
18    part_path: PathBuf,
19    slot_size: u64,
20    active_slot: u8,
21    metadata: PartMetadata,
22}
23
24impl PartFile {
25    pub fn open_or_create(
26        target_path: &Path,
27        url: &str,
28        remote: &RemoteInfo,
29        chunk_size: u64,
30        hash: HashConfig,
31    ) -> Result<Self> {
32        if target_path.exists() {
33            return Err(TakanawaError::TargetExists(target_path.to_owned()));
34        }
35
36        let chunk_size = normalize_chunk_size(chunk_size)?;
37        let part_path = part_path_for(target_path);
38        let lock_path = part_lock_path_for(target_path);
39        let lock_file = acquire_lock(&lock_path)?;
40        let slot_size = slot_size_for(remote.content_len, chunk_size)?;
41        let expected_len = remote
42            .content_len
43            .checked_add(slot_size.checked_mul(2).ok_or_else(|| {
44                TakanawaError::InvalidConfig("part file length overflow".to_owned())
45            })?)
46            .ok_or_else(|| TakanawaError::InvalidConfig("part file length overflow".to_owned()))?;
47        let url_hash = hash_url(url);
48
49        if part_path.exists() {
50            let mut file = OpenOptions::new().read(true).write(true).open(&part_path)?;
51            let actual_len = file.metadata()?.len();
52            if actual_len != expected_len {
53                return Err(TakanawaError::PartSizeMismatch {
54                    expected: expected_len,
55                    actual: actual_len,
56                });
57            }
58
59            let (metadata, active_slot) =
60                read_best_metadata(&mut file, remote.content_len, slot_size)?;
61            metadata.ensure_compatible(url_hash, remote, chunk_size, hash)?;
62            return Ok(Self {
63                file,
64                lock_file,
65                lock_path,
66                part_path,
67                slot_size,
68                active_slot,
69                metadata,
70            });
71        }
72
73        let mut file = OpenOptions::new()
74            .read(true)
75            .write(true)
76            .create_new(true)
77            .open(&part_path)?;
78        file.set_len(expected_len)?;
79
80        let metadata = PartMetadata::new(url_hash, remote, chunk_size, hash)?;
81        let slot = metadata.encode_slot(slot_size)?;
82        file.seek(SeekFrom::Start(remote.content_len))?;
83        file.write_all(&slot)?;
84        file.sync_all()?;
85
86        Ok(Self {
87            file,
88            lock_file,
89            lock_path,
90            part_path,
91            slot_size,
92            active_slot: 0,
93            metadata,
94        })
95    }
96
97    #[must_use]
98    pub const fn metadata(&self) -> &PartMetadata {
99        &self.metadata
100    }
101
102    #[must_use]
103    pub fn incomplete_chunks(&self) -> Vec<u64> {
104        self.metadata.bitmap.incomplete_indices()
105    }
106
107    pub fn write_chunk(&mut self, index: u64, bytes: &[u8]) -> Result<()> {
108        let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
109        let chunk = plan.chunk(index)?;
110        if bytes.len() != usize::try_from(chunk.len).unwrap_or(usize::MAX) {
111            return Err(TakanawaError::HttpProtocol(format!(
112                "chunk {index} length mismatch: expected {}, got {}",
113                chunk.len,
114                bytes.len()
115            )));
116        }
117        if self.metadata.bitmap.is_complete(index)? {
118            return Ok(());
119        }
120
121        self.write_chunk_bytes(index, 0, bytes)?;
122        self.commit_chunk(index)
123    }
124
125    pub fn write_chunk_bytes(&mut self, index: u64, chunk_offset: u64, bytes: &[u8]) -> Result<()> {
126        let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
127        let chunk = plan.chunk(index)?;
128        let len = u64::try_from(bytes.len()).map_err(|_| {
129            TakanawaError::InvalidConfig(format!(
130                "chunk {index} write length does not fit in file offsets"
131            ))
132        })?;
133        let end = chunk_offset.checked_add(len).ok_or_else(|| {
134            TakanawaError::InvalidConfig(format!("chunk {index} write offset overflow"))
135        })?;
136        if end > chunk.len {
137            return Err(TakanawaError::InvalidConfig(format!(
138                "chunk {index} write range {chunk_offset}..{end} exceeds chunk length {}",
139                chunk.len
140            )));
141        }
142        if bytes.is_empty() || self.metadata.bitmap.is_complete(index)? {
143            return Ok(());
144        }
145
146        self.file
147            .seek(SeekFrom::Start(chunk.start + chunk_offset))?;
148        self.file.write_all(bytes)?;
149        Ok(())
150    }
151
152    pub fn commit_chunk(&mut self, index: u64) -> Result<()> {
153        let plan = ChunkPlan::new(self.metadata.content_len, self.metadata.chunk_size)?;
154        let _chunk = plan.chunk(index)?;
155        if self.metadata.bitmap.is_complete(index)? {
156            return Ok(());
157        }
158
159        self.file.sync_data()?;
160
161        self.metadata.bitmap.mark_complete(index)?;
162        self.commit_metadata()
163    }
164
165    pub fn finalize(mut self, target_path: &Path) -> Result<()> {
166        if target_path.exists() {
167            return Err(TakanawaError::TargetExists(target_path.to_owned()));
168        }
169        if !self.metadata.all_complete() {
170            return Err(TakanawaError::InvalidConfig(
171                "cannot finalize an incomplete part file".to_owned(),
172            ));
173        }
174
175        if let HashConfig::Sha256(expected) = self.metadata.hash {
176            let actual = self.compute_sha256()?;
177            if actual != expected {
178                return Err(TakanawaError::HashMismatch);
179            }
180        }
181
182        let PartFile {
183            file,
184            lock_file,
185            lock_path,
186            part_path,
187            metadata,
188            ..
189        } = self;
190        file.set_len(metadata.content_len)?;
191        file.sync_all()?;
192        drop(file);
193        fs::rename(&part_path, target_path)?;
194        sync_parent_dir(target_path);
195        drop(lock_file);
196        let _ = fs::remove_file(lock_path);
197        Ok(())
198    }
199
200    fn commit_metadata(&mut self) -> Result<()> {
201        self.metadata.generation = self.metadata.generation.checked_add(1).ok_or_else(|| {
202            TakanawaError::InvalidConfig("metadata generation overflow".to_owned())
203        })?;
204        self.active_slot = (self.metadata.generation % 2) as u8;
205        let slot = self.metadata.encode_slot(self.slot_size)?;
206        let offset = self.metadata.content_len + u64::from(self.active_slot) * self.slot_size;
207        self.file.seek(SeekFrom::Start(offset))?;
208        self.file.write_all(&slot)?;
209        self.file.sync_all()?;
210        Ok(())
211    }
212
213    fn compute_sha256(&mut self) -> Result<[u8; 32]> {
214        let mut hasher = Sha256::new();
215        let mut remaining = self.metadata.content_len;
216        let mut buffer = vec![0; 1024 * 1024];
217        self.file.seek(SeekFrom::Start(0))?;
218        while remaining > 0 {
219            let read_len = usize::try_from(remaining.min(buffer.len() as u64))
220                .expect("bounded by buffer length");
221            self.file.read_exact(&mut buffer[..read_len])?;
222            hasher.update(&buffer[..read_len]);
223            remaining -= read_len as u64;
224        }
225        Ok(hasher.finalize().into())
226    }
227}
228
229#[must_use]
230pub fn part_path_for(target_path: &Path) -> PathBuf {
231    let mut value: OsString = target_path.as_os_str().to_owned();
232    value.push(".part");
233    PathBuf::from(value)
234}
235
236#[must_use]
237pub fn part_lock_path_for(target_path: &Path) -> PathBuf {
238    let mut value: OsString = target_path.as_os_str().to_owned();
239    value.push(".part.lock");
240    PathBuf::from(value)
241}
242
243fn acquire_lock(lock_path: &Path) -> Result<File> {
244    let lock_file = OpenOptions::new()
245        .read(true)
246        .write(true)
247        .create(true)
248        .truncate(false)
249        .open(lock_path)?;
250    match lock_file.try_lock_exclusive() {
251        Ok(()) => Ok(lock_file),
252        Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
253            Err(TakanawaError::PartBusy(lock_path.to_owned()))
254        }
255        Err(err) => Err(TakanawaError::Io(err)),
256    }
257}
258
259fn read_best_metadata(
260    file: &mut File,
261    content_len: u64,
262    slot_size: u64,
263) -> Result<(PartMetadata, u8)> {
264    let slot_len = usize::try_from(slot_size)
265        .map_err(|_| TakanawaError::PartCorrupt("slot size overflow".to_owned()))?;
266    let mut slots = Vec::new();
267
268    for slot_index in 0..2_u8 {
269        let offset = content_len + u64::from(slot_index) * slot_size;
270        let mut buffer = vec![0; slot_len];
271        file.seek(SeekFrom::Start(offset))?;
272        file.read_exact(&mut buffer)?;
273        if let Ok(metadata) = PartMetadata::decode_slot(&buffer) {
274            slots.push((metadata, slot_index));
275        }
276    }
277
278    slots
279        .into_iter()
280        .max_by_key(|(metadata, _)| metadata.generation)
281        .ok_or_else(|| TakanawaError::PartCorrupt("no valid metadata slot found".to_owned()))
282}
283
284#[cfg(unix)]
285fn sync_parent_dir(target_path: &Path) {
286    if let Some(parent) = target_path.parent() {
287        if let Ok(dir) = File::open(parent) {
288            let _ = dir.sync_all();
289        }
290    }
291}
292
293#[cfg(not(unix))]
294fn sync_parent_dir(_target_path: &Path) {}
295
296#[cfg(test)]
297mod tests {
298    use std::fs;
299
300    use tempfile::TempDir;
301
302    use super::*;
303
304    fn remote(content_len: u64) -> RemoteInfo {
305        RemoteInfo {
306            content_len,
307            etag: Some("etag".to_owned()),
308            last_modified: Some("now".to_owned()),
309        }
310    }
311
312    #[test]
313    fn resumes_valid_part() {
314        let dir = TempDir::new().unwrap();
315        let target = dir.path().join("file.bin");
316        {
317            let mut part = PartFile::open_or_create(
318                &target,
319                "https://example.test/file",
320                &remote(6),
321                3,
322                HashConfig::None,
323            )
324            .unwrap();
325            part.write_chunk(0, b"abc").unwrap();
326        }
327
328        let part = PartFile::open_or_create(
329            &target,
330            "https://example.test/file",
331            &remote(6),
332            3,
333            HashConfig::None,
334        )
335        .unwrap();
336
337        assert_eq!(part.metadata().completed_chunks(), 1);
338        assert_eq!(part.incomplete_chunks(), vec![1]);
339    }
340
341    #[test]
342    fn partial_chunk_write_is_not_committed_on_reopen() {
343        let dir = TempDir::new().unwrap();
344        let target = dir.path().join("file.bin");
345        {
346            let mut part = PartFile::open_or_create(
347                &target,
348                "https://example.test/file",
349                &remote(6),
350                3,
351                HashConfig::None,
352            )
353            .unwrap();
354            part.write_chunk_bytes(0, 0, b"ab").unwrap();
355        }
356
357        let part = PartFile::open_or_create(
358            &target,
359            "https://example.test/file",
360            &remote(6),
361            3,
362            HashConfig::None,
363        )
364        .unwrap();
365
366        assert_eq!(part.metadata().completed_chunks(), 0);
367        assert_eq!(part.incomplete_chunks(), vec![0, 1]);
368    }
369
370    #[test]
371    fn partial_chunk_can_be_overwritten_and_committed() {
372        let dir = TempDir::new().unwrap();
373        let target = dir.path().join("file.bin");
374        let mut part = PartFile::open_or_create(
375            &target,
376            "https://example.test/file",
377            &remote(6),
378            3,
379            HashConfig::None,
380        )
381        .unwrap();
382        part.write_chunk_bytes(0, 0, b"xx").unwrap();
383        part.write_chunk_bytes(0, 0, b"abc").unwrap();
384        part.commit_chunk(0).unwrap();
385        part.write_chunk(1, b"def").unwrap();
386        part.finalize(&target).unwrap();
387
388        assert_eq!(fs::read(&target).unwrap(), b"abcdef");
389    }
390
391    #[test]
392    fn partial_chunk_write_rejects_out_of_bounds_ranges() {
393        let dir = TempDir::new().unwrap();
394        let target = dir.path().join("file.bin");
395        let mut part = PartFile::open_or_create(
396            &target,
397            "https://example.test/file",
398            &remote(6),
399            3,
400            HashConfig::None,
401        )
402        .unwrap();
403
404        let err = part.write_chunk_bytes(0, 2, b"bc").unwrap_err();
405
406        assert!(matches!(err, TakanawaError::InvalidConfig(_)));
407    }
408
409    #[test]
410    fn rejects_part_size_mismatch() {
411        let dir = TempDir::new().unwrap();
412        let target = dir.path().join("file.bin");
413        let part_path = part_path_for(&target);
414        fs::write(&part_path, b"too short").unwrap();
415
416        let err = PartFile::open_or_create(
417            &target,
418            "https://example.test/file",
419            &remote(6),
420            3,
421            HashConfig::None,
422        )
423        .unwrap_err();
424
425        assert!(matches!(err, TakanawaError::PartSizeMismatch { .. }));
426    }
427
428    #[test]
429    fn finalizes_and_strips_metadata() {
430        let dir = TempDir::new().unwrap();
431        let target = dir.path().join("file.bin");
432        let mut part = PartFile::open_or_create(
433            &target,
434            "https://example.test/file",
435            &remote(6),
436            3,
437            HashConfig::None,
438        )
439        .unwrap();
440        part.write_chunk(1, b"def").unwrap();
441        part.write_chunk(0, b"abc").unwrap();
442        part.finalize(&target).unwrap();
443
444        assert_eq!(fs::read(&target).unwrap(), b"abcdef");
445        assert!(!part_path_for(&target).exists());
446    }
447}