db_rs/
logger.rs

1use crate::config::Config;
2use crate::errors::DbResult;
3use crate::{ByteCount, DbError, TableId};
4use std::fs::{self, File, OpenOptions};
5use std::io::{Read, Write};
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8
9#[cfg(not(target_family = "wasm"))]
10use fs2::FileExt;
11
12pub struct LogFormat<'a> {
13    pub index: usize,
14    pub table_id: TableId,
15    pub bytes: &'a [u8],
16}
17
18#[derive(Clone, Debug)]
19pub struct Logger {
20    inner: Arc<Mutex<LoggerInner>>,
21}
22
23#[derive(Debug)]
24struct LoggerInner {
25    config: Config,
26    file: Option<File>,
27    log_metadata: Option<LogMetadata>,
28    incomplete_write: bool,
29    current_txs: usize,
30    tx_data: Option<Vec<u8>>,
31}
32
33impl Logger {
34    pub fn init(config: Config) -> DbResult<Self> {
35        if config.create_path {
36            // todo: is this happening for no_io?
37            fs::create_dir_all(&config.path)?;
38        }
39
40        let mut file = if config.no_io {
41            None
42        } else {
43            Self::handle_migration(&config)?;
44            Some(Self::open_file(&config, &config.db_location_v2()?)?)
45        };
46
47        let incomplete_write = false;
48        let tx_data = None;
49        let current_txs = 0;
50
51        let log_metadata = Self::read_or_stamp_metadata(&config, &mut file)?;
52
53        let inner = Arc::new(Mutex::new(LoggerInner {
54            file,
55            config,
56            incomplete_write,
57            tx_data,
58            current_txs,
59            log_metadata,
60        }));
61
62        Ok(Self { inner })
63    }
64
65    pub fn get_bytes(&self) -> DbResult<Vec<u8>> {
66        let mut buffer: Vec<u8> = Vec::new();
67
68        let mut inner = self.inner.lock()?;
69        if let Some(file) = inner.file.as_mut() {
70            file.read_to_end(&mut buffer)?;
71        }
72
73        Ok(buffer)
74    }
75
76    pub fn get_entries<'a>(&self, buffer: &'a [u8]) -> DbResult<Vec<LogFormat<'a>>> {
77        let mut index = 0;
78        let mut entries = vec![];
79
80        while index < buffer.len() {
81            let index_capture = index + 2;
82            println!("{index}, {index_capture}");
83            if buffer.len() < index + 4 + 1 {
84                eprintln!("size missing!");
85                self.inner.lock()?.incomplete_write = true;
86                return Ok(entries);
87            }
88
89            let table_id = buffer[index];
90            index += 1;
91
92            let size = ByteCount::from_be_bytes(
93                buffer[index..index + 4]
94                    .try_into()
95                    .expect("slice with incorrect length"),
96            ) as usize;
97            index += 4;
98
99            if buffer.len() < index + size {
100                eprintln!("expected {} found {}", index + size, buffer.len());
101                self.inner.lock()?.incomplete_write = true;
102                return Ok(entries);
103            }
104
105            if table_id == 0 {
106                println!("TABLE_ID == 0");
107                continue;
108            }
109
110            let bytes = &buffer[index..index + size];
111            entries.push(LogFormat { index: index_capture, table_id, bytes });
112            index += size;
113        }
114
115        Ok(entries)
116    }
117
118    pub fn begin_tx(&self) -> DbResult<TxHandle> {
119        let h = TxHandle { inner: self.clone() };
120        let mut inner = self.inner.lock()?;
121        if inner.tx_data.is_none() {
122            inner.tx_data = Some(vec![]);
123        }
124        inner.current_txs += 1;
125        Ok(h)
126    }
127
128    pub fn end_tx(&self) -> DbResult<()> {
129        let mut inner = self.inner.lock()?;
130        if inner.current_txs == 0 {
131            return Ok(());
132        }
133
134        inner.current_txs -= 1;
135        if inner.current_txs == 0 {
136            let data = inner.tx_data.take();
137            drop(inner);
138            if let Some(data) = data {
139                self.write_to_file(Self::log_entry(0, data))?;
140            }
141        }
142
143        Ok(())
144    }
145
146    pub fn write(&self, id: TableId, mut data: Vec<u8>) -> DbResult<()> {
147        let mut inner = self.inner.lock()?;
148        if inner.config.no_io {
149            return Ok(());
150        }
151
152        if let Some(tx_data) = &mut inner.tx_data {
153            tx_data.extend(Self::header(id, &data));
154            tx_data.append(&mut data);
155            return Ok(());
156        }
157
158        drop(inner);
159
160        self.write_to_file(Self::log_entry(id, data))
161    }
162
163    fn write_to_file(&self, data: Vec<u8>) -> DbResult<()> {
164        let mut inner = self.inner.lock()?;
165        if let Some(file) = inner.file.as_mut() {
166            file.write_all(&data)?;
167        }
168        Ok(())
169    }
170
171    pub fn header(id: TableId, data: &[u8]) -> [u8; 5] {
172        let size_info = (data.len() as ByteCount).to_be_bytes();
173        [id, size_info[0], size_info[1], size_info[2], size_info[3]]
174    }
175
176    pub fn log_entry(id: TableId, mut data: Vec<u8>) -> Vec<u8> {
177        let header = Self::header(id, &data);
178        data.reserve(header.len());
179        data.splice(0..0, header);
180        data
181    }
182
183    pub fn compact_log(&self, data: Vec<u8>) -> DbResult<()> {
184        let mut inner = self.inner.lock()?;
185        if inner.config.no_io {
186            return Ok(());
187        }
188
189        let temp_path = inner.config.compaction_location()?;
190        let final_path = inner.config.db_location_v2()?;
191
192        let mut file = Self::open_file(&inner.config, &temp_path)?;
193
194        // write compaction count for future IPC reasons
195        let mut log_meta = inner
196            .log_metadata
197            .ok_or(DbError::Unexpected("log meta missing -- no_io == false"))?;
198        log_meta.compaction_count += 1;
199        let metadata_bytes = log_meta.to_bytes();
200        file.write_all(&metadata_bytes)?;
201
202        // write compacted data to a temporary file
203        let compacted_data = Self::log_entry(0, data);
204        file.write_all(&compacted_data)?;
205
206        // atomically make this the new log
207        fs::rename(temp_path, final_path)?;
208        inner.file = Some(file);
209        inner.log_metadata = Some(log_meta);
210
211        Ok(())
212    }
213
214    fn handle_migration(config: &Config) -> DbResult<()> {
215        let v1 = config.db_location_v1()?;
216        let v2 = config.db_location_v2()?;
217        let v2_temp = PathBuf::from(format!("{}.migration", v2.to_string_lossy()));
218
219        if !v1.exists() {
220            return Ok(());
221        }
222
223        if v2_temp.exists() {
224            fs::remove_file(&v2_temp)?;
225        }
226
227        if v2.exists() {
228            return Ok(());
229        }
230
231        let v1_bytes = fs::read(&v1)?;
232        let mut v2_bytes = LogMetadata::default().to_bytes().to_vec();
233        v2_bytes.extend(v1_bytes);
234        fs::write(&v2_temp, v2_bytes)?;
235        fs::rename(v2_temp, v2)?;
236        fs::remove_file(v1)?;
237
238        Ok(())
239    }
240
241    fn open_file(config: &Config, db_location: &Path) -> DbResult<File> {
242        let file = OpenOptions::new()
243            .read(true)
244            .create(config.create_db || config.read_only)
245            .append(!config.read_only)
246            .open(db_location)?;
247
248        #[cfg(not(target_family = "wasm"))]
249        if config.fs_locks {
250            if config.fs_locks_block {
251                file.lock_exclusive()?;
252            } else {
253                file.try_lock_exclusive()?;
254            }
255        }
256        #[cfg(target_family = "wasm")]
257        if config.fs_locks {
258            return Err(DbError::Unexpected("File Locks are not supported on wasm"));
259        }
260
261        Ok(file)
262    }
263
264    pub(crate) fn config(&self) -> DbResult<Config> {
265        Ok(self.inner.lock()?.config.clone())
266    }
267
268    pub(crate) fn incomplete_write(&self) -> DbResult<bool> {
269        Ok(self.inner.lock()?.incomplete_write)
270    }
271
272    fn read_or_stamp_metadata(
273        config: &Config, file: &mut Option<File>,
274    ) -> DbResult<Option<LogMetadata>> {
275        match file {
276            Some(file) => {
277                let mut buffer = [0_u8; 2];
278                let bytes_read = file.read(&mut buffer)?;
279                let mut needs_stamp = false;
280                match bytes_read {
281                    0 => {
282                        needs_stamp = true;
283                        buffer = LogMetadata::default().to_bytes();
284                    }
285                    2 => {}
286                    _ => {
287                        return Err(DbError::Unexpected(
288                            "Unexpected amount of bytes read from log stamp",
289                        ))
290                    }
291                };
292
293                if !config.read_only && needs_stamp {
294                    file.write_all(&buffer)?;
295                }
296                let meta = LogMetadata::from_bytes(buffer);
297                if meta.log_version != 1 {
298                    return Err(DbError::Unexpected("unexpected log format version found"));
299                }
300
301                Ok(Some(meta))
302            }
303            None => Ok(None),
304        }
305    }
306}
307
308#[derive(Debug, Copy, Clone)]
309pub struct LogMetadata {
310    /// knowing the log version that we're reading allows us to evolve the format and make breaking
311    /// changes. At the very least, allows us to return an error in the event of a version mismatch
312    /// (leaving the migration up to the client)
313    log_version: u8,
314
315    /// compaction count is going to be a key data point to read when there are multiple processes
316    /// reading and operating on the same log
317    compaction_count: u8,
318}
319
320impl Default for LogMetadata {
321    fn default() -> Self {
322        Self { log_version: 1, compaction_count: 0 }
323    }
324}
325
326impl LogMetadata {
327    fn to_bytes(self) -> [u8; 2] {
328        [self.log_version, self.compaction_count]
329    }
330
331    fn from_bytes(bytes: [u8; 2]) -> Self {
332        Self { log_version: bytes[0], compaction_count: bytes[1] }
333    }
334}
335
336impl Drop for LoggerInner {
337    fn drop(&mut self) {
338        if let Some(file) = &self.file {
339            if self.config.fs_locks {
340                #[cfg(not(target_family = "wasm"))]
341                if let Err(e) = fs2::FileExt::unlock(file) {
342                    eprintln!("failed to unlock log lock: {:?}", e);
343                }
344            }
345        }
346    }
347}
348
349#[must_use = "DB stays in Tx mode while this value is in scope. Manually call drop_safely() to handle io errors that may arise when tx terminates."]
350pub struct TxHandle {
351    inner: Logger,
352}
353
354impl TxHandle {
355    pub fn drop_safely(&self) -> DbResult<()> {
356        self.inner.end_tx()
357    }
358}
359
360impl Drop for TxHandle {
361    fn drop(&mut self) {
362        self.drop_safely()
363            .expect("auto tx-end panicked. Call drop_safely() for non-panicking variant.");
364    }
365}