1use std::{cell::RefCell, io, path::PathBuf, rc::Rc};
2
3use bincode::Options;
4use bytes::{BufMut, BytesMut};
5use tempest_io::{Io, IoBuf, OpenOptions, Statx};
6use tempest_rt::{
7 JoinHandle, close_file, list_dir, open_file, read_exact, remove_file, spawn, stat_file,
8 sync::{
9 mpsc::{BoundedSender, bounded},
10 oneshot,
11 },
12 sync_file, write_exact,
13};
14
15use crate::{bincode_options, utils::ByteSize};
16
17mod config;
18mod header;
19
20#[cfg(test)]
21mod tests;
22
23pub use config::{JournalConfig, JournalError, Replayable};
24pub use header::{EditPrefix, JOURNAL_MAGIC_NUM};
25
26use header::{EDIT_PREFIX_SIZE, JOURNAL_HEADER_SIZE, JournalHeader};
27
28struct JournalMessage<T: Replayable> {
29 edit: T::Edit,
30 tx: oneshot::Sender<Result<(), JournalError>>,
31}
32
33pub struct JournalHandle<T: Replayable> {
34 tx: BoundedSender<JournalMessage<T>>,
35 data: Rc<RefCell<T>>,
36}
37
38impl<T: Replayable> JournalHandle<T> {
39 pub async fn append(&self, edit: T::Edit) -> Result<(), JournalError> {
40 let (tx, rx) = oneshot::channel();
41 self.tx
42 .clone()
43 .send(JournalMessage { edit, tx })
44 .await
45 .map_err(|_| JournalError::WorkerDied)?;
46
47 rx.recv()
48 .await
49 .map_err(|_| JournalError::WorkerDied)
50 .flatten()
51 }
52
53 pub fn data(&self) -> std::cell::Ref<'_, T> {
55 self.data.borrow()
56 }
57}
58
59#[derive(Debug)]
60pub struct Journal<T: Replayable, I: Io> {
61 dir: PathBuf,
62 data: Rc<RefCell<T>>,
63 config: JournalConfig,
64 fd: Option<I::Fd>,
65 filenum: u64,
66 write_offset: u64,
67 rotation_threshold: u64,
68}
69
70impl<T: Replayable, I: Io> Journal<T, I> {
71 fn path(&self, filenum: u64) -> PathBuf {
72 self.dir
73 .join(format!("{}-{}", T::filename_prefix(), filenum))
74 }
75
76 fn write_edit(scratch: &mut BytesMut, edit: &T::Edit) -> Result<u64, JournalError> {
77 let initial_size = scratch.len();
78 scratch.put_bytes(0, EDIT_PREFIX_SIZE);
79
80 if let Err(e) = bincode_options().serialize_into(scratch.writer(), edit) {
81 return Err(e.into());
82 }
83
84 let prefix = EditPrefix::new(&scratch[initial_size + EDIT_PREFIX_SIZE..]);
85 scratch[initial_size..initial_size + EDIT_PREFIX_SIZE].copy_from_slice(&prefix.encode());
86
87 Ok((scratch.len() - initial_size) as u64)
88 }
89
90 async fn create_file(
91 &mut self,
92 filenum: u64,
93 old_fd: Option<I::Fd>,
94 ) -> Result<(), JournalError> {
95 let path = self.path(filenum);
96 debug!(filenum, ?path, "creating new journal file");
97
98 let fd = open_file::<I>(
99 path,
100 OpenOptions::new().create(true).write(true).truncate(true),
101 )
102 .await?;
103
104 let mut scratch = BytesMut::new();
105 let header = JournalHeader::new(filenum).encode();
106 scratch.put_slice(&header);
107 Self::write_edit(&mut scratch, &self.data.borrow().snapshot())?;
108
109 let initial_size = scratch.len() as u64;
110 let (result, _) = write_exact::<_, I>(fd, scratch, 0).await;
111 result?;
112 sync_file::<I>(fd).await?;
113
114 if let Some(old_fd) = old_fd {
115 let old_path = self.path(self.filenum);
116 close_file::<I>(old_fd).await?;
117 if let Err(e) = remove_file::<I>(old_path).await {
118 warn!("could not remove old journal file: {}", e);
119 }
120 }
121
122 let factored = initial_size * self.config.growth_factor;
123 let rotation_threshold = factored.max(self.config.growth_baseline);
124
125 self.fd = Some(fd);
126 self.filenum = filenum;
127 self.write_offset = initial_size;
128 self.rotation_threshold = rotation_threshold;
129 Ok(())
130 }
131
132 async fn replay(&mut self, file_size: u64) -> Result<(), JournalError> {
133 let fd = self.fd.expect("replay called before fd is set");
134 let mut read_offset = JOURNAL_HEADER_SIZE as u64;
135
136 while read_offset < file_size {
137 let mut prefix_buf = BytesMut::zeroed(EDIT_PREFIX_SIZE);
139 let (result, slice) =
140 read_exact::<_, I>(fd, prefix_buf.slice(0..EDIT_PREFIX_SIZE), read_offset).await;
141 prefix_buf = slice.into_inner();
142 result?;
143 let prefix = EditPrefix::decode_from_slice(&prefix_buf);
144 read_offset += EDIT_PREFIX_SIZE as u64;
145
146 let edit_len = prefix.len() as usize;
148 let mut edit_buf = BytesMut::zeroed(edit_len);
149 let (result, slice) =
150 read_exact::<_, I>(fd, edit_buf.slice(0..edit_len), read_offset).await;
151 edit_buf = slice.into_inner();
152 result?;
153
154 if !prefix.is_valid(&edit_buf) {
155 return Err(io::Error::new(
156 io::ErrorKind::InvalidData,
157 "journal record prefix checksum mismatch: potential corruption",
158 )
159 .into());
160 }
161
162 let edit: T::Edit = bincode_options().deserialize(&edit_buf)?;
163 self.data.borrow_mut().apply(edit);
164 read_offset += edit_len as u64;
165 }
166
167 self.write_offset = read_offset;
168 Ok(())
169 }
170
171 async fn append(&mut self, edit: T::Edit) -> Result<(), JournalError> {
172 let fd = self.fd.expect("append called before init");
173 let original_offset = self.write_offset;
174
175 let mut scratch = BytesMut::new();
176 Self::write_edit(&mut scratch, &edit)?;
177
178 let len = scratch.len() as u64;
179 let (result, _) = write_exact::<_, I>(fd, scratch, self.write_offset).await;
180 result?;
181 sync_file::<I>(fd).await?;
182
183 self.data.borrow_mut().apply(edit);
184 self.write_offset += len;
185
186 trace!(size=?ByteSize(self.write_offset - original_offset), "wrote edit");
187
188 if self.write_offset >= self.rotation_threshold {
190 debug!(
191 size = ?ByteSize(self.write_offset),
192 threshold = ?ByteSize(self.rotation_threshold),
193 "journal size exceeds rotation threshold, rotating file",
194 );
195 self.rotate().await?;
196 }
197
198 Ok(())
199 }
200
201 pub async fn rotate(&mut self) -> Result<(), JournalError> {
202 debug!("rotating journal file");
203 let old_fd = self.fd.take();
204 let new_filenum = self.filenum + 1;
205 self.create_file(new_filenum, old_fd).await
206 }
207
208 pub async fn close(mut self) -> Result<(), JournalError> {
209 if let Some(fd) = self.fd.take() {
210 close_file::<I>(fd).await?;
211 }
212 Ok(())
213 }
214
215 async fn init(&mut self) -> Result<(), JournalError> {
216 let entries = list_dir::<I>(self.dir.clone())?;
217 let files: Vec<PathBuf> = entries
218 .into_iter()
219 .filter(|e| !e.is_dir)
220 .map(|e| e.path)
221 .collect();
222
223 let mut winner: Option<(u64, PathBuf)> = None;
225 for path in files {
226 let fd = match open_file::<I>(path.clone(), OpenOptions::new().read(true)).await {
227 Ok(fd) => fd,
228 Err(err) => {
229 warn!("could not open journal candidate (skipping): {err}");
230 continue;
231 }
232 };
233 let mut scratch = BytesMut::zeroed(JOURNAL_HEADER_SIZE);
234 let (result, slice) =
235 read_exact::<_, I>(fd, scratch.slice(0..JOURNAL_HEADER_SIZE), 0).await;
236 scratch = slice.into_inner();
237 close_file::<I>(fd).await.ok();
238 if result.is_err() {
239 continue;
240 }
241 match JournalHeader::decode_from_slice(&scratch) {
242 Ok(header) => {
243 if winner.as_ref().map_or(true, |(n, _)| header.filenum > *n) {
244 winner = Some((header.filenum, path));
245 }
246 }
247 Err(err) => {
248 warn!("could not decode journal candidate (skipping): {err}");
249 continue;
250 }
251 }
252 }
253
254 match winner {
255 None => self.create_file(0, None).await?,
256 Some((filenum, path)) => {
257 let fd = open_file::<I>(path, OpenOptions::new().read(true).write(true)).await?;
258 let stat = stat_file::<I>(fd).await?;
259 self.fd = Some(fd);
260 self.filenum = filenum;
261 self.write_offset = JOURNAL_HEADER_SIZE as u64;
262
263 if stat.stx_size() > JOURNAL_HEADER_SIZE as u64 {
264 self.replay(stat.stx_size()).await?;
265 }
266 }
267 }
268
269 Ok(())
270 }
271
272 pub async fn new(
273 dir: PathBuf,
274 config: JournalConfig,
275 ) -> Result<(JournalHandle<T>, JoinHandle<()>), JournalError> {
276 let data = Rc::new(RefCell::new(T::initial()));
277
278 let mut instance = Self {
279 dir,
280 data: data.clone(),
281 config,
282 fd: None,
283 filenum: 0,
284 write_offset: 0,
285 rotation_threshold: 0,
286 };
287
288 instance.init().await?;
289
290 let (tx, mut rx) = bounded(256);
291
292 let journal_handle = JournalHandle { tx, data };
293
294 let join_handle = spawn(async move {
295 loop {
296 match rx.recv().await {
297 Ok(msg) => {
298 let result = instance.append(msg.edit).await;
299 if let Err(err) = &result {
300 error!("failed to append edit: {err}");
301 break;
302 }
303 let _ = msg.tx.send(result);
304 }
305 Err(_) => break,
306 }
307 }
308
309 if let Err(err) = instance.close().await {
310 error!("failed to close journal: {err}");
311 }
312 });
313
314 Ok((journal_handle, join_handle))
315 }
316}