Skip to main content

uts_journal/
lib.rs

1//! RocksDB-backed journal implementation for UTS
2//!
3//! This crate provides the same functionality as `uts-journal` but uses RocksDB
4//! for persistence instead of a custom WAL, providing better reliability and
5//! crash recovery guarantees.
6
7#[macro_use]
8extern crate tracing;
9
10use crate::{helper::FatalErrorExt, reader::JournalReader};
11use rocksdb::{ColumnFamily, DB, Options, WriteBatch};
12use std::{
13    fmt,
14    path::PathBuf,
15    sync::{
16        Arc, Mutex,
17        atomic::{AtomicBool, AtomicU64, Ordering},
18    },
19    task::Waker,
20};
21
22/// Journal reader.
23pub mod reader;
24
25mod helper;
26/// Error indicating that the journal buffer is not available now.
27#[derive(Debug, thiserror::Error)]
28pub enum Error {
29    /// The journal is in a fatal state, caller should stop using it and drop it as soon as possible.
30    #[error("fatal error happened")]
31    Fatal,
32    /// The journal buffer is full, new entries cannot be accepted until some entries are consumed
33    /// and the buffer has space.
34    #[error("journal buffer is full")]
35    Full,
36}
37
38const CF_ENTRIES: &str = "entries";
39const CF_META: &str = "meta";
40
41/// Configuration for the journal.
42#[derive(Debug, Clone)]
43pub struct JournalConfig {
44    /// Directory for the RocksDB database that stores journal entries and metadata.
45    pub db_path: PathBuf,
46}
47
48impl Default for JournalConfig {
49    fn default() -> Self {
50        Self {
51            db_path: PathBuf::from("journal_db"),
52        }
53    }
54}
55
56/// An `At-Least-Once` journal for storing fixed-size entries, with
57/// RocksDB-backed persistence.
58///
59/// All indices here are monotonic u64, wrapping around on overflow.
60///
61/// Invariant: `consumed_index` <= `write_index`.
62///
63/// Writes go directly to RocksDB (synchronous and durable), so there is no
64/// separate "persisted" boundary — `write_index` **is** the persisted boundary.
65#[derive(Clone)]
66pub struct Journal {
67    inner: Arc<JournalInner>,
68}
69
70pub(crate) struct JournalInner {
71    /// The RocksDB instance storing entries and metadata.
72    db: DB,
73    /// Maximum number of in-flight (written but not yet consumed) entries.
74    capacity: u64,
75    /// Next write position – also the durable frontier because every commit
76    /// is a synchronous RocksDB write.
77    write_index: AtomicU64,
78    /// Last consumed index, updated by the reader's `commit()`.
79    consumed_index: AtomicU64,
80    /// Serializes the write path so entries are numbered consecutively.
81    write_lock: Mutex<()>,
82    /// Whether a reader has been acquired.
83    reader_taken: AtomicBool,
84    /// Waker for the consumer waiting for new entries.
85    consumer_wait: Mutex<Option<ConsumerWait>>,
86    /// Whether the journal is in a fatal error state. If true, all operations will fail and the
87    /// journal should be dropped.
88    fatal_error: AtomicBool,
89}
90
91impl fmt::Debug for Journal {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("Journal").finish()
94    }
95}
96
97impl Journal {
98    /// Create a new journal with the specified capacity and default configuration.
99    pub fn with_capacity(capacity: usize) -> Result<Self, Error> {
100        Self::with_capacity_and_config(capacity, JournalConfig::default())
101    }
102
103    /// Create a new journal with the specified capacity and configuration.
104    pub fn with_capacity_and_config(capacity: usize, config: JournalConfig) -> Result<Self, Error> {
105        let capacity = capacity as u64;
106
107        let mut global_options = Options::default();
108        global_options.create_if_missing(true);
109        global_options.create_missing_column_families(true);
110
111        let db = match DB::open_cf(&global_options, &config.db_path, [CF_ENTRIES, CF_META]) {
112            Ok(db) => db,
113            Err(e) => {
114                error!("Failed to open RocksDB at {:?}: {e}", config.db_path);
115                return Err(Error::Fatal);
116            }
117        };
118
119        let inner = Arc::new(JournalInner {
120            db,
121            capacity,
122            write_index: AtomicU64::new(0),
123            consumed_index: AtomicU64::new(0),
124            write_lock: Mutex::new(()),
125            reader_taken: AtomicBool::new(false),
126            consumer_wait: Mutex::new(None),
127            fatal_error: AtomicBool::new(false),
128        });
129
130        // Recover state
131        let write_index = inner.read_write_index_from_db()?;
132        let consumed_index = inner.read_consumed_index_from_db()?;
133        if consumed_index > write_index {
134            error!("Consumed index {consumed_index} is greater than write index {write_index}");
135            return Err(Error::Fatal);
136        }
137        info!("Journal recovered: write_index={write_index}, consumed_index={consumed_index}");
138
139        inner.set_write_index(write_index);
140        inner.set_consumed_index(consumed_index);
141
142        Ok(Self { inner })
143    }
144
145    /// Acquires a reader for this journal.
146    ///
147    /// # Panics
148    ///
149    /// Panics if a reader is already taken.
150    pub fn reader(&self) -> JournalReader {
151        self.try_reader().expect("Journal reader already taken")
152    }
153
154    /// Try acquires a reader for this journal.
155    ///
156    /// If a reader is already taken, returns None.
157    pub fn try_reader(&self) -> Option<JournalReader> {
158        if self.inner.reader_taken.swap(true, Ordering::AcqRel) {
159            return None;
160        }
161        Some(JournalReader::new(self.inner.clone()))
162    }
163
164    /// Commit a new entry to the journal.
165    ///
166    /// The entry is written to RocksDB synchronously.
167    ///
168    /// # Panics
169    ///
170    /// Panics if the journal is full or has encountered a fatal error.
171    pub fn commit(&self, data: &[u8]) {
172        self.try_commit(data).expect("Journal is unavailable")
173    }
174
175    /// Try commit a new entry to the journal.
176    ///
177    /// The entry is written to RocksDB synchronously.
178    pub fn try_commit(&self, data: &[u8]) -> Result<(), Error> {
179        if self.inner.fatal_error.load(Ordering::Acquire) {
180            return Err(Error::Fatal);
181        }
182
183        // Serialize writes so indices are strictly consecutive.
184        let _guard = self.inner.write_lock.lock().unwrap();
185
186        let write_idx = self.inner.write_index();
187        let consumed = self.inner.consumed_index();
188
189        if write_idx.wrapping_sub(consumed) >= self.inner.capacity {
190            return Err(Error::Full);
191        }
192
193        let cf_entries = self.inner.cf_entries();
194        // Write entry + updated write_index atomically via WriteBatch.
195        let new_write_idx = write_idx.wrapping_add(1);
196        let mut batch = WriteBatch::default();
197        batch.put_cf(cf_entries, write_idx.to_be_bytes(), data);
198        self.inner
199            .write_write_index_batched(&mut batch, new_write_idx);
200        self.inner.db.write(batch).stop_if_error(&self.inner)?;
201
202        self.inner.set_write_index(new_write_idx);
203
204        // drop write_lock before notifying consumer
205        drop(_guard);
206
207        // Notify consumer if it is waiting for entries.
208        self.inner.notify_consumer();
209
210        Ok(())
211    }
212
213    /// Get the current consumed index.
214    #[inline]
215    pub fn consumed_index(&self) -> u64 {
216        self.inner.consumed_index()
217    }
218
219    /// Get the current write index.
220    #[inline]
221    pub fn write_index(&self) -> u64 {
222        self.inner.write_index()
223    }
224}
225
226impl JournalInner {
227    const META_WRITE_INDEX_KEY: &[u8] = &[0x00];
228    const META_CONSUMED_INDEX_KEY: &[u8] = &[0x01];
229
230    /// Wake the consumer waker if the write index has reached its target.
231    fn notify_consumer(&self) {
232        let mut guard = self.consumer_wait.lock().unwrap();
233        if let Some(wait) = guard.as_ref()
234            && (self.write_index() >= wait.target_index || self.fatal_error.load(Ordering::Acquire))
235        {
236            guard.take().unwrap().waker.wake();
237        }
238    }
239
240    #[inline]
241    pub(crate) fn consumed_index(&self) -> u64 {
242        self.consumed_index.load(Ordering::Acquire)
243    }
244
245    #[inline]
246    pub(crate) fn set_consumed_index(&self, idx: u64) {
247        self.consumed_index.store(idx, Ordering::Release);
248    }
249
250    #[inline]
251    pub(crate) fn write_index(&self) -> u64 {
252        self.write_index.load(Ordering::Acquire)
253    }
254
255    #[inline]
256    pub(crate) fn set_write_index(&self, idx: u64) {
257        self.write_index.store(idx, Ordering::Release);
258    }
259
260    #[inline]
261    pub(crate) fn cf_entries(&self) -> &ColumnFamily {
262        self.db.cf_handle(CF_ENTRIES).expect("missing entries CF")
263    }
264
265    #[inline]
266    pub(crate) fn cf_meta(&self) -> &ColumnFamily {
267        self.db.cf_handle(CF_META).expect("missing meta CF")
268    }
269
270    #[inline]
271    pub(crate) fn read_consumed_index_from_db(&self) -> Result<u64, Error> {
272        self.read_meta(Self::META_CONSUMED_INDEX_KEY)
273    }
274
275    #[inline]
276    pub(crate) fn read_write_index_from_db(&self) -> Result<u64, Error> {
277        self.read_meta(Self::META_WRITE_INDEX_KEY)
278    }
279
280    #[inline]
281    fn read_meta(&self, key: &[u8]) -> Result<u64, Error> {
282        let cf = self.cf_meta();
283        let Some(value) = self.db.get_cf(cf, key).stop_if_error(self)? else {
284            // If the key is missing, assume index 0 (fresh journal).
285            return Ok(0);
286        };
287        if value.len() != 8 {
288            error!(
289                "Invalid meta value for key {:?}: expected 8 bytes, got {}",
290                key,
291                value.len()
292            );
293            return Err(Error::Fatal);
294        }
295        Ok(u64::from_le_bytes(value.as_slice().try_into().unwrap()))
296    }
297
298    #[inline]
299    pub(crate) fn write_consumed_index_batched(&self, batch: &mut WriteBatch, new: u64) {
300        self.write_meta_batched(Self::META_CONSUMED_INDEX_KEY, batch, new)
301    }
302
303    #[inline]
304    pub(crate) fn write_write_index_batched(&self, batch: &mut WriteBatch, new: u64) {
305        self.write_meta_batched(Self::META_WRITE_INDEX_KEY, batch, new)
306    }
307
308    #[inline]
309    fn write_meta_batched(&self, key: &[u8], batch: &mut WriteBatch, new: u64) {
310        batch.put_cf(self.cf_meta(), key, new.to_le_bytes())
311    }
312}
313
314/// A consumer wait entry.
315pub(crate) struct ConsumerWait {
316    pub(crate) waker: Waker,
317    pub(crate) target_index: u64,
318}
319
320#[cfg(test)]
321pub(crate) mod tests {
322    pub const ENTRY_SIZE: usize = 8;
323    pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[
324        [0u8; ENTRY_SIZE],
325        [1u8; ENTRY_SIZE],
326        [2u8; ENTRY_SIZE],
327        [3u8; ENTRY_SIZE],
328        [4u8; ENTRY_SIZE],
329        [5u8; ENTRY_SIZE],
330        [6u8; ENTRY_SIZE],
331        [7u8; ENTRY_SIZE],
332        [8u8; ENTRY_SIZE],
333        [9u8; ENTRY_SIZE],
334    ];
335    pub type Journal = crate::Journal;
336
337    /// Create a journal with an isolated temporary directory for the RocksDB database.
338    /// Returns the journal and the temp dir guard (must be kept alive for the test duration).
339    pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) {
340        let tmp = tempfile::tempdir().expect("failed to create temp dir");
341        let config = crate::JournalConfig {
342            db_path: tmp.path().join("journal_db"),
343        };
344        let journal =
345            Journal::with_capacity_and_config(capacity, config).expect("failed to create journal");
346        (journal, tmp)
347    }
348
349    #[tokio::test(flavor = "current_thread")]
350    async fn try_reader_is_exclusive() -> eyre::Result<()> {
351        let (journal, _tmp) = test_journal(2);
352
353        let reader = journal.try_reader().unwrap();
354
355        assert!(
356            journal.try_reader().is_none(),
357            "second reader acquisition should fail"
358        );
359
360        drop(reader);
361        assert!(
362            journal.try_reader().is_some(),
363            "reader acquisition should succeed after drop"
364        );
365
366        Ok(())
367    }
368
369    #[tokio::test(flavor = "current_thread")]
370    async fn commit_and_read_round_trip() -> eyre::Result<()> {
371        let (journal, _tmp) = test_journal(4);
372        let mut reader = journal.reader();
373
374        journal.commit(&TEST_DATA[0]);
375        journal.commit(&TEST_DATA[1]);
376
377        {
378            let entries = reader.read(2)?;
379            assert_eq!(entries.len(), 2);
380            assert_eq!(*entries[0], TEST_DATA[0]);
381            assert_eq!(*entries[1], TEST_DATA[1]);
382        }
383
384        reader.commit()?;
385        assert_eq!(reader.available(), 0);
386        Ok(())
387    }
388
389    #[tokio::test(flavor = "current_thread")]
390    async fn commit_returns_error_when_full() -> eyre::Result<()> {
391        let (journal, _tmp) = test_journal(2);
392
393        journal.commit(&TEST_DATA[1]);
394        journal.commit(&TEST_DATA[2]);
395
396        let err = journal
397            .try_commit(&TEST_DATA[3])
398            .expect_err("buffer should report full on third commit");
399        assert!(
400            matches!(err, crate::Error::Full),
401            "expected Full, got {err:?}"
402        );
403        Ok(())
404    }
405
406    #[tokio::test(flavor = "current_thread")]
407    async fn reader_handles_sequential_reads() -> eyre::Result<()> {
408        let (journal, _tmp) = test_journal(4);
409        let mut reader = journal.reader();
410
411        for entry in TEST_DATA.iter().take(4) {
412            journal.commit(entry);
413        }
414
415        {
416            let entries = reader.read(2)?;
417            assert_eq!(entries.len(), 2);
418            assert_eq!(*entries[0], TEST_DATA[0]);
419            assert_eq!(*entries[1], TEST_DATA[1]);
420        }
421        reader.commit()?;
422
423        for entry in TEST_DATA.iter().skip(4).take(2) {
424            journal.commit(entry);
425        }
426
427        {
428            let entries = reader.read(4)?;
429            assert_eq!(entries.len(), 4);
430            assert_eq!(*entries[0], TEST_DATA[2]);
431            assert_eq!(*entries[1], TEST_DATA[3]);
432            assert_eq!(*entries[2], TEST_DATA[4]);
433            assert_eq!(*entries[3], TEST_DATA[5]);
434        }
435
436        reader.commit()?;
437        assert_eq!(reader.available(), 0);
438        Ok(())
439    }
440}