db_rs/
logger.rs

1use crate::config::Config;
2use crate::errors::DbResult;
3use crate::{ByteCount, DbError, TableId};
4use fs2::FileExt;
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10pub struct LogFormat<'a> {
11    pub table_id: TableId,
12    pub bytes: &'a [u8],
13}
14
15#[derive(Clone, Debug)]
16pub struct Logger {
17    inner: Arc<Mutex<LoggerInner>>,
18}
19
20#[derive(Debug)]
21struct LoggerInner {
22    config: Config,
23    file: Option<File>,
24    log_metadata: Option<LogMetadata>,
25    incomplete_write: bool,
26    current_txs: usize,
27    tx_data: Option<Vec<u8>>,
28}
29
30impl Logger {
31    pub fn init(config: Config) -> DbResult<Self> {
32        if config.create_path {
33            // todo: is this happening for no_io?
34            fs::create_dir_all(&config.path)?;
35        }
36
37        let mut file = if config.no_io {
38            None
39        } else {
40            Self::handle_migration(&config)?;
41            Some(Self::open_file(&config, &config.db_location_v2()?)?)
42        };
43
44        let incomplete_write = false;
45        let tx_data = None;
46        let current_txs = 0;
47
48        let log_metadata = Self::read_or_stamp_metadata(&config, &mut file)?;
49
50        let inner = Arc::new(Mutex::new(LoggerInner {
51            file,
52            config,
53            incomplete_write,
54            tx_data,
55            current_txs,
56            log_metadata,
57        }));
58
59        Ok(Self { inner })
60    }
61
62    pub fn get_bytes(&self) -> DbResult<Vec<u8>> {
63        let mut buffer: Vec<u8> = Vec::new();
64
65        let mut inner = self.inner.lock()?;
66        if let Some(file) = inner.file.as_mut() {
67            file.read_to_end(&mut buffer)?;
68        }
69
70        Ok(buffer)
71    }
72
73    pub fn get_entries<'a>(&self, buffer: &'a [u8]) -> DbResult<Vec<LogFormat<'a>>> {
74        let mut index = 0;
75        let mut entries = vec![];
76
77        while index < buffer.len() {
78            if buffer.len() < index + 4 + 1 {
79                self.inner.lock()?.incomplete_write = true;
80                return Ok(entries);
81            }
82
83            let table_id = buffer[index];
84            index += 1;
85
86            let size = ByteCount::from_be_bytes(
87                buffer[index..index + 4]
88                    .try_into()
89                    .expect("slice with incorrect length"),
90            ) as usize;
91            index += 4;
92
93            if buffer.len() < index + size {
94                self.inner.lock()?.incomplete_write = true;
95                return Ok(entries);
96            }
97
98            if table_id == 0 {
99                continue;
100            }
101
102            let bytes = &buffer[index..index + size];
103            entries.push(LogFormat { table_id, bytes });
104            index += size;
105        }
106
107        Ok(entries)
108    }
109
110    pub fn begin_tx(&self) -> DbResult<TxHandle> {
111        let h = TxHandle { inner: self.clone() };
112        let mut inner = self.inner.lock()?;
113        if inner.tx_data.is_none() {
114            inner.tx_data = Some(vec![]);
115        }
116        inner.current_txs += 1;
117        Ok(h)
118    }
119
120    pub fn end_tx(&self) -> DbResult<()> {
121        let mut inner = self.inner.lock()?;
122        if inner.current_txs == 0 {
123            return Ok(());
124        }
125
126        inner.current_txs -= 1;
127        if inner.current_txs == 0 {
128            let data = inner.tx_data.take();
129            drop(inner);
130            if let Some(data) = data {
131                self.write_to_file(Self::log_entry(0, data))?;
132            }
133        }
134
135        Ok(())
136    }
137
138    pub fn write(&self, id: TableId, mut data: Vec<u8>) -> DbResult<()> {
139        let mut inner = self.inner.lock()?;
140        if inner.config.no_io {
141            return Ok(());
142        }
143
144        if let Some(tx_data) = &mut inner.tx_data {
145            tx_data.extend(Self::header(id, &data));
146            tx_data.append(&mut data);
147            return Ok(());
148        }
149
150        drop(inner);
151
152        self.write_to_file(Self::log_entry(id, data))
153    }
154
155    fn write_to_file(&self, data: Vec<u8>) -> DbResult<()> {
156        let mut inner = self.inner.lock()?;
157        if let Some(file) = inner.file.as_mut() {
158            file.write_all(&data)?;
159        }
160        Ok(())
161    }
162
163    pub fn header(id: TableId, data: &[u8]) -> [u8; 5] {
164        let size_info = (data.len() as ByteCount).to_be_bytes();
165        [id, size_info[0], size_info[1], size_info[2], size_info[3]]
166    }
167
168    pub fn log_entry(id: TableId, mut data: Vec<u8>) -> Vec<u8> {
169        let header = Self::header(id, &data);
170        data.reserve(header.len());
171        data.splice(0..0, header);
172        data
173    }
174
175    pub fn compact_log(&self, data: Vec<u8>) -> DbResult<()> {
176        let mut inner = self.inner.lock()?;
177        if inner.config.no_io {
178            return Ok(());
179        }
180
181        let temp_path = inner.config.compaction_location()?;
182        let final_path = inner.config.db_location_v2()?;
183
184        let mut file = Self::open_file(&inner.config, &temp_path)?;
185
186        // write compaction count for future IPC reasons
187        let mut log_meta = inner
188            .log_metadata
189            .ok_or(DbError::Unexpected("log meta missing -- no_io == false"))?;
190        log_meta.compaction_count += 1;
191        let metadata_bytes = log_meta.to_bytes();
192        file.write_all(&metadata_bytes)?;
193
194        // write compacted data to a temporary file
195        let compacted_data = Self::log_entry(0, data);
196        file.write_all(&compacted_data)?;
197
198        // atomically make this the new log
199        fs::rename(temp_path, final_path)?;
200        inner.file = Some(file);
201        inner.log_metadata = Some(log_meta);
202
203        Ok(())
204    }
205
206    fn handle_migration(config: &Config) -> DbResult<()> {
207        let v1 = config.db_location_v1()?;
208        let v2 = config.db_location_v2()?;
209        let v2_temp = PathBuf::from(format!("{}.migration", v2.to_string_lossy()));
210
211        if !v1.exists() {
212            return Ok(());
213        }
214
215        if v2_temp.exists() {
216            fs::remove_file(&v2_temp)?;
217        }
218
219        if v2.exists() {
220            return Ok(());
221        }
222
223        let v1_bytes = fs::read(&v1)?;
224        let mut v2_bytes = LogMetadata::default().to_bytes().to_vec();
225        v2_bytes.extend(v1_bytes);
226        fs::write(&v2_temp, v2_bytes)?;
227        fs::rename(v2_temp, v2)?;
228        fs::remove_file(v1)?;
229
230        Ok(())
231    }
232
233    fn open_file(config: &Config, db_location: &Path) -> DbResult<File> {
234        let file = OpenOptions::new()
235            .read(true)
236            .create(config.create_db || config.read_only)
237            .append(!config.read_only)
238            .open(db_location)?;
239
240        if config.fs_locks {
241            if config.fs_locks_block {
242                file.lock_exclusive()?;
243            } else {
244                file.try_lock_exclusive()?;
245            }
246        }
247
248        Ok(file)
249    }
250
251    pub(crate) fn config(&self) -> DbResult<Config> {
252        Ok(self.inner.lock()?.config.clone())
253    }
254
255    pub(crate) fn incomplete_write(&self) -> DbResult<bool> {
256        Ok(self.inner.lock()?.incomplete_write)
257    }
258
259    fn read_or_stamp_metadata(
260        config: &Config, file: &mut Option<File>,
261    ) -> DbResult<Option<LogMetadata>> {
262        match file {
263            Some(file) => {
264                let mut buffer = [0_u8; 2];
265                let bytes_read = file.read(&mut buffer)?;
266                let mut needs_stamp = false;
267                match bytes_read {
268                    0 => {
269                        needs_stamp = true;
270                        buffer = LogMetadata::default().to_bytes();
271                    }
272                    2 => {}
273                    _ => {
274                        return Err(DbError::Unexpected(
275                            "Unexpected amount of bytes read from log stamp",
276                        ))
277                    }
278                };
279
280                if !config.read_only && needs_stamp {
281                    file.write_all(&buffer)?;
282                }
283                let meta = LogMetadata::from_bytes(buffer);
284                if meta.log_version != 1 {
285                    return Err(DbError::Unexpected("unexpected log format version found"));
286                }
287
288                Ok(Some(meta))
289            }
290            None => Ok(None),
291        }
292    }
293}
294
295#[derive(Debug, Copy, Clone)]
296pub struct LogMetadata {
297    /// knowing the log version that we're reading allows us to evolve the format and make breaking
298    /// changes. At the very least, allows us to return an error in the event of a version mismatch
299    /// (leaving the migration up to the client)
300    log_version: u8,
301
302    /// compaction count is going to be a key data point to read when there are multiple processes
303    /// reading and operating on the same log
304    compaction_count: u8,
305}
306
307impl Default for LogMetadata {
308    fn default() -> Self {
309        Self { log_version: 1, compaction_count: 0 }
310    }
311}
312
313impl LogMetadata {
314    fn to_bytes(self) -> [u8; 2] {
315        [self.log_version, self.compaction_count]
316    }
317
318    fn from_bytes(bytes: [u8; 2]) -> Self {
319        Self { log_version: bytes[0], compaction_count: bytes[1] }
320    }
321}
322
323impl Drop for LoggerInner {
324    fn drop(&mut self) {
325        if let Some(file) = &self.file {
326            if self.config.fs_locks {
327                if let Err(e) = file.unlock() {
328                    eprintln!("failed to unlock log lock: {:?}", e);
329                }
330            }
331        }
332    }
333}
334
335#[must_use = "DB stays in Tx mode while this value is in scope. Manually call drop_safely() to handle io errors that may arise when tx terminates."]
336pub struct TxHandle {
337    inner: Logger,
338}
339
340impl TxHandle {
341    pub fn drop_safely(&self) -> DbResult<()> {
342        self.inner.end_tx()
343    }
344}
345
346impl Drop for TxHandle {
347    fn drop(&mut self) {
348        self.drop_safely()
349            .expect("auto tx-end panicked. Call drop_safely() for non-panicking variant.");
350    }
351}