1#[macro_use]
8extern crate tracing;
9
10use crate::{helper::FatalErrorExt, reader::JournalReader};
11use rocksdb::{ColumnFamily, DB, Options, WriteBatch};
12use std::{
13 fmt,
14 path::PathBuf,
15 sync::{
16 Arc, Mutex,
17 atomic::{AtomicBool, AtomicU64, Ordering},
18 },
19 task::Waker,
20};
21
22pub mod reader;
24
25mod helper;
26#[derive(Debug, thiserror::Error)]
28pub enum Error {
29 #[error("fatal error happened")]
31 Fatal,
32 #[error("journal buffer is full")]
35 Full,
36}
37
38const CF_ENTRIES: &str = "entries";
39const CF_META: &str = "meta";
40
41#[derive(Debug, Clone)]
43pub struct JournalConfig {
44 pub db_path: PathBuf,
46}
47
48impl Default for JournalConfig {
49 fn default() -> Self {
50 Self {
51 db_path: PathBuf::from("journal_db"),
52 }
53 }
54}
55
56#[derive(Clone)]
66pub struct Journal {
67 inner: Arc<JournalInner>,
68}
69
70pub(crate) struct JournalInner {
71 db: DB,
73 capacity: u64,
75 write_index: AtomicU64,
78 consumed_index: AtomicU64,
80 write_lock: Mutex<()>,
82 reader_taken: AtomicBool,
84 consumer_wait: Mutex<Option<ConsumerWait>>,
86 fatal_error: AtomicBool,
89}
90
91impl fmt::Debug for Journal {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("Journal").finish()
94 }
95}
96
97impl Journal {
98 pub fn with_capacity(capacity: usize) -> Result<Self, Error> {
100 Self::with_capacity_and_config(capacity, JournalConfig::default())
101 }
102
103 pub fn with_capacity_and_config(capacity: usize, config: JournalConfig) -> Result<Self, Error> {
105 let capacity = capacity as u64;
106
107 let mut global_options = Options::default();
108 global_options.create_if_missing(true);
109 global_options.create_missing_column_families(true);
110
111 let db = match DB::open_cf(&global_options, &config.db_path, [CF_ENTRIES, CF_META]) {
112 Ok(db) => db,
113 Err(e) => {
114 error!("Failed to open RocksDB at {:?}: {e}", config.db_path);
115 return Err(Error::Fatal);
116 }
117 };
118
119 let inner = Arc::new(JournalInner {
120 db,
121 capacity,
122 write_index: AtomicU64::new(0),
123 consumed_index: AtomicU64::new(0),
124 write_lock: Mutex::new(()),
125 reader_taken: AtomicBool::new(false),
126 consumer_wait: Mutex::new(None),
127 fatal_error: AtomicBool::new(false),
128 });
129
130 let write_index = inner.read_write_index_from_db()?;
132 let consumed_index = inner.read_consumed_index_from_db()?;
133 if consumed_index > write_index {
134 error!("Consumed index {consumed_index} is greater than write index {write_index}");
135 return Err(Error::Fatal);
136 }
137 info!("Journal recovered: write_index={write_index}, consumed_index={consumed_index}");
138
139 inner.set_write_index(write_index);
140 inner.set_consumed_index(consumed_index);
141
142 Ok(Self { inner })
143 }
144
145 pub fn reader(&self) -> JournalReader {
151 self.try_reader().expect("Journal reader already taken")
152 }
153
154 pub fn try_reader(&self) -> Option<JournalReader> {
158 if self.inner.reader_taken.swap(true, Ordering::AcqRel) {
159 return None;
160 }
161 Some(JournalReader::new(self.inner.clone()))
162 }
163
164 pub fn commit(&self, data: &[u8]) {
172 self.try_commit(data).expect("Journal is unavailable")
173 }
174
175 pub fn try_commit(&self, data: &[u8]) -> Result<(), Error> {
179 if self.inner.fatal_error.load(Ordering::Acquire) {
180 return Err(Error::Fatal);
181 }
182
183 let _guard = self.inner.write_lock.lock().unwrap();
185
186 let write_idx = self.inner.write_index();
187 let consumed = self.inner.consumed_index();
188
189 if write_idx.wrapping_sub(consumed) >= self.inner.capacity {
190 return Err(Error::Full);
191 }
192
193 let cf_entries = self.inner.cf_entries();
194 let new_write_idx = write_idx.wrapping_add(1);
196 let mut batch = WriteBatch::default();
197 batch.put_cf(cf_entries, write_idx.to_be_bytes(), data);
198 self.inner
199 .write_write_index_batched(&mut batch, new_write_idx);
200 self.inner.db.write(batch).stop_if_error(&self.inner)?;
201
202 self.inner.set_write_index(new_write_idx);
203
204 drop(_guard);
206
207 self.inner.notify_consumer();
209
210 Ok(())
211 }
212
213 #[inline]
215 pub fn consumed_index(&self) -> u64 {
216 self.inner.consumed_index()
217 }
218
219 #[inline]
221 pub fn write_index(&self) -> u64 {
222 self.inner.write_index()
223 }
224}
225
226impl JournalInner {
227 const META_WRITE_INDEX_KEY: &[u8] = &[0x00];
228 const META_CONSUMED_INDEX_KEY: &[u8] = &[0x01];
229
230 fn notify_consumer(&self) {
232 let mut guard = self.consumer_wait.lock().unwrap();
233 if let Some(wait) = guard.as_ref()
234 && (self.write_index() >= wait.target_index || self.fatal_error.load(Ordering::Acquire))
235 {
236 guard.take().unwrap().waker.wake();
237 }
238 }
239
240 #[inline]
241 pub(crate) fn consumed_index(&self) -> u64 {
242 self.consumed_index.load(Ordering::Acquire)
243 }
244
245 #[inline]
246 pub(crate) fn set_consumed_index(&self, idx: u64) {
247 self.consumed_index.store(idx, Ordering::Release);
248 }
249
250 #[inline]
251 pub(crate) fn write_index(&self) -> u64 {
252 self.write_index.load(Ordering::Acquire)
253 }
254
255 #[inline]
256 pub(crate) fn set_write_index(&self, idx: u64) {
257 self.write_index.store(idx, Ordering::Release);
258 }
259
260 #[inline]
261 pub(crate) fn cf_entries(&self) -> &ColumnFamily {
262 self.db.cf_handle(CF_ENTRIES).expect("missing entries CF")
263 }
264
265 #[inline]
266 pub(crate) fn cf_meta(&self) -> &ColumnFamily {
267 self.db.cf_handle(CF_META).expect("missing meta CF")
268 }
269
270 #[inline]
271 pub(crate) fn read_consumed_index_from_db(&self) -> Result<u64, Error> {
272 self.read_meta(Self::META_CONSUMED_INDEX_KEY)
273 }
274
275 #[inline]
276 pub(crate) fn read_write_index_from_db(&self) -> Result<u64, Error> {
277 self.read_meta(Self::META_WRITE_INDEX_KEY)
278 }
279
280 #[inline]
281 fn read_meta(&self, key: &[u8]) -> Result<u64, Error> {
282 let cf = self.cf_meta();
283 let Some(value) = self.db.get_cf(cf, key).stop_if_error(self)? else {
284 return Ok(0);
286 };
287 if value.len() != 8 {
288 error!(
289 "Invalid meta value for key {:?}: expected 8 bytes, got {}",
290 key,
291 value.len()
292 );
293 return Err(Error::Fatal);
294 }
295 Ok(u64::from_le_bytes(value.as_slice().try_into().unwrap()))
296 }
297
298 #[inline]
299 pub(crate) fn write_consumed_index_batched(&self, batch: &mut WriteBatch, new: u64) {
300 self.write_meta_batched(Self::META_CONSUMED_INDEX_KEY, batch, new)
301 }
302
303 #[inline]
304 pub(crate) fn write_write_index_batched(&self, batch: &mut WriteBatch, new: u64) {
305 self.write_meta_batched(Self::META_WRITE_INDEX_KEY, batch, new)
306 }
307
308 #[inline]
309 fn write_meta_batched(&self, key: &[u8], batch: &mut WriteBatch, new: u64) {
310 batch.put_cf(self.cf_meta(), key, new.to_le_bytes())
311 }
312}
313
314pub(crate) struct ConsumerWait {
316 pub(crate) waker: Waker,
317 pub(crate) target_index: u64,
318}
319
320#[cfg(test)]
321pub(crate) mod tests {
322 pub const ENTRY_SIZE: usize = 8;
323 pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[
324 [0u8; ENTRY_SIZE],
325 [1u8; ENTRY_SIZE],
326 [2u8; ENTRY_SIZE],
327 [3u8; ENTRY_SIZE],
328 [4u8; ENTRY_SIZE],
329 [5u8; ENTRY_SIZE],
330 [6u8; ENTRY_SIZE],
331 [7u8; ENTRY_SIZE],
332 [8u8; ENTRY_SIZE],
333 [9u8; ENTRY_SIZE],
334 ];
335 pub type Journal = crate::Journal;
336
337 pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) {
340 let tmp = tempfile::tempdir().expect("failed to create temp dir");
341 let config = crate::JournalConfig {
342 db_path: tmp.path().join("journal_db"),
343 };
344 let journal =
345 Journal::with_capacity_and_config(capacity, config).expect("failed to create journal");
346 (journal, tmp)
347 }
348
349 #[tokio::test(flavor = "current_thread")]
350 async fn try_reader_is_exclusive() -> eyre::Result<()> {
351 let (journal, _tmp) = test_journal(2);
352
353 let reader = journal.try_reader().unwrap();
354
355 assert!(
356 journal.try_reader().is_none(),
357 "second reader acquisition should fail"
358 );
359
360 drop(reader);
361 assert!(
362 journal.try_reader().is_some(),
363 "reader acquisition should succeed after drop"
364 );
365
366 Ok(())
367 }
368
369 #[tokio::test(flavor = "current_thread")]
370 async fn commit_and_read_round_trip() -> eyre::Result<()> {
371 let (journal, _tmp) = test_journal(4);
372 let mut reader = journal.reader();
373
374 journal.commit(&TEST_DATA[0]);
375 journal.commit(&TEST_DATA[1]);
376
377 {
378 let entries = reader.read(2)?;
379 assert_eq!(entries.len(), 2);
380 assert_eq!(*entries[0], TEST_DATA[0]);
381 assert_eq!(*entries[1], TEST_DATA[1]);
382 }
383
384 reader.commit()?;
385 assert_eq!(reader.available(), 0);
386 Ok(())
387 }
388
389 #[tokio::test(flavor = "current_thread")]
390 async fn commit_returns_error_when_full() -> eyre::Result<()> {
391 let (journal, _tmp) = test_journal(2);
392
393 journal.commit(&TEST_DATA[1]);
394 journal.commit(&TEST_DATA[2]);
395
396 let err = journal
397 .try_commit(&TEST_DATA[3])
398 .expect_err("buffer should report full on third commit");
399 assert!(
400 matches!(err, crate::Error::Full),
401 "expected Full, got {err:?}"
402 );
403 Ok(())
404 }
405
406 #[tokio::test(flavor = "current_thread")]
407 async fn reader_handles_sequential_reads() -> eyre::Result<()> {
408 let (journal, _tmp) = test_journal(4);
409 let mut reader = journal.reader();
410
411 for entry in TEST_DATA.iter().take(4) {
412 journal.commit(entry);
413 }
414
415 {
416 let entries = reader.read(2)?;
417 assert_eq!(entries.len(), 2);
418 assert_eq!(*entries[0], TEST_DATA[0]);
419 assert_eq!(*entries[1], TEST_DATA[1]);
420 }
421 reader.commit()?;
422
423 for entry in TEST_DATA.iter().skip(4).take(2) {
424 journal.commit(entry);
425 }
426
427 {
428 let entries = reader.read(4)?;
429 assert_eq!(entries.len(), 4);
430 assert_eq!(*entries[0], TEST_DATA[2]);
431 assert_eq!(*entries[1], TEST_DATA[3]);
432 assert_eq!(*entries[2], TEST_DATA[4]);
433 assert_eq!(*entries[3], TEST_DATA[5]);
434 }
435
436 reader.commit()?;
437 assert_eq!(reader.available(), 0);
438 Ok(())
439 }
440}