queue_file/
lib.rs

1// Java version: Copyright (C) 2010 Square, Inc.
2// Rust version: Copyright (C) 2019 ING Systems
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//      http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! `queue-file` crate is a feature complete and binary compatible port of `QueueFile` class from
17//! Tape2 by Square, Inc. Check the original project [here](https://github.com/square/tape).
18
19#![forbid(non_ascii_idents)]
20#![deny(
21    macro_use_extern_crate,
22    missing_copy_implementations,
23    missing_debug_implementations,
24    rust_2018_idioms,
25    rust_2021_compatibility,
26    trivial_casts,
27    trivial_numeric_casts,
28    unused_extern_crates,
29    unused_import_braces,
30    unused_qualifications
31)]
32#![warn(
33    clippy::nursery,
34    clippy::pedantic,
35    clippy::mutex_atomic,
36    clippy::rc_buffer,
37    clippy::rc_mutex,
38    // clippy::expect_used,
39    // clippy::unwrap_used,
40)]
41#![allow(
42    clippy::cast_possible_truncation,
43    clippy::cast_possible_wrap,
44    clippy::cast_precision_loss,
45    clippy::cast_sign_loss,
46    clippy::missing_errors_doc,
47    clippy::missing_panics_doc,
48    clippy::must_use_candidate
49)]
50
51use std::cmp::min;
52use std::collections::VecDeque;
53use std::fs::{rename, File, OpenOptions};
54use std::io;
55use std::io::{Read, Seek, SeekFrom, Write};
56use std::mem::ManuallyDrop;
57use std::path::Path;
58
59use bytes::{Buf, BufMut, BytesMut};
60use snafu::{ensure, Snafu};
61
62#[derive(Debug, Snafu)]
63pub enum Error {
64    #[snafu(context(false))]
65    Io { source: std::io::Error },
66    #[snafu(display("too many elements"))]
67    TooManyElements {},
68    #[snafu(display("element too big"))]
69    ElementTooBig {},
70    #[snafu(display("corrupted file: {}", msg))]
71    CorruptedFile { msg: String },
72    #[snafu(display(
73        "unsupported version {}. supported versions is {} and legacy",
74        detected,
75        supported
76    ))]
77    UnsupportedVersion { detected: u32, supported: u32 },
78}
79
80type Result<T, E = Error> = std::result::Result<T, E>;
81
82/// `QueueFile` is a lightning-fast, transactional, file-based FIFO.
83///
84/// Addition and removal from an instance is an O(1) operation and is atomic.
85/// Writes are synchronous by default; data will be written to disk before an operation returns.
86///
87/// The underlying file. Uses a ring buffer to store entries. Designed so that a modification
88/// isn't committed or visible until we write the header. The header is much smaller than a
89/// segment. So long as the underlying file system supports atomic segment writes, changes to the
90/// queue are atomic. Storing the file length ensures we can recover from a failed expansion
91/// (i.e. if setting the file length succeeds but the process dies before the data can be copied).
92///
93/// # Example
94/// ```
95/// use queue_file::QueueFile;
96///
97/// # let path = auto_delete_path::AutoDeletePath::temp();
98/// let mut qf = QueueFile::open(path)
99///     .expect("cannot open queue file");
100/// let data = "Welcome to QueueFile!".as_bytes();
101///
102/// qf.add(&data).expect("add failed");
103///
104/// if let Ok(Some(bytes)) = qf.peek() {
105///     assert_eq!(data, bytes.as_ref());
106/// }
107///
108/// qf.remove().expect("remove failed");
109/// ```
110/// # File format
111///
112/// ```text
113///   16-32 bytes      Header
114///   ...              Data
115/// ```
116/// This implementation supports two versions of the header format.
117/// ```text
118/// Versioned Header (32 bytes):
119///   1 bit            Versioned indicator [0 = legacy, 1 = versioned]
120///   31 bits          Version, always 1
121///   8 bytes          File length
122///   4 bytes          Element count
123///   8 bytes          Head element position
124///   8 bytes          Tail element position
125///
126/// Legacy Header (16 bytes):
127///   1 bit            Legacy indicator, always 0
128///   31 bits          File length
129///   4 bytes          Element count
130///   4 bytes          Head element position
131///   4 bytes          Tail element position
132/// ```
133/// Each element stored is represented by:
134/// ```text
135/// Element:
136///   4 bytes          Data length
137///   ...              Data
138/// ```
139#[derive(Debug)]
140pub struct QueueFile {
141    inner: QueueFileInner,
142    /// True when using the versioned header format. Otherwise use the legacy format.
143    versioned: bool,
144    /// The header length in bytes: 16 or 32.
145    header_len: u64,
146    /// Number of elements.
147    elem_cnt: usize,
148    /// Pointer to first (or eldest) element.
149    first: Element,
150    /// Pointer to last (or newest) element.
151    last: Element,
152    /// Minimum number of bytes the file shrinks to.
153    capacity: u64,
154    /// When true, removing an element will also overwrite data with zero bytes.
155    /// It's true by default.
156    overwrite_on_remove: bool,
157    /// When true, skips header update upon adding.
158    /// It's false by default.
159    skip_write_header_on_add: bool,
160    /// Write buffering.
161    write_buf: Vec<u8>,
162    /// Offset cache idx->Element. Sorted in ascending order, always unique.
163    /// Indices form perfect squares though may skew after removal.
164    cached_offsets: VecDeque<(usize, Element)>,
165    /// Offset caching policy.
166    offset_cache_kind: Option<OffsetCacheKind>,
167}
168
169/// Policy for offset caching if enabled.
170/// Notice that offsets frequency might be skewed due after series of adding/removal.
171/// This shall not affect functional properties, only performance one.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum OffsetCacheKind {
174    /// Linear offseting.
175    ///
176    /// Next offset would be cached after `offset` additions.
177    Linear { offset: usize },
178    /// Quadratic offseting.
179    ///
180    /// Cached offsets form a sequence of perfect squares (e.g. cached 1st, 4th, 9th, .. offsets).
181    Quadratic,
182}
183
184#[derive(Debug)]
185struct QueueFileInner {
186    file: ManuallyDrop<File>,
187    /// Cached file length. Always a power of 2.
188    file_len: u64,
189    /// Intention seek offset.
190    expected_seek: u64,
191    /// Real last seek offset.
192    last_seek: Option<u64>,
193    /// Offset for the next read from buffer.
194    read_buffer_offset: Option<u64>,
195    /// Buffer for reads.
196    read_buffer: Vec<u8>,
197    /// Buffer used by `transfer` function.
198    transfer_buf: Option<Box<[u8]>>,
199    /// When true, every write to file will be followed by `sync_data()` call.
200    /// It's true by default.
201    sync_writes: bool,
202}
203
204impl Drop for QueueFile {
205    fn drop(&mut self) {
206        if self.skip_write_header_on_add {
207            let _ = self.sync_header();
208        }
209
210        unsafe {
211            ManuallyDrop::drop(&mut self.inner.file);
212        }
213    }
214}
215
216impl QueueFile {
217    const BLOCK_LENGTH: u64 = 4096;
218    const INITIAL_LENGTH: u64 = 4096;
219    const READ_BUFFER_SIZE: usize = 4096;
220    const VERSIONED_HEADER: u32 = 0x8000_0001;
221    const ZEROES: [u8; 4096] = [0; 4096];
222
223    fn init(path: &Path, force_legacy: bool, capacity: u64) -> Result<()> {
224        let tmp_path = path.with_extension(".tmp");
225
226        // Use a temp file so we don't leave a partially-initialized file.
227        {
228            let mut file =
229                OpenOptions::new().read(true).write(true).create(true).open(&tmp_path)?;
230
231            file.set_len(capacity)?;
232
233            let mut buf = BytesMut::with_capacity(16);
234
235            if force_legacy {
236                buf.put_u32(capacity as u32);
237            } else {
238                buf.put_u32(Self::VERSIONED_HEADER);
239                buf.put_u64(capacity);
240            }
241
242            file.write_all(buf.as_ref())?;
243        }
244
245        // A rename is atomic.
246        rename(tmp_path, path)?;
247
248        Ok(())
249    }
250
251    /// Open or create [`QueueFile`] at `path` with specified minimal file size.
252    ///
253    /// # Example
254    ///
255    /// ```
256    /// # use queue_file::QueueFile;
257    /// # let path = auto_delete_path::AutoDeletePath::temp();
258    /// let qf = QueueFile::with_capacity(path, 120).expect("failed to open queue");
259    /// ```
260    pub fn with_capacity<P: AsRef<Path>>(path: P, capacity: u64) -> Result<Self> {
261        Self::open_internal(path, true, false, capacity)
262    }
263
264    /// Open or create [`QueueFile`] at `path`.
265    ///
266    /// # Example
267    ///
268    /// ```
269    /// # use queue_file::QueueFile;
270    /// # let path = auto_delete_path::AutoDeletePath::temp();
271    /// let qf = QueueFile::open(path).expect("failed to open queue");
272    /// ```
273    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
274        Self::with_capacity(path, Self::INITIAL_LENGTH)
275    }
276
277    /// Open or create [`QueueFile`] at `path` forcing legacy format.
278    ///
279    /// # Example
280    ///
281    /// ```
282    /// # use queue_file::QueueFile;
283    /// # let path = auto_delete_path::AutoDeletePath::temp();
284    /// let qf = QueueFile::open_legacy(path).expect("failed to open queue");
285    /// ```
286    pub fn open_legacy<P: AsRef<Path>>(path: P) -> Result<Self> {
287        Self::open_internal(path, true, true, Self::INITIAL_LENGTH)
288    }
289
290    fn open_internal<P: AsRef<Path>>(
291        path: P, overwrite_on_remove: bool, force_legacy: bool, capacity: u64,
292    ) -> Result<Self> {
293        if !path.as_ref().exists() {
294            Self::init(path.as_ref(), force_legacy, capacity)?;
295        }
296
297        let mut file = OpenOptions::new().read(true).write(true).open(path)?;
298
299        let mut buf = [0u8; 32];
300
301        let bytes_read = file.read(&mut buf)?;
302
303        ensure!(bytes_read >= 32, CorruptedFileSnafu { msg: "file too short" });
304
305        let versioned = !force_legacy && (buf[0] & 0x80) != 0;
306
307        let header_len: u64;
308        let file_len: u64;
309        let elem_cnt: usize;
310        let first_pos: u64;
311        let last_pos: u64;
312
313        let mut buf = BytesMut::from(&buf[..]);
314
315        if versioned {
316            header_len = 32;
317
318            let version = buf.get_u32() & 0x7FFF_FFFF;
319
320            ensure!(version == 1, UnsupportedVersionSnafu { detected: version, supported: 1u32 });
321
322            file_len = buf.get_u64();
323            elem_cnt = buf.get_u32() as usize;
324            first_pos = buf.get_u64();
325            last_pos = buf.get_u64();
326
327            ensure!(i64::try_from(file_len).is_ok(), CorruptedFileSnafu {
328                msg: "file length in header is greater than i64::MAX"
329            });
330            ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
331                msg: "element count in header is greater than i32::MAX"
332            });
333            ensure!(i64::try_from(first_pos).is_ok(), CorruptedFileSnafu {
334                msg: "first element position in header is greater than i64::MAX"
335            });
336            ensure!(i64::try_from(last_pos).is_ok(), CorruptedFileSnafu {
337                msg: "last element position in header is greater than i64::MAX"
338            });
339        } else {
340            header_len = 16;
341
342            file_len = u64::from(buf.get_u32());
343            elem_cnt = buf.get_u32() as usize;
344            first_pos = u64::from(buf.get_u32());
345            last_pos = u64::from(buf.get_u32());
346
347            ensure!(i32::try_from(file_len).is_ok(), CorruptedFileSnafu {
348                msg: "file length in header is greater than i32::MAX"
349            });
350            ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
351                msg: "element count in header is greater than i32::MAX"
352            });
353            ensure!(i32::try_from(first_pos).is_ok(), CorruptedFileSnafu {
354                msg: "first element position in header is greater than i32::MAX"
355            });
356            ensure!(i32::try_from(last_pos).is_ok(), CorruptedFileSnafu {
357                msg: "last element position in header is greater than i32::MAX"
358            });
359        }
360
361        let real_file_len = file.metadata()?.len();
362
363        ensure!(file_len <= real_file_len, CorruptedFileSnafu {
364            msg: format!(
365                "file is truncated. expected length was {file_len} but actual length is {real_file_len}"
366            )
367        });
368        ensure!(file_len >= header_len, CorruptedFileSnafu {
369            msg: format!("length stored in header ({file_len}) is invalid")
370        });
371        ensure!(first_pos <= file_len, CorruptedFileSnafu {
372            msg: format!("position of the first element ({first_pos}) is beyond the file")
373        });
374        ensure!(last_pos <= file_len, CorruptedFileSnafu {
375            msg: format!("position of the last element ({last_pos}) is beyond the file")
376        });
377
378        let mut queue_file = Self {
379            inner: QueueFileInner {
380                file: ManuallyDrop::new(file),
381                file_len,
382                expected_seek: 0,
383                last_seek: Some(32),
384                read_buffer_offset: None,
385                read_buffer: vec![0; Self::READ_BUFFER_SIZE],
386                transfer_buf: Some(
387                    vec![0u8; QueueFileInner::TRANSFER_BUFFER_SIZE].into_boxed_slice(),
388                ),
389                sync_writes: cfg!(not(test)),
390            },
391            versioned,
392            header_len,
393            elem_cnt,
394            first: Element::EMPTY,
395            last: Element::EMPTY,
396            capacity,
397            overwrite_on_remove,
398            skip_write_header_on_add: false,
399            write_buf: Vec::new(),
400            cached_offsets: VecDeque::new(),
401            offset_cache_kind: None,
402        };
403
404        if file_len < capacity {
405            queue_file.inner.sync_set_len(queue_file.capacity)?;
406        }
407
408        queue_file.first = queue_file.read_element(first_pos)?;
409        queue_file.last = queue_file.read_element(last_pos)?;
410
411        Ok(queue_file)
412    }
413
414    /// Returns true if removing an element will also overwrite data with zero bytes.
415    #[inline]
416    pub const fn overwrite_on_remove(&self) -> bool {
417        self.overwrite_on_remove
418    }
419
420    #[deprecated(since = "1.4.7", note = "Use `overwrite_on_remove` instead.")]
421    pub const fn get_overwrite_on_remove(&self) -> bool {
422        self.overwrite_on_remove()
423    }
424
425    /// If set to true removing an element will also overwrite data with zero bytes.
426    #[inline]
427    pub fn set_overwrite_on_remove(&mut self, value: bool) {
428        self.overwrite_on_remove = value;
429    }
430
431    /// Returns true if every write to file will be followed by `sync_data()` call.
432    #[inline]
433    pub const fn sync_writes(&self) -> bool {
434        self.inner.sync_writes
435    }
436
437    #[deprecated(since = "1.4.7", note = "Use `sync_writes` instead.")]
438    pub const fn get_sync_writes(&self) -> bool {
439        self.sync_writes()
440    }
441
442    /// If set to true every write to file will be followed by `sync_data()` call.
443    #[inline]
444    pub fn set_sync_writes(&mut self, value: bool) {
445        self.inner.sync_writes = value;
446    }
447
448    /// Returns true if skips header update upon adding enabled.
449    #[inline]
450    pub const fn skip_write_header_on_add(&self) -> bool {
451        self.skip_write_header_on_add
452    }
453
454    #[deprecated(since = "1.4.7", note = "Use `skip_write_header_on_add` instead.")]
455    pub const fn get_skip_write_header_on_add(&self) -> bool {
456        self.skip_write_header_on_add()
457    }
458
459    /// If set to true skips header update upon adding.
460    #[inline]
461    pub fn set_skip_write_header_on_add(&mut self, value: bool) {
462        self.skip_write_header_on_add = value;
463    }
464
465    /// Changes buffer size used for data reading.
466    pub fn set_read_buffer_size(&mut self, size: usize) {
467        if self.inner.read_buffer.len() < size {
468            self.inner.read_buffer_offset = None;
469        }
470        self.inner.read_buffer.resize(size, 0);
471    }
472
473    #[inline]
474    pub const fn cache_offset_policy(&self) -> Option<OffsetCacheKind> {
475        self.offset_cache_kind
476    }
477
478    #[deprecated(since = "1.4.7", note = "Use `cache_offset_policy` instead.")]
479    pub const fn get_cache_offset_policy(&self) -> Option<OffsetCacheKind> {
480        self.cache_offset_policy()
481    }
482
483    #[inline]
484    pub fn set_cache_offset_policy(&mut self, kind: impl Into<Option<OffsetCacheKind>>) {
485        self.offset_cache_kind = kind.into();
486
487        if self.offset_cache_kind.is_none() {
488            self.cached_offsets.clear();
489        }
490    }
491
492    /// Returns true if this queue contains no entries.
493    #[inline]
494    pub const fn is_empty(&self) -> bool {
495        self.elem_cnt == 0
496    }
497
498    /// Returns the number of elements in this queue.
499    #[inline]
500    pub const fn size(&self) -> usize {
501        self.elem_cnt
502    }
503
504    /// Synchronizes the underlying file, look at [`File::sync_all`] doc for more info.
505    pub fn sync_all(&mut self) -> Result<()> {
506        if self.skip_write_header_on_add {
507            self.sync_header()?;
508        }
509
510        Ok(self.inner.file.sync_all()?)
511    }
512
513    fn cache_last_offset_if_needed(&mut self, affected_items: usize) {
514        if self.elem_cnt == 0 {
515            return;
516        }
517
518        self.cache_elem_if_needed(self.elem_cnt - 1, self.last, affected_items);
519    }
520
521    fn cache_elem_if_needed(&mut self, index: usize, elem: Element, affected_items: usize) {
522        debug_assert!(index <= self.elem_cnt);
523        debug_assert!(index + 1 >= affected_items);
524
525        let need_to_cache = self.offset_cache_kind.map_or(false, |kind| match kind {
526            OffsetCacheKind::Linear { offset } => {
527                let last_cached_index = self.cached_offsets.back().map_or(0, |(idx, _)| *idx);
528                index.saturating_sub(last_cached_index) >= offset
529            }
530            OffsetCacheKind::Quadratic => {
531                let x = (index as f64).sqrt() as usize;
532                x > 1 && (index + 1 - affected_items..=index).contains(&(x * x))
533            }
534        });
535
536        if need_to_cache {
537            if let Some((last_cached_index, last_cached_elem)) = self.cached_offsets.back() {
538                if *last_cached_index >= index {
539                    if *last_cached_index == index {
540                        debug_assert_eq!(last_cached_elem.pos, elem.pos);
541                        debug_assert_eq!(last_cached_elem.len, elem.len);
542                    }
543
544                    return;
545                }
546            }
547
548            self.cached_offsets.push_back((index, elem));
549        }
550    }
551
552    #[inline]
553    fn cached_index_up_to(&self, i: usize) -> Option<usize> {
554        self.cached_offsets
555            .binary_search_by(|(idx, _)| idx.cmp(&i))
556            .map_or_else(|i| i.checked_sub(1), Some)
557    }
558
559    pub fn add_n(
560        &mut self, elems: impl IntoIterator<Item = impl AsRef<[u8]>> + Clone,
561    ) -> Result<()> {
562        let (count, total_len) = elems
563            .clone()
564            .into_iter()
565            .fold((0, 0), |(c, l), elem| (c + 1, l + Element::HEADER_LENGTH + elem.as_ref().len()));
566
567        if count == 0 {
568            return Ok(());
569        }
570
571        ensure!(self.elem_cnt + count < i32::max_value() as usize, TooManyElementsSnafu {});
572
573        self.expand_if_necessary(total_len as u64)?;
574
575        let was_empty = self.is_empty();
576        let mut pos = if was_empty {
577            self.header_len
578        } else {
579            self.wrap_pos(self.last.pos + Element::HEADER_LENGTH as u64 + self.last.len as u64)
580        };
581
582        let mut first_added = None;
583        let mut last_added = None;
584
585        self.write_buf.clear();
586
587        for elem in elems {
588            let elem = elem.as_ref();
589            let len = elem.len();
590
591            if first_added.is_none() {
592                first_added = Some(Element::new(pos, len)?);
593            }
594            last_added = Some(Element::new(pos, len)?);
595
596            self.write_buf.extend(&(len as u32).to_be_bytes());
597            self.write_buf.extend(elem);
598
599            pos = self.wrap_pos(pos + Element::HEADER_LENGTH as u64 + len as u64);
600        }
601
602        let first_added = first_added.unwrap();
603        self.ring_write_buf(first_added.pos)?;
604
605        if was_empty {
606            self.first = first_added;
607        }
608        self.last = last_added.unwrap();
609
610        self.write_header(self.file_len(), self.elem_cnt + count, self.first.pos, self.last.pos)?;
611        self.elem_cnt += count;
612
613        self.cache_last_offset_if_needed(count);
614
615        Ok(())
616    }
617
618    /// Adds an element to the end of the queue.
619    #[inline]
620    pub fn add(&mut self, buf: &[u8]) -> Result<()> {
621        self.add_n(std::iter::once(buf))
622    }
623
624    /// Reads the eldest element. Returns `OK(None)` if the queue is empty.
625    pub fn peek(&mut self) -> Result<Option<Box<[u8]>>> {
626        if self.is_empty() {
627            Ok(None)
628        } else {
629            let len = self.first.len;
630            let mut data = vec![0; len].into_boxed_slice();
631
632            self.ring_read(self.first.pos + Element::HEADER_LENGTH as u64, &mut data)?;
633
634            Ok(Some(data))
635        }
636    }
637
638    /// Removes the eldest element.
639    #[inline]
640    pub fn remove(&mut self) -> Result<()> {
641        self.remove_n(1)
642    }
643
644    /// Removes the eldest `n` elements.
645    pub fn remove_n(&mut self, n: usize) -> Result<()> {
646        if n == 0 || self.is_empty() {
647            return Ok(());
648        }
649
650        if n >= self.elem_cnt {
651            return self.clear();
652        }
653
654        debug_assert!(
655            self.cached_offsets
656                .iter()
657                .zip(self.cached_offsets.iter().skip(1))
658                .all(|(a, b)| a.0 < b.0),
659            "{:?}",
660            self.cached_offsets
661        );
662
663        let erase_start_pos = self.first.pos;
664        let mut erase_total_len = 0usize;
665
666        // Read the position and length of the new first element.
667        let mut new_first_pos = self.first.pos;
668        let mut new_first_len = self.first.len;
669
670        let cached_index = self.cached_index_up_to(n - 1);
671        let to_remove = if let Some(i) = cached_index {
672            let (index, elem) = self.cached_offsets[i];
673
674            if let Some(index) = index.checked_sub(1) {
675                erase_total_len += Element::HEADER_LENGTH * index;
676                erase_total_len += (elem.pos
677                    + if self.first.pos < elem.pos {
678                        0
679                    } else {
680                        self.file_len() - self.first.pos - self.header_len
681                    }) as usize;
682            }
683
684            new_first_pos = elem.pos;
685            new_first_len = elem.len;
686            n - index
687        } else {
688            n
689        };
690
691        for _ in 0..to_remove {
692            erase_total_len += Element::HEADER_LENGTH + new_first_len;
693            new_first_pos =
694                self.wrap_pos(new_first_pos + Element::HEADER_LENGTH as u64 + new_first_len as u64);
695
696            let mut buf: [u8; 4] = [0; 4];
697            self.ring_read(new_first_pos, &mut buf)?;
698            new_first_len = u32::from_be_bytes(buf) as usize;
699        }
700
701        // Commit the header.
702        self.write_header(self.file_len(), self.elem_cnt - n, new_first_pos, self.last.pos)?;
703        self.elem_cnt -= n;
704        self.first = Element::new(new_first_pos, new_first_len)?;
705
706        if let Some(cached_index) = cached_index {
707            self.cached_offsets.drain(..=cached_index);
708        }
709        self.cached_offsets.iter_mut().for_each(|(i, _)| *i -= n);
710
711        if self.overwrite_on_remove {
712            self.ring_erase(erase_start_pos, erase_total_len)?;
713        }
714
715        Ok(())
716    }
717
718    /// Clears this queue. Truncates the file to the initial size.
719    pub fn clear(&mut self) -> Result<()> {
720        // Commit the header.
721        self.write_header(self.capacity, 0, 0, 0)?;
722
723        if self.overwrite_on_remove {
724            self.inner.seek(self.header_len);
725            let first_block = self.capacity.min(Self::BLOCK_LENGTH) - self.header_len;
726            self.inner.write(&Self::ZEROES[..first_block as usize])?;
727
728            if let Some(left) = self.capacity.checked_sub(Self::BLOCK_LENGTH) {
729                for _ in 0..left / Self::BLOCK_LENGTH {
730                    self.inner.write(&Self::ZEROES)?;
731                }
732
733                let tail = left % Self::BLOCK_LENGTH;
734
735                if tail != 0 {
736                    self.inner.write(&Self::ZEROES[..tail as usize])?;
737                }
738            }
739        }
740
741        self.cached_offsets.clear();
742
743        self.elem_cnt = 0;
744        self.first = Element::EMPTY;
745        self.last = Element::EMPTY;
746
747        if self.file_len() > self.capacity {
748            self.inner.sync_set_len(self.capacity)?;
749        }
750
751        Ok(())
752    }
753
754    /// Returns an iterator over elements in this queue.
755    ///
756    /// # Example
757    ///
758    /// ```
759    /// # use queue_file::QueueFile;
760    /// # let path = auto_delete_path::AutoDeletePath::temp();
761    /// let mut qf = QueueFile::open(path).expect("failed to open queue");
762    /// let items = vec![vec![1, 2], vec![], vec![3]];
763    /// qf.add_n(&items).expect("failed to add elements to queue");
764    ///
765    /// let stored = qf.iter().map(Vec::from).collect::<Vec<_>>();
766    /// assert_eq!(items, stored);
767    /// ```
768    pub fn iter(&mut self) -> Iter<'_> {
769        let pos = self.first.pos;
770
771        Iter {
772            // We are using write buffer for reducing number of allocations.
773            // BorrowedIter doesn't modify any data and will return it back on drop.
774            buffer: std::mem::take(&mut self.write_buf),
775            queue_file: self,
776            next_elem_index: 0,
777            next_elem_pos: pos,
778        }
779    }
780
781    /// Returns the amount of bytes used by the backed file.
782    /// Always >= [`Self::used_bytes`].
783    #[inline]
784    pub const fn file_len(&self) -> u64 {
785        self.inner.file_len
786    }
787
788    /// Returns the amount of bytes used by the queue.
789    #[inline]
790    pub const fn used_bytes(&self) -> u64 {
791        if self.elem_cnt == 0 {
792            self.header_len
793        } else if self.last.pos >= self.first.pos {
794            // Contiguous queue.
795            (self.last.pos - self.first.pos)
796                + Element::HEADER_LENGTH as u64
797                + self.last.len as u64
798                + self.header_len
799        } else {
800            // tail < head. The queue wraps.
801            self.last.pos + Element::HEADER_LENGTH as u64 + self.last.len as u64 + self.file_len()
802                - self.first.pos
803        }
804    }
805
806    /// Returns underlying file of the queue.
807    pub fn into_inner_file(mut self) -> File {
808        if self.skip_write_header_on_add {
809            let _ = self.sync_header();
810        }
811
812        let file = unsafe { ManuallyDrop::take(&mut self.inner.file) };
813        std::mem::forget(self);
814
815        file
816    }
817
818    #[inline]
819    const fn remaining_bytes(&self) -> u64 {
820        self.file_len() - self.used_bytes()
821    }
822
823    fn sync_header(&mut self) -> Result<()> {
824        self.write_header(self.file_len(), self.size(), self.first.pos, self.last.pos)
825    }
826
827    /// Writes header atomically. The arguments contain the updated values. The struct member fields
828    /// should not have changed yet. This only updates the state in the file. It's up to the caller
829    /// to update the class member variables *after* this call succeeds. Assumes segment writes are
830    /// atomic in the underlying file system.
831    fn write_header(
832        &mut self, file_len: u64, elem_cnt: usize, first_pos: u64, last_pos: u64,
833    ) -> Result<()> {
834        let mut header = [0; 32];
835        let mut header_buf = &mut header[..];
836
837        // Never allow write values that will render file unreadable by Java library.
838        if self.versioned {
839            ensure!(i64::try_from(file_len).is_ok(), CorruptedFileSnafu {
840                msg: "file length in header will exceed i64::MAX"
841            });
842            ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
843                msg: "element count in header will exceed i32::MAX"
844            });
845            ensure!(i64::try_from(first_pos).is_ok(), CorruptedFileSnafu {
846                msg: "first element position in header will exceed i64::MAX"
847            });
848            ensure!(i64::try_from(last_pos).is_ok(), CorruptedFileSnafu {
849                msg: "last element position in header will exceed i64::MAX"
850            });
851
852            header_buf.put_u32(Self::VERSIONED_HEADER);
853            header_buf.put_u64(file_len);
854            header_buf.put_i32(elem_cnt as i32);
855            header_buf.put_u64(first_pos);
856            header_buf.put_u64(last_pos);
857        } else {
858            ensure!(i32::try_from(file_len).is_ok(), CorruptedFileSnafu {
859                msg: "file length in header will exceed i32::MAX"
860            });
861            ensure!(i32::try_from(elem_cnt).is_ok(), CorruptedFileSnafu {
862                msg: "element count in header will exceed i32::MAX"
863            });
864            ensure!(i32::try_from(first_pos).is_ok(), CorruptedFileSnafu {
865                msg: "first element position in header will exceed i32::MAX"
866            });
867            ensure!(i32::try_from(last_pos).is_ok(), CorruptedFileSnafu {
868                msg: "last element position in header will exceed i32::MAX"
869            });
870
871            header_buf.put_i32(file_len as i32);
872            header_buf.put_i32(elem_cnt as i32);
873            header_buf.put_i32(first_pos as i32);
874            header_buf.put_i32(last_pos as i32);
875        }
876
877        self.inner.seek(0);
878        self.inner.write(&header.as_ref()[..self.header_len as usize])
879    }
880
881    fn read_element(&mut self, pos: u64) -> Result<Element> {
882        if pos == 0 {
883            Ok(Element::EMPTY)
884        } else {
885            let mut buf: [u8; 4] = [0; Element::HEADER_LENGTH];
886            self.ring_read(pos, &mut buf)?;
887
888            Element::new(pos, u32::from_be_bytes(buf) as usize)
889        }
890    }
891
892    /// Wraps the position if it exceeds the end of the file.
893    #[inline]
894    const fn wrap_pos(&self, pos: u64) -> u64 {
895        if pos < self.file_len() { pos } else { self.header_len + pos - self.file_len() }
896    }
897
898    /// Writes `n` bytes from buffer to position in file. Automatically wraps write if position is
899    /// past the end of the file or if buffer overlaps it.
900    fn ring_write_buf(&mut self, pos: u64) -> Result<()> {
901        let pos = self.wrap_pos(pos);
902
903        if pos + self.write_buf.len() as u64 <= self.file_len() {
904            self.inner.seek(pos);
905            self.inner.write(&self.write_buf)
906        } else {
907            let before_eof = (self.file_len() - pos) as usize;
908
909            self.inner.seek(pos);
910            self.inner.write(&self.write_buf[..before_eof])?;
911            self.inner.seek(self.header_len);
912            self.inner.write(&self.write_buf[before_eof..])
913        }
914    }
915
916    fn ring_erase(&mut self, pos: u64, n: usize) -> Result<()> {
917        let mut pos = pos;
918        let mut len = n;
919
920        self.write_buf.clear();
921        self.write_buf.extend(Self::ZEROES);
922
923        while len > 0 {
924            let chunk_len = min(len, Self::ZEROES.len());
925            self.write_buf.truncate(chunk_len);
926
927            self.ring_write_buf(pos)?;
928
929            len -= chunk_len;
930            pos += chunk_len as u64;
931        }
932
933        Ok(())
934    }
935
936    /// Reads `n` bytes into buffer from file. Wraps if necessary.
937    fn ring_read(&mut self, pos: u64, buf: &mut [u8]) -> io::Result<()> {
938        let pos = self.wrap_pos(pos);
939
940        if pos + buf.len() as u64 <= self.file_len() {
941            self.inner.seek(pos);
942            self.inner.read(buf)
943        } else {
944            let before_eof = (self.file_len() - pos) as usize;
945
946            self.inner.seek(pos);
947            self.inner.read(&mut buf[..before_eof])?;
948            self.inner.seek(self.header_len);
949            self.inner.read(&mut buf[before_eof..])
950        }
951    }
952
953    /// If necessary, expands the file to accommodate an additional element of the given length.
954    fn expand_if_necessary(&mut self, data_len: u64) -> Result<()> {
955        let mut rem_bytes = self.remaining_bytes();
956
957        if rem_bytes >= data_len {
958            return Ok(());
959        }
960
961        let orig_file_len = self.file_len();
962        let mut prev_len = orig_file_len;
963        let mut new_len = prev_len;
964
965        while rem_bytes < data_len {
966            rem_bytes += prev_len;
967            new_len = prev_len << 1;
968            prev_len = new_len;
969        }
970
971        let bytes_used_before = self.used_bytes();
972
973        // Calculate the position of the tail end of the data in the ring buffer
974        let end_of_last_elem =
975            self.wrap_pos(self.last.pos + Element::HEADER_LENGTH as u64 + self.last.len as u64);
976        self.inner.sync_set_len(new_len)?;
977
978        let mut count = 0u64;
979
980        // If the buffer is split, we need to make it contiguous
981        if end_of_last_elem <= self.first.pos {
982            count = end_of_last_elem - self.header_len;
983
984            self.inner.transfer(self.header_len, orig_file_len, count)?;
985        }
986
987        // Commit the expansion.
988        if self.last.pos < self.first.pos {
989            let new_last_pos = orig_file_len + self.last.pos - self.header_len;
990            self.last = Element::new(new_last_pos, self.last.len)?;
991        }
992
993        // TODO: cached offsets might be recalculated after transfer
994        self.cached_offsets.clear();
995
996        if self.overwrite_on_remove {
997            self.ring_erase(self.header_len, count as usize)?;
998        }
999
1000        let bytes_used_after = self.used_bytes();
1001        debug_assert_eq!(bytes_used_before, bytes_used_after);
1002
1003        Ok(())
1004    }
1005}
1006
1007// I/O Helpers
1008impl QueueFileInner {
1009    const TRANSFER_BUFFER_SIZE: usize = 128 * 1024;
1010
1011    #[inline]
1012    fn seek(&mut self, pos: u64) -> u64 {
1013        self.expected_seek = pos;
1014
1015        pos
1016    }
1017
1018    fn real_seek(&mut self) -> io::Result<u64> {
1019        if Some(self.expected_seek) == self.last_seek {
1020            return Ok(self.expected_seek);
1021        }
1022
1023        let res = self.file.seek(SeekFrom::Start(self.expected_seek));
1024        self.last_seek = res.as_ref().ok().copied();
1025
1026        res
1027    }
1028
1029    fn read(&mut self, buf: &mut [u8]) -> io::Result<()> {
1030        if buf.is_empty() {
1031            return Ok(());
1032        }
1033
1034        let size = buf.len();
1035
1036        let not_enough_data = if let Some(left) = self.read_buffer.len().checked_sub(size) {
1037            self.read_buffer_offset
1038                .and_then(|o| self.expected_seek.checked_sub(o))
1039                .and_then(|skip| left.checked_sub(skip as usize))
1040                .is_none()
1041        } else {
1042            self.read_buffer.resize(size, 0);
1043
1044            true
1045        };
1046
1047        if not_enough_data {
1048            use std::io::{Error, ErrorKind};
1049
1050            self.real_seek()?;
1051
1052            let mut read = 0;
1053            let mut res = Ok(());
1054
1055            while !buf.is_empty() {
1056                match self.file.read(&mut self.read_buffer[read..]) {
1057                    Ok(0) => break,
1058                    Ok(n) => read += n,
1059                    Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
1060                    Err(e) => {
1061                        res = Err(e);
1062                        break;
1063                    }
1064                }
1065            }
1066
1067            if res.is_ok() && read < size {
1068                res = Err(Error::new(ErrorKind::UnexpectedEof, "failed to fill whole buffer"));
1069            }
1070
1071            if let Err(err) = res {
1072                self.read_buffer_offset = None;
1073                self.last_seek = None;
1074
1075                return Err(err);
1076            }
1077
1078            self.read_buffer_offset = Some(self.expected_seek);
1079
1080            if let Some(seek) = &mut self.last_seek {
1081                *seek += read as u64;
1082            }
1083        }
1084
1085        let start = (self.expected_seek - self.read_buffer_offset.unwrap()) as usize;
1086
1087        buf.copy_from_slice(&self.read_buffer[start..start + size]);
1088
1089        Ok(())
1090    }
1091
1092    fn write(&mut self, buf: &[u8]) -> Result<()> {
1093        self.real_seek()?;
1094
1095        self.file.write_all(buf)?;
1096
1097        if let Some(seek) = &mut self.last_seek {
1098            *seek += buf.len() as u64;
1099        }
1100
1101        if let Some(read_buffer_offset) = self.read_buffer_offset {
1102            let write_size_u64 = buf.len() as u64;
1103            let read_buffer_end_offset = read_buffer_offset + self.read_buffer.len() as u64;
1104            let read_buffered = read_buffer_offset..read_buffer_end_offset;
1105
1106            let has_start = read_buffered.contains(&self.expected_seek);
1107            let buf_end = self.expected_seek + write_size_u64;
1108            let has_end = read_buffered.contains(&buf_end);
1109
1110            match (has_start, has_end) {
1111                // rd_buf_offset .. exp_seek .. exp_seek+buf.len .. rd_buf_end
1112                // need to copy whole write buffer
1113                (true, true) => {
1114                    let start = (self.expected_seek - read_buffer_offset) as usize;
1115
1116                    self.read_buffer[start..start + buf.len()].copy_from_slice(buf);
1117                }
1118                // exp_seek .. rd_buf_offset .. exp_seek+buf.len .. rd_buf_end
1119                // need to copy only a tail of write buffer
1120                (false, true) => {
1121                    let need_to_skip = (read_buffer_offset - self.expected_seek) as usize;
1122                    let need_to_copy = buf.len() - need_to_skip;
1123
1124                    self.read_buffer[..need_to_copy].copy_from_slice(&buf[need_to_skip..]);
1125                }
1126                // rd_buf_offset .. exp_seek .. rd_buf_end .. exp_seek+buf.len
1127                // need to copy only a head of write buffer
1128                (true, false) => {
1129                    let need_to_skip = (self.expected_seek - read_buffer_offset) as usize;
1130                    let need_to_copy = self.read_buffer.len() - need_to_skip;
1131
1132                    self.read_buffer[need_to_skip..need_to_skip + need_to_copy]
1133                        .copy_from_slice(&buf[..need_to_copy]);
1134                }
1135                // exp_seek .. rd_buf_offset .. rd_buf_end .. exp_seek+buf.len
1136                // read buffer is inside writing range, need to rewrite it completely
1137                (false, false)
1138                    if (self.expected_seek + 1..buf_end).contains(&read_buffer_offset) =>
1139                {
1140                    let need_to_skip = (read_buffer_offset - self.expected_seek) as usize;
1141                    let need_to_copy = self.read_buffer.len();
1142
1143                    self.read_buffer[..]
1144                        .copy_from_slice(&buf[need_to_skip..need_to_skip + need_to_copy]);
1145                }
1146                // nothing to do, read & write buffers do not overlap
1147                (false, false) => {}
1148            }
1149        }
1150
1151        if self.sync_writes {
1152            self.file.sync_data()?;
1153        }
1154
1155        Ok(())
1156    }
1157
1158    fn transfer_inner(
1159        &mut self, buf: &mut [u8], mut read_pos: u64, mut write_pos: u64, count: u64,
1160    ) -> Result<()> {
1161        debug_assert!(read_pos < self.file_len);
1162        debug_assert!(write_pos <= self.file_len);
1163        debug_assert!(count < self.file_len);
1164        debug_assert!(i64::try_from(count).is_ok());
1165
1166        let mut bytes_left = count as i64;
1167
1168        while bytes_left > 0 {
1169            self.seek(read_pos);
1170            let bytes_to_read = min(bytes_left as usize, Self::TRANSFER_BUFFER_SIZE);
1171            self.read(&mut buf[..bytes_to_read])?;
1172
1173            self.seek(write_pos);
1174            self.write(&buf[..bytes_to_read])?;
1175
1176            read_pos += bytes_to_read as u64;
1177            write_pos += bytes_to_read as u64;
1178            bytes_left -= bytes_to_read as i64;
1179        }
1180
1181        // Should we `sync_data()` in internal loop instead?
1182        if self.sync_writes {
1183            self.file.sync_data()?;
1184        }
1185
1186        Ok(())
1187    }
1188
1189    /// Transfer `count` bytes starting from `read_pos` to `write_pos`.
1190    fn transfer(&mut self, read_pos: u64, write_pos: u64, count: u64) -> Result<()> {
1191        let mut buf = self.transfer_buf.take().unwrap();
1192        let res = self.transfer_inner(&mut buf, read_pos, write_pos, count);
1193        self.transfer_buf = Some(buf);
1194
1195        res
1196    }
1197
1198    fn sync_set_len(&mut self, new_len: u64) -> io::Result<()> {
1199        self.file.set_len(new_len)?;
1200        self.file_len = new_len;
1201        self.file.sync_all()
1202    }
1203}
1204
1205#[derive(Copy, Clone, Debug)]
1206struct Element {
1207    pos: u64,
1208    len: usize,
1209}
1210
1211impl Element {
1212    const EMPTY: Self = Self { pos: 0, len: 0 };
1213    const HEADER_LENGTH: usize = 4;
1214
1215    #[inline]
1216    fn new(pos: u64, len: usize) -> Result<Self> {
1217        ensure!(i64::try_from(pos).is_ok(), CorruptedFileSnafu {
1218            msg: "element position must be less or equal to i64::MAX"
1219        });
1220        ensure!(i32::try_from(len).is_ok(), ElementTooBigSnafu);
1221
1222        Ok(Self { pos, len })
1223    }
1224}
1225
1226/// Iterator over items in the queue.
1227#[derive(Debug)]
1228pub struct Iter<'a> {
1229    queue_file: &'a mut QueueFile,
1230    buffer: Vec<u8>,
1231    next_elem_index: usize,
1232    next_elem_pos: u64,
1233}
1234
1235impl<'a> Iterator for Iter<'a> {
1236    type Item = Box<[u8]>;
1237
1238    fn next(&mut self) -> Option<Self::Item> {
1239        let buffer = self.borrowed_next()?;
1240
1241        Some(buffer.to_vec().into_boxed_slice())
1242    }
1243
1244    fn size_hint(&self) -> (usize, Option<usize>) {
1245        let elems_left = self.queue_file.elem_cnt - self.next_elem_index;
1246
1247        (elems_left, Some(elems_left))
1248    }
1249
1250    fn nth(&mut self, n: usize) -> Option<Self::Item> {
1251        if self.queue_file.elem_cnt - self.next_elem_index < n {
1252            self.next_elem_index = self.queue_file.elem_cnt;
1253
1254            return None;
1255        }
1256
1257        let left = if let Some(i) = self.queue_file.cached_index_up_to(n) {
1258            let (index, elem) = self.queue_file.cached_offsets[i];
1259            if index > self.next_elem_index {
1260                self.next_elem_index = index;
1261                self.next_elem_pos = elem.pos;
1262            }
1263
1264            n - self.next_elem_index
1265        } else {
1266            n
1267        };
1268
1269        for _ in 0..left {
1270            self.borrowed_next();
1271        }
1272
1273        self.next()
1274    }
1275}
1276
1277impl Iter<'_> {
1278    /// Returns the next element in the queue.
1279    /// Similar to `Iter::next` but returned value bounded to internal buffer,
1280    /// i.e not allocated at each call.
1281    pub fn borrowed_next(&mut self) -> Option<&[u8]> {
1282        if self.next_elem_index >= self.queue_file.elem_cnt {
1283            return None;
1284        }
1285
1286        let current = self.queue_file.read_element(self.next_elem_pos).ok()?;
1287        self.next_elem_pos = self.queue_file.wrap_pos(current.pos + Element::HEADER_LENGTH as u64);
1288
1289        if current.len > self.buffer.len() {
1290            self.buffer.resize(current.len, 0);
1291        }
1292        self.queue_file.ring_read(self.next_elem_pos, &mut self.buffer[..current.len]).ok()?;
1293
1294        self.next_elem_pos = self
1295            .queue_file
1296            .wrap_pos(current.pos + Element::HEADER_LENGTH as u64 + current.len as u64);
1297
1298        self.queue_file.cache_elem_if_needed(self.next_elem_index, current, 1);
1299        self.next_elem_index += 1;
1300
1301        Some(&self.buffer[..current.len])
1302    }
1303}
1304
1305impl Drop for Iter<'_> {
1306    fn drop(&mut self) {
1307        self.queue_file.write_buf = std::mem::take(&mut self.buffer);
1308    }
1309}