1#[macro_use]
23extern crate tracing;
24
25use crate::{helper::FatalErrorExt, reader::JournalReader};
26use rocksdb::{ColumnFamily, DB, Options, WriteBatch};
27use std::{
28 fmt,
29 path::PathBuf,
30 sync::{
31 Arc, Mutex,
32 atomic::{AtomicBool, AtomicU64, Ordering},
33 },
34 task::Waker,
35};
36
37pub mod reader;
39
40mod helper;
41#[derive(Debug, thiserror::Error)]
43pub enum Error {
44 #[error("fatal error happened")]
46 Fatal,
47 #[error("journal buffer is full")]
50 Full,
51}
52
53const CF_ENTRIES: &str = "entries";
54const CF_META: &str = "meta";
55
56#[derive(Debug, Clone)]
58pub struct JournalConfig {
59 pub db_path: PathBuf,
61}
62
63impl Default for JournalConfig {
64 fn default() -> Self {
65 Self {
66 db_path: PathBuf::from("journal_db"),
67 }
68 }
69}
70
71#[derive(Clone)]
81pub struct Journal {
82 inner: Arc<JournalInner>,
83}
84
85pub(crate) struct JournalInner {
86 db: DB,
88 capacity: u64,
90 write_index: AtomicU64,
93 consumed_index: AtomicU64,
95 write_lock: Mutex<()>,
97 reader_taken: AtomicBool,
99 consumer_wait: Mutex<Option<ConsumerWait>>,
101 fatal_error: AtomicBool,
104}
105
106impl fmt::Debug for Journal {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("Journal").finish()
109 }
110}
111
112impl Journal {
113 pub fn with_capacity(capacity: usize) -> Result<Self, Error> {
115 Self::with_capacity_and_config(capacity, JournalConfig::default())
116 }
117
118 pub fn with_capacity_and_config(capacity: usize, config: JournalConfig) -> Result<Self, Error> {
120 let capacity = capacity as u64;
121
122 let mut global_options = Options::default();
123 global_options.create_if_missing(true);
124 global_options.create_missing_column_families(true);
125
126 let db = match DB::open_cf(&global_options, &config.db_path, [CF_ENTRIES, CF_META]) {
127 Ok(db) => db,
128 Err(e) => {
129 error!("Failed to open RocksDB at {:?}: {e}", config.db_path);
130 return Err(Error::Fatal);
131 }
132 };
133
134 let inner = Arc::new(JournalInner {
135 db,
136 capacity,
137 write_index: AtomicU64::new(0),
138 consumed_index: AtomicU64::new(0),
139 write_lock: Mutex::new(()),
140 reader_taken: AtomicBool::new(false),
141 consumer_wait: Mutex::new(None),
142 fatal_error: AtomicBool::new(false),
143 });
144
145 let write_index = inner.read_write_index_from_db()?;
147 let consumed_index = inner.read_consumed_index_from_db()?;
148 if consumed_index > write_index {
149 error!("Consumed index {consumed_index} is greater than write index {write_index}");
150 return Err(Error::Fatal);
151 }
152 info!("Journal recovered: write_index={write_index}, consumed_index={consumed_index}");
153
154 inner.set_write_index(write_index);
155 inner.set_consumed_index(consumed_index);
156
157 Ok(Self { inner })
158 }
159
160 pub fn reader(&self) -> JournalReader {
166 self.try_reader().expect("Journal reader already taken")
167 }
168
169 pub fn try_reader(&self) -> Option<JournalReader> {
173 if self.inner.reader_taken.swap(true, Ordering::AcqRel) {
174 return None;
175 }
176 Some(JournalReader::new(self.inner.clone()))
177 }
178
179 pub fn commit(&self, data: &[u8]) {
187 self.try_commit(data).expect("Journal is unavailable")
188 }
189
190 pub fn try_commit(&self, data: &[u8]) -> Result<(), Error> {
194 if self.inner.fatal_error.load(Ordering::Acquire) {
195 return Err(Error::Fatal);
196 }
197
198 let _guard = self.inner.write_lock.lock().unwrap();
200
201 let write_idx = self.inner.write_index();
202 let consumed = self.inner.consumed_index();
203
204 if write_idx.wrapping_sub(consumed) >= self.inner.capacity {
205 return Err(Error::Full);
206 }
207
208 let cf_entries = self.inner.cf_entries();
209 let new_write_idx = write_idx.wrapping_add(1);
211 let mut batch = WriteBatch::default();
212 batch.put_cf(cf_entries, write_idx.to_be_bytes(), data);
213 self.inner
214 .write_write_index_batched(&mut batch, new_write_idx);
215 self.inner.db.write(batch).stop_if_error(&self.inner)?;
216
217 self.inner.set_write_index(new_write_idx);
218
219 drop(_guard);
221
222 self.inner.notify_consumer();
224
225 Ok(())
226 }
227
228 #[inline]
230 pub fn consumed_index(&self) -> u64 {
231 self.inner.consumed_index()
232 }
233
234 #[inline]
236 pub fn write_index(&self) -> u64 {
237 self.inner.write_index()
238 }
239}
240
241impl JournalInner {
242 const META_WRITE_INDEX_KEY: &[u8] = &[0x00];
243 const META_CONSUMED_INDEX_KEY: &[u8] = &[0x01];
244
245 fn notify_consumer(&self) {
247 let mut guard = self.consumer_wait.lock().unwrap();
248 if let Some(wait) = guard.as_ref()
249 && (self.write_index() >= wait.target_index || self.fatal_error.load(Ordering::Acquire))
250 {
251 guard.take().unwrap().waker.wake();
252 }
253 }
254
255 #[inline]
256 pub(crate) fn consumed_index(&self) -> u64 {
257 self.consumed_index.load(Ordering::Acquire)
258 }
259
260 #[inline]
261 pub(crate) fn set_consumed_index(&self, idx: u64) {
262 self.consumed_index.store(idx, Ordering::Release);
263 }
264
265 #[inline]
266 pub(crate) fn write_index(&self) -> u64 {
267 self.write_index.load(Ordering::Acquire)
268 }
269
270 #[inline]
271 pub(crate) fn set_write_index(&self, idx: u64) {
272 self.write_index.store(idx, Ordering::Release);
273 }
274
275 #[inline]
276 pub(crate) fn cf_entries(&self) -> &ColumnFamily {
277 self.db.cf_handle(CF_ENTRIES).expect("missing entries CF")
278 }
279
280 #[inline]
281 pub(crate) fn cf_meta(&self) -> &ColumnFamily {
282 self.db.cf_handle(CF_META).expect("missing meta CF")
283 }
284
285 #[inline]
286 pub(crate) fn read_consumed_index_from_db(&self) -> Result<u64, Error> {
287 self.read_meta(Self::META_CONSUMED_INDEX_KEY)
288 }
289
290 #[inline]
291 pub(crate) fn read_write_index_from_db(&self) -> Result<u64, Error> {
292 self.read_meta(Self::META_WRITE_INDEX_KEY)
293 }
294
295 #[inline]
296 fn read_meta(&self, key: &[u8]) -> Result<u64, Error> {
297 let cf = self.cf_meta();
298 let Some(value) = self.db.get_cf(cf, key).stop_if_error(self)? else {
299 return Ok(0);
301 };
302 if value.len() != 8 {
303 error!(
304 "Invalid meta value for key {:?}: expected 8 bytes, got {}",
305 key,
306 value.len()
307 );
308 return Err(Error::Fatal);
309 }
310 Ok(u64::from_le_bytes(value.as_slice().try_into().unwrap()))
311 }
312
313 #[inline]
314 pub(crate) fn write_consumed_index_batched(&self, batch: &mut WriteBatch, new: u64) {
315 self.write_meta_batched(Self::META_CONSUMED_INDEX_KEY, batch, new)
316 }
317
318 #[inline]
319 pub(crate) fn write_write_index_batched(&self, batch: &mut WriteBatch, new: u64) {
320 self.write_meta_batched(Self::META_WRITE_INDEX_KEY, batch, new)
321 }
322
323 #[inline]
324 fn write_meta_batched(&self, key: &[u8], batch: &mut WriteBatch, new: u64) {
325 batch.put_cf(self.cf_meta(), key, new.to_le_bytes())
326 }
327}
328
329pub(crate) struct ConsumerWait {
331 pub(crate) waker: Waker,
332 pub(crate) target_index: u64,
333}
334
335#[cfg(test)]
336pub(crate) mod tests {
337 pub const ENTRY_SIZE: usize = 8;
338 pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[
339 [0u8; ENTRY_SIZE],
340 [1u8; ENTRY_SIZE],
341 [2u8; ENTRY_SIZE],
342 [3u8; ENTRY_SIZE],
343 [4u8; ENTRY_SIZE],
344 [5u8; ENTRY_SIZE],
345 [6u8; ENTRY_SIZE],
346 [7u8; ENTRY_SIZE],
347 [8u8; ENTRY_SIZE],
348 [9u8; ENTRY_SIZE],
349 ];
350 pub type Journal = crate::Journal;
351
352 pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) {
355 let tmp = tempfile::tempdir().expect("failed to create temp dir");
356 let config = crate::JournalConfig {
357 db_path: tmp.path().join("journal_db"),
358 };
359 let journal =
360 Journal::with_capacity_and_config(capacity, config).expect("failed to create journal");
361 (journal, tmp)
362 }
363
364 #[tokio::test(flavor = "current_thread")]
365 async fn try_reader_is_exclusive() -> eyre::Result<()> {
366 let (journal, _tmp) = test_journal(2);
367
368 let reader = journal.try_reader().unwrap();
369
370 assert!(
371 journal.try_reader().is_none(),
372 "second reader acquisition should fail"
373 );
374
375 drop(reader);
376 assert!(
377 journal.try_reader().is_some(),
378 "reader acquisition should succeed after drop"
379 );
380
381 Ok(())
382 }
383
384 #[tokio::test(flavor = "current_thread")]
385 async fn commit_and_read_round_trip() -> eyre::Result<()> {
386 let (journal, _tmp) = test_journal(4);
387 let mut reader = journal.reader();
388
389 journal.commit(&TEST_DATA[0]);
390 journal.commit(&TEST_DATA[1]);
391
392 {
393 let entries = reader.read(2)?;
394 assert_eq!(entries.len(), 2);
395 assert_eq!(*entries[0], TEST_DATA[0]);
396 assert_eq!(*entries[1], TEST_DATA[1]);
397 }
398
399 reader.commit()?;
400 assert_eq!(reader.available(), 0);
401 Ok(())
402 }
403
404 #[tokio::test(flavor = "current_thread")]
405 async fn commit_returns_error_when_full() -> eyre::Result<()> {
406 let (journal, _tmp) = test_journal(2);
407
408 journal.commit(&TEST_DATA[1]);
409 journal.commit(&TEST_DATA[2]);
410
411 let err = journal
412 .try_commit(&TEST_DATA[3])
413 .expect_err("buffer should report full on third commit");
414 assert!(
415 matches!(err, crate::Error::Full),
416 "expected Full, got {err:?}"
417 );
418 Ok(())
419 }
420
421 #[tokio::test(flavor = "current_thread")]
422 async fn reader_handles_sequential_reads() -> eyre::Result<()> {
423 let (journal, _tmp) = test_journal(4);
424 let mut reader = journal.reader();
425
426 for entry in TEST_DATA.iter().take(4) {
427 journal.commit(entry);
428 }
429
430 {
431 let entries = reader.read(2)?;
432 assert_eq!(entries.len(), 2);
433 assert_eq!(*entries[0], TEST_DATA[0]);
434 assert_eq!(*entries[1], TEST_DATA[1]);
435 }
436 reader.commit()?;
437
438 for entry in TEST_DATA.iter().skip(4).take(2) {
439 journal.commit(entry);
440 }
441
442 {
443 let entries = reader.read(4)?;
444 assert_eq!(entries.len(), 4);
445 assert_eq!(*entries[0], TEST_DATA[2]);
446 assert_eq!(*entries[1], TEST_DATA[3]);
447 assert_eq!(*entries[2], TEST_DATA[4]);
448 assert_eq!(*entries[3], TEST_DATA[5]);
449 }
450
451 reader.commit()?;
452 assert_eq!(reader.available(), 0);
453 Ok(())
454 }
455}