Skip to main content

uts_journal/
lib.rs

1//! RocksDB-backed journal implementation for UTS
2//!
3//! This crate provides the same functionality as `uts-journal` but uses RocksDB
4//! for persistence instead of a custom WAL, providing better reliability and
5//! crash recovery guarantees.
6
7// Copyright (C) 2026 UTS Contributors
8//
9// This program is free software: you can redistribute it and/or modify
10// it under the terms of the GNU Affero General Public License as published by
11// the Free Software Foundation, either version 3 of the License, or
12// (at your option) any later version.
13//
14// This program is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17// GNU Affero General Public License for more details.
18//
19// You should have received a copy of the GNU Affero General Public License
20// along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
22#[macro_use]
23extern crate tracing;
24
25use crate::{helper::FatalErrorExt, reader::JournalReader};
26use rocksdb::{ColumnFamily, DB, Options, WriteBatch};
27use std::{
28    fmt,
29    path::PathBuf,
30    sync::{
31        Arc, Mutex,
32        atomic::{AtomicBool, AtomicU64, Ordering},
33    },
34    task::Waker,
35};
36
37/// Journal reader.
38pub mod reader;
39
40mod helper;
41/// Error indicating that the journal buffer is not available now.
42#[derive(Debug, thiserror::Error)]
43pub enum Error {
44    /// The journal is in a fatal state, caller should stop using it and drop it as soon as possible.
45    #[error("fatal error happened")]
46    Fatal,
47    /// The journal buffer is full, new entries cannot be accepted until some entries are consumed
48    /// and the buffer has space.
49    #[error("journal buffer is full")]
50    Full,
51}
52
53const CF_ENTRIES: &str = "entries";
54const CF_META: &str = "meta";
55
56/// Configuration for the journal.
57#[derive(Debug, Clone)]
58pub struct JournalConfig {
59    /// Directory for the RocksDB database that stores journal entries and metadata.
60    pub db_path: PathBuf,
61}
62
63impl Default for JournalConfig {
64    fn default() -> Self {
65        Self {
66            db_path: PathBuf::from("journal_db"),
67        }
68    }
69}
70
71/// An `At-Least-Once` journal for storing fixed-size entries, with
72/// RocksDB-backed persistence.
73///
74/// All indices here are monotonic u64, wrapping around on overflow.
75///
76/// Invariant: `consumed_index` <= `write_index`.
77///
78/// Writes go directly to RocksDB (synchronous and durable), so there is no
79/// separate "persisted" boundary — `write_index` **is** the persisted boundary.
80#[derive(Clone)]
81pub struct Journal {
82    inner: Arc<JournalInner>,
83}
84
85pub(crate) struct JournalInner {
86    /// The RocksDB instance storing entries and metadata.
87    db: DB,
88    /// Maximum number of in-flight (written but not yet consumed) entries.
89    capacity: u64,
90    /// Next write position – also the durable frontier because every commit
91    /// is a synchronous RocksDB write.
92    write_index: AtomicU64,
93    /// Last consumed index, updated by the reader's `commit()`.
94    consumed_index: AtomicU64,
95    /// Serializes the write path so entries are numbered consecutively.
96    write_lock: Mutex<()>,
97    /// Whether a reader has been acquired.
98    reader_taken: AtomicBool,
99    /// Waker for the consumer waiting for new entries.
100    consumer_wait: Mutex<Option<ConsumerWait>>,
101    /// Whether the journal is in a fatal error state. If true, all operations will fail and the
102    /// journal should be dropped.
103    fatal_error: AtomicBool,
104}
105
106impl fmt::Debug for Journal {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.debug_struct("Journal").finish()
109    }
110}
111
112impl Journal {
113    /// Create a new journal with the specified capacity and default configuration.
114    pub fn with_capacity(capacity: usize) -> Result<Self, Error> {
115        Self::with_capacity_and_config(capacity, JournalConfig::default())
116    }
117
118    /// Create a new journal with the specified capacity and configuration.
119    pub fn with_capacity_and_config(capacity: usize, config: JournalConfig) -> Result<Self, Error> {
120        let capacity = capacity as u64;
121
122        let mut global_options = Options::default();
123        global_options.create_if_missing(true);
124        global_options.create_missing_column_families(true);
125
126        let db = match DB::open_cf(&global_options, &config.db_path, [CF_ENTRIES, CF_META]) {
127            Ok(db) => db,
128            Err(e) => {
129                error!("Failed to open RocksDB at {:?}: {e}", config.db_path);
130                return Err(Error::Fatal);
131            }
132        };
133
134        let inner = Arc::new(JournalInner {
135            db,
136            capacity,
137            write_index: AtomicU64::new(0),
138            consumed_index: AtomicU64::new(0),
139            write_lock: Mutex::new(()),
140            reader_taken: AtomicBool::new(false),
141            consumer_wait: Mutex::new(None),
142            fatal_error: AtomicBool::new(false),
143        });
144
145        // Recover state
146        let write_index = inner.read_write_index_from_db()?;
147        let consumed_index = inner.read_consumed_index_from_db()?;
148        if consumed_index > write_index {
149            error!("Consumed index {consumed_index} is greater than write index {write_index}");
150            return Err(Error::Fatal);
151        }
152        info!("Journal recovered: write_index={write_index}, consumed_index={consumed_index}");
153
154        inner.set_write_index(write_index);
155        inner.set_consumed_index(consumed_index);
156
157        Ok(Self { inner })
158    }
159
160    /// Acquires a reader for this journal.
161    ///
162    /// # Panics
163    ///
164    /// Panics if a reader is already taken.
165    pub fn reader(&self) -> JournalReader {
166        self.try_reader().expect("Journal reader already taken")
167    }
168
169    /// Try acquires a reader for this journal.
170    ///
171    /// If a reader is already taken, returns None.
172    pub fn try_reader(&self) -> Option<JournalReader> {
173        if self.inner.reader_taken.swap(true, Ordering::AcqRel) {
174            return None;
175        }
176        Some(JournalReader::new(self.inner.clone()))
177    }
178
179    /// Commit a new entry to the journal.
180    ///
181    /// The entry is written to RocksDB synchronously.
182    ///
183    /// # Panics
184    ///
185    /// Panics if the journal is full or has encountered a fatal error.
186    pub fn commit(&self, data: &[u8]) {
187        self.try_commit(data).expect("Journal is unavailable")
188    }
189
190    /// Try commit a new entry to the journal.
191    ///
192    /// The entry is written to RocksDB synchronously.
193    pub fn try_commit(&self, data: &[u8]) -> Result<(), Error> {
194        if self.inner.fatal_error.load(Ordering::Acquire) {
195            return Err(Error::Fatal);
196        }
197
198        // Serialize writes so indices are strictly consecutive.
199        let _guard = self.inner.write_lock.lock().unwrap();
200
201        let write_idx = self.inner.write_index();
202        let consumed = self.inner.consumed_index();
203
204        if write_idx.wrapping_sub(consumed) >= self.inner.capacity {
205            return Err(Error::Full);
206        }
207
208        let cf_entries = self.inner.cf_entries();
209        // Write entry + updated write_index atomically via WriteBatch.
210        let new_write_idx = write_idx.wrapping_add(1);
211        let mut batch = WriteBatch::default();
212        batch.put_cf(cf_entries, write_idx.to_be_bytes(), data);
213        self.inner
214            .write_write_index_batched(&mut batch, new_write_idx);
215        self.inner.db.write(batch).stop_if_error(&self.inner)?;
216
217        self.inner.set_write_index(new_write_idx);
218
219        // drop write_lock before notifying consumer
220        drop(_guard);
221
222        // Notify consumer if it is waiting for entries.
223        self.inner.notify_consumer();
224
225        Ok(())
226    }
227
228    /// Get the current consumed index.
229    #[inline]
230    pub fn consumed_index(&self) -> u64 {
231        self.inner.consumed_index()
232    }
233
234    /// Get the current write index.
235    #[inline]
236    pub fn write_index(&self) -> u64 {
237        self.inner.write_index()
238    }
239}
240
241impl JournalInner {
242    const META_WRITE_INDEX_KEY: &[u8] = &[0x00];
243    const META_CONSUMED_INDEX_KEY: &[u8] = &[0x01];
244
245    /// Wake the consumer waker if the write index has reached its target.
246    fn notify_consumer(&self) {
247        let mut guard = self.consumer_wait.lock().unwrap();
248        if let Some(wait) = guard.as_ref()
249            && (self.write_index() >= wait.target_index || self.fatal_error.load(Ordering::Acquire))
250        {
251            guard.take().unwrap().waker.wake();
252        }
253    }
254
255    #[inline]
256    pub(crate) fn consumed_index(&self) -> u64 {
257        self.consumed_index.load(Ordering::Acquire)
258    }
259
260    #[inline]
261    pub(crate) fn set_consumed_index(&self, idx: u64) {
262        self.consumed_index.store(idx, Ordering::Release);
263    }
264
265    #[inline]
266    pub(crate) fn write_index(&self) -> u64 {
267        self.write_index.load(Ordering::Acquire)
268    }
269
270    #[inline]
271    pub(crate) fn set_write_index(&self, idx: u64) {
272        self.write_index.store(idx, Ordering::Release);
273    }
274
275    #[inline]
276    pub(crate) fn cf_entries(&self) -> &ColumnFamily {
277        self.db.cf_handle(CF_ENTRIES).expect("missing entries CF")
278    }
279
280    #[inline]
281    pub(crate) fn cf_meta(&self) -> &ColumnFamily {
282        self.db.cf_handle(CF_META).expect("missing meta CF")
283    }
284
285    #[inline]
286    pub(crate) fn read_consumed_index_from_db(&self) -> Result<u64, Error> {
287        self.read_meta(Self::META_CONSUMED_INDEX_KEY)
288    }
289
290    #[inline]
291    pub(crate) fn read_write_index_from_db(&self) -> Result<u64, Error> {
292        self.read_meta(Self::META_WRITE_INDEX_KEY)
293    }
294
295    #[inline]
296    fn read_meta(&self, key: &[u8]) -> Result<u64, Error> {
297        let cf = self.cf_meta();
298        let Some(value) = self.db.get_cf(cf, key).stop_if_error(self)? else {
299            // If the key is missing, assume index 0 (fresh journal).
300            return Ok(0);
301        };
302        if value.len() != 8 {
303            error!(
304                "Invalid meta value for key {:?}: expected 8 bytes, got {}",
305                key,
306                value.len()
307            );
308            return Err(Error::Fatal);
309        }
310        Ok(u64::from_le_bytes(value.as_slice().try_into().unwrap()))
311    }
312
313    #[inline]
314    pub(crate) fn write_consumed_index_batched(&self, batch: &mut WriteBatch, new: u64) {
315        self.write_meta_batched(Self::META_CONSUMED_INDEX_KEY, batch, new)
316    }
317
318    #[inline]
319    pub(crate) fn write_write_index_batched(&self, batch: &mut WriteBatch, new: u64) {
320        self.write_meta_batched(Self::META_WRITE_INDEX_KEY, batch, new)
321    }
322
323    #[inline]
324    fn write_meta_batched(&self, key: &[u8], batch: &mut WriteBatch, new: u64) {
325        batch.put_cf(self.cf_meta(), key, new.to_le_bytes())
326    }
327}
328
329/// A consumer wait entry.
330pub(crate) struct ConsumerWait {
331    pub(crate) waker: Waker,
332    pub(crate) target_index: u64,
333}
334
335#[cfg(test)]
336pub(crate) mod tests {
337    pub const ENTRY_SIZE: usize = 8;
338    pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[
339        [0u8; ENTRY_SIZE],
340        [1u8; ENTRY_SIZE],
341        [2u8; ENTRY_SIZE],
342        [3u8; ENTRY_SIZE],
343        [4u8; ENTRY_SIZE],
344        [5u8; ENTRY_SIZE],
345        [6u8; ENTRY_SIZE],
346        [7u8; ENTRY_SIZE],
347        [8u8; ENTRY_SIZE],
348        [9u8; ENTRY_SIZE],
349    ];
350    pub type Journal = crate::Journal;
351
352    /// Create a journal with an isolated temporary directory for the RocksDB database.
353    /// Returns the journal and the temp dir guard (must be kept alive for the test duration).
354    pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) {
355        let tmp = tempfile::tempdir().expect("failed to create temp dir");
356        let config = crate::JournalConfig {
357            db_path: tmp.path().join("journal_db"),
358        };
359        let journal =
360            Journal::with_capacity_and_config(capacity, config).expect("failed to create journal");
361        (journal, tmp)
362    }
363
364    #[tokio::test(flavor = "current_thread")]
365    async fn try_reader_is_exclusive() -> eyre::Result<()> {
366        let (journal, _tmp) = test_journal(2);
367
368        let reader = journal.try_reader().unwrap();
369
370        assert!(
371            journal.try_reader().is_none(),
372            "second reader acquisition should fail"
373        );
374
375        drop(reader);
376        assert!(
377            journal.try_reader().is_some(),
378            "reader acquisition should succeed after drop"
379        );
380
381        Ok(())
382    }
383
384    #[tokio::test(flavor = "current_thread")]
385    async fn commit_and_read_round_trip() -> eyre::Result<()> {
386        let (journal, _tmp) = test_journal(4);
387        let mut reader = journal.reader();
388
389        journal.commit(&TEST_DATA[0]);
390        journal.commit(&TEST_DATA[1]);
391
392        {
393            let entries = reader.read(2)?;
394            assert_eq!(entries.len(), 2);
395            assert_eq!(*entries[0], TEST_DATA[0]);
396            assert_eq!(*entries[1], TEST_DATA[1]);
397        }
398
399        reader.commit()?;
400        assert_eq!(reader.available(), 0);
401        Ok(())
402    }
403
404    #[tokio::test(flavor = "current_thread")]
405    async fn commit_returns_error_when_full() -> eyre::Result<()> {
406        let (journal, _tmp) = test_journal(2);
407
408        journal.commit(&TEST_DATA[1]);
409        journal.commit(&TEST_DATA[2]);
410
411        let err = journal
412            .try_commit(&TEST_DATA[3])
413            .expect_err("buffer should report full on third commit");
414        assert!(
415            matches!(err, crate::Error::Full),
416            "expected Full, got {err:?}"
417        );
418        Ok(())
419    }
420
421    #[tokio::test(flavor = "current_thread")]
422    async fn reader_handles_sequential_reads() -> eyre::Result<()> {
423        let (journal, _tmp) = test_journal(4);
424        let mut reader = journal.reader();
425
426        for entry in TEST_DATA.iter().take(4) {
427            journal.commit(entry);
428        }
429
430        {
431            let entries = reader.read(2)?;
432            assert_eq!(entries.len(), 2);
433            assert_eq!(*entries[0], TEST_DATA[0]);
434            assert_eq!(*entries[1], TEST_DATA[1]);
435        }
436        reader.commit()?;
437
438        for entry in TEST_DATA.iter().skip(4).take(2) {
439            journal.commit(entry);
440        }
441
442        {
443            let entries = reader.read(4)?;
444            assert_eq!(entries.len(), 4);
445            assert_eq!(*entries[0], TEST_DATA[2]);
446            assert_eq!(*entries[1], TEST_DATA[3]);
447            assert_eq!(*entries[2], TEST_DATA[4]);
448            assert_eq!(*entries[3], TEST_DATA[5]);
449        }
450
451        reader.commit()?;
452        assert_eq!(reader.available(), 0);
453        Ok(())
454    }
455}