pagecache/reservation.rs
1use std::sync::Arc;
2
3use super::*;
4
5/// A pending log reservation which can be aborted or completed.
6/// NB the holder should quickly call `complete` or `abort` as
7/// taking too long to decide will cause the underlying IO
8/// buffer to become blocked.
9pub struct Reservation<'a> {
10 pub(super) log: &'a Log,
11 pub(super) iobuf: Arc<IoBuf>,
12 pub(super) buf: &'a mut [u8],
13 pub(super) flushed: bool,
14 pub(super) ptr: DiskPtr,
15 pub(super) lsn: Lsn,
16 pub(super) is_blob_rewrite: bool,
17}
18
19impl<'a> Drop for Reservation<'a> {
20 fn drop(&mut self) {
21 // We auto-abort if the user never uses a reservation.
22 if !self.flushed {
23 self.flush(false).unwrap();
24 }
25 }
26}
27
28impl<'a> Reservation<'a> {
29 /// Cancel the reservation, placing a failed flush on disk, returning
30 /// the (cancelled) log sequence number and file offset.
31 pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
32 if self.ptr.is_blob() && !self.is_blob_rewrite {
33 // we don't want to remove this blob if something
34 // else may still be using it.
35
36 trace!("removing blob for aborted reservation at lsn {}", self.ptr);
37
38 remove_blob(self.ptr.blob().1, &self.log.config)?;
39 }
40
41 self.flush(false)
42 }
43
44 /// Complete the reservation, placing the buffer on disk. returns
45 /// the log sequence number of the write, and the file offset.
46 pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
47 self.flush(true)
48 }
49
50 /// Get the log file offset for reading this buffer in the future.
51 pub fn lid(&self) -> LogId {
52 self.ptr.lid()
53 }
54
55 /// Get the log sequence number for this update.
56 pub fn lsn(&self) -> Lsn {
57 self.lsn
58 }
59
60 /// Get the underlying storage location for the written value.
61 /// Note that an blob write still has a pointer in the
62 /// log at the provided lid location.
63 pub fn ptr(&self) -> DiskPtr {
64 self.ptr
65 }
66
67 /// Returns the length of the on-log reservation.
68 pub fn reservation_len(&self) -> usize {
69 self.buf.len()
70 }
71
72 /// Refills the reservation buffer with new data.
73 /// Must supply a buffer of an identical length
74 /// as the one initially provided. Don't use this
75 /// on messages subject to compression etc...
76 ///
77 /// # Panics
78 ///
79 /// Will panic if the reservation is not the correct
80 /// size to hold a serialized Lsn.
81 #[doc(hidden)]
82 pub fn mark_writebatch(&mut self, lsn: Lsn) {
83 self.buf[0] = MessageKind::BatchManifest.into();
84
85 let buf = u64_to_arr(u64::try_from(lsn).unwrap());
86
87 let dst = &mut self.buf[MSG_HEADER_LEN..];
88
89 dst.copy_from_slice(&buf);
90 }
91
92 fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
93 if self.flushed {
94 panic!("flushing already-flushed reservation!");
95 }
96
97 self.flushed = true;
98
99 if !valid {
100 // don't actually zero the message, still check its hash
101 // on recovery to find corruption.
102 self.buf[0] = MessageKind::Cancelled.into();
103 }
104
105 // the order of hashing must be the
106 // same here as during calls to
107 // LogReader::read_message
108 let mut hasher = crc32fast::Hasher::new();
109 hasher.update(&self.buf[MSG_HEADER_LEN..]);
110 hasher.update(&self.buf[..MSG_HEADER_LEN]);
111 let crc32 = hasher.finalize();
112 let crc32_arr = u32_to_arr(crc32 ^ 0xFFFF_FFFF);
113
114 unsafe {
115 std::ptr::copy_nonoverlapping(
116 crc32_arr.as_ptr(),
117 self.buf
118 .as_mut_ptr()
119 .add(MSG_HEADER_LEN - std::mem::size_of::<u32>()),
120 std::mem::size_of::<u32>(),
121 );
122 }
123 self.log.exit_reservation(&self.iobuf)?;
124
125 Ok((self.lsn(), self.ptr()))
126 }
127}