db_rs/
logger.rs

1use crate::config::Config;
2use crate::errors::DbResult;
3use crate::{ByteCount, DbError, TableId};
4use std::fs::{self, File, OpenOptions};
5use std::io::{Read, Write};
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8
9#[cfg(not(target_family = "wasm"))]
10use fs2::FileExt;
11
12pub struct LogFormat<'a> {
13    pub table_id: TableId,
14    pub bytes: &'a [u8],
15}
16
17#[derive(Clone, Debug)]
18pub struct Logger {
19    inner: Arc<Mutex<LoggerInner>>,
20}
21
22#[derive(Debug)]
23struct LoggerInner {
24    config: Config,
25    file: Option<File>,
26    log_metadata: Option<LogMetadata>,
27    incomplete_write: bool,
28    current_txs: usize,
29    tx_data: Option<Vec<u8>>,
30}
31
32impl Logger {
33    pub fn init(config: Config) -> DbResult<Self> {
34        if config.create_path {
35            // todo: is this happening for no_io?
36            fs::create_dir_all(&config.path)?;
37        }
38
39        let mut file = if config.no_io {
40            None
41        } else {
42            Self::handle_migration(&config)?;
43            Some(Self::open_file(&config, &config.db_location_v2()?)?)
44        };
45
46        let incomplete_write = false;
47        let tx_data = None;
48        let current_txs = 0;
49
50        let log_metadata = Self::read_or_stamp_metadata(&config, &mut file)?;
51
52        let inner = Arc::new(Mutex::new(LoggerInner {
53            file,
54            config,
55            incomplete_write,
56            tx_data,
57            current_txs,
58            log_metadata,
59        }));
60
61        Ok(Self { inner })
62    }
63
64    pub fn get_bytes(&self) -> DbResult<Vec<u8>> {
65        let mut buffer: Vec<u8> = Vec::new();
66
67        let mut inner = self.inner.lock()?;
68        if let Some(file) = inner.file.as_mut() {
69            file.read_to_end(&mut buffer)?;
70        }
71
72        Ok(buffer)
73    }
74
75    pub fn get_entries<'a>(&self, buffer: &'a [u8]) -> DbResult<Vec<LogFormat<'a>>> {
76        let mut index = 0;
77        let mut entries = vec![];
78
79        while index < buffer.len() {
80            if buffer.len() < index + 4 + 1 {
81                self.inner.lock()?.incomplete_write = true;
82                return Ok(entries);
83            }
84
85            let table_id = buffer[index];
86            index += 1;
87
88            let size = ByteCount::from_be_bytes(
89                buffer[index..index + 4]
90                    .try_into()
91                    .expect("slice with incorrect length"),
92            ) as usize;
93            index += 4;
94
95            if buffer.len() < index + size {
96                self.inner.lock()?.incomplete_write = true;
97                return Ok(entries);
98            }
99
100            if table_id == 0 {
101                continue;
102            }
103
104            let bytes = &buffer[index..index + size];
105            entries.push(LogFormat { table_id, bytes });
106            index += size;
107        }
108
109        Ok(entries)
110    }
111
112    pub fn begin_tx(&self) -> DbResult<TxHandle> {
113        let h = TxHandle { inner: self.clone() };
114        let mut inner = self.inner.lock()?;
115        if inner.tx_data.is_none() {
116            inner.tx_data = Some(vec![]);
117        }
118        inner.current_txs += 1;
119        Ok(h)
120    }
121
122    pub fn end_tx(&self) -> DbResult<()> {
123        let mut inner = self.inner.lock()?;
124        if inner.current_txs == 0 {
125            return Ok(());
126        }
127
128        inner.current_txs -= 1;
129        if inner.current_txs == 0 {
130            let data = inner.tx_data.take();
131            drop(inner);
132            if let Some(data) = data {
133                self.write_to_file(Self::log_entry(0, data))?;
134            }
135        }
136
137        Ok(())
138    }
139
140    pub fn write(&self, id: TableId, mut data: Vec<u8>) -> DbResult<()> {
141        let mut inner = self.inner.lock()?;
142        if inner.config.no_io {
143            return Ok(());
144        }
145
146        if let Some(tx_data) = &mut inner.tx_data {
147            tx_data.extend(Self::header(id, &data));
148            tx_data.append(&mut data);
149            return Ok(());
150        }
151
152        drop(inner);
153
154        self.write_to_file(Self::log_entry(id, data))
155    }
156
157    fn write_to_file(&self, data: Vec<u8>) -> DbResult<()> {
158        let mut inner = self.inner.lock()?;
159        if let Some(file) = inner.file.as_mut() {
160            file.write_all(&data)?;
161        }
162        Ok(())
163    }
164
165    pub fn header(id: TableId, data: &[u8]) -> [u8; 5] {
166        let size_info = (data.len() as ByteCount).to_be_bytes();
167        [id, size_info[0], size_info[1], size_info[2], size_info[3]]
168    }
169
170    pub fn log_entry(id: TableId, mut data: Vec<u8>) -> Vec<u8> {
171        let header = Self::header(id, &data);
172        data.reserve(header.len());
173        data.splice(0..0, header);
174        data
175    }
176
177    pub fn compact_log(&self, data: Vec<u8>) -> DbResult<()> {
178        let mut inner = self.inner.lock()?;
179        if inner.config.no_io {
180            return Ok(());
181        }
182
183        let temp_path = inner.config.compaction_location()?;
184        let final_path = inner.config.db_location_v2()?;
185
186        let mut file = Self::open_file(&inner.config, &temp_path)?;
187
188        // write compaction count for future IPC reasons
189        let mut log_meta = inner
190            .log_metadata
191            .ok_or(DbError::Unexpected("log meta missing -- no_io == false"))?;
192        log_meta.compaction_count += 1;
193        let metadata_bytes = log_meta.to_bytes();
194        file.write_all(&metadata_bytes)?;
195
196        // write compacted data to a temporary file
197        let compacted_data = Self::log_entry(0, data);
198        file.write_all(&compacted_data)?;
199
200        // atomically make this the new log
201        fs::rename(temp_path, final_path)?;
202        inner.file = Some(file);
203        inner.log_metadata = Some(log_meta);
204
205        Ok(())
206    }
207
208    fn handle_migration(config: &Config) -> DbResult<()> {
209        let v1 = config.db_location_v1()?;
210        let v2 = config.db_location_v2()?;
211        let v2_temp = PathBuf::from(format!("{}.migration", v2.to_string_lossy()));
212
213        if !v1.exists() {
214            return Ok(());
215        }
216
217        if v2_temp.exists() {
218            fs::remove_file(&v2_temp)?;
219        }
220
221        if v2.exists() {
222            return Ok(());
223        }
224
225        let v1_bytes = fs::read(&v1)?;
226        let mut v2_bytes = LogMetadata::default().to_bytes().to_vec();
227        v2_bytes.extend(v1_bytes);
228        fs::write(&v2_temp, v2_bytes)?;
229        fs::rename(v2_temp, v2)?;
230        fs::remove_file(v1)?;
231
232        Ok(())
233    }
234
235    fn open_file(config: &Config, db_location: &Path) -> DbResult<File> {
236        let file = OpenOptions::new()
237            .read(true)
238            .create(config.create_db || config.read_only)
239            .append(!config.read_only)
240            .open(db_location)?;
241
242        #[cfg(not(target_family = "wasm"))]
243        if config.fs_locks {
244            if config.fs_locks_block {
245                file.lock_exclusive()?;
246            } else {
247                file.try_lock_exclusive()?;
248            }
249        }
250        #[cfg(target_family = "wasm")]
251        if config.fs_locks {
252            return Err(DbError::Unexpected("File Locks are not supported on wasm"));
253        }
254
255        Ok(file)
256    }
257
258    pub(crate) fn config(&self) -> DbResult<Config> {
259        Ok(self.inner.lock()?.config.clone())
260    }
261
262    pub(crate) fn incomplete_write(&self) -> DbResult<bool> {
263        Ok(self.inner.lock()?.incomplete_write)
264    }
265
266    fn read_or_stamp_metadata(
267        config: &Config, file: &mut Option<File>,
268    ) -> DbResult<Option<LogMetadata>> {
269        match file {
270            Some(file) => {
271                let mut buffer = [0_u8; 2];
272                let bytes_read = file.read(&mut buffer)?;
273                let mut needs_stamp = false;
274                match bytes_read {
275                    0 => {
276                        needs_stamp = true;
277                        buffer = LogMetadata::default().to_bytes();
278                    }
279                    2 => {}
280                    _ => {
281                        return Err(DbError::Unexpected(
282                            "Unexpected amount of bytes read from log stamp",
283                        ))
284                    }
285                };
286
287                if !config.read_only && needs_stamp {
288                    file.write_all(&buffer)?;
289                }
290                let meta = LogMetadata::from_bytes(buffer);
291                if meta.log_version != 1 {
292                    return Err(DbError::Unexpected("unexpected log format version found"));
293                }
294
295                Ok(Some(meta))
296            }
297            None => Ok(None),
298        }
299    }
300}
301
302#[derive(Debug, Copy, Clone)]
303pub struct LogMetadata {
304    /// knowing the log version that we're reading allows us to evolve the format and make breaking
305    /// changes. At the very least, allows us to return an error in the event of a version mismatch
306    /// (leaving the migration up to the client)
307    log_version: u8,
308
309    /// compaction count is going to be a key data point to read when there are multiple processes
310    /// reading and operating on the same log
311    compaction_count: u8,
312}
313
314impl Default for LogMetadata {
315    fn default() -> Self {
316        Self { log_version: 1, compaction_count: 0 }
317    }
318}
319
320impl LogMetadata {
321    fn to_bytes(self) -> [u8; 2] {
322        [self.log_version, self.compaction_count]
323    }
324
325    fn from_bytes(bytes: [u8; 2]) -> Self {
326        Self { log_version: bytes[0], compaction_count: bytes[1] }
327    }
328}
329
330impl Drop for LoggerInner {
331    fn drop(&mut self) {
332        if let Some(file) = &self.file {
333            if self.config.fs_locks {
334                #[cfg(not(target_family = "wasm"))]
335                if let Err(e) = fs2::FileExt::unlock(file) {
336                    eprintln!("failed to unlock log lock: {:?}", e);
337                }
338            }
339        }
340    }
341}
342
343#[must_use = "DB stays in Tx mode while this value is in scope. Manually call drop_safely() to handle io errors that may arise when tx terminates."]
344pub struct TxHandle {
345    inner: Logger,
346}
347
348impl TxHandle {
349    pub fn drop_safely(&self) -> DbResult<()> {
350        self.inner.end_tx()
351    }
352}
353
354impl Drop for TxHandle {
355    fn drop(&mut self) {
356        self.drop_safely()
357            .expect("auto tx-end panicked. Call drop_safely() for non-panicking variant.");
358    }
359}