1use crate::config::Config;
2use crate::errors::DbResult;
3use crate::{ByteCount, DbError, TableId};
4use std::fs::{self, File, OpenOptions};
5use std::io::{Read, Write};
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8
9#[cfg(not(target_family = "wasm"))]
10use fs2::FileExt;
11
12pub struct LogFormat<'a> {
13 pub index: usize,
14 pub table_id: TableId,
15 pub bytes: &'a [u8],
16}
17
18#[derive(Clone, Debug)]
19pub struct Logger {
20 inner: Arc<Mutex<LoggerInner>>,
21}
22
23#[derive(Debug)]
24struct LoggerInner {
25 config: Config,
26 file: Option<File>,
27 log_metadata: Option<LogMetadata>,
28 incomplete_write: bool,
29 current_txs: usize,
30 tx_data: Option<Vec<u8>>,
31}
32
33impl Logger {
34 pub fn init(config: Config) -> DbResult<Self> {
35 if config.create_path {
36 fs::create_dir_all(&config.path)?;
38 }
39
40 let mut file = if config.no_io {
41 None
42 } else {
43 Self::handle_migration(&config)?;
44 Some(Self::open_file(&config, &config.db_location_v2()?)?)
45 };
46
47 let incomplete_write = false;
48 let tx_data = None;
49 let current_txs = 0;
50
51 let log_metadata = Self::read_or_stamp_metadata(&config, &mut file)?;
52
53 let inner = Arc::new(Mutex::new(LoggerInner {
54 file,
55 config,
56 incomplete_write,
57 tx_data,
58 current_txs,
59 log_metadata,
60 }));
61
62 Ok(Self { inner })
63 }
64
65 pub fn get_bytes(&self) -> DbResult<Vec<u8>> {
66 let mut buffer: Vec<u8> = Vec::new();
67
68 let mut inner = self.inner.lock()?;
69 if let Some(file) = inner.file.as_mut() {
70 file.read_to_end(&mut buffer)?;
71 }
72
73 Ok(buffer)
74 }
75
76 pub fn get_entries<'a>(&self, buffer: &'a [u8]) -> DbResult<Vec<LogFormat<'a>>> {
77 let mut index = 0;
78 let mut entries = vec![];
79
80 while index < buffer.len() {
81 let index_capture = index + 2;
82 println!("{index}, {index_capture}");
83 if buffer.len() < index + 4 + 1 {
84 eprintln!("size missing!");
85 self.inner.lock()?.incomplete_write = true;
86 return Ok(entries);
87 }
88
89 let table_id = buffer[index];
90 index += 1;
91
92 let size = ByteCount::from_be_bytes(
93 buffer[index..index + 4]
94 .try_into()
95 .expect("slice with incorrect length"),
96 ) as usize;
97 index += 4;
98
99 if buffer.len() < index + size {
100 eprintln!("expected {} found {}", index + size, buffer.len());
101 self.inner.lock()?.incomplete_write = true;
102 return Ok(entries);
103 }
104
105 if table_id == 0 {
106 println!("TABLE_ID == 0");
107 continue;
108 }
109
110 let bytes = &buffer[index..index + size];
111 entries.push(LogFormat { index: index_capture, table_id, bytes });
112 index += size;
113 }
114
115 Ok(entries)
116 }
117
118 pub fn begin_tx(&self) -> DbResult<TxHandle> {
119 let h = TxHandle { inner: self.clone() };
120 let mut inner = self.inner.lock()?;
121 if inner.tx_data.is_none() {
122 inner.tx_data = Some(vec![]);
123 }
124 inner.current_txs += 1;
125 Ok(h)
126 }
127
128 pub fn end_tx(&self) -> DbResult<()> {
129 let mut inner = self.inner.lock()?;
130 if inner.current_txs == 0 {
131 return Ok(());
132 }
133
134 inner.current_txs -= 1;
135 if inner.current_txs == 0 {
136 let data = inner.tx_data.take();
137 drop(inner);
138 if let Some(data) = data {
139 self.write_to_file(Self::log_entry(0, data))?;
140 }
141 }
142
143 Ok(())
144 }
145
146 pub fn write(&self, id: TableId, mut data: Vec<u8>) -> DbResult<()> {
147 let mut inner = self.inner.lock()?;
148 if inner.config.no_io {
149 return Ok(());
150 }
151
152 if let Some(tx_data) = &mut inner.tx_data {
153 tx_data.extend(Self::header(id, &data));
154 tx_data.append(&mut data);
155 return Ok(());
156 }
157
158 drop(inner);
159
160 self.write_to_file(Self::log_entry(id, data))
161 }
162
163 fn write_to_file(&self, data: Vec<u8>) -> DbResult<()> {
164 let mut inner = self.inner.lock()?;
165 if let Some(file) = inner.file.as_mut() {
166 file.write_all(&data)?;
167 }
168 Ok(())
169 }
170
171 pub fn header(id: TableId, data: &[u8]) -> [u8; 5] {
172 let size_info = (data.len() as ByteCount).to_be_bytes();
173 [id, size_info[0], size_info[1], size_info[2], size_info[3]]
174 }
175
176 pub fn log_entry(id: TableId, mut data: Vec<u8>) -> Vec<u8> {
177 let header = Self::header(id, &data);
178 data.reserve(header.len());
179 data.splice(0..0, header);
180 data
181 }
182
183 pub fn compact_log(&self, data: Vec<u8>) -> DbResult<()> {
184 let mut inner = self.inner.lock()?;
185 if inner.config.no_io {
186 return Ok(());
187 }
188
189 let temp_path = inner.config.compaction_location()?;
190 let final_path = inner.config.db_location_v2()?;
191
192 let mut file = Self::open_file(&inner.config, &temp_path)?;
193
194 let mut log_meta = inner
196 .log_metadata
197 .ok_or(DbError::Unexpected("log meta missing -- no_io == false"))?;
198 log_meta.compaction_count += 1;
199 let metadata_bytes = log_meta.to_bytes();
200 file.write_all(&metadata_bytes)?;
201
202 let compacted_data = Self::log_entry(0, data);
204 file.write_all(&compacted_data)?;
205
206 fs::rename(temp_path, final_path)?;
208 inner.file = Some(file);
209 inner.log_metadata = Some(log_meta);
210
211 Ok(())
212 }
213
214 fn handle_migration(config: &Config) -> DbResult<()> {
215 let v1 = config.db_location_v1()?;
216 let v2 = config.db_location_v2()?;
217 let v2_temp = PathBuf::from(format!("{}.migration", v2.to_string_lossy()));
218
219 if !v1.exists() {
220 return Ok(());
221 }
222
223 if v2_temp.exists() {
224 fs::remove_file(&v2_temp)?;
225 }
226
227 if v2.exists() {
228 return Ok(());
229 }
230
231 let v1_bytes = fs::read(&v1)?;
232 let mut v2_bytes = LogMetadata::default().to_bytes().to_vec();
233 v2_bytes.extend(v1_bytes);
234 fs::write(&v2_temp, v2_bytes)?;
235 fs::rename(v2_temp, v2)?;
236 fs::remove_file(v1)?;
237
238 Ok(())
239 }
240
241 fn open_file(config: &Config, db_location: &Path) -> DbResult<File> {
242 let file = OpenOptions::new()
243 .read(true)
244 .create(config.create_db || config.read_only)
245 .append(!config.read_only)
246 .open(db_location)?;
247
248 #[cfg(not(target_family = "wasm"))]
249 if config.fs_locks {
250 if config.fs_locks_block {
251 file.lock_exclusive()?;
252 } else {
253 file.try_lock_exclusive()?;
254 }
255 }
256 #[cfg(target_family = "wasm")]
257 if config.fs_locks {
258 return Err(DbError::Unexpected("File Locks are not supported on wasm"));
259 }
260
261 Ok(file)
262 }
263
264 pub(crate) fn config(&self) -> DbResult<Config> {
265 Ok(self.inner.lock()?.config.clone())
266 }
267
268 pub(crate) fn incomplete_write(&self) -> DbResult<bool> {
269 Ok(self.inner.lock()?.incomplete_write)
270 }
271
272 fn read_or_stamp_metadata(
273 config: &Config, file: &mut Option<File>,
274 ) -> DbResult<Option<LogMetadata>> {
275 match file {
276 Some(file) => {
277 let mut buffer = [0_u8; 2];
278 let bytes_read = file.read(&mut buffer)?;
279 let mut needs_stamp = false;
280 match bytes_read {
281 0 => {
282 needs_stamp = true;
283 buffer = LogMetadata::default().to_bytes();
284 }
285 2 => {}
286 _ => {
287 return Err(DbError::Unexpected(
288 "Unexpected amount of bytes read from log stamp",
289 ))
290 }
291 };
292
293 if !config.read_only && needs_stamp {
294 file.write_all(&buffer)?;
295 }
296 let meta = LogMetadata::from_bytes(buffer);
297 if meta.log_version != 1 {
298 return Err(DbError::Unexpected("unexpected log format version found"));
299 }
300
301 Ok(Some(meta))
302 }
303 None => Ok(None),
304 }
305 }
306}
307
308#[derive(Debug, Copy, Clone)]
309pub struct LogMetadata {
310 log_version: u8,
314
315 compaction_count: u8,
318}
319
320impl Default for LogMetadata {
321 fn default() -> Self {
322 Self { log_version: 1, compaction_count: 0 }
323 }
324}
325
326impl LogMetadata {
327 fn to_bytes(self) -> [u8; 2] {
328 [self.log_version, self.compaction_count]
329 }
330
331 fn from_bytes(bytes: [u8; 2]) -> Self {
332 Self { log_version: bytes[0], compaction_count: bytes[1] }
333 }
334}
335
336impl Drop for LoggerInner {
337 fn drop(&mut self) {
338 if let Some(file) = &self.file {
339 if self.config.fs_locks {
340 #[cfg(not(target_family = "wasm"))]
341 if let Err(e) = fs2::FileExt::unlock(file) {
342 eprintln!("failed to unlock log lock: {:?}", e);
343 }
344 }
345 }
346 }
347}
348
349#[must_use = "DB stays in Tx mode while this value is in scope. Manually call drop_safely() to handle io errors that may arise when tx terminates."]
350pub struct TxHandle {
351 inner: Logger,
352}
353
354impl TxHandle {
355 pub fn drop_safely(&self) -> DbResult<()> {
356 self.inner.end_tx()
357 }
358}
359
360impl Drop for TxHandle {
361 fn drop(&mut self) {
362 self.drop_safely()
363 .expect("auto tx-end panicked. Call drop_safely() for non-panicking variant.");
364 }
365}