1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::io::{Read, Seek, Write};
use std::os::unix::io::AsRawFd;
use rand::{Rng, thread_rng};
use super::*;
pub struct LockFreeLog {
pub(super) iobufs: IOBufs,
}
unsafe impl Send for LockFreeLog {}
unsafe impl Sync for LockFreeLog {}
impl LockFreeLog {
pub fn start_system(path: Option<String>) -> LockFreeLog {
let mut options = fs::OpenOptions::new();
options.create(true);
options.read(true);
options.write(true);
let iobufs = if let Some(p) = path {
let file = options.open(p).unwrap();
let cur_id = file.metadata().unwrap().len();
IOBufs::new(file, cur_id)
} else {
let nonce: String = thread_rng().gen_ascii_chars().take(10).collect();
let path = format!("__rsdb_memory_{}.log", nonce);
let file = options.open(&path).unwrap();
fs::remove_file(path).unwrap();
IOBufs::new(file, 0)
};
LockFreeLog { iobufs: iobufs }
}
}
impl Log for LockFreeLog {
fn reserve(&self, buf: Vec<u8>) -> Reservation {
self.iobufs.reserve(buf)
}
fn write(&self, buf: Vec<u8>) -> LogID {
self.iobufs.reserve(buf).complete()
}
fn read(&self, id: LogID) -> io::Result<Option<Vec<u8>>> {
let mut f = self.iobufs.file.lock().unwrap();
f.seek(SeekFrom::Start(id))?;
let mut valid = [0u8; 1];
f.read_exact(&mut valid)?;
if valid[0] == 0 {
return Ok(None);
}
let mut len_buf = [0u8; 4];
f.read_exact(&mut len_buf)?;
let len = ops::array_to_usize(len_buf);
if len > MAX_BUF_SZ {
let msg = format!("read invalid message length, {} should be <= {}",
len,
MAX_BUF_SZ);
return Err(Error::new(ErrorKind::Other, msg));
}
let mut crc16_buf = [0u8; 2];
f.read_exact(&mut crc16_buf)?;
let mut buf = Vec::with_capacity(len);
unsafe {
buf.set_len(len);
}
f.read_exact(&mut buf)?;
let checksum = crc16_arr(&buf);
if checksum != crc16_buf {
let msg = format!("read data failed crc16 checksum, {:?} should be {:?}",
checksum,
crc16_buf);
return Err(Error::new(ErrorKind::Other, msg));
}
Ok(Some(buf))
}
fn stable_offset(&self) -> LogID {
self.iobufs.stable()
}
fn make_stable(&self, id: LogID) {
let mut spins = 0;
loop {
self.iobufs.flush();
spins += 1;
if spins > 2000000 {
spins = 0;
}
let cur = self.iobufs.stable();
if cur > id {
return;
}
}
}
fn punch_hole(&self, id: LogID) {
let mut f = self.iobufs.file.lock().unwrap();
f.seek(SeekFrom::Start(id)).unwrap();
let zeros = vec![0];
f.write_all(&*zeros).unwrap();
f.seek(SeekFrom::Start(id + 1)).unwrap();
let mut len_buf = [0u8; 4];
f.read_exact(&mut len_buf).unwrap();
let len = ops::array_to_usize(len_buf);
#[cfg(target_os="linux")]
let mode = FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE;
let fd = f.as_raw_fd();
unsafe {
#[cfg(target_os="linux")]
fallocate(fd, mode, id as i64 + 5, len as i64 + 2);
}
}
}