1pub mod file;
33pub use std::io::Read;
34
35use anyhow::{anyhow, Result};
36use dashmap::DashMap;
37use std::{
38 fs,
39 io::{self, Seek, SeekFrom, Write},
40 sync::atomic::{AtomicI64, Ordering},
41 sync::Arc,
42};
43
44use std::fs::File;
45
46#[cfg(unix)]
47use std::os::unix::fs::{FileExt, OpenOptionsExt};
48
49pub trait ReadAt {
51 fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize>;
60}
61
62#[cfg(target_os = "zkvm")]
63impl ReadAt for File {
64 fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
65 panic!("read_at is not supported in zkvm target");
66 }
70}
71
72const PRE_READ_BUF_SIZE: usize = 512 * 1024;
73pub const IO_BLK_SIZE: usize = 512;
74
75#[cfg(target_os = "linux")]
76const DIRECT: i32 = libc::O_DIRECT;
77#[cfg(not(target_os = "linux"))]
78const DIRECT: i32 = i32::MIN; type FileMap = DashMap<
81 i64,
82 Arc<(
83 File,
84 bool, )>,
86>;
87
88#[derive(Debug)]
90pub struct HPFile {
91 dir_name: String, segment_size: i64, buffer_size: i64, file_map: FileMap,
95 largest_id: AtomicI64,
96 latest_file_size: AtomicI64,
97 file_size: AtomicI64,
98 file_size_on_disk: AtomicI64,
99 directio: bool,
100}
101
102impl HPFile {
103 pub fn new(
121 wr_buf_size: i64,
122 segment_size: i64,
123 dir_name: String,
124 directio: bool,
125 ) -> Result<HPFile> {
126 if segment_size % wr_buf_size != 0 {
127 return Err(anyhow!(
128 "Invalid segmentSize:{} writeBufferSize:{}",
129 segment_size,
130 wr_buf_size
131 ));
132 }
133
134 if directio && cfg!(not(target_os = "linux")) {
135 eprintln!("Directio is only supported on Linux");
136 }
137
138 let (id_list, largest_id) = Self::get_file_ids(&dir_name, segment_size)?;
139 let (file_map, latest_file_size) =
140 Self::load_file_map(&dir_name, segment_size, id_list, largest_id, directio)?;
141
142 let file_size = largest_id * segment_size + latest_file_size;
143 Ok(HPFile {
144 dir_name: dir_name.clone(),
145 segment_size,
146 buffer_size: wr_buf_size,
147 file_map,
148 largest_id: AtomicI64::new(largest_id),
149 latest_file_size: AtomicI64::new(latest_file_size),
150 file_size: AtomicI64::new(file_size),
151 file_size_on_disk: AtomicI64::new(file_size),
152 directio,
153 })
154 }
155
156 pub fn empty() -> HPFile {
158 HPFile {
159 dir_name: "".to_owned(),
160 segment_size: 0,
161 buffer_size: 0,
162 file_map: DashMap::with_capacity(0),
163 largest_id: AtomicI64::new(0),
164 latest_file_size: AtomicI64::new(0),
165 file_size: AtomicI64::new(0),
166 file_size_on_disk: AtomicI64::new(0),
167 directio: false,
168 }
169 }
170
171 pub fn is_empty(&self) -> bool {
173 self.segment_size == 0
174 }
175
176 fn get_file_ids(dir_name: &str, segment_size: i64) -> Result<(Vec<i64>, i64)> {
177 let mut largest_id = 0;
178 let mut id_list = Vec::new();
179
180 for entry in fs::read_dir(dir_name)? {
181 let entry = entry?;
182 let path = entry.path();
183 if path.is_dir() {
184 continue;
185 }
186
187 let file_name = entry.file_name().to_string_lossy().to_string();
188 let id = Self::parse_filename(segment_size, &file_name)?;
189 largest_id = largest_id.max(id);
190 id_list.push(id);
191 }
192
193 Ok((id_list, largest_id))
194 }
195
196 fn parse_filename(segment_size: i64, file_name: &str) -> Result<i64> {
197 let parts: Vec<_> = file_name.split("-").collect();
198 if parts.len() != 2 {
199 return Err(anyhow!(
200 "{} does not match the pattern 'FileId-segmentSize'",
201 file_name
202 ));
203 }
204
205 let id: i64 = parts[0].parse()?;
206 let size: i64 = parts[1].parse()?;
207
208 if segment_size != size {
209 return Err(anyhow!("Invalid Size! {}!={}", size, segment_size));
210 }
211
212 Ok(id)
213 }
214
215 fn load_file_map(
216 dir_name: &str,
217 segment_size: i64,
218 id_list: Vec<i64>,
219 largest_id: i64,
220 directio: bool,
221 ) -> Result<(FileMap, i64)> {
222 let file_map = DashMap::new();
223 let mut latest_file_size = 0;
224
225 for &id in &id_list {
226 let file_name = format!("{dir_name}/{id}-{segment_size}");
227 let mut options = File::options();
228 let file_and_ro = if id == largest_id {
229 let file = options.read(true).write(true).open(file_name)?;
230 latest_file_size = file.metadata()?.len() as i64;
231 (file, false)
232 } else {
233 if directio {
234 #[cfg(target_os = "linux")]
235 options.custom_flags(DIRECT);
236 }
237 (options.read(true).open(file_name)?, true)
238 };
239 file_map.insert(id, Arc::new(file_and_ro));
240 }
241
242 if id_list.is_empty() {
243 let file_name = format!("{}/{}-{}", &dir_name, 0, segment_size);
244 let file = File::create_new(file_name)?;
245 file_map.insert(0, Arc::new((file, false)));
246 }
247
248 Ok((file_map, latest_file_size))
249 }
250
251 pub fn size(&self) -> i64 {
253 self.file_size.load(Ordering::SeqCst)
254 }
255
256 pub fn size_on_disk(&self) -> i64 {
258 self.file_size_on_disk.load(Ordering::SeqCst)
259 }
260
261 pub fn truncate(&self, size: i64) -> io::Result<()> {
276 if self.is_empty() {
277 return Ok(());
278 }
279
280 let mut largest_id = self.largest_id.load(Ordering::SeqCst);
281
282 while size < largest_id * self.segment_size {
283 self.file_map.remove(&largest_id);
284
285 #[cfg(unix)]
286 {
287 let file_name = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
288 fs::remove_file(file_name)?;
289 }
290
291 self.largest_id.fetch_sub(1, Ordering::SeqCst);
292 largest_id -= 1;
293 }
294
295 let remaining_size = size - largest_id * self.segment_size;
296 let file_name = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
297 let mut f = File::options().read(true).write(true).open(file_name)?;
298 f.set_len(remaining_size as u64)?;
299 f.seek(SeekFrom::End(0))?;
300
301 self.file_map.insert(largest_id, Arc::new((f, false)));
302 self.latest_file_size
303 .store(remaining_size, Ordering::SeqCst);
304 self.file_size.store(size, Ordering::SeqCst);
305 self.file_size_on_disk.store(size, Ordering::SeqCst);
306
307 Ok(())
308 }
309
310 pub fn flush(&self, buffer: &mut Vec<u8>, eof: bool) -> io::Result<()> {
325 if self.is_empty() {
326 return Ok(());
327 }
328 let largest_id = self.largest_id.load(Ordering::SeqCst);
329 let mut opt = self.file_map.get_mut(&largest_id);
330 let mut f = &opt.as_mut().unwrap().value().0;
331 if !buffer.is_empty() {
332 let tail_len = buffer.len() % IO_BLK_SIZE;
333 if eof && tail_len != 0 {
334 buffer.resize(buffer.len() + IO_BLK_SIZE - tail_len, 0);
336 }
337 f.seek(SeekFrom::End(0)).unwrap();
338 f.write_all(buffer)?;
339 self.file_size_on_disk
340 .fetch_add(buffer.len() as i64, Ordering::SeqCst);
341 buffer.clear();
342 }
343
344 f.sync_all()
345 }
346
347 pub fn close(&self) {
349 self.file_map.clear();
350 }
351
352 pub fn get_file_and_pos(&self, offset: i64) -> (Arc<(File, bool)>, i64) {
365 let file_id = offset / self.segment_size;
366 let opt = self.file_map.get(&file_id);
367 let f = opt.as_ref().unwrap().value();
368 (f.clone(), offset % self.segment_size)
369 }
370
371 pub fn read_at(&self, bz: &mut [u8], offset: i64) -> io::Result<usize> {
385 let file_id = offset / self.segment_size;
386 let pos = offset % self.segment_size;
387 let opt = self.file_map.get(&file_id);
388 let f = &opt.as_ref().unwrap().value();
389 if self.directio && f.1 {
390 Self::read_at_aligned(&f.0, bz, pos)
392 } else {
393 f.0.read_at(bz, pos as u64)
394 }
395 }
396
397 pub fn read_range(&self, buf: &mut [u8], offset: i64) -> io::Result<()> {
398 let size = self.file_size_on_disk.load(Ordering::SeqCst);
399 let end_offset = offset + buf.len() as i64;
400 if end_offset > size {
401 return Err(io::Error::new(
402 io::ErrorKind::UnexpectedEof,
403 format!("Read out of range: {} + {} > {}", offset, buf.len(), size),
404 ));
405 }
406
407 let start_file_id = offset / self.segment_size;
408 let end_file_id = end_offset / self.segment_size;
409 let mut has_read_size = 0usize;
410 for file_id in start_file_id..=end_file_id {
411 let opt = self.file_map.get(&file_id);
412 if opt.is_none() {
413 return Err(io::Error::new(
414 io::ErrorKind::NotFound,
415 format!("File ID {file_id} not found"),
416 ));
417 }
418
419 let f = &opt.as_ref().unwrap().value();
420 let pos = (offset + has_read_size as i64) % self.segment_size;
421 let read_len = if file_id == end_file_id {
422 end_offset - offset - has_read_size as i64
423 } else {
424 self.segment_size - pos
425 } as usize;
426 let size = f.0.read_at(
427 &mut buf[has_read_size..has_read_size + read_len],
428 pos as u64,
429 );
430 match size {
431 Ok(n) if n < read_len => {
432 return Err(io::Error::new(
433 io::ErrorKind::UnexpectedEof,
434 format!("Short read from file ID {file_id}: expected {read_len}, got {n}"),
435 ));
436 }
437 Ok(_) => {}
438 Err(e) => {
439 return Err(io::Error::new(
440 io::ErrorKind::UnexpectedEof,
441 format!("Failed to read from file ID {file_id}: {e}"),
442 ));
443 }
444 }
445 has_read_size += read_len;
446 }
447
448 Ok(())
449 }
450
451 fn read_at_aligned(f: &File, bz: &mut [u8], offset: i64) -> io::Result<usize> {
452 if bz.len() > 2 * IO_BLK_SIZE {
453 panic!("Cannot read more than two io blocks");
454 }
455 let off_in_blk = offset % (IO_BLK_SIZE as i64);
456 let off_start = offset - off_in_blk;
457 let mut buf = [0u8; 3 * IO_BLK_SIZE];
458 let buf_start = IO_BLK_SIZE - (buf.as_ptr() as usize % IO_BLK_SIZE);
459 let mut buf_end = buf_start + IO_BLK_SIZE;
460 if off_in_blk != 0 {
461 buf_end += IO_BLK_SIZE; }
463 let buf = &mut buf[buf_start..buf_end];
464 if buf.as_ptr() as usize % IO_BLK_SIZE != 0 {
465 panic!("Buffer still not aligned!");
466 }
467 if off_start as usize % IO_BLK_SIZE != 0 {
468 panic!("File offset still not aligned!");
469 }
470 let res = f.read_at(buf, off_start as u64);
471 if let Err(e) = res {
472 panic!("aligned {e}");
473 }
474 if let Ok(read_len) = res {
475 let copy_len = usize::min(read_len, bz.len());
476 let copy_start = off_in_blk as usize;
477 bz[..copy_len].copy_from_slice(&buf[copy_start..copy_start + copy_len]);
478 return Ok(copy_len);
479 }
480 res
481 }
482
483 pub fn read_at_with_pre_reader(
500 &self,
501 buf: &mut Vec<u8>,
502 num_bytes: usize,
503 offset: i64,
504 pre_reader: &mut PreReader,
505 ) -> io::Result<usize> {
506 if buf.len() < num_bytes {
507 buf.resize(num_bytes, 0);
508 }
509
510 let file_id = offset / self.segment_size;
511 let pos = offset % self.segment_size;
512
513 if pre_reader.try_read(file_id, pos, &mut buf[..num_bytes]) {
514 return Ok(num_bytes);
515 }
516
517 let opt = self.file_map.get(&file_id);
518 let f = &opt.as_ref().unwrap().value().0;
519
520 if num_bytes >= PRE_READ_BUF_SIZE {
521 panic!("Read too many bytes");
522 }
523
524 if pos + num_bytes as i64 > self.segment_size {
525 return Self::read_at_aligned(f, &mut buf[..num_bytes], pos);
526 }
527
528 let blk_size = IO_BLK_SIZE as i64;
529 let aligned_pos = (pos / blk_size) * blk_size;
530 pre_reader.fill_slice(file_id, aligned_pos, |slice| {
531 let end = usize::min(slice.len(), (self.segment_size - aligned_pos) as usize);
532 f.read_at(&mut slice[..end], aligned_pos as u64)
533 .map(|n| n as i64)
534 })?;
535
536 if !pre_reader.try_read(file_id, pos, &mut buf[..num_bytes]) {
537 panic!(
538 "Internal error: cannot read data just fetched in {} fileID {}",
539 self.dir_name, file_id
540 );
541 }
542
543 Ok(num_bytes)
544 }
545
546 pub fn append(&self, bz: &[u8], buffer: &mut Vec<u8>) -> io::Result<i64> {
563 if self.is_empty() {
564 return Ok(0);
565 }
566
567 if bz.len() as i64 > self.buffer_size {
568 panic!("bz is too large");
569 }
570
571 let mut largest_id = self.largest_id.load(Ordering::SeqCst);
572 let start_pos = self.size();
573 let old_size = self
574 .latest_file_size
575 .fetch_add(bz.len() as i64, Ordering::SeqCst);
576 self.file_size.fetch_add(bz.len() as i64, Ordering::SeqCst);
577 let mut split_pos = 0;
578 let extra_bytes = (buffer.len() + bz.len()) as i64 - self.buffer_size;
579 if extra_bytes > 0 {
580 split_pos = bz.len() - extra_bytes as usize;
582 buffer.extend_from_slice(&bz[0..split_pos]);
583 let mut opt = self.file_map.get_mut(&largest_id);
584 let mut f = &opt.as_mut().unwrap().value().0;
585 if f.write_all(buffer).is_err() {
586 panic!("Fail to write file");
587 }
588 self.file_size_on_disk
589 .fetch_add(buffer.len() as i64, Ordering::SeqCst);
590 buffer.clear();
591 }
592 buffer.extend_from_slice(&bz[split_pos..]); let overflow_byte_count = old_size + bz.len() as i64 - self.segment_size;
594 if overflow_byte_count >= 0 {
595 self.flush(buffer, true)?;
596 if self.directio {
597 let done_file = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
598 let mut options = File::options();
601 #[cfg(target_os = "linux")]
602 options.custom_flags(DIRECT);
603 let f = options.read(true).open(&done_file).unwrap();
604 self.file_map.insert(largest_id, Arc::new((f, true)));
605 }
606 largest_id += 1;
607 self.largest_id.fetch_add(1, Ordering::SeqCst);
608 let new_file = format!("{}/{}-{}", self.dir_name, largest_id, self.segment_size);
610 let f = match File::create_new(&new_file) {
611 Ok(file) => file,
612 Err(_) => File::options()
613 .read(true)
614 .write(true)
615 .open(&new_file)
616 .unwrap(),
617 };
618 if overflow_byte_count != 0 {
619 buffer.clear();
621 buffer.resize(overflow_byte_count as usize, 0);
622 }
623 self.file_map.insert(largest_id, Arc::new((f, false)));
624 self.latest_file_size
625 .store(overflow_byte_count, Ordering::SeqCst);
626 }
627
628 Ok(start_pos)
629 }
630
631 pub fn prune_head(&self, offset: i64) -> io::Result<()> {
633 if self.is_empty() {
634 return Ok(());
635 }
636
637 let file_id = offset / self.segment_size;
638 let ids_to_remove: Vec<i64> = self
639 .file_map
640 .iter()
641 .filter(|entry| *entry.key() < file_id)
642 .map(|entry| *entry.key())
643 .collect();
644
645 for id in ids_to_remove {
646 self.file_map.remove(&id);
647
648 #[cfg(unix)]
649 {
650 let file_name = format!("{}/{}-{}", self.dir_name, id, self.segment_size);
651 fs::remove_file(file_name)?;
652 }
653 }
654
655 Ok(())
656 }
657}
658
659#[derive(Debug)]
661pub struct PreReader {
662 buffer: Box<[u8]>, file_id: i64,
664 start: i64,
665 end: i64,
666}
667
668impl Default for PreReader {
669 fn default() -> Self {
670 Self::new()
671 }
672}
673
674impl PreReader {
675 pub fn new() -> Self {
676 let v = direct_io::allocate_aligned_vec(PRE_READ_BUF_SIZE, IO_BLK_SIZE);
677 Self {
678 buffer: v.into_boxed_slice(),
679 file_id: 0,
680 start: 0,
681 end: 0,
682 }
683 }
684
685 fn fill_slice<F>(&mut self, file_id: i64, start: i64, access: F) -> io::Result<()>
686 where
687 F: FnOnce(&mut [u8]) -> io::Result<i64>,
688 {
689 self.file_id = file_id;
690 self.start = start;
691 let n = access(&mut self.buffer[..])?;
692 self.end = start + n;
693 Ok(())
694 }
695
696 fn try_read(&self, file_id: i64, start: i64, bz: &mut [u8]) -> bool {
697 if file_id == self.file_id && self.start <= start && start + bz.len() as i64 <= self.end {
698 let offset = (start - self.start) as usize;
699 bz.copy_from_slice(&self.buffer[offset..offset + bz.len()]);
700 true
701 } else {
702 false
703 }
704 }
705}
706
707pub mod direct_io {
708 use std::alloc::alloc;
709 use std::alloc::Layout;
710
711 pub fn is_aligned(ptr: *const u8, alignment: usize) -> bool {
712 (ptr as usize) % alignment == 0
713 }
714
715 pub fn allocate_aligned_vec(size: usize, alignment: usize) -> Vec<u8> {
716 assert!(
717 alignment.is_power_of_two(),
718 "Alignment must be a power of two"
719 );
720 let layout = Layout::from_size_align(size, alignment).expect("Invalid layout");
721 let ptr = unsafe { alloc(layout) };
722 if ptr.is_null() {
723 panic!("Failed to allocate memory");
724 }
725 unsafe { Vec::from_raw_parts(ptr, size, size) }
726 }
727}
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732
733 #[test]
734 fn test_hp_file_new() {
735 let temp_dir = tempfile::Builder::new()
736 .prefix("hp_file_test")
737 .tempdir()
738 .unwrap();
739 let dir = temp_dir.path().to_str().unwrap();
740 let buffer_size = 64;
741 let segment_size = 128;
742 let hp = HPFile::new(buffer_size, segment_size, dir.to_string(), false).unwrap();
743 assert_eq!(hp.buffer_size, buffer_size);
744 assert_eq!(hp.segment_size, segment_size);
745 assert_eq!(hp.file_map.len(), 1);
746
747 let slice0 = [1; 44];
748 let mut buffer = vec![];
749 let mut pos = hp.append(slice0.as_ref(), &mut buffer).unwrap();
750 assert_eq!(0, pos);
751 assert_eq!(44, hp.size());
752
753 let slice1a = [2; 16];
754 let slice1b = [3; 10];
755 let mut slice1 = vec![];
756 slice1.extend_from_slice(&slice1a);
757 slice1.extend_from_slice(&slice1b);
758 pos = hp.append(slice1.as_ref(), &mut buffer).unwrap();
759 assert_eq!(44, pos);
760 assert_eq!(70, hp.size());
761
762 let slice2a = [4; 25];
763 let slice2b = [5; 25];
764 let mut slice2 = vec![];
765 slice2.extend_from_slice(&slice2a);
766 slice2.extend_from_slice(&slice2b);
767 pos = hp.append(slice2.as_ref(), &mut buffer).unwrap();
768 assert_eq!(70, pos);
769 assert_eq!(120, hp.size());
770
771 let mut check0 = [0; 44];
772 hp.read_at(&mut check0, 0).unwrap();
773 assert_eq!(slice0.to_vec(), check0.to_vec());
774
775 hp.flush(&mut buffer, false).unwrap();
776
777 {
778 let mut check0 = [0; 44];
780 hp.read_range(&mut check0, 0).unwrap();
781 assert_eq!(slice0.to_vec(), check0.to_vec());
782
783 let mut check0 = [0; 25];
784 hp.read_range(&mut check0, 70).unwrap();
785 assert_eq!(slice2a.to_vec(), check0.to_vec());
786 }
787
788 let mut check1 = [0; 26];
789 hp.read_at(&mut check1, 44).unwrap();
790 assert_eq!(slice1, check1);
791
792 let mut check2 = [0; 50];
793 hp.read_at(&mut check2, 70).unwrap();
794 assert_eq!(slice2, check2);
795
796 let slice3 = [0; 16];
797 pos = hp.append(slice3.to_vec().as_ref(), &mut buffer).unwrap();
798 assert_eq!(120, pos);
799 assert_eq!(136, hp.size());
800
801 hp.flush(&mut buffer, false).unwrap();
802
803 let hp_new = HPFile::new(64, 128, dir.to_string(), false).unwrap();
804
805 hp_new.read_at(&mut check0, 0).unwrap();
806 assert_eq!(slice0.to_vec(), check0.to_vec());
807
808 hp_new.read_at(&mut check1, 44).unwrap();
809 assert_eq!(slice1, check1);
810
811 hp_new.read_at(&mut check2, 70).unwrap();
812 assert_eq!(slice2, check2);
813
814 let mut check3 = [0; 16];
815 hp_new.read_at(&mut check3, 120).unwrap();
816 assert_eq!(slice3.to_vec(), check3.to_vec());
817
818 hp_new.prune_head(64).unwrap();
819 hp_new.truncate(120).unwrap();
820 assert_eq!(hp_new.size(), 120);
821 let mut slice4 = vec![];
822 hp_new.read_at(&mut slice4, 120).unwrap();
823 assert_eq!(slice4.len(), 0);
824 }
825}