1#![forbid(non_ascii_idents)]
20#![deny(
21 macro_use_extern_crate,
22 missing_copy_implementations,
23 missing_debug_implementations,
24 rust_2018_idioms,
25 rust_2021_compatibility,
26 trivial_casts,
27 trivial_numeric_casts,
28 unused_extern_crates,
29 unused_import_braces,
30 unused_qualifications
31)]
32#![warn(
33 clippy::nursery,
34 clippy::pedantic,
35 clippy::mutex_atomic,
36 clippy::rc_buffer,
37 clippy::rc_mutex,
38 )]
41#![allow(
42 clippy::cast_possible_truncation,
43 clippy::cast_possible_wrap,
44 clippy::cast_precision_loss,
45 clippy::cast_sign_loss,
46 clippy::missing_errors_doc,
47 clippy::missing_panics_doc,
48 clippy::must_use_candidate
49)]
50
51use std::cmp::min;
52use std::collections::VecDeque;
53use std::fs::{rename, File, OpenOptions};
54use std::io;
55use std::io::{Read, Seek, SeekFrom, Write};
56use std::mem::ManuallyDrop;
57use std::path::Path;
58
59use bytes::{Buf, BufMut, BytesMut};
60use snafu::{ensure, Snafu};
61
62#[derive(Debug, Snafu)]
63pub enum Error {
64 #[snafu(context(false))]
65 Io { source: std::io::Error },
66 #[snafu(display("too many elements"))]
67 TooManyElements {},
68 #[snafu(display("element too big"))]
69 ElementTooBig {},
70 #[snafu(display("corrupted file: {}", msg))]
71 CorruptedFile { msg: String },
72 #[snafu(display(
73 "unsupported version {}. supported versions is {} and legacy",
74 detected,
75 supported
76 ))]
77 UnsupportedVersion { detected: u32, supported: u32 },
78}
79
80type Result<T, E = Error> = std::result::Result<T, E>;
81
82#[derive(Debug)]
140pub struct QueueFile {
141 inner: QueueFileInner,
142 versioned: bool,
144 header_len: u64,
146 elem_cnt: usize,
148 first: Element,
150 last: Element,
152 capacity: u64,
154 overwrite_on_remove: bool,
157 skip_write_header_on_add: bool,
160 write_buf: Vec<u8>,
162 cached_offsets: VecDeque<(usize, Element)>,
165 offset_cache_kind: Option<OffsetCacheKind>,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum OffsetCacheKind {
174 Linear { offset: usize },
178 Quadratic,
182}
183
184#[derive(Debug)]
185struct QueueFileInner {
186 file: ManuallyDrop<File>,
187 file_len: u64,
189 expected_seek: u64,
191 last_seek: Option<u64>,
193 read_buffer_offset: Option<u64>,
195 read_buffer: Vec<u8>,
197 transfer_buf: Option<Box<[u8]>>,
199 sync_writes: bool,
202}
203
204impl Drop for QueueFile {
205 fn drop(&mut self) {
206 if self.skip_write_header_on_add {
207 let _ = self.sync_header();
208 }
209
210 unsafe {
211 ManuallyDrop::drop(&mut self.inner.file);
212 }
213 }
214}
215
216impl QueueFile {
217 const BLOCK_LENGTH: u64 = 4096;
218 const INITIAL_LENGTH: u64 = 4096;
219 const READ_BUFFER_SIZE: usize = 4096;
220 const VERSIONED_HEADER: u32 = 0x8000_0001;
221 const ZEROES: [u8; 4096] = [0; 4096];
222
223 fn init(path: &Path, force_legacy: bool, capacity: u64) -> Result<()> {
224 let tmp_path = path.with_extension(".tmp");
225
226 {
228 let mut file =
229 OpenOptions::new().read(true).write(true).create(true).open(&tmp_path)?;
230
231 file.set_len(capacity)?;
232
233 let mut buf = BytesMut::with_capacity(16);
234
235 if force_legacy {
236 buf.put_u32(capacity as u32);
237 } else {
238 buf.put_u32(Self::VERSIONED_HEADER);
239 buf.put_u64(capacity);
240 }
241
242 file.write_all(buf.as_ref())?;
243 }
244
245 rename(tmp_path, path)?;
247
248 Ok(())
249 }
250
251 pub fn with_capacity<P: AsRef<Path>>(path: P, capacity: u64) -> Result<Self> {
261 Self::open_internal(path, true, false, capacity)
262 }
263
264 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
274 Self::with_capacity(path, Self::INITIAL_LENGTH)
275 }
276
277 pub fn open_legacy<P: AsRef<Path>>(path: P) -> Result<Self> {
287 Self::open_internal(path, true, true, Self::INITIAL_LENGTH)
288 }
289
290 fn open_internal<P: AsRef<Path>>(
291 path: P, overwrite_on_remove: bool, force_legacy: bool, capacity: u64,
292 ) -> Result<Self> {
293 if !path.as_ref().exists() {
294 Self::init(path.as_ref(), force_legacy, capacity)?;
295 }
296
297 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
298
299 let mut buf = [0u8; 32];
300
301 let bytes_read = file.read(&mut buf)?;
302
303 ensure!(bytes_read >= 32, CorruptedFileSnafu { msg: "file too short" });
304
305 let versioned = !force_legacy && (buf[0] & 0x80) != 0;
306
307 let header_len: u64;
308 let file_len: u64;
309 let elem_cnt: usize;
310 let first_pos: u64;
311 let last_pos: u64;
312
313 let mut buf = BytesMut::from(&buf[..]);
314
315 if versioned {
316 header_len = 32;
317
318 let version = buf.get_u32() & 0x7FFF_FFFF;
319
320 ensure!(version == 1, UnsupportedVersionSnafu { detected: version, supported: 1u32 });
321
322 file_len = buf.get_u64();
323 elem_cnt = buf.get_u32() as usize;
324 first_pos = buf.get_u64();
325 last_pos = buf.get_u64();
326
327 ensure!(i64::try_from(file_len).is_ok(), CorruptedFileSnafu {
328 msg: "file length in header is greater than i64::MAX"
329 });
330 ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
331 msg: "element count in header is greater than i32::MAX"
332 });
333 ensure!(i64::try_from(first_pos).is_ok(), CorruptedFileSnafu {
334 msg: "first element position in header is greater than i64::MAX"
335 });
336 ensure!(i64::try_from(last_pos).is_ok(), CorruptedFileSnafu {
337 msg: "last element position in header is greater than i64::MAX"
338 });
339 } else {
340 header_len = 16;
341
342 file_len = u64::from(buf.get_u32());
343 elem_cnt = buf.get_u32() as usize;
344 first_pos = u64::from(buf.get_u32());
345 last_pos = u64::from(buf.get_u32());
346
347 ensure!(i32::try_from(file_len).is_ok(), CorruptedFileSnafu {
348 msg: "file length in header is greater than i32::MAX"
349 });
350 ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
351 msg: "element count in header is greater than i32::MAX"
352 });
353 ensure!(i32::try_from(first_pos).is_ok(), CorruptedFileSnafu {
354 msg: "first element position in header is greater than i32::MAX"
355 });
356 ensure!(i32::try_from(last_pos).is_ok(), CorruptedFileSnafu {
357 msg: "last element position in header is greater than i32::MAX"
358 });
359 }
360
361 let real_file_len = file.metadata()?.len();
362
363 ensure!(file_len <= real_file_len, CorruptedFileSnafu {
364 msg: format!(
365 "file is truncated. expected length was {file_len} but actual length is {real_file_len}"
366 )
367 });
368 ensure!(file_len >= header_len, CorruptedFileSnafu {
369 msg: format!("length stored in header ({file_len}) is invalid")
370 });
371 ensure!(first_pos <= file_len, CorruptedFileSnafu {
372 msg: format!("position of the first element ({first_pos}) is beyond the file")
373 });
374 ensure!(last_pos <= file_len, CorruptedFileSnafu {
375 msg: format!("position of the last element ({last_pos}) is beyond the file")
376 });
377
378 let mut queue_file = Self {
379 inner: QueueFileInner {
380 file: ManuallyDrop::new(file),
381 file_len,
382 expected_seek: 0,
383 last_seek: Some(32),
384 read_buffer_offset: None,
385 read_buffer: vec![0; Self::READ_BUFFER_SIZE],
386 transfer_buf: Some(
387 vec![0u8; QueueFileInner::TRANSFER_BUFFER_SIZE].into_boxed_slice(),
388 ),
389 sync_writes: cfg!(not(test)),
390 },
391 versioned,
392 header_len,
393 elem_cnt,
394 first: Element::EMPTY,
395 last: Element::EMPTY,
396 capacity,
397 overwrite_on_remove,
398 skip_write_header_on_add: false,
399 write_buf: Vec::new(),
400 cached_offsets: VecDeque::new(),
401 offset_cache_kind: None,
402 };
403
404 if file_len < capacity {
405 queue_file.inner.sync_set_len(queue_file.capacity)?;
406 }
407
408 queue_file.first = queue_file.read_element(first_pos)?;
409 queue_file.last = queue_file.read_element(last_pos)?;
410
411 Ok(queue_file)
412 }
413
414 #[inline]
416 pub const fn overwrite_on_remove(&self) -> bool {
417 self.overwrite_on_remove
418 }
419
420 #[deprecated(since = "1.4.7", note = "Use `overwrite_on_remove` instead.")]
421 pub const fn get_overwrite_on_remove(&self) -> bool {
422 self.overwrite_on_remove()
423 }
424
425 #[inline]
427 pub fn set_overwrite_on_remove(&mut self, value: bool) {
428 self.overwrite_on_remove = value;
429 }
430
431 #[inline]
433 pub const fn sync_writes(&self) -> bool {
434 self.inner.sync_writes
435 }
436
437 #[deprecated(since = "1.4.7", note = "Use `sync_writes` instead.")]
438 pub const fn get_sync_writes(&self) -> bool {
439 self.sync_writes()
440 }
441
442 #[inline]
444 pub fn set_sync_writes(&mut self, value: bool) {
445 self.inner.sync_writes = value;
446 }
447
448 #[inline]
450 pub const fn skip_write_header_on_add(&self) -> bool {
451 self.skip_write_header_on_add
452 }
453
454 #[deprecated(since = "1.4.7", note = "Use `skip_write_header_on_add` instead.")]
455 pub const fn get_skip_write_header_on_add(&self) -> bool {
456 self.skip_write_header_on_add()
457 }
458
459 #[inline]
461 pub fn set_skip_write_header_on_add(&mut self, value: bool) {
462 self.skip_write_header_on_add = value;
463 }
464
465 pub fn set_read_buffer_size(&mut self, size: usize) {
467 if self.inner.read_buffer.len() < size {
468 self.inner.read_buffer_offset = None;
469 }
470 self.inner.read_buffer.resize(size, 0);
471 }
472
473 #[inline]
474 pub const fn cache_offset_policy(&self) -> Option<OffsetCacheKind> {
475 self.offset_cache_kind
476 }
477
478 #[deprecated(since = "1.4.7", note = "Use `cache_offset_policy` instead.")]
479 pub const fn get_cache_offset_policy(&self) -> Option<OffsetCacheKind> {
480 self.cache_offset_policy()
481 }
482
483 #[inline]
484 pub fn set_cache_offset_policy(&mut self, kind: impl Into<Option<OffsetCacheKind>>) {
485 self.offset_cache_kind = kind.into();
486
487 if self.offset_cache_kind.is_none() {
488 self.cached_offsets.clear();
489 }
490 }
491
492 #[inline]
494 pub const fn is_empty(&self) -> bool {
495 self.elem_cnt == 0
496 }
497
498 #[inline]
500 pub const fn size(&self) -> usize {
501 self.elem_cnt
502 }
503
504 pub fn sync_all(&mut self) -> Result<()> {
506 if self.skip_write_header_on_add {
507 self.sync_header()?;
508 }
509
510 Ok(self.inner.file.sync_all()?)
511 }
512
513 fn cache_last_offset_if_needed(&mut self, affected_items: usize) {
514 if self.elem_cnt == 0 {
515 return;
516 }
517
518 self.cache_elem_if_needed(self.elem_cnt - 1, self.last, affected_items);
519 }
520
521 fn cache_elem_if_needed(&mut self, index: usize, elem: Element, affected_items: usize) {
522 debug_assert!(index <= self.elem_cnt);
523 debug_assert!(index + 1 >= affected_items);
524
525 let need_to_cache = self.offset_cache_kind.map_or(false, |kind| match kind {
526 OffsetCacheKind::Linear { offset } => {
527 let last_cached_index = self.cached_offsets.back().map_or(0, |(idx, _)| *idx);
528 index.saturating_sub(last_cached_index) >= offset
529 }
530 OffsetCacheKind::Quadratic => {
531 let x = (index as f64).sqrt() as usize;
532 x > 1 && (index + 1 - affected_items..=index).contains(&(x * x))
533 }
534 });
535
536 if need_to_cache {
537 if let Some((last_cached_index, last_cached_elem)) = self.cached_offsets.back() {
538 if *last_cached_index >= index {
539 if *last_cached_index == index {
540 debug_assert_eq!(last_cached_elem.pos, elem.pos);
541 debug_assert_eq!(last_cached_elem.len, elem.len);
542 }
543
544 return;
545 }
546 }
547
548 self.cached_offsets.push_back((index, elem));
549 }
550 }
551
552 #[inline]
553 fn cached_index_up_to(&self, i: usize) -> Option<usize> {
554 self.cached_offsets
555 .binary_search_by(|(idx, _)| idx.cmp(&i))
556 .map_or_else(|i| i.checked_sub(1), Some)
557 }
558
559 pub fn add_n(
560 &mut self, elems: impl IntoIterator<Item = impl AsRef<[u8]>> + Clone,
561 ) -> Result<()> {
562 let (count, total_len) = elems
563 .clone()
564 .into_iter()
565 .fold((0, 0), |(c, l), elem| (c + 1, l + Element::HEADER_LENGTH + elem.as_ref().len()));
566
567 if count == 0 {
568 return Ok(());
569 }
570
571 ensure!(self.elem_cnt + count < i32::max_value() as usize, TooManyElementsSnafu {});
572
573 self.expand_if_necessary(total_len as u64)?;
574
575 let was_empty = self.is_empty();
576 let mut pos = if was_empty {
577 self.header_len
578 } else {
579 self.wrap_pos(self.last.pos + Element::HEADER_LENGTH as u64 + self.last.len as u64)
580 };
581
582 let mut first_added = None;
583 let mut last_added = None;
584
585 self.write_buf.clear();
586
587 for elem in elems {
588 let elem = elem.as_ref();
589 let len = elem.len();
590
591 if first_added.is_none() {
592 first_added = Some(Element::new(pos, len)?);
593 }
594 last_added = Some(Element::new(pos, len)?);
595
596 self.write_buf.extend(&(len as u32).to_be_bytes());
597 self.write_buf.extend(elem);
598
599 pos = self.wrap_pos(pos + Element::HEADER_LENGTH as u64 + len as u64);
600 }
601
602 let first_added = first_added.unwrap();
603 self.ring_write_buf(first_added.pos)?;
604
605 if was_empty {
606 self.first = first_added;
607 }
608 self.last = last_added.unwrap();
609
610 self.write_header(self.file_len(), self.elem_cnt + count, self.first.pos, self.last.pos)?;
611 self.elem_cnt += count;
612
613 self.cache_last_offset_if_needed(count);
614
615 Ok(())
616 }
617
618 #[inline]
620 pub fn add(&mut self, buf: &[u8]) -> Result<()> {
621 self.add_n(std::iter::once(buf))
622 }
623
624 pub fn peek(&mut self) -> Result<Option<Box<[u8]>>> {
626 if self.is_empty() {
627 Ok(None)
628 } else {
629 let len = self.first.len;
630 let mut data = vec![0; len].into_boxed_slice();
631
632 self.ring_read(self.first.pos + Element::HEADER_LENGTH as u64, &mut data)?;
633
634 Ok(Some(data))
635 }
636 }
637
638 #[inline]
640 pub fn remove(&mut self) -> Result<()> {
641 self.remove_n(1)
642 }
643
644 pub fn remove_n(&mut self, n: usize) -> Result<()> {
646 if n == 0 || self.is_empty() {
647 return Ok(());
648 }
649
650 if n >= self.elem_cnt {
651 return self.clear();
652 }
653
654 debug_assert!(
655 self.cached_offsets
656 .iter()
657 .zip(self.cached_offsets.iter().skip(1))
658 .all(|(a, b)| a.0 < b.0),
659 "{:?}",
660 self.cached_offsets
661 );
662
663 let erase_start_pos = self.first.pos;
664 let mut erase_total_len = 0usize;
665
666 let mut new_first_pos = self.first.pos;
668 let mut new_first_len = self.first.len;
669
670 let cached_index = self.cached_index_up_to(n - 1);
671 let to_remove = if let Some(i) = cached_index {
672 let (index, elem) = self.cached_offsets[i];
673
674 if let Some(index) = index.checked_sub(1) {
675 erase_total_len += Element::HEADER_LENGTH * index;
676 erase_total_len += (elem.pos
677 + if self.first.pos < elem.pos {
678 0
679 } else {
680 self.file_len() - self.first.pos - self.header_len
681 }) as usize;
682 }
683
684 new_first_pos = elem.pos;
685 new_first_len = elem.len;
686 n - index
687 } else {
688 n
689 };
690
691 for _ in 0..to_remove {
692 erase_total_len += Element::HEADER_LENGTH + new_first_len;
693 new_first_pos =
694 self.wrap_pos(new_first_pos + Element::HEADER_LENGTH as u64 + new_first_len as u64);
695
696 let mut buf: [u8; 4] = [0; 4];
697 self.ring_read(new_first_pos, &mut buf)?;
698 new_first_len = u32::from_be_bytes(buf) as usize;
699 }
700
701 self.write_header(self.file_len(), self.elem_cnt - n, new_first_pos, self.last.pos)?;
703 self.elem_cnt -= n;
704 self.first = Element::new(new_first_pos, new_first_len)?;
705
706 if let Some(cached_index) = cached_index {
707 self.cached_offsets.drain(..=cached_index);
708 }
709 self.cached_offsets.iter_mut().for_each(|(i, _)| *i -= n);
710
711 if self.overwrite_on_remove {
712 self.ring_erase(erase_start_pos, erase_total_len)?;
713 }
714
715 Ok(())
716 }
717
718 pub fn clear(&mut self) -> Result<()> {
720 self.write_header(self.capacity, 0, 0, 0)?;
722
723 if self.overwrite_on_remove {
724 self.inner.seek(self.header_len);
725 let first_block = self.capacity.min(Self::BLOCK_LENGTH) - self.header_len;
726 self.inner.write(&Self::ZEROES[..first_block as usize])?;
727
728 if let Some(left) = self.capacity.checked_sub(Self::BLOCK_LENGTH) {
729 for _ in 0..left / Self::BLOCK_LENGTH {
730 self.inner.write(&Self::ZEROES)?;
731 }
732
733 let tail = left % Self::BLOCK_LENGTH;
734
735 if tail != 0 {
736 self.inner.write(&Self::ZEROES[..tail as usize])?;
737 }
738 }
739 }
740
741 self.cached_offsets.clear();
742
743 self.elem_cnt = 0;
744 self.first = Element::EMPTY;
745 self.last = Element::EMPTY;
746
747 if self.file_len() > self.capacity {
748 self.inner.sync_set_len(self.capacity)?;
749 }
750
751 Ok(())
752 }
753
754 pub fn iter(&mut self) -> Iter<'_> {
769 let pos = self.first.pos;
770
771 Iter {
772 buffer: std::mem::take(&mut self.write_buf),
775 queue_file: self,
776 next_elem_index: 0,
777 next_elem_pos: pos,
778 }
779 }
780
781 #[inline]
784 pub const fn file_len(&self) -> u64 {
785 self.inner.file_len
786 }
787
788 #[inline]
790 pub const fn used_bytes(&self) -> u64 {
791 if self.elem_cnt == 0 {
792 self.header_len
793 } else if self.last.pos >= self.first.pos {
794 (self.last.pos - self.first.pos)
796 + Element::HEADER_LENGTH as u64
797 + self.last.len as u64
798 + self.header_len
799 } else {
800 self.last.pos + Element::HEADER_LENGTH as u64 + self.last.len as u64 + self.file_len()
802 - self.first.pos
803 }
804 }
805
806 pub fn into_inner_file(mut self) -> File {
808 if self.skip_write_header_on_add {
809 let _ = self.sync_header();
810 }
811
812 let file = unsafe { ManuallyDrop::take(&mut self.inner.file) };
813 std::mem::forget(self);
814
815 file
816 }
817
818 #[inline]
819 const fn remaining_bytes(&self) -> u64 {
820 self.file_len() - self.used_bytes()
821 }
822
823 fn sync_header(&mut self) -> Result<()> {
824 self.write_header(self.file_len(), self.size(), self.first.pos, self.last.pos)
825 }
826
827 fn write_header(
832 &mut self, file_len: u64, elem_cnt: usize, first_pos: u64, last_pos: u64,
833 ) -> Result<()> {
834 let mut header = [0; 32];
835 let mut header_buf = &mut header[..];
836
837 if self.versioned {
839 ensure!(i64::try_from(file_len).is_ok(), CorruptedFileSnafu {
840 msg: "file length in header will exceed i64::MAX"
841 });
842 ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
843 msg: "element count in header will exceed i32::MAX"
844 });
845 ensure!(i64::try_from(first_pos).is_ok(), CorruptedFileSnafu {
846 msg: "first element position in header will exceed i64::MAX"
847 });
848 ensure!(i64::try_from(last_pos).is_ok(), CorruptedFileSnafu {
849 msg: "last element position in header will exceed i64::MAX"
850 });
851
852 header_buf.put_u32(Self::VERSIONED_HEADER);
853 header_buf.put_u64(file_len);
854 header_buf.put_i32(elem_cnt as i32);
855 header_buf.put_u64(first_pos);
856 header_buf.put_u64(last_pos);
857 } else {
858 ensure!(i32::try_from(file_len).is_ok(), CorruptedFileSnafu {
859 msg: "file length in header will exceed i32::MAX"
860 });
861 ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
862 msg: "element count in header will exceed i32::MAX"
863 });
864 ensure!(i32::try_from(first_pos).is_ok(), CorruptedFileSnafu {
865 msg: "first element position in header will exceed i32::MAX"
866 });
867 ensure!(i32::try_from(last_pos).is_ok(), CorruptedFileSnafu {
868 msg: "last element position in header will exceed i32::MAX"
869 });
870
871 header_buf.put_i32(file_len as i32);
872 header_buf.put_i32(elem_cnt as i32);
873 header_buf.put_i32(first_pos as i32);
874 header_buf.put_i32(last_pos as i32);
875 }
876
877 self.inner.seek(0);
878 self.inner.write(&header.as_ref()[..self.header_len as usize])
879 }
880
881 fn read_element(&mut self, pos: u64) -> Result<Element> {
882 if pos == 0 {
883 Ok(Element::EMPTY)
884 } else {
885 let mut buf: [u8; 4] = [0; Element::HEADER_LENGTH];
886 self.ring_read(pos, &mut buf)?;
887
888 Element::new(pos, u32::from_be_bytes(buf) as usize)
889 }
890 }
891
892 #[inline]
894 const fn wrap_pos(&self, pos: u64) -> u64 {
895 if pos < self.file_len() { pos } else { self.header_len + pos - self.file_len() }
896 }
897
898 fn ring_write_buf(&mut self, pos: u64) -> Result<()> {
901 let pos = self.wrap_pos(pos);
902
903 if pos + self.write_buf.len() as u64 <= self.file_len() {
904 self.inner.seek(pos);
905 self.inner.write(&self.write_buf)
906 } else {
907 let before_eof = (self.file_len() - pos) as usize;
908
909 self.inner.seek(pos);
910 self.inner.write(&self.write_buf[..before_eof])?;
911 self.inner.seek(self.header_len);
912 self.inner.write(&self.write_buf[before_eof..])
913 }
914 }
915
916 fn ring_erase(&mut self, pos: u64, n: usize) -> Result<()> {
917 let mut pos = pos;
918 let mut len = n;
919
920 self.write_buf.clear();
921 self.write_buf.extend(Self::ZEROES);
922
923 while len > 0 {
924 let chunk_len = min(len, Self::ZEROES.len());
925 self.write_buf.truncate(chunk_len);
926
927 self.ring_write_buf(pos)?;
928
929 len -= chunk_len;
930 pos += chunk_len as u64;
931 }
932
933 Ok(())
934 }
935
936 fn ring_read(&mut self, pos: u64, buf: &mut [u8]) -> io::Result<()> {
938 let pos = self.wrap_pos(pos);
939
940 if pos + buf.len() as u64 <= self.file_len() {
941 self.inner.seek(pos);
942 self.inner.read(buf)
943 } else {
944 let before_eof = (self.file_len() - pos) as usize;
945
946 self.inner.seek(pos);
947 self.inner.read(&mut buf[..before_eof])?;
948 self.inner.seek(self.header_len);
949 self.inner.read(&mut buf[before_eof..])
950 }
951 }
952
953 fn expand_if_necessary(&mut self, data_len: u64) -> Result<()> {
955 let mut rem_bytes = self.remaining_bytes();
956
957 if rem_bytes >= data_len {
958 return Ok(());
959 }
960
961 let orig_file_len = self.file_len();
962 let mut prev_len = orig_file_len;
963 let mut new_len = prev_len;
964
965 while rem_bytes < data_len {
966 rem_bytes += prev_len;
967 new_len = prev_len << 1;
968 prev_len = new_len;
969 }
970
971 let bytes_used_before = self.used_bytes();
972
973 let end_of_last_elem =
975 self.wrap_pos(self.last.pos + Element::HEADER_LENGTH as u64 + self.last.len as u64);
976 self.inner.sync_set_len(new_len)?;
977
978 let mut count = 0u64;
979
980 if end_of_last_elem <= self.first.pos {
982 count = end_of_last_elem - self.header_len;
983
984 self.inner.transfer(self.header_len, orig_file_len, count)?;
985 }
986
987 if self.last.pos < self.first.pos {
989 let new_last_pos = orig_file_len + self.last.pos - self.header_len;
990 self.last = Element::new(new_last_pos, self.last.len)?;
991 }
992
993 self.cached_offsets.clear();
995
996 if self.overwrite_on_remove {
997 self.ring_erase(self.header_len, count as usize)?;
998 }
999
1000 let bytes_used_after = self.used_bytes();
1001 debug_assert_eq!(bytes_used_before, bytes_used_after);
1002
1003 Ok(())
1004 }
1005}
1006
1007impl QueueFileInner {
1009 const TRANSFER_BUFFER_SIZE: usize = 128 * 1024;
1010
1011 #[inline]
1012 fn seek(&mut self, pos: u64) -> u64 {
1013 self.expected_seek = pos;
1014
1015 pos
1016 }
1017
1018 fn real_seek(&mut self) -> io::Result<u64> {
1019 if Some(self.expected_seek) == self.last_seek {
1020 return Ok(self.expected_seek);
1021 }
1022
1023 let res = self.file.seek(SeekFrom::Start(self.expected_seek));
1024 self.last_seek = res.as_ref().ok().copied();
1025
1026 res
1027 }
1028
1029 fn read(&mut self, buf: &mut [u8]) -> io::Result<()> {
1030 if buf.is_empty() {
1031 return Ok(());
1032 }
1033
1034 let size = buf.len();
1035
1036 let not_enough_data = if let Some(left) = self.read_buffer.len().checked_sub(size) {
1037 self.read_buffer_offset
1038 .and_then(|o| self.expected_seek.checked_sub(o))
1039 .and_then(|skip| left.checked_sub(skip as usize))
1040 .is_none()
1041 } else {
1042 self.read_buffer.resize(size, 0);
1043
1044 true
1045 };
1046
1047 if not_enough_data {
1048 use std::io::{Error, ErrorKind};
1049
1050 self.real_seek()?;
1051
1052 let mut read = 0;
1053 let mut res = Ok(());
1054
1055 while !buf.is_empty() {
1056 match self.file.read(&mut self.read_buffer[read..]) {
1057 Ok(0) => break,
1058 Ok(n) => read += n,
1059 Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
1060 Err(e) => {
1061 res = Err(e);
1062 break;
1063 }
1064 }
1065 }
1066
1067 if res.is_ok() && read < size {
1068 res = Err(Error::new(ErrorKind::UnexpectedEof, "failed to fill whole buffer"));
1069 }
1070
1071 if let Err(err) = res {
1072 self.read_buffer_offset = None;
1073 self.last_seek = None;
1074
1075 return Err(err);
1076 }
1077
1078 self.read_buffer_offset = Some(self.expected_seek);
1079
1080 if let Some(seek) = &mut self.last_seek {
1081 *seek += read as u64;
1082 }
1083 }
1084
1085 let start = (self.expected_seek - self.read_buffer_offset.unwrap()) as usize;
1086
1087 buf.copy_from_slice(&self.read_buffer[start..start + size]);
1088
1089 Ok(())
1090 }
1091
1092 fn write(&mut self, buf: &[u8]) -> Result<()> {
1093 self.real_seek()?;
1094
1095 self.file.write_all(buf)?;
1096
1097 if let Some(seek) = &mut self.last_seek {
1098 *seek += buf.len() as u64;
1099 }
1100
1101 if let Some(read_buffer_offset) = self.read_buffer_offset {
1102 let write_size_u64 = buf.len() as u64;
1103 let read_buffer_end_offset = read_buffer_offset + self.read_buffer.len() as u64;
1104 let read_buffered = read_buffer_offset..read_buffer_end_offset;
1105
1106 let has_start = read_buffered.contains(&self.expected_seek);
1107 let buf_end = self.expected_seek + write_size_u64;
1108 let has_end = read_buffered.contains(&buf_end);
1109
1110 match (has_start, has_end) {
1111 (true, true) => {
1114 let start = (self.expected_seek - read_buffer_offset) as usize;
1115
1116 self.read_buffer[start..start + buf.len()].copy_from_slice(buf);
1117 }
1118 (false, true) => {
1121 let need_to_skip = (read_buffer_offset - self.expected_seek) as usize;
1122 let need_to_copy = buf.len() - need_to_skip;
1123
1124 self.read_buffer[..need_to_copy].copy_from_slice(&buf[need_to_skip..]);
1125 }
1126 (true, false) => {
1129 let need_to_skip = (self.expected_seek - read_buffer_offset) as usize;
1130 let need_to_copy = self.read_buffer.len() - need_to_skip;
1131
1132 self.read_buffer[need_to_skip..need_to_skip + need_to_copy]
1133 .copy_from_slice(&buf[..need_to_copy]);
1134 }
1135 (false, false)
1138 if (self.expected_seek + 1..buf_end).contains(&read_buffer_offset) =>
1139 {
1140 let need_to_skip = (read_buffer_offset - self.expected_seek) as usize;
1141 let need_to_copy = self.read_buffer.len();
1142
1143 self.read_buffer[..]
1144 .copy_from_slice(&buf[need_to_skip..need_to_skip + need_to_copy]);
1145 }
1146 (false, false) => {}
1148 }
1149 }
1150
1151 if self.sync_writes {
1152 self.file.sync_data()?;
1153 }
1154
1155 Ok(())
1156 }
1157
1158 fn transfer_inner(
1159 &mut self, buf: &mut [u8], mut read_pos: u64, mut write_pos: u64, count: u64,
1160 ) -> Result<()> {
1161 debug_assert!(read_pos < self.file_len);
1162 debug_assert!(write_pos <= self.file_len);
1163 debug_assert!(count < self.file_len);
1164 debug_assert!(i64::try_from(count).is_ok());
1165
1166 let mut bytes_left = count as i64;
1167
1168 while bytes_left > 0 {
1169 self.seek(read_pos);
1170 let bytes_to_read = min(bytes_left as usize, Self::TRANSFER_BUFFER_SIZE);
1171 self.read(&mut buf[..bytes_to_read])?;
1172
1173 self.seek(write_pos);
1174 self.write(&buf[..bytes_to_read])?;
1175
1176 read_pos += bytes_to_read as u64;
1177 write_pos += bytes_to_read as u64;
1178 bytes_left -= bytes_to_read as i64;
1179 }
1180
1181 if self.sync_writes {
1183 self.file.sync_data()?;
1184 }
1185
1186 Ok(())
1187 }
1188
1189 fn transfer(&mut self, read_pos: u64, write_pos: u64, count: u64) -> Result<()> {
1191 let mut buf = self.transfer_buf.take().unwrap();
1192 let res = self.transfer_inner(&mut buf, read_pos, write_pos, count);
1193 self.transfer_buf = Some(buf);
1194
1195 res
1196 }
1197
1198 fn sync_set_len(&mut self, new_len: u64) -> io::Result<()> {
1199 self.file.set_len(new_len)?;
1200 self.file_len = new_len;
1201 self.file.sync_all()
1202 }
1203}
1204
1205#[derive(Copy, Clone, Debug)]
1206struct Element {
1207 pos: u64,
1208 len: usize,
1209}
1210
1211impl Element {
1212 const EMPTY: Self = Self { pos: 0, len: 0 };
1213 const HEADER_LENGTH: usize = 4;
1214
1215 #[inline]
1216 fn new(pos: u64, len: usize) -> Result<Self> {
1217 ensure!(i64::try_from(pos).is_ok(), CorruptedFileSnafu {
1218 msg: "element position must be less or equal to i64::MAX"
1219 });
1220 ensure!(i32::try_from(len).is_ok(), ElementTooBigSnafu);
1221
1222 Ok(Self { pos, len })
1223 }
1224}
1225
1226#[derive(Debug)]
1228pub struct Iter<'a> {
1229 queue_file: &'a mut QueueFile,
1230 buffer: Vec<u8>,
1231 next_elem_index: usize,
1232 next_elem_pos: u64,
1233}
1234
1235impl<'a> Iterator for Iter<'a> {
1236 type Item = Box<[u8]>;
1237
1238 fn next(&mut self) -> Option<Self::Item> {
1239 let buffer = self.borrowed_next()?;
1240
1241 Some(buffer.to_vec().into_boxed_slice())
1242 }
1243
1244 fn size_hint(&self) -> (usize, Option<usize>) {
1245 let elems_left = self.queue_file.elem_cnt - self.next_elem_index;
1246
1247 (elems_left, Some(elems_left))
1248 }
1249
1250 fn nth(&mut self, n: usize) -> Option<Self::Item> {
1251 if self.queue_file.elem_cnt - self.next_elem_index < n {
1252 self.next_elem_index = self.queue_file.elem_cnt;
1253
1254 return None;
1255 }
1256
1257 let left = if let Some(i) = self.queue_file.cached_index_up_to(n) {
1258 let (index, elem) = self.queue_file.cached_offsets[i];
1259 if index > self.next_elem_index {
1260 self.next_elem_index = index;
1261 self.next_elem_pos = elem.pos;
1262 }
1263
1264 n - self.next_elem_index
1265 } else {
1266 n
1267 };
1268
1269 for _ in 0..left {
1270 self.borrowed_next();
1271 }
1272
1273 self.next()
1274 }
1275}
1276
1277impl Iter<'_> {
1278 pub fn borrowed_next(&mut self) -> Option<&[u8]> {
1282 if self.next_elem_index >= self.queue_file.elem_cnt {
1283 return None;
1284 }
1285
1286 let current = self.queue_file.read_element(self.next_elem_pos).ok()?;
1287 self.next_elem_pos = self.queue_file.wrap_pos(current.pos + Element::HEADER_LENGTH as u64);
1288
1289 if current.len > self.buffer.len() {
1290 self.buffer.resize(current.len, 0);
1291 }
1292 self.queue_file.ring_read(self.next_elem_pos, &mut self.buffer[..current.len]).ok()?;
1293
1294 self.next_elem_pos = self
1295 .queue_file
1296 .wrap_pos(current.pos + Element::HEADER_LENGTH as u64 + current.len as u64);
1297
1298 self.queue_file.cache_elem_if_needed(self.next_elem_index, current, 1);
1299 self.next_elem_index += 1;
1300
1301 Some(&self.buffer[..current.len])
1302 }
1303}
1304
1305impl Drop for Iter<'_> {
1306 fn drop(&mut self) {
1307 self.queue_file.write_buf = std::mem::take(&mut self.buffer);
1308 }
1309}