Skip to main content

tempest_core/journal/
mod.rs

1use std::{cell::RefCell, io, path::PathBuf, rc::Rc};
2
3use bincode::Options;
4use bytes::{BufMut, BytesMut};
5use tempest_io::{Io, IoBuf, OpenOptions, Statx};
6use tempest_rt::{
7    JoinHandle, close_file, list_dir, open_file, read_exact, remove_file, spawn, stat_file,
8    sync::{
9        mpsc::{BoundedSender, bounded},
10        oneshot,
11    },
12    sync_file, write_exact,
13};
14
15use crate::{bincode_options, utils::ByteSize};
16
17mod config;
18mod header;
19
20#[cfg(test)]
21mod tests;
22
23pub use config::{JournalConfig, JournalError, Replayable};
24pub use header::{EditPrefix, JOURNAL_MAGIC_NUM};
25
26use header::{EDIT_PREFIX_SIZE, JOURNAL_HEADER_SIZE, JournalHeader};
27
28struct JournalMessage<T: Replayable> {
29    edit: T::Edit,
30    tx: oneshot::Sender<Result<(), JournalError>>,
31}
32
33pub struct JournalHandle<T: Replayable> {
34    tx: BoundedSender<JournalMessage<T>>,
35    data: Rc<RefCell<T>>,
36}
37
38impl<T: Replayable> JournalHandle<T> {
39    pub async fn append(&self, edit: T::Edit) -> Result<(), JournalError> {
40        let (tx, rx) = oneshot::channel();
41        self.tx
42            .clone()
43            .send(JournalMessage { edit, tx })
44            .await
45            .map_err(|_| JournalError::WorkerDied)?;
46
47        rx.recv()
48            .await
49            .map_err(|_| JournalError::WorkerDied)
50            .flatten()
51    }
52
53    // TODO: should this be a RwLock guarded value?
54    pub fn data(&self) -> std::cell::Ref<'_, T> {
55        self.data.borrow()
56    }
57}
58
59#[derive(Debug)]
60pub struct Journal<T: Replayable, I: Io> {
61    dir: PathBuf,
62    data: Rc<RefCell<T>>,
63    config: JournalConfig,
64    fd: Option<I::Fd>,
65    filenum: u64,
66    write_offset: u64,
67    rotation_threshold: u64,
68}
69
70impl<T: Replayable, I: Io> Journal<T, I> {
71    fn path(&self, filenum: u64) -> PathBuf {
72        self.dir
73            .join(format!("{}-{}", T::filename_prefix(), filenum))
74    }
75
76    fn write_edit(scratch: &mut BytesMut, edit: &T::Edit) -> Result<u64, JournalError> {
77        let initial_size = scratch.len();
78        scratch.put_bytes(0, EDIT_PREFIX_SIZE);
79
80        if let Err(e) = bincode_options().serialize_into(scratch.writer(), edit) {
81            return Err(e.into());
82        }
83
84        let prefix = EditPrefix::new(&scratch[initial_size + EDIT_PREFIX_SIZE..]);
85        scratch[initial_size..initial_size + EDIT_PREFIX_SIZE].copy_from_slice(&prefix.encode());
86
87        Ok((scratch.len() - initial_size) as u64)
88    }
89
90    async fn create_file(
91        &mut self,
92        filenum: u64,
93        old_fd: Option<I::Fd>,
94    ) -> Result<(), JournalError> {
95        let path = self.path(filenum);
96        debug!(filenum, ?path, "creating new journal file");
97
98        let fd = open_file::<I>(
99            path,
100            OpenOptions::new().create(true).write(true).truncate(true),
101        )
102        .await?;
103
104        let mut scratch = BytesMut::new();
105        let header = JournalHeader::new(filenum).encode();
106        scratch.put_slice(&header);
107        Self::write_edit(&mut scratch, &self.data.borrow().snapshot())?;
108
109        let initial_size = scratch.len() as u64;
110        let (result, _) = write_exact::<_, I>(fd, scratch, 0).await;
111        result?;
112        sync_file::<I>(fd).await?;
113
114        if let Some(old_fd) = old_fd {
115            let old_path = self.path(self.filenum);
116            close_file::<I>(old_fd).await?;
117            if let Err(e) = remove_file::<I>(old_path).await {
118                warn!("could not remove old journal file: {}", e);
119            }
120        }
121
122        let factored = initial_size * self.config.growth_factor;
123        let rotation_threshold = factored.max(self.config.growth_baseline);
124
125        self.fd = Some(fd);
126        self.filenum = filenum;
127        self.write_offset = initial_size;
128        self.rotation_threshold = rotation_threshold;
129        Ok(())
130    }
131
132    async fn replay(&mut self, file_size: u64) -> Result<(), JournalError> {
133        let fd = self.fd.expect("replay called before fd is set");
134        let mut read_offset = JOURNAL_HEADER_SIZE as u64;
135
136        while read_offset < file_size {
137            // read edit prefix
138            let mut prefix_buf = BytesMut::zeroed(EDIT_PREFIX_SIZE);
139            let (result, slice) =
140                read_exact::<_, I>(fd, prefix_buf.slice(0..EDIT_PREFIX_SIZE), read_offset).await;
141            prefix_buf = slice.into_inner();
142            result?;
143            let prefix = EditPrefix::decode_from_slice(&prefix_buf);
144            read_offset += EDIT_PREFIX_SIZE as u64;
145
146            // read edit body
147            let edit_len = prefix.len() as usize;
148            let mut edit_buf = BytesMut::zeroed(edit_len);
149            let (result, slice) =
150                read_exact::<_, I>(fd, edit_buf.slice(0..edit_len), read_offset).await;
151            edit_buf = slice.into_inner();
152            result?;
153
154            if !prefix.is_valid(&edit_buf) {
155                return Err(io::Error::new(
156                    io::ErrorKind::InvalidData,
157                    "journal record prefix checksum mismatch: potential corruption",
158                )
159                .into());
160            }
161
162            let edit: T::Edit = bincode_options().deserialize(&edit_buf)?;
163            self.data.borrow_mut().apply(edit);
164            read_offset += edit_len as u64;
165        }
166
167        self.write_offset = read_offset;
168        Ok(())
169    }
170
171    async fn append(&mut self, edit: T::Edit) -> Result<(), JournalError> {
172        let fd = self.fd.expect("append called before init");
173        let original_offset = self.write_offset;
174
175        let mut scratch = BytesMut::new();
176        Self::write_edit(&mut scratch, &edit)?;
177
178        let len = scratch.len() as u64;
179        let (result, _) = write_exact::<_, I>(fd, scratch, self.write_offset).await;
180        result?;
181        sync_file::<I>(fd).await?;
182
183        self.data.borrow_mut().apply(edit);
184        self.write_offset += len;
185
186        trace!(size=?ByteSize(self.write_offset - original_offset), "wrote edit");
187
188        // auto-rotate if threshold exceeded
189        if self.write_offset >= self.rotation_threshold {
190            debug!(
191                size = ?ByteSize(self.write_offset),
192                threshold = ?ByteSize(self.rotation_threshold),
193                "journal size exceeds rotation threshold, rotating file",
194            );
195            self.rotate().await?;
196        }
197
198        Ok(())
199    }
200
201    pub async fn rotate(&mut self) -> Result<(), JournalError> {
202        debug!("rotating journal file");
203        let old_fd = self.fd.take();
204        let new_filenum = self.filenum + 1;
205        self.create_file(new_filenum, old_fd).await
206    }
207
208    pub async fn close(mut self) -> Result<(), JournalError> {
209        if let Some(fd) = self.fd.take() {
210            close_file::<I>(fd).await?;
211        }
212        Ok(())
213    }
214
215    async fn init(&mut self) -> Result<(), JournalError> {
216        let entries = list_dir::<I>(self.dir.clone())?;
217        let files: Vec<PathBuf> = entries
218            .into_iter()
219            .filter(|e| !e.is_dir)
220            .map(|e| e.path)
221            .collect();
222
223        // scan candidates: open each, read header, find highest filenum
224        let mut winner: Option<(u64, PathBuf)> = None;
225        for path in files {
226            let fd = match open_file::<I>(path.clone(), OpenOptions::new().read(true)).await {
227                Ok(fd) => fd,
228                Err(err) => {
229                    warn!("could not open journal candidate (skipping): {err}");
230                    continue;
231                }
232            };
233            let mut scratch = BytesMut::zeroed(JOURNAL_HEADER_SIZE);
234            let (result, slice) =
235                read_exact::<_, I>(fd, scratch.slice(0..JOURNAL_HEADER_SIZE), 0).await;
236            scratch = slice.into_inner();
237            close_file::<I>(fd).await.ok();
238            if result.is_err() {
239                continue;
240            }
241            match JournalHeader::decode_from_slice(&scratch) {
242                Ok(header) => {
243                    if winner.as_ref().map_or(true, |(n, _)| header.filenum > *n) {
244                        winner = Some((header.filenum, path));
245                    }
246                }
247                Err(err) => {
248                    warn!("could not decode journal candidate (skipping): {err}");
249                    continue;
250                }
251            }
252        }
253
254        match winner {
255            None => self.create_file(0, None).await?,
256            Some((filenum, path)) => {
257                let fd = open_file::<I>(path, OpenOptions::new().read(true).write(true)).await?;
258                let stat = stat_file::<I>(fd).await?;
259                self.fd = Some(fd);
260                self.filenum = filenum;
261                self.write_offset = JOURNAL_HEADER_SIZE as u64;
262
263                if stat.stx_size() > JOURNAL_HEADER_SIZE as u64 {
264                    self.replay(stat.stx_size()).await?;
265                }
266            }
267        }
268
269        Ok(())
270    }
271
272    pub async fn new(
273        dir: PathBuf,
274        config: JournalConfig,
275    ) -> Result<(JournalHandle<T>, JoinHandle<()>), JournalError> {
276        let data = Rc::new(RefCell::new(T::initial()));
277
278        let mut instance = Self {
279            dir,
280            data: data.clone(),
281            config,
282            fd: None,
283            filenum: 0,
284            write_offset: 0,
285            rotation_threshold: 0,
286        };
287
288        instance.init().await?;
289
290        let (tx, mut rx) = bounded(256);
291
292        let journal_handle = JournalHandle { tx, data };
293
294        let join_handle = spawn(async move {
295            loop {
296                match rx.recv().await {
297                    Ok(msg) => {
298                        let result = instance.append(msg.edit).await;
299                        if let Err(err) = &result {
300                            error!("failed to append edit: {err}");
301                            break;
302                        }
303                        let _ = msg.tx.send(result);
304                    }
305                    Err(_) => break,
306                }
307            }
308
309            if let Err(err) = instance.close().await {
310                error!("failed to close journal: {err}");
311            }
312        });
313
314        Ok((journal_handle, join_handle))
315    }
316}