fjall/journal/
writer.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::marker::{serialize_marker_item, Marker};
6use crate::{
7    batch::item::Item as BatchItem, file::fsync_directory, journal::recovery::JournalId, InnerItem,
8    PartitionHandle,
9};
10use lsm_tree::{coding::Encode, EncodeError, SeqNo, ValueType};
11use std::{
12    fs::{File, OpenOptions},
13    hash::Hasher,
14    io::{BufWriter, Write},
15    path::{Path, PathBuf},
16};
17
18// TODO: this should be a keyspace configuration
19pub const PRE_ALLOCATED_BYTES: u64 = 32 * 1_024 * 1_024;
20
21pub const JOURNAL_BUFFER_BYTES: usize = 8 * 1_024;
22
23pub struct Writer {
24    pub(crate) path: PathBuf,
25    file: BufWriter<File>,
26    buf: Vec<u8>,
27
28    is_buffer_dirty: bool,
29}
30
31/// The persist mode allows setting the durability guarantee of previous writes
32#[derive(Copy, Clone, Debug, Eq, PartialEq)]
33pub enum PersistMode {
34    /// Flushes data to OS buffers. This allows the OS to write out data in case of an
35    /// application crash.
36    ///
37    /// When this function returns, data is **not** guaranteed to be persisted in case
38    /// of a power loss event or OS crash.
39    Buffer,
40
41    /// Flushes data using `fdatasync`.
42    ///
43    /// Use if you know that `fdatasync` is sufficient for your file system and/or operating system.
44    SyncData,
45
46    /// Flushes data + metadata using `fsync`.
47    SyncAll,
48}
49
50impl Writer {
51    pub fn len(&self) -> crate::Result<u64> {
52        Ok(self.file.get_ref().metadata()?.len())
53    }
54
55    pub fn rotate(&mut self) -> crate::Result<(PathBuf, PathBuf)> {
56        self.persist(PersistMode::SyncAll)?;
57
58        log::debug!(
59            "Sealing active journal at {:?}, len={}B",
60            self.path,
61            self.path
62                .metadata()
63                .inspect_err(|e| {
64                    log::error!(
65                        "Failed to get file metadata of journal file at {:?}: {e:?}",
66                        self.path
67                    );
68                })?
69                .len(),
70        );
71
72        let prev_path = self.path.clone();
73
74        let folder = self
75            .path
76            .parent()
77            .expect("should have parent")
78            .to_path_buf();
79
80        let journal_id = self
81            .path
82            .file_name()
83            .expect("should be valid file name")
84            .to_str()
85            .expect("should be valid journal file name")
86            .parse::<JournalId>()
87            .expect("should be valid journal ID");
88
89        let new_path = folder.join((journal_id + 1).to_string());
90        log::debug!("Rotating active journal to {new_path:?}");
91
92        *self = Self::create_new(new_path.clone())?;
93
94        // IMPORTANT: fsync folder on Unix
95        fsync_directory(&folder)?;
96
97        Ok((prev_path, new_path))
98    }
99
100    pub fn create_new<P: Into<PathBuf>>(path: P) -> crate::Result<Self> {
101        let path = path.into();
102
103        let file = File::create(&path).inspect_err(|e| {
104            log::error!("Failed to create journal file at {path:?}: {e:?}");
105        })?;
106
107        file.set_len(PRE_ALLOCATED_BYTES).inspect_err(|e| {
108            log::error!(
109                "Failed to set journal file size to {PRE_ALLOCATED_BYTES}B at {path:?}: {e:?}"
110            );
111        })?;
112
113        file.sync_all().inspect_err(|e| {
114            log::error!("Failed to fsync journal file at {path:?}: {e:?}");
115        })?;
116
117        Ok(Self {
118            path,
119            file: BufWriter::new(file),
120            buf: Vec::new(),
121            is_buffer_dirty: false,
122        })
123    }
124
125    pub fn from_file<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
126        let path = path.as_ref();
127
128        if !path.try_exists()? {
129            let file = OpenOptions::new()
130                .create_new(true)
131                .write(true)
132                .open(path)
133                .inspect_err(|e| {
134                    log::error!("Failed to create journal file at {path:?}: {e:?}");
135                })?;
136
137            file.set_len(PRE_ALLOCATED_BYTES).inspect_err(|e| {
138                log::error!(
139                    "Failed to set journal file size to {PRE_ALLOCATED_BYTES}B at {path:?}: {e:?}"
140                );
141            })?;
142
143            file.sync_all().inspect_err(|e| {
144                log::error!("Failed to fsync journal file at {path:?}: {e:?}");
145            })?;
146
147            return Ok(Self {
148                path: path.into(),
149                file: BufWriter::with_capacity(JOURNAL_BUFFER_BYTES, file),
150                buf: Vec::new(),
151                is_buffer_dirty: false,
152            });
153        }
154
155        let file = OpenOptions::new()
156            .append(true)
157            .open(path)
158            .inspect_err(|e| {
159                log::error!("Failed to open journal file at {path:?}: {e:?}");
160            })?;
161
162        Ok(Self {
163            path: path.into(),
164            file: BufWriter::with_capacity(JOURNAL_BUFFER_BYTES, file),
165            buf: Vec::new(),
166            is_buffer_dirty: false,
167        })
168    }
169
170    /// Persists the journal file.
171    pub(crate) fn persist(&mut self, mode: PersistMode) -> std::io::Result<()> {
172        log::trace!("Persisting journal at {:?} with mode={mode:?}", self.path);
173
174        if self.is_buffer_dirty {
175            self.file.flush().inspect_err(|e| {
176                log::error!(
177                    "Failed to flush journal IO buffers at {:?}: {e:?}",
178                    self.path
179                );
180            })?;
181            self.is_buffer_dirty = false;
182        }
183
184        match mode {
185            PersistMode::SyncAll => self.file.get_mut().sync_all().inspect_err(|e| {
186                log::error!("Failed to fsync journal file at {:?}: {e:?}", self.path);
187            }),
188            PersistMode::SyncData => self.file.get_mut().sync_data().inspect_err(|e| {
189                log::error!("Failed to fsyncdata journal file at {:?}: {e:?}", self.path);
190            }),
191            PersistMode::Buffer => Ok(()),
192        }
193    }
194
195    /// Writes a batch start marker to the journal
196    fn write_start(&mut self, item_count: u32, seqno: SeqNo) -> Result<usize, EncodeError> {
197        debug_assert!(self.buf.is_empty());
198
199        Marker::Start {
200            item_count,
201            seqno,
202            compression: lsm_tree::CompressionType::None,
203        }
204        .encode_into(&mut self.buf)?;
205
206        self.file.write_all(&self.buf)?;
207
208        Ok(self.buf.len())
209    }
210
211    /// Writes a batch end marker to the journal
212    fn write_end(&mut self, checksum: u64) -> Result<usize, EncodeError> {
213        debug_assert!(self.buf.is_empty());
214
215        Marker::End(checksum).encode_into(&mut self.buf)?;
216
217        self.file.write_all(&self.buf)?;
218
219        Ok(self.buf.len())
220    }
221
222    pub(crate) fn write_raw(
223        &mut self,
224        partition: &str,
225        key: &[u8],
226        value: &[u8],
227        value_type: ValueType,
228        seqno: u64,
229    ) -> crate::Result<usize> {
230        self.is_buffer_dirty = true;
231
232        let mut hasher = xxhash_rust::xxh3::Xxh3::new();
233        let mut byte_count = 0;
234
235        self.buf.clear();
236        byte_count += self.write_start(1, seqno)?;
237        self.buf.clear();
238
239        serialize_marker_item(&mut self.buf, partition, key, value, value_type)?;
240
241        self.file.write_all(&self.buf)?;
242
243        hasher.update(&self.buf);
244        byte_count += self.buf.len();
245
246        self.buf.clear();
247        let checksum = hasher.finish();
248        byte_count += self.write_end(checksum)?;
249
250        Ok(byte_count)
251    }
252
253    pub fn write_optimized_batch(
254        &mut self,
255        tuples: &[(&PartitionHandle, Vec<InnerItem>)],
256        batch_size: usize,
257        seqno: SeqNo,
258    ) -> crate::Result<usize> {
259        if batch_size == 0 {
260            return Ok(0);
261        }
262
263        self.is_buffer_dirty = true;
264
265        self.buf.clear();
266
267        // NOTE: entries.len() is surely never > u32::MAX
268        #[allow(clippy::cast_possible_truncation)]
269        let item_count = batch_size as u32;
270
271        let mut hasher = xxhash_rust::xxh3::Xxh3::new();
272        let mut byte_count = 0;
273
274        byte_count += self.write_start(item_count, seqno)?;
275        self.buf.clear();
276
277        for (partition, items) in tuples {
278            let partition = partition.name.clone();
279
280            for item in items {
281                debug_assert!(self.buf.is_empty());
282
283                serialize_marker_item(
284                    &mut self.buf,
285                    &partition,
286                    &item.key,
287                    &item.value,
288                    item.value_type,
289                )?;
290
291                self.file.write_all(&self.buf)?;
292
293                hasher.update(&self.buf);
294                byte_count += self.buf.len();
295
296                self.buf.clear();
297            }
298        }
299
300        let checksum = hasher.finish();
301        byte_count += self.write_end(checksum)?;
302
303        Ok(byte_count)
304    }
305
306    pub fn write_batch<'a>(
307        &mut self,
308        items: impl Iterator<Item = &'a BatchItem>,
309        batch_size: usize,
310        seqno: SeqNo,
311    ) -> crate::Result<usize> {
312        if batch_size == 0 {
313            return Ok(0);
314        }
315
316        self.is_buffer_dirty = true;
317
318        self.buf.clear();
319
320        // NOTE: entries.len() is surely never > u32::MAX
321        #[allow(clippy::cast_possible_truncation)]
322        let item_count = batch_size as u32;
323
324        let mut hasher = xxhash_rust::xxh3::Xxh3::new();
325        let mut byte_count = 0;
326
327        byte_count += self.write_start(item_count, seqno)?;
328        self.buf.clear();
329
330        for item in items {
331            debug_assert!(self.buf.is_empty());
332
333            serialize_marker_item(
334                &mut self.buf,
335                &item.partition,
336                &item.key,
337                &item.value,
338                item.value_type,
339            )?;
340
341            self.file.write_all(&self.buf)?;
342
343            hasher.update(&self.buf);
344            byte_count += self.buf.len();
345
346            self.buf.clear();
347        }
348
349        let checksum = hasher.finish();
350        byte_count += self.write_end(checksum)?;
351
352        Ok(byte_count)
353    }
354}