1use crate::config::Config;
2use crate::errors::DbResult;
3use crate::{ByteCount, DbError, TableId};
4use std::fs::{self, File, OpenOptions};
5use std::io::{Read, Write};
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8
9#[cfg(not(target_family = "wasm"))]
10use fs2::FileExt;
11
12pub struct LogFormat<'a> {
13 pub table_id: TableId,
14 pub bytes: &'a [u8],
15}
16
17#[derive(Clone, Debug)]
18pub struct Logger {
19 inner: Arc<Mutex<LoggerInner>>,
20}
21
22#[derive(Debug)]
23struct LoggerInner {
24 config: Config,
25 file: Option<File>,
26 log_metadata: Option<LogMetadata>,
27 incomplete_write: bool,
28 current_txs: usize,
29 tx_data: Option<Vec<u8>>,
30}
31
32impl Logger {
33 pub fn init(config: Config) -> DbResult<Self> {
34 if config.create_path {
35 fs::create_dir_all(&config.path)?;
37 }
38
39 let mut file = if config.no_io {
40 None
41 } else {
42 Self::handle_migration(&config)?;
43 Some(Self::open_file(&config, &config.db_location_v2()?)?)
44 };
45
46 let incomplete_write = false;
47 let tx_data = None;
48 let current_txs = 0;
49
50 let log_metadata = Self::read_or_stamp_metadata(&config, &mut file)?;
51
52 let inner = Arc::new(Mutex::new(LoggerInner {
53 file,
54 config,
55 incomplete_write,
56 tx_data,
57 current_txs,
58 log_metadata,
59 }));
60
61 Ok(Self { inner })
62 }
63
64 pub fn get_bytes(&self) -> DbResult<Vec<u8>> {
65 let mut buffer: Vec<u8> = Vec::new();
66
67 let mut inner = self.inner.lock()?;
68 if let Some(file) = inner.file.as_mut() {
69 file.read_to_end(&mut buffer)?;
70 }
71
72 Ok(buffer)
73 }
74
75 pub fn get_entries<'a>(&self, buffer: &'a [u8]) -> DbResult<Vec<LogFormat<'a>>> {
76 let mut index = 0;
77 let mut entries = vec![];
78
79 while index < buffer.len() {
80 if buffer.len() < index + 4 + 1 {
81 self.inner.lock()?.incomplete_write = true;
82 return Ok(entries);
83 }
84
85 let table_id = buffer[index];
86 index += 1;
87
88 let size = ByteCount::from_be_bytes(
89 buffer[index..index + 4]
90 .try_into()
91 .expect("slice with incorrect length"),
92 ) as usize;
93 index += 4;
94
95 if buffer.len() < index + size {
96 self.inner.lock()?.incomplete_write = true;
97 return Ok(entries);
98 }
99
100 if table_id == 0 {
101 continue;
102 }
103
104 let bytes = &buffer[index..index + size];
105 entries.push(LogFormat { table_id, bytes });
106 index += size;
107 }
108
109 Ok(entries)
110 }
111
112 pub fn begin_tx(&self) -> DbResult<TxHandle> {
113 let h = TxHandle { inner: self.clone() };
114 let mut inner = self.inner.lock()?;
115 if inner.tx_data.is_none() {
116 inner.tx_data = Some(vec![]);
117 }
118 inner.current_txs += 1;
119 Ok(h)
120 }
121
122 pub fn end_tx(&self) -> DbResult<()> {
123 let mut inner = self.inner.lock()?;
124 if inner.current_txs == 0 {
125 return Ok(());
126 }
127
128 inner.current_txs -= 1;
129 if inner.current_txs == 0 {
130 let data = inner.tx_data.take();
131 drop(inner);
132 if let Some(data) = data {
133 self.write_to_file(Self::log_entry(0, data))?;
134 }
135 }
136
137 Ok(())
138 }
139
140 pub fn write(&self, id: TableId, mut data: Vec<u8>) -> DbResult<()> {
141 let mut inner = self.inner.lock()?;
142 if inner.config.no_io {
143 return Ok(());
144 }
145
146 if let Some(tx_data) = &mut inner.tx_data {
147 tx_data.extend(Self::header(id, &data));
148 tx_data.append(&mut data);
149 return Ok(());
150 }
151
152 drop(inner);
153
154 self.write_to_file(Self::log_entry(id, data))
155 }
156
157 fn write_to_file(&self, data: Vec<u8>) -> DbResult<()> {
158 let mut inner = self.inner.lock()?;
159 if let Some(file) = inner.file.as_mut() {
160 file.write_all(&data)?;
161 }
162 Ok(())
163 }
164
165 pub fn header(id: TableId, data: &[u8]) -> [u8; 5] {
166 let size_info = (data.len() as ByteCount).to_be_bytes();
167 [id, size_info[0], size_info[1], size_info[2], size_info[3]]
168 }
169
170 pub fn log_entry(id: TableId, mut data: Vec<u8>) -> Vec<u8> {
171 let header = Self::header(id, &data);
172 data.reserve(header.len());
173 data.splice(0..0, header);
174 data
175 }
176
177 pub fn compact_log(&self, data: Vec<u8>) -> DbResult<()> {
178 let mut inner = self.inner.lock()?;
179 if inner.config.no_io {
180 return Ok(());
181 }
182
183 let temp_path = inner.config.compaction_location()?;
184 let final_path = inner.config.db_location_v2()?;
185
186 let mut file = Self::open_file(&inner.config, &temp_path)?;
187
188 let mut log_meta = inner
190 .log_metadata
191 .ok_or(DbError::Unexpected("log meta missing -- no_io == false"))?;
192 log_meta.compaction_count += 1;
193 let metadata_bytes = log_meta.to_bytes();
194 file.write_all(&metadata_bytes)?;
195
196 let compacted_data = Self::log_entry(0, data);
198 file.write_all(&compacted_data)?;
199
200 fs::rename(temp_path, final_path)?;
202 inner.file = Some(file);
203 inner.log_metadata = Some(log_meta);
204
205 Ok(())
206 }
207
208 fn handle_migration(config: &Config) -> DbResult<()> {
209 let v1 = config.db_location_v1()?;
210 let v2 = config.db_location_v2()?;
211 let v2_temp = PathBuf::from(format!("{}.migration", v2.to_string_lossy()));
212
213 if !v1.exists() {
214 return Ok(());
215 }
216
217 if v2_temp.exists() {
218 fs::remove_file(&v2_temp)?;
219 }
220
221 if v2.exists() {
222 return Ok(());
223 }
224
225 let v1_bytes = fs::read(&v1)?;
226 let mut v2_bytes = LogMetadata::default().to_bytes().to_vec();
227 v2_bytes.extend(v1_bytes);
228 fs::write(&v2_temp, v2_bytes)?;
229 fs::rename(v2_temp, v2)?;
230 fs::remove_file(v1)?;
231
232 Ok(())
233 }
234
235 fn open_file(config: &Config, db_location: &Path) -> DbResult<File> {
236 let file = OpenOptions::new()
237 .read(true)
238 .create(config.create_db || config.read_only)
239 .append(!config.read_only)
240 .open(db_location)?;
241
242 #[cfg(not(target_family = "wasm"))]
243 if config.fs_locks {
244 if config.fs_locks_block {
245 file.lock_exclusive()?;
246 } else {
247 file.try_lock_exclusive()?;
248 }
249 }
250 #[cfg(target_family = "wasm")]
251 if config.fs_locks {
252 return Err(DbError::Unexpected("File Locks are not supported on wasm"));
253 }
254
255 Ok(file)
256 }
257
258 pub(crate) fn config(&self) -> DbResult<Config> {
259 Ok(self.inner.lock()?.config.clone())
260 }
261
262 pub(crate) fn incomplete_write(&self) -> DbResult<bool> {
263 Ok(self.inner.lock()?.incomplete_write)
264 }
265
266 fn read_or_stamp_metadata(
267 config: &Config, file: &mut Option<File>,
268 ) -> DbResult<Option<LogMetadata>> {
269 match file {
270 Some(file) => {
271 let mut buffer = [0_u8; 2];
272 let bytes_read = file.read(&mut buffer)?;
273 let mut needs_stamp = false;
274 match bytes_read {
275 0 => {
276 needs_stamp = true;
277 buffer = LogMetadata::default().to_bytes();
278 }
279 2 => {}
280 _ => {
281 return Err(DbError::Unexpected(
282 "Unexpected amount of bytes read from log stamp",
283 ))
284 }
285 };
286
287 if !config.read_only && needs_stamp {
288 file.write_all(&buffer)?;
289 }
290 let meta = LogMetadata::from_bytes(buffer);
291 if meta.log_version != 1 {
292 return Err(DbError::Unexpected("unexpected log format version found"));
293 }
294
295 Ok(Some(meta))
296 }
297 None => Ok(None),
298 }
299 }
300}
301
302#[derive(Debug, Copy, Clone)]
303pub struct LogMetadata {
304 log_version: u8,
308
309 compaction_count: u8,
312}
313
314impl Default for LogMetadata {
315 fn default() -> Self {
316 Self { log_version: 1, compaction_count: 0 }
317 }
318}
319
320impl LogMetadata {
321 fn to_bytes(self) -> [u8; 2] {
322 [self.log_version, self.compaction_count]
323 }
324
325 fn from_bytes(bytes: [u8; 2]) -> Self {
326 Self { log_version: bytes[0], compaction_count: bytes[1] }
327 }
328}
329
330impl Drop for LoggerInner {
331 fn drop(&mut self) {
332 if let Some(file) = &self.file {
333 if self.config.fs_locks {
334 #[cfg(not(target_family = "wasm"))]
335 if let Err(e) = fs2::FileExt::unlock(file) {
336 eprintln!("failed to unlock log lock: {:?}", e);
337 }
338 }
339 }
340 }
341}
342
343#[must_use = "DB stays in Tx mode while this value is in scope. Manually call drop_safely() to handle io errors that may arise when tx terminates."]
344pub struct TxHandle {
345 inner: Logger,
346}
347
348impl TxHandle {
349 pub fn drop_safely(&self) -> DbResult<()> {
350 self.inner.end_tx()
351 }
352}
353
354impl Drop for TxHandle {
355 fn drop(&mut self) {
356 self.drop_safely()
357 .expect("auto tx-end panicked. Call drop_safely() for non-panicking variant.");
358 }
359}