1use crate::config::Config;
2use crate::errors::DbResult;
3use crate::{ByteCount, DbError, TableId};
4use fs2::FileExt;
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10pub struct LogFormat<'a> {
11 pub table_id: TableId,
12 pub bytes: &'a [u8],
13}
14
15#[derive(Clone, Debug)]
16pub struct Logger {
17 inner: Arc<Mutex<LoggerInner>>,
18}
19
20#[derive(Debug)]
21struct LoggerInner {
22 config: Config,
23 file: Option<File>,
24 log_metadata: Option<LogMetadata>,
25 incomplete_write: bool,
26 current_txs: usize,
27 tx_data: Option<Vec<u8>>,
28}
29
30impl Logger {
31 pub fn init(config: Config) -> DbResult<Self> {
32 if config.create_path {
33 fs::create_dir_all(&config.path)?;
35 }
36
37 let mut file = if config.no_io {
38 None
39 } else {
40 Self::handle_migration(&config)?;
41 Some(Self::open_file(&config, &config.db_location_v2()?)?)
42 };
43
44 let incomplete_write = false;
45 let tx_data = None;
46 let current_txs = 0;
47
48 let log_metadata = Self::read_or_stamp_metadata(&config, &mut file)?;
49
50 let inner = Arc::new(Mutex::new(LoggerInner {
51 file,
52 config,
53 incomplete_write,
54 tx_data,
55 current_txs,
56 log_metadata,
57 }));
58
59 Ok(Self { inner })
60 }
61
62 pub fn get_bytes(&self) -> DbResult<Vec<u8>> {
63 let mut buffer: Vec<u8> = Vec::new();
64
65 let mut inner = self.inner.lock()?;
66 if let Some(file) = inner.file.as_mut() {
67 file.read_to_end(&mut buffer)?;
68 }
69
70 Ok(buffer)
71 }
72
73 pub fn get_entries<'a>(&self, buffer: &'a [u8]) -> DbResult<Vec<LogFormat<'a>>> {
74 let mut index = 0;
75 let mut entries = vec![];
76
77 while index < buffer.len() {
78 if buffer.len() < index + 4 + 1 {
79 self.inner.lock()?.incomplete_write = true;
80 return Ok(entries);
81 }
82
83 let table_id = buffer[index];
84 index += 1;
85
86 let size = ByteCount::from_be_bytes(
87 buffer[index..index + 4]
88 .try_into()
89 .expect("slice with incorrect length"),
90 ) as usize;
91 index += 4;
92
93 if buffer.len() < index + size {
94 self.inner.lock()?.incomplete_write = true;
95 return Ok(entries);
96 }
97
98 if table_id == 0 {
99 continue;
100 }
101
102 let bytes = &buffer[index..index + size];
103 entries.push(LogFormat { table_id, bytes });
104 index += size;
105 }
106
107 Ok(entries)
108 }
109
110 pub fn begin_tx(&self) -> DbResult<TxHandle> {
111 let h = TxHandle { inner: self.clone() };
112 let mut inner = self.inner.lock()?;
113 if inner.tx_data.is_none() {
114 inner.tx_data = Some(vec![]);
115 }
116 inner.current_txs += 1;
117 Ok(h)
118 }
119
120 pub fn end_tx(&self) -> DbResult<()> {
121 let mut inner = self.inner.lock()?;
122 if inner.current_txs == 0 {
123 return Ok(());
124 }
125
126 inner.current_txs -= 1;
127 if inner.current_txs == 0 {
128 let data = inner.tx_data.take();
129 drop(inner);
130 if let Some(data) = data {
131 self.write_to_file(Self::log_entry(0, data))?;
132 }
133 }
134
135 Ok(())
136 }
137
138 pub fn write(&self, id: TableId, mut data: Vec<u8>) -> DbResult<()> {
139 let mut inner = self.inner.lock()?;
140 if inner.config.no_io {
141 return Ok(());
142 }
143
144 if let Some(tx_data) = &mut inner.tx_data {
145 tx_data.extend(Self::header(id, &data));
146 tx_data.append(&mut data);
147 return Ok(());
148 }
149
150 drop(inner);
151
152 self.write_to_file(Self::log_entry(id, data))
153 }
154
155 fn write_to_file(&self, data: Vec<u8>) -> DbResult<()> {
156 let mut inner = self.inner.lock()?;
157 if let Some(file) = inner.file.as_mut() {
158 file.write_all(&data)?;
159 }
160 Ok(())
161 }
162
163 pub fn header(id: TableId, data: &[u8]) -> [u8; 5] {
164 let size_info = (data.len() as ByteCount).to_be_bytes();
165 [id, size_info[0], size_info[1], size_info[2], size_info[3]]
166 }
167
168 pub fn log_entry(id: TableId, mut data: Vec<u8>) -> Vec<u8> {
169 let header = Self::header(id, &data);
170 data.reserve(header.len());
171 data.splice(0..0, header);
172 data
173 }
174
175 pub fn compact_log(&self, data: Vec<u8>) -> DbResult<()> {
176 let mut inner = self.inner.lock()?;
177 if inner.config.no_io {
178 return Ok(());
179 }
180
181 let temp_path = inner.config.compaction_location()?;
182 let final_path = inner.config.db_location_v2()?;
183
184 let mut file = Self::open_file(&inner.config, &temp_path)?;
185
186 let mut log_meta = inner
188 .log_metadata
189 .ok_or(DbError::Unexpected("log meta missing -- no_io == false"))?;
190 log_meta.compaction_count += 1;
191 let metadata_bytes = log_meta.to_bytes();
192 file.write_all(&metadata_bytes)?;
193
194 let compacted_data = Self::log_entry(0, data);
196 file.write_all(&compacted_data)?;
197
198 fs::rename(temp_path, final_path)?;
200 inner.file = Some(file);
201 inner.log_metadata = Some(log_meta);
202
203 Ok(())
204 }
205
206 fn handle_migration(config: &Config) -> DbResult<()> {
207 let v1 = config.db_location_v1()?;
208 let v2 = config.db_location_v2()?;
209 let v2_temp = PathBuf::from(format!("{}.migration", v2.to_string_lossy()));
210
211 if !v1.exists() {
212 return Ok(());
213 }
214
215 if v2_temp.exists() {
216 fs::remove_file(&v2_temp)?;
217 }
218
219 if v2.exists() {
220 return Ok(());
221 }
222
223 let v1_bytes = fs::read(&v1)?;
224 let mut v2_bytes = LogMetadata::default().to_bytes().to_vec();
225 v2_bytes.extend(v1_bytes);
226 fs::write(&v2_temp, v2_bytes)?;
227 fs::rename(v2_temp, v2)?;
228 fs::remove_file(v1)?;
229
230 Ok(())
231 }
232
233 fn open_file(config: &Config, db_location: &Path) -> DbResult<File> {
234 let file = OpenOptions::new()
235 .read(true)
236 .create(config.create_db || config.read_only)
237 .append(!config.read_only)
238 .open(db_location)?;
239
240 if config.fs_locks {
241 if config.fs_locks_block {
242 file.lock_exclusive()?;
243 } else {
244 file.try_lock_exclusive()?;
245 }
246 }
247
248 Ok(file)
249 }
250
251 pub(crate) fn config(&self) -> DbResult<Config> {
252 Ok(self.inner.lock()?.config.clone())
253 }
254
255 pub(crate) fn incomplete_write(&self) -> DbResult<bool> {
256 Ok(self.inner.lock()?.incomplete_write)
257 }
258
259 fn read_or_stamp_metadata(
260 config: &Config, file: &mut Option<File>,
261 ) -> DbResult<Option<LogMetadata>> {
262 match file {
263 Some(file) => {
264 let mut buffer = [0_u8; 2];
265 let bytes_read = file.read(&mut buffer)?;
266 let mut needs_stamp = false;
267 match bytes_read {
268 0 => {
269 needs_stamp = true;
270 buffer = LogMetadata::default().to_bytes();
271 }
272 2 => {}
273 _ => {
274 return Err(DbError::Unexpected(
275 "Unexpected amount of bytes read from log stamp",
276 ))
277 }
278 };
279
280 if !config.read_only && needs_stamp {
281 file.write_all(&buffer)?;
282 }
283 let meta = LogMetadata::from_bytes(buffer);
284 if meta.log_version != 1 {
285 return Err(DbError::Unexpected("unexpected log format version found"));
286 }
287
288 Ok(Some(meta))
289 }
290 None => Ok(None),
291 }
292 }
293}
294
295#[derive(Debug, Copy, Clone)]
296pub struct LogMetadata {
297 log_version: u8,
301
302 compaction_count: u8,
305}
306
307impl Default for LogMetadata {
308 fn default() -> Self {
309 Self { log_version: 1, compaction_count: 0 }
310 }
311}
312
313impl LogMetadata {
314 fn to_bytes(self) -> [u8; 2] {
315 [self.log_version, self.compaction_count]
316 }
317
318 fn from_bytes(bytes: [u8; 2]) -> Self {
319 Self { log_version: bytes[0], compaction_count: bytes[1] }
320 }
321}
322
323impl Drop for LoggerInner {
324 fn drop(&mut self) {
325 if let Some(file) = &self.file {
326 if self.config.fs_locks {
327 if let Err(e) = file.unlock() {
328 eprintln!("failed to unlock log lock: {:?}", e);
329 }
330 }
331 }
332 }
333}
334
335#[must_use = "DB stays in Tx mode while this value is in scope. Manually call drop_safely() to handle io errors that may arise when tx terminates."]
336pub struct TxHandle {
337 inner: Logger,
338}
339
340impl TxHandle {
341 pub fn drop_safely(&self) -> DbResult<()> {
342 self.inner.end_tx()
343 }
344}
345
346impl Drop for TxHandle {
347 fn drop(&mut self) {
348 self.drop_safely()
349 .expect("auto tx-end panicked. Call drop_safely() for non-panicking variant.");
350 }
351}