pagecache/
reservation.rs

1use std::sync::Arc;
2
3use super::*;
4
5/// A pending log reservation which can be aborted or completed.
6/// NB the holder should quickly call `complete` or `abort` as
7/// taking too long to decide will cause the underlying IO
8/// buffer to become blocked.
9pub struct Reservation<'a> {
10    pub(super) log: &'a Log,
11    pub(super) iobuf: Arc<IoBuf>,
12    pub(super) buf: &'a mut [u8],
13    pub(super) flushed: bool,
14    pub(super) ptr: DiskPtr,
15    pub(super) lsn: Lsn,
16    pub(super) is_blob_rewrite: bool,
17}
18
19impl<'a> Drop for Reservation<'a> {
20    fn drop(&mut self) {
21        // We auto-abort if the user never uses a reservation.
22        if !self.flushed {
23            self.flush(false).unwrap();
24        }
25    }
26}
27
28impl<'a> Reservation<'a> {
29    /// Cancel the reservation, placing a failed flush on disk, returning
30    /// the (cancelled) log sequence number and file offset.
31    pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
32        if self.ptr.is_blob() && !self.is_blob_rewrite {
33            // we don't want to remove this blob if something
34            // else may still be using it.
35
36            trace!("removing blob for aborted reservation at lsn {}", self.ptr);
37
38            remove_blob(self.ptr.blob().1, &self.log.config)?;
39        }
40
41        self.flush(false)
42    }
43
44    /// Complete the reservation, placing the buffer on disk. returns
45    /// the log sequence number of the write, and the file offset.
46    pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
47        self.flush(true)
48    }
49
50    /// Get the log file offset for reading this buffer in the future.
51    pub fn lid(&self) -> LogId {
52        self.ptr.lid()
53    }
54
55    /// Get the log sequence number for this update.
56    pub fn lsn(&self) -> Lsn {
57        self.lsn
58    }
59
60    /// Get the underlying storage location for the written value.
61    /// Note that an blob write still has a pointer in the
62    /// log at the provided lid location.
63    pub fn ptr(&self) -> DiskPtr {
64        self.ptr
65    }
66
67    /// Returns the length of the on-log reservation.
68    pub fn reservation_len(&self) -> usize {
69        self.buf.len()
70    }
71
72    /// Refills the reservation buffer with new data.
73    /// Must supply a buffer of an identical length
74    /// as the one initially provided. Don't use this
75    /// on messages subject to compression etc...
76    ///
77    /// # Panics
78    ///
79    /// Will panic if the reservation is not the correct
80    /// size to hold a serialized Lsn.
81    #[doc(hidden)]
82    pub fn mark_writebatch(&mut self, lsn: Lsn) {
83        self.buf[0] = MessageKind::BatchManifest.into();
84
85        let buf = u64_to_arr(u64::try_from(lsn).unwrap());
86
87        let dst = &mut self.buf[MSG_HEADER_LEN..];
88
89        dst.copy_from_slice(&buf);
90    }
91
92    fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
93        if self.flushed {
94            panic!("flushing already-flushed reservation!");
95        }
96
97        self.flushed = true;
98
99        if !valid {
100            // don't actually zero the message, still check its hash
101            // on recovery to find corruption.
102            self.buf[0] = MessageKind::Cancelled.into();
103        }
104
105        // the order of hashing must be the
106        // same here as during calls to
107        // LogReader::read_message
108        let mut hasher = crc32fast::Hasher::new();
109        hasher.update(&self.buf[MSG_HEADER_LEN..]);
110        hasher.update(&self.buf[..MSG_HEADER_LEN]);
111        let crc32 = hasher.finalize();
112        let crc32_arr = u32_to_arr(crc32 ^ 0xFFFF_FFFF);
113
114        unsafe {
115            std::ptr::copy_nonoverlapping(
116                crc32_arr.as_ptr(),
117                self.buf
118                    .as_mut_ptr()
119                    .add(MSG_HEADER_LEN - std::mem::size_of::<u32>()),
120                std::mem::size_of::<u32>(),
121            );
122        }
123        self.log.exit_reservation(&self.iobuf)?;
124
125        Ok((self.lsn(), self.ptr()))
126    }
127}