hpfile/
lib.rs

1//! # Head-prunable file
2//!
3//! Normal files can not be pruned (truncated) from the beginning to some middle position.
4//! A `HPFile` uses a sequence of small files to simulate one big virtual file. Thus, pruning
5//! from the beginning is to delete the first several small files.
6//!
7//! A `HPFile` can only be read and appended. Any byteslice which was written to it is
8//! immutable.
9//!
10//! To append a new byteslice into a `HPFile`, use the `append` function, which will return
11//! the start position of this byteslice. Later, just pass this start position to `read_at`
12//! for reading this byteslice out. The position passed to `read_at` must be the beginning of a
13//! byteslice that was written before, instead of its middle. Do NOT try to read the later
14//! half (from a middle point to the end) of a byteslice.
15//!
16//! A `HPFile` can also be truncated: discarding the content from a given position to the
17//! end of the file. During truncation, several small files may be removed and one small file
18//! may get truncated.
19//!
20//! A `HPFile` can serve many reader threads. If a reader thread just read random positions,
21//! plain `read_at` is enough. If a reader tends to read many adjacent byteslices in sequence, it
22//! can take advantage of spatial locality by using `read_at_with_pre_reader`, which uses a
23//! `PreReader` to read large chunks of data from file and cache them. Each reader thread can have
24//! its own `PreReader`. A `PreReader` cannot be shared by different `HPFile`s.
25//!
26//! A `HPFile` can serve only one writer thread. The writer thread must own a write buffer that
27//! collects small pieces of written data into one big single write to the underlying OS file,
28//! to avoid the cost of many syscalls writing the OS file. This write buffer must be provided
29//! when calling `append` and `flush`. It is owned by the writer thread, instead of `HPFile`,
30//! because we want `HPFile` to be shared between many reader threads.
31
32pub mod file;
33pub use std::io::Read;
34
35use anyhow::{anyhow, Result};
36use dashmap::DashMap;
37use std::{
38    fs,
39    io::{self, Seek, SeekFrom, Write},
40    sync::atomic::{AtomicI64, Ordering},
41    sync::Arc,
42};
43
44use std::fs::File;
45
46#[cfg(unix)]
47use std::os::unix::fs::{FileExt, OpenOptionsExt};
48
49/// A trait for reading at a given offset without affecting the cursor position
50pub trait ReadAt {
51    /// Reads a number of bytes starting from a given offset.
52    ///
53    /// Returns the number of bytes read.
54    ///
55    /// The offset is relative to the start of the file and thus independent from the current cursor.
56    /// The current file cursor is not affected by this function.
57    ///
58    /// Note that similar to File::read, it is not an error to return with a short read.
59    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize>;
60}
61
62#[cfg(target_os = "zkvm")]
63impl ReadAt for File {
64    fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
65        panic!("read_at is not supported in zkvm target");
66        // let mut temp_file = self.try_clone()?;
67        // temp_file.seek(SeekFrom::Start(offset))?;
68        // temp_file.read(buf)
69    }
70}
71
72const PRE_READ_BUF_SIZE: usize = 512 * 1024;
73pub const IO_BLK_SIZE: usize = 512;
74
75#[cfg(target_os = "linux")]
76const DIRECT: i32 = libc::O_DIRECT;
77#[cfg(not(target_os = "linux"))]
78const DIRECT: i32 = i32::MIN; //will cause error
79
80type FileMap = DashMap<
81    i64,
82    Arc<(
83        File,
84        bool, /* Whether the file is written only used with directio */
85    )>,
86>;
87
88/// Head-prunable file
89#[derive(Debug)]
90pub struct HPFile {
91    dir_name: String,  // where we store the small files
92    segment_size: i64, // the size of each small file
93    buffer_size: i64,  // the write buffer's size
94    file_map: FileMap,
95    largest_id: AtomicI64,
96    latest_file_size: AtomicI64,
97    file_size: AtomicI64,
98    file_size_on_disk: AtomicI64,
99    directio: bool,
100}
101
102impl HPFile {
103    /// Create a `HPFile` with a directory. If this directory was used by an old HPFile, the old
104    /// HPFile must have the same `segment_size` as this one.
105    ///
106    /// # Parameters
107    ///
108    /// - `wr_buf_size`: The write buffer used in `append` will not exceed this size
109    /// - `segment_size`: The target size of the small files
110    /// - `dir_name`: The name of the directory used to store the small files
111    /// - `directio`: Enable directio for readonly files (only use this feature on Linux)
112    ///
113    /// # Returns
114    ///
115    /// A `Result` which is:
116    ///
117    /// - `Ok`: A successfully initialized `HPFile`
118    /// - `Err`: Encounted some file system error.
119    ///
120    pub fn new(
121        wr_buf_size: i64,
122        segment_size: i64,
123        dir_name: String,
124        directio: bool,
125    ) -> Result<HPFile> {
126        if segment_size % wr_buf_size != 0 {
127            return Err(anyhow!(
128                "Invalid segmentSize:{} writeBufferSize:{}",
129                segment_size,
130                wr_buf_size
131            ));
132        }
133
134        if directio && cfg!(not(target_os = "linux")) {
135            eprintln!("Directio is only supported on Linux");
136        }
137
138        let (id_list, largest_id) = Self::get_file_ids(&dir_name, segment_size)?;
139        let (file_map, latest_file_size) =
140            Self::load_file_map(&dir_name, segment_size, id_list, largest_id, directio)?;
141
142        let file_size = largest_id * segment_size + latest_file_size;
143        Ok(HPFile {
144            dir_name: dir_name.clone(),
145            segment_size,
146            buffer_size: wr_buf_size,
147            file_map,
148            largest_id: AtomicI64::new(largest_id),
149            latest_file_size: AtomicI64::new(latest_file_size),
150            file_size: AtomicI64::new(file_size),
151            file_size_on_disk: AtomicI64::new(file_size),
152            directio,
153        })
154    }
155
156    /// Create an empty `HPFile` that has no function and can only be used as placeholder.
157    pub fn empty() -> HPFile {
158        HPFile {
159            dir_name: "".to_owned(),
160            segment_size: 0,
161            buffer_size: 0,
162            file_map: DashMap::with_capacity(0),
163            largest_id: AtomicI64::new(0),
164            latest_file_size: AtomicI64::new(0),
165            file_size: AtomicI64::new(0),
166            file_size_on_disk: AtomicI64::new(0),
167            directio: false,
168        }
169    }
170
171    /// Returns whether this `HPFile` is empty.
172    pub fn is_empty(&self) -> bool {
173        self.segment_size == 0
174    }
175
176    fn get_file_ids(dir_name: &str, segment_size: i64) -> Result<(Vec<i64>, i64)> {
177        let mut largest_id = 0;
178        let mut id_list = Vec::new();
179
180        for entry in fs::read_dir(dir_name)? {
181            let entry = entry?;
182            let path = entry.path();
183            if path.is_dir() {
184                continue;
185            }
186
187            let file_name = entry.file_name().to_string_lossy().to_string();
188            let id = Self::parse_filename(segment_size, &file_name)?;
189            largest_id = largest_id.max(id);
190            id_list.push(id);
191        }
192
193        Ok((id_list, largest_id))
194    }
195
196    fn parse_filename(segment_size: i64, file_name: &str) -> Result<i64> {
197        let parts: Vec<_> = file_name.split("-").collect();
198        if parts.len() != 2 {
199            return Err(anyhow!(
200                "{} does not match the pattern 'FileId-segmentSize'",
201                file_name
202            ));
203        }
204
205        let id: i64 = parts[0].parse()?;
206        let size: i64 = parts[1].parse()?;
207
208        if segment_size != size {
209            return Err(anyhow!("Invalid Size! {}!={}", size, segment_size));
210        }
211
212        Ok(id)
213    }
214
215    fn load_file_map(
216        dir_name: &str,
217        segment_size: i64,
218        id_list: Vec<i64>,
219        largest_id: i64,
220        directio: bool,
221    ) -> Result<(FileMap, i64)> {
222        let file_map = DashMap::new();
223        let mut latest_file_size = 0;
224
225        for &id in &id_list {
226            let file_name = format!("{dir_name}/{id}-{segment_size}");
227            let mut options = File::options();
228            let file_and_ro = if id == largest_id {
229                let file = options.read(true).write(true).open(file_name)?;
230                latest_file_size = file.metadata()?.len() as i64;
231                (file, false)
232            } else {
233                if directio {
234                    #[cfg(target_os = "linux")]
235                    options.custom_flags(DIRECT);
236                }
237                (options.read(true).open(file_name)?, true)
238            };
239            file_map.insert(id, Arc::new(file_and_ro));
240        }
241
242        if id_list.is_empty() {
243            let file_name = format!("{}/{}-{}", &dir_name, 0, segment_size);
244            let file = File::create_new(file_name)?;
245            file_map.insert(0, Arc::new((file, false)));
246        }
247
248        Ok((file_map, latest_file_size))
249    }
250
251    /// Returns the size of the virtual large file, including the non-flushed bytes
252    pub fn size(&self) -> i64 {
253        self.file_size.load(Ordering::SeqCst)
254    }
255
256    /// Returns the flushed size of the virtual large file
257    pub fn size_on_disk(&self) -> i64 {
258        self.file_size_on_disk.load(Ordering::SeqCst)
259    }
260
261    /// Truncate the file to make it smaller
262    ///
263    /// # Parameters
264    ///
265    /// - `size`: the size of the virtual large file after truncation. It must be smaller
266    ///           than the original size.
267    ///
268    /// # Returns
269    ///
270    /// A `Result` which is:
271    ///
272    /// - `Ok`: It's truncated successfully
273    /// - `Err`: Encounted some file system error.
274    ///
275    pub fn truncate(&self, size: i64) -> io::Result<()> {
276        if self.is_empty() {
277            return Ok(());
278        }
279
280        let mut largest_id = self.largest_id.load(Ordering::SeqCst);
281
282        while size < largest_id * self.segment_size {
283            self.file_map.remove(&largest_id);
284
285            #[cfg(unix)]
286            {
287                let file_name = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
288                fs::remove_file(file_name)?;
289            }
290
291            self.largest_id.fetch_sub(1, Ordering::SeqCst);
292            largest_id -= 1;
293        }
294
295        let remaining_size = size - largest_id * self.segment_size;
296        let file_name = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
297        let mut f = File::options().read(true).write(true).open(file_name)?;
298        f.set_len(remaining_size as u64)?;
299        f.seek(SeekFrom::End(0))?;
300
301        self.file_map.insert(largest_id, Arc::new((f, false)));
302        self.latest_file_size
303            .store(remaining_size, Ordering::SeqCst);
304        self.file_size.store(size, Ordering::SeqCst);
305        self.file_size_on_disk.store(size, Ordering::SeqCst);
306
307        Ok(())
308    }
309
310    /// Flush the remained data in `buffer` into file system
311    ///
312    /// # Parameters
313    ///
314    /// - `buffer`: the write buffer, which is used by the client to call `append`.
315    /// - `eof`: indicates if this is the end of file. If so, padding may be added for alignment.
316    ///
317    /// # Returns
318    ///
319    /// A `Result` which is:
320    ///
321    /// - `Ok`: It's flushed successfully
322    /// - `Err`: Encounted some file system error.
323    ///
324    pub fn flush(&self, buffer: &mut Vec<u8>, eof: bool) -> io::Result<()> {
325        if self.is_empty() {
326            return Ok(());
327        }
328        let largest_id = self.largest_id.load(Ordering::SeqCst);
329        let mut opt = self.file_map.get_mut(&largest_id);
330        let mut f = &opt.as_mut().unwrap().value().0;
331        if !buffer.is_empty() {
332            let tail_len = buffer.len() % IO_BLK_SIZE;
333            if eof && tail_len != 0 {
334                // force the file size aligned with IO_BLK_SIZE
335                buffer.resize(buffer.len() + IO_BLK_SIZE - tail_len, 0);
336            }
337            f.seek(SeekFrom::End(0)).unwrap();
338            f.write_all(buffer)?;
339            self.file_size_on_disk
340                .fetch_add(buffer.len() as i64, Ordering::SeqCst);
341            buffer.clear();
342        }
343
344        f.sync_all()
345    }
346
347    /// Close the opened small files
348    pub fn close(&self) {
349        self.file_map.clear();
350    }
351
352    /// Returns the corresponding file and in-file position given a logical offset. When we
353    /// use io_uring to read data from HPFile, the underlying segment files must be exposed.
354    ///
355    /// # Parameters
356    ///
357    /// - `offset`: a logical offset of this HPFile
358    ///
359    /// # Returns
360    ///
361    /// A tuple. Its first entry is the underlying File and its readonly attribute,
362    /// and its sencond entry is the position within this underlying File.
363    ///
364    pub fn get_file_and_pos(&self, offset: i64) -> (Arc<(File, bool)>, i64) {
365        let file_id = offset / self.segment_size;
366        let opt = self.file_map.get(&file_id);
367        let f = opt.as_ref().unwrap().value();
368        (f.clone(), offset % self.segment_size)
369    }
370
371    /// Read data from file at `offset` to fill `bz`
372    ///
373    /// # Parameters
374    ///
375    /// - `offset`: the start position of a byteslice that was written before
376    ///
377    /// # Returns
378    ///
379    /// A `Result` which is:
380    ///
381    /// - `Ok`: Number of bytes that was filled into `bz`
382    /// - `Err`: Encounted some file system error.
383    ///
384    pub fn read_at(&self, bz: &mut [u8], offset: i64) -> io::Result<usize> {
385        let file_id = offset / self.segment_size;
386        let pos = offset % self.segment_size;
387        let opt = self.file_map.get(&file_id);
388        let f = &opt.as_ref().unwrap().value();
389        if self.directio && f.1 {
390            //readonly file, so we add alignment requirement
391            Self::read_at_aligned(&f.0, bz, pos)
392        } else {
393            f.0.read_at(bz, pos as u64)
394        }
395    }
396
397    pub fn read_range(&self, buf: &mut [u8], offset: i64) -> io::Result<()> {
398        let size = self.file_size_on_disk.load(Ordering::SeqCst);
399        let end_offset = offset + buf.len() as i64;
400        if end_offset > size {
401            return Err(io::Error::new(
402                io::ErrorKind::UnexpectedEof,
403                format!("Read out of range: {} + {} > {}", offset, buf.len(), size),
404            ));
405        }
406
407        let start_file_id = offset / self.segment_size;
408        let end_file_id = end_offset / self.segment_size;
409        let mut has_read_size = 0usize;
410        for file_id in start_file_id..=end_file_id {
411            let opt = self.file_map.get(&file_id);
412            if opt.is_none() {
413                return Err(io::Error::new(
414                    io::ErrorKind::NotFound,
415                    format!("File ID {file_id} not found"),
416                ));
417            }
418
419            let f = &opt.as_ref().unwrap().value();
420            let pos = (offset + has_read_size as i64) % self.segment_size;
421            let read_len = if file_id == end_file_id {
422                end_offset - offset - has_read_size as i64
423            } else {
424                self.segment_size - pos
425            } as usize;
426            let size = f.0.read_at(
427                &mut buf[has_read_size..has_read_size + read_len],
428                pos as u64,
429            );
430            match size {
431                Ok(n) if n < read_len => {
432                    return Err(io::Error::new(
433                        io::ErrorKind::UnexpectedEof,
434                        format!("Short read from file ID {file_id}: expected {read_len}, got {n}"),
435                    ));
436                }
437                Ok(_) => {}
438                Err(e) => {
439                    return Err(io::Error::new(
440                        io::ErrorKind::UnexpectedEof,
441                        format!("Failed to read from file ID {file_id}: {e}"),
442                    ));
443                }
444            }
445            has_read_size += read_len;
446        }
447
448        Ok(())
449    }
450
451    fn read_at_aligned(f: &File, bz: &mut [u8], offset: i64) -> io::Result<usize> {
452        if bz.len() > 2 * IO_BLK_SIZE {
453            panic!("Cannot read more than two io blocks");
454        }
455        let off_in_blk = offset % (IO_BLK_SIZE as i64);
456        let off_start = offset - off_in_blk;
457        let mut buf = [0u8; 3 * IO_BLK_SIZE];
458        let buf_start = IO_BLK_SIZE - (buf.as_ptr() as usize % IO_BLK_SIZE);
459        let mut buf_end = buf_start + IO_BLK_SIZE;
460        if off_in_blk != 0 {
461            buf_end += IO_BLK_SIZE; //we must read two blocks
462        }
463        let buf = &mut buf[buf_start..buf_end];
464        if buf.as_ptr() as usize % IO_BLK_SIZE != 0 {
465            panic!("Buffer still not aligned!");
466        }
467        if off_start as usize % IO_BLK_SIZE != 0 {
468            panic!("File offset still not aligned!");
469        }
470        let res = f.read_at(buf, off_start as u64);
471        if let Err(e) = res {
472            panic!("aligned {e}");
473        }
474        if let Ok(read_len) = res {
475            let copy_len = usize::min(read_len, bz.len());
476            let copy_start = off_in_blk as usize;
477            bz[..copy_len].copy_from_slice(&buf[copy_start..copy_start + copy_len]);
478            return Ok(copy_len);
479        }
480        res
481    }
482
483    /// Read at most `num_bytes` from file at `offset` to fill `buf`
484    ///
485    /// # Parameters
486    ///
487    /// - `buf`: a vector to be filled
488    /// - `num_bytes`: the wanted number of bytes to be read
489    /// - `offset`: the start position of a byteslice that was written before
490    /// - `pre_reader`: a PreReader used to take advantage of spatial locality
491    ///
492    /// # Returns
493    ///
494    /// A `Result` which is:
495    ///
496    /// - `Ok`: Number of bytes that was filled into `buf`
497    /// - `Err`: Encounted some file system error.
498    ///
499    pub fn read_at_with_pre_reader(
500        &self,
501        buf: &mut Vec<u8>,
502        num_bytes: usize,
503        offset: i64,
504        pre_reader: &mut PreReader,
505    ) -> io::Result<usize> {
506        if buf.len() < num_bytes {
507            buf.resize(num_bytes, 0);
508        }
509
510        let file_id = offset / self.segment_size;
511        let pos = offset % self.segment_size;
512
513        if pre_reader.try_read(file_id, pos, &mut buf[..num_bytes]) {
514            return Ok(num_bytes);
515        }
516
517        let opt = self.file_map.get(&file_id);
518        let f = &opt.as_ref().unwrap().value().0;
519
520        if num_bytes >= PRE_READ_BUF_SIZE {
521            panic!("Read too many bytes");
522        }
523
524        if pos + num_bytes as i64 > self.segment_size {
525            return Self::read_at_aligned(f, &mut buf[..num_bytes], pos);
526        }
527
528        let blk_size = IO_BLK_SIZE as i64;
529        let aligned_pos = (pos / blk_size) * blk_size;
530        pre_reader.fill_slice(file_id, aligned_pos, |slice| {
531            let end = usize::min(slice.len(), (self.segment_size - aligned_pos) as usize);
532            f.read_at(&mut slice[..end], aligned_pos as u64)
533                .map(|n| n as i64)
534        })?;
535
536        if !pre_reader.try_read(file_id, pos, &mut buf[..num_bytes]) {
537            panic!(
538                "Internal error: cannot read data just fetched in {} fileID {}",
539                self.dir_name, file_id
540            );
541        }
542
543        Ok(num_bytes)
544    }
545
546    /// Append a byteslice to the file. This byteslice may be temporarily held in
547    /// `buffer` before flushing.
548    ///
549    /// # Parameters
550    ///
551    /// - `bz`: the byteslice to append. It cannot be longer than `wr_buf_size` specified
552    ///         in `HPFile::new`.
553    /// - `buffer`: the write buffer. It will never be larger than `wr_buf_size`.
554    ///
555    /// # Returns
556    ///
557    /// A `Result` which is:
558    ///
559    /// - `Ok`: the start position where this byteslice locates in the file
560    /// - `Err`: Encounted some file system error.
561    ///
562    pub fn append(&self, bz: &[u8], buffer: &mut Vec<u8>) -> io::Result<i64> {
563        if self.is_empty() {
564            return Ok(0);
565        }
566
567        if bz.len() as i64 > self.buffer_size {
568            panic!("bz is too large");
569        }
570
571        let mut largest_id = self.largest_id.load(Ordering::SeqCst);
572        let start_pos = self.size();
573        let old_size = self
574            .latest_file_size
575            .fetch_add(bz.len() as i64, Ordering::SeqCst);
576        self.file_size.fetch_add(bz.len() as i64, Ordering::SeqCst);
577        let mut split_pos = 0;
578        let extra_bytes = (buffer.len() + bz.len()) as i64 - self.buffer_size;
579        if extra_bytes > 0 {
580            // flush buffer_size bytes to disk
581            split_pos = bz.len() - extra_bytes as usize;
582            buffer.extend_from_slice(&bz[0..split_pos]);
583            let mut opt = self.file_map.get_mut(&largest_id);
584            let mut f = &opt.as_mut().unwrap().value().0;
585            if f.write_all(buffer).is_err() {
586                panic!("Fail to write file");
587            }
588            self.file_size_on_disk
589                .fetch_add(buffer.len() as i64, Ordering::SeqCst);
590            buffer.clear();
591        }
592        buffer.extend_from_slice(&bz[split_pos..]); //put remained bytes into buffer
593        let overflow_byte_count = old_size + bz.len() as i64 - self.segment_size;
594        if overflow_byte_count >= 0 {
595            self.flush(buffer, true)?;
596            if self.directio {
597                let done_file = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
598                //re-open it as readonly&directio
599                // A file that has been written in non-direct mode cannot be read in direct mode unless it is reopened.
600                let mut options = File::options();
601                #[cfg(target_os = "linux")]
602                options.custom_flags(DIRECT);
603                let f = options.read(true).open(&done_file).unwrap();
604                self.file_map.insert(largest_id, Arc::new((f, true)));
605            }
606            largest_id += 1;
607            self.largest_id.fetch_add(1, Ordering::SeqCst);
608            //open new file as writable
609            let new_file = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
610            let f = match File::create_new(&new_file) {
611                Ok(file) => file,
612                Err(_) => File::options()
613                    .read(true)
614                    .write(true)
615                    .open(&new_file)
616                    .unwrap(),
617            };
618            if overflow_byte_count != 0 {
619                // write zero bytes as placeholder
620                buffer.clear();
621                buffer.resize(overflow_byte_count as usize, 0);
622            }
623            self.file_map.insert(largest_id, Arc::new((f, false)));
624            self.latest_file_size
625                .store(overflow_byte_count, Ordering::SeqCst);
626        }
627
628        Ok(start_pos)
629    }
630
631    /// Prune from the beginning to `offset`. This part of the file cannot be read hereafter.
632    pub fn prune_head(&self, offset: i64) -> io::Result<()> {
633        if self.is_empty() {
634            return Ok(());
635        }
636
637        let file_id = offset / self.segment_size;
638        let ids_to_remove: Vec<i64> = self
639            .file_map
640            .iter()
641            .filter(|entry| *entry.key() < file_id)
642            .map(|entry| *entry.key())
643            .collect();
644
645        for id in ids_to_remove {
646            self.file_map.remove(&id);
647
648            #[cfg(unix)]
649            {
650                let file_name = format!("{}/{}-{}", self.dir_name, id, self.segment_size);
651                fs::remove_file(file_name)?;
652            }
653        }
654
655        Ok(())
656    }
657}
658
659/// Pre-read a large chunk of data from file for caching
660#[derive(Debug)]
661pub struct PreReader {
662    buffer: Box<[u8]>, // size is PRE_READ_BUF_SIZE
663    file_id: i64,
664    start: i64,
665    end: i64,
666}
667
668impl Default for PreReader {
669    fn default() -> Self {
670        Self::new()
671    }
672}
673
674impl PreReader {
675    pub fn new() -> Self {
676        let v = direct_io::allocate_aligned_vec(PRE_READ_BUF_SIZE, IO_BLK_SIZE);
677        Self {
678            buffer: v.into_boxed_slice(),
679            file_id: 0,
680            start: 0,
681            end: 0,
682        }
683    }
684
685    fn fill_slice<F>(&mut self, file_id: i64, start: i64, access: F) -> io::Result<()>
686    where
687        F: FnOnce(&mut [u8]) -> io::Result<i64>,
688    {
689        self.file_id = file_id;
690        self.start = start;
691        let n = access(&mut self.buffer[..])?;
692        self.end = start + n;
693        Ok(())
694    }
695
696    fn try_read(&self, file_id: i64, start: i64, bz: &mut [u8]) -> bool {
697        if file_id == self.file_id && self.start <= start && start + bz.len() as i64 <= self.end {
698            let offset = (start - self.start) as usize;
699            bz.copy_from_slice(&self.buffer[offset..offset + bz.len()]);
700            true
701        } else {
702            false
703        }
704    }
705}
706
707pub mod direct_io {
708    use std::alloc::alloc;
709    use std::alloc::Layout;
710
711    pub fn is_aligned(ptr: *const u8, alignment: usize) -> bool {
712        (ptr as usize) % alignment == 0
713    }
714
715    pub fn allocate_aligned_vec(size: usize, alignment: usize) -> Vec<u8> {
716        assert!(
717            alignment.is_power_of_two(),
718            "Alignment must be a power of two"
719        );
720        let layout = Layout::from_size_align(size, alignment).expect("Invalid layout");
721        let ptr = unsafe { alloc(layout) };
722        if ptr.is_null() {
723            panic!("Failed to allocate memory");
724        }
725        unsafe { Vec::from_raw_parts(ptr, size, size) }
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732
733    #[test]
734    fn test_hp_file_new() {
735        let temp_dir = tempfile::Builder::new()
736            .prefix("hp_file_test")
737            .tempdir()
738            .unwrap();
739        let dir = temp_dir.path().to_str().unwrap();
740        let buffer_size = 64;
741        let segment_size = 128;
742        let hp = HPFile::new(buffer_size, segment_size, dir.to_string(), false).unwrap();
743        assert_eq!(hp.buffer_size, buffer_size);
744        assert_eq!(hp.segment_size, segment_size);
745        assert_eq!(hp.file_map.len(), 1);
746
747        let slice0 = [1; 44];
748        let mut buffer = vec![];
749        let mut pos = hp.append(slice0.as_ref(), &mut buffer).unwrap();
750        assert_eq!(0, pos);
751        assert_eq!(44, hp.size());
752
753        let slice1a = [2; 16];
754        let slice1b = [3; 10];
755        let mut slice1 = vec![];
756        slice1.extend_from_slice(&slice1a);
757        slice1.extend_from_slice(&slice1b);
758        pos = hp.append(slice1.as_ref(), &mut buffer).unwrap();
759        assert_eq!(44, pos);
760        assert_eq!(70, hp.size());
761
762        let slice2a = [4; 25];
763        let slice2b = [5; 25];
764        let mut slice2 = vec![];
765        slice2.extend_from_slice(&slice2a);
766        slice2.extend_from_slice(&slice2b);
767        pos = hp.append(slice2.as_ref(), &mut buffer).unwrap();
768        assert_eq!(70, pos);
769        assert_eq!(120, hp.size());
770
771        let mut check0 = [0; 44];
772        hp.read_at(&mut check0, 0).unwrap();
773        assert_eq!(slice0.to_vec(), check0.to_vec());
774
775        hp.flush(&mut buffer, false).unwrap();
776
777        {
778            // read_range
779            let mut check0 = [0; 44];
780            hp.read_range(&mut check0, 0).unwrap();
781            assert_eq!(slice0.to_vec(), check0.to_vec());
782
783            let mut check0 = [0; 25];
784            hp.read_range(&mut check0, 70).unwrap();
785            assert_eq!(slice2a.to_vec(), check0.to_vec());
786        }
787
788        let mut check1 = [0; 26];
789        hp.read_at(&mut check1, 44).unwrap();
790        assert_eq!(slice1, check1);
791
792        let mut check2 = [0; 50];
793        hp.read_at(&mut check2, 70).unwrap();
794        assert_eq!(slice2, check2);
795
796        let slice3 = [0; 16];
797        pos = hp.append(slice3.to_vec().as_ref(), &mut buffer).unwrap();
798        assert_eq!(120, pos);
799        assert_eq!(136, hp.size());
800
801        hp.flush(&mut buffer, false).unwrap();
802
803        let hp_new = HPFile::new(64, 128, dir.to_string(), false).unwrap();
804
805        hp_new.read_at(&mut check0, 0).unwrap();
806        assert_eq!(slice0.to_vec(), check0.to_vec());
807
808        hp_new.read_at(&mut check1, 44).unwrap();
809        assert_eq!(slice1, check1);
810
811        hp_new.read_at(&mut check2, 70).unwrap();
812        assert_eq!(slice2, check2);
813
814        let mut check3 = [0; 16];
815        hp_new.read_at(&mut check3, 120).unwrap();
816        assert_eq!(slice3.to_vec(), check3.to_vec());
817
818        hp_new.prune_head(64).unwrap();
819        hp_new.truncate(120).unwrap();
820        assert_eq!(hp_new.size(), 120);
821        let mut slice4 = vec![];
822        hp_new.read_at(&mut slice4, 120).unwrap();
823        assert_eq!(slice4.len(), 0);
824    }
825}