1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::sync::Arc;
use super::*;
pub struct Reservation<'a> {
pub(super) log: &'a Log,
pub(super) iobuf: Arc<IoBuf>,
pub(super) buf: &'a mut [u8],
pub(super) flushed: bool,
pub(super) ptr: DiskPtr,
pub(super) lsn: Lsn,
pub(super) is_blob_rewrite: bool,
}
impl<'a> Drop for Reservation<'a> {
fn drop(&mut self) {
if !self.flushed {
self.flush(false).unwrap();
}
}
}
impl<'a> Reservation<'a> {
pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
if self.ptr.is_blob() && !self.is_blob_rewrite {
trace!("removing blob for aborted reservation at lsn {}", self.ptr);
remove_blob(self.ptr.blob().1, &self.log.config)?;
}
self.flush(false)
}
pub fn complete(mut self) -> Result<(Lsn, DiskPtr)> {
self.flush(true)
}
pub fn lid(&self) -> LogId {
self.ptr.lid()
}
pub fn lsn(&self) -> Lsn {
self.lsn
}
pub fn ptr(&self) -> DiskPtr {
self.ptr
}
pub fn reservation_len(&self) -> usize {
self.buf.len()
}
#[doc(hidden)]
pub fn mark_writebatch(&mut self, lsn: Lsn) {
self.buf[0] = MessageKind::BatchManifest.into();
let buf = u64_to_arr(u64::try_from(lsn).unwrap());
let dst = &mut self.buf[MSG_HEADER_LEN..];
dst.copy_from_slice(&buf);
}
fn flush(&mut self, valid: bool) -> Result<(Lsn, DiskPtr)> {
if self.flushed {
panic!("flushing already-flushed reservation!");
}
self.flushed = true;
if !valid {
self.buf[0] = MessageKind::Cancelled.into();
}
let mut hasher = crc32fast::Hasher::new();
hasher.update(&self.buf[MSG_HEADER_LEN..]);
hasher.update(&self.buf[..MSG_HEADER_LEN]);
let crc32 = hasher.finalize();
let crc32_arr = u32_to_arr(crc32 ^ 0xFFFF_FFFF);
unsafe {
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
self.buf
.as_mut_ptr()
.add(MSG_HEADER_LEN - std::mem::size_of::<u32>()),
std::mem::size_of::<u32>(),
);
}
self.log.exit_reservation(&self.iobuf)?;
Ok((self.lsn(), self.ptr()))
}
}