Skip to main content

uts_journal/
reader.rs

1use crate::{ConsumerWait, Error, JournalInner, helper::FatalErrorExt};
2use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch};
3use std::{
4    fmt,
5    pin::Pin,
6    sync::{Arc, atomic::Ordering},
7    task::{Context, Poll},
8};
9
10/// A reader for consuming settled entries from the journal.
11///
12/// Reader **WON'T** advance the shared consumed boundary until `commit()` is
13/// called.  Entries are fetched from RocksDB into an internal buffer on each
14/// [`read`](JournalReader::read) call.
15pub struct JournalReader {
16    journal: Arc<JournalInner>,
17    /// Local consumed cursor – how far this reader has read.
18    consumed: u64,
19    /// Internal buffer populated by [`read`](JournalReader::read).
20    read_buf: Vec<Box<[u8]>>,
21}
22
23impl fmt::Debug for JournalReader {
24    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25        f.debug_struct("JournalReader")
26            .field("consumed", &self.consumed)
27            .finish()
28    }
29}
30
31impl Drop for JournalReader {
32    fn drop(&mut self) {
33        self.journal.reader_taken.store(false, Ordering::Release);
34    }
35}
36
37impl JournalReader {
38    pub(super) fn new(journal: Arc<JournalInner>) -> Self {
39        let consumed = journal.consumed_index();
40        Self {
41            journal,
42            consumed,
43            read_buf: Vec::new(),
44        }
45    }
46
47    /// Returns the number of available entries that have been written but not
48    /// yet consumed by this reader.
49    #[inline]
50    pub fn available(&self) -> usize {
51        let write_idx = self.journal.write_index();
52        write_idx.wrapping_sub(self.consumed) as usize
53    }
54
55    /// Wait until at least `min` entries are available.
56    pub async fn wait_at_least(&mut self, min: usize) -> Result<(), Error> {
57        if self.journal.fatal_error.load(Ordering::Acquire) {
58            return Err(Error::Fatal);
59        }
60        if self.available() >= min {
61            return Ok(());
62        }
63
64        let target_index = self.consumed.wrapping_add(min as u64);
65        {
66            let capacity = self.journal.capacity;
67            let current_consumed = self.journal.consumed_index();
68            let max_possible_target = current_consumed.wrapping_add(capacity);
69            if target_index > max_possible_target {
70                panic!(
71                    "requested ({target_index}) exceeds max possible ({max_possible_target}): journal.capacity={capacity}, journal.consumed_index={current_consumed}"
72                );
73            }
74        }
75
76        // Slow path – register a waker and park until the writer catches up.
77        struct WaitForBatch<'a> {
78            reader: &'a JournalReader,
79            target_index: u64,
80        }
81
82        impl Future for WaitForBatch<'_> {
83            type Output = Result<(), Error>;
84            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85                if self.reader.journal.fatal_error.load(Ordering::Acquire) {
86                    return Poll::Ready(Err(Error::Fatal));
87                }
88
89                if self.reader.journal.write_index() >= self.target_index {
90                    return Poll::Ready(Ok(()));
91                }
92
93                let mut guard = self
94                    .reader
95                    .journal
96                    .consumer_wait
97                    .lock()
98                    .expect("Mutex poisoned");
99                if self.reader.journal.write_index() >= self.target_index {
100                    return Poll::Ready(Ok(()));
101                }
102                *guard = Some(ConsumerWait {
103                    waker: cx.waker().clone(),
104                    target_index: self.target_index,
105                });
106
107                Poll::Pending
108            }
109        }
110
111        // handle cancellation by clearing the wait slot if the future is dropped while pending.
112        impl Drop for WaitForBatch<'_> {
113            fn drop(&mut self) {
114                let mut guard = self
115                    .reader
116                    .journal
117                    .consumer_wait
118                    .lock()
119                    .expect("Mutex poisoned");
120                if let Some(wait) = guard.as_ref() {
121                    debug_assert_eq!(wait.target_index, self.target_index);
122                    // At the same time, only one JournalReader can wait, so if there's a wait
123                    // registered, it must be ours.
124                    guard.take();
125                }
126            }
127        }
128
129        WaitForBatch {
130            reader: self,
131            target_index,
132        }
133        .await
134    }
135
136    /// Read available entries from RocksDB, up to `max`.
137    ///
138    /// Bumps the internal consumed cursor by the number of entries yielded.
139    /// Caller is responsible for calling [`commit`](JournalReader::commit)
140    /// after processing the entries.
141    pub fn read(&mut self, max: usize) -> Result<&[Box<[u8]>], Error> {
142        if self.journal.fatal_error.load(Ordering::Acquire) {
143            return Err(Error::Fatal);
144        }
145
146        let available = self.available();
147        if available == 0 {
148            return Ok(&[]);
149        }
150
151        let count = available.min(max);
152        self.read_buf.clear();
153
154        let start_key = self.consumed.to_be_bytes();
155        let end_key = self
156            .consumed
157            .checked_add(count as u64)
158            .expect("let's handle overflow 10000 years later")
159            .to_be_bytes();
160
161        let mut options = ReadOptions::default();
162        options.set_iterate_lower_bound(start_key);
163        options.set_iterate_upper_bound(end_key);
164        options.set_auto_readahead_size(true);
165
166        let iter = self.journal.db.iterator_cf_opt(
167            self.journal.cf_entries(),
168            options,
169            IteratorMode::From(&start_key, Direction::Forward),
170        );
171        for (idx, data) in iter.enumerate() {
172            let (_key, value) = data.stop_if_error(&self.journal)?;
173            debug_assert_eq!((self.consumed + idx as u64).to_be_bytes(), _key.as_ref(),);
174            self.read_buf.push(value);
175        }
176
177        let read = self.read_buf.len();
178        if read != count {
179            error!(
180                "journal reader short read: expected {count} entries, got {read}; treating as fatal"
181            );
182            return Err(Error::Fatal);
183        }
184
185        self.consumed += count as u64;
186        Ok(&self.read_buf)
187    }
188
189    /// Commit the current consumed index, persisting it to RocksDB and
190    /// deleting consumed entries.
191    pub fn commit(&mut self) -> Result<(), Error> {
192        if self.journal.fatal_error.load(Ordering::Acquire) {
193            return Err(Error::Fatal);
194        }
195
196        let old_consumed = self.journal.consumed_index();
197
198        let mut batch = WriteBatch::default();
199        self.journal
200            .write_consumed_index_batched(&mut batch, self.consumed);
201        // Garbage-collect consumed entries.
202        batch.delete_range_cf(
203            self.journal.cf_entries(),
204            old_consumed.to_be_bytes(),
205            self.consumed.to_be_bytes(),
206        );
207        self.journal.db.write(batch).stop_if_error(&self.journal)?;
208
209        self.journal
210            .consumed_index
211            .store(self.consumed, Ordering::Release);
212        Ok(())
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use crate::tests::*;
219    use tokio::time::{Duration, sleep, timeout};
220
221    #[tokio::test(flavor = "current_thread")]
222    async fn available_tracks_written_entries() -> eyre::Result<()> {
223        let (journal, _tmp) = test_journal(4);
224        let mut reader = journal.reader();
225
226        assert_eq!(reader.available(), 0);
227
228        journal.commit(&TEST_DATA[0]);
229        assert_eq!(reader.available(), 1);
230
231        journal.commit(&TEST_DATA[1]);
232        assert_eq!(reader.available(), 2);
233
234        let slice = reader.read(1)?;
235        assert_eq!(slice.len(), 1);
236        assert_eq!(reader.available(), 1);
237        Ok(())
238    }
239
240    #[tokio::test(flavor = "current_thread")]
241    async fn commit_updates_shared_consumed_boundary() -> eyre::Result<()> {
242        let (journal, _tmp) = test_journal(4);
243        let mut reader = journal.reader();
244
245        for entry in TEST_DATA.iter().take(3) {
246            journal.commit(entry);
247        }
248
249        let slice = reader.read(2)?;
250        assert_eq!(slice.len(), 2);
251        assert_eq!(reader.available(), 1);
252        assert_eq!(
253            reader
254                .journal
255                .consumed_index
256                .load(std::sync::atomic::Ordering::Acquire),
257            0,
258            "global consumed boundary should not advance before commit",
259        );
260
261        reader.commit()?;
262        assert_eq!(
263            reader
264                .journal
265                .consumed_index
266                .load(std::sync::atomic::Ordering::Acquire),
267            2
268        );
269        Ok(())
270    }
271
272    #[tokio::test(flavor = "current_thread")]
273    async fn wait_at_least_resumes_after_write() -> eyre::Result<()> {
274        let (journal, _tmp) = test_journal(4);
275        let mut reader = journal.reader();
276
277        let journal_clone = journal.clone();
278        let task = tokio::spawn(async move {
279            sleep(Duration::from_millis(5)).await;
280            journal_clone.commit(&TEST_DATA[0]);
281        });
282
283        reader.wait_at_least(1).await?;
284        assert_eq!(reader.available(), 1);
285
286        task.await?;
287        Ok(())
288    }
289
290    #[tokio::test(flavor = "current_thread")]
291    async fn wait_at_least_waits_for_correct_count() -> eyre::Result<()> {
292        let (journal, _tmp) = test_journal(4);
293        let mut reader = journal.reader();
294
295        let journal_clone = journal.clone();
296        let task = tokio::spawn(async move {
297            for entry in TEST_DATA.iter().take(4) {
298                journal_clone.commit(entry);
299                sleep(Duration::from_millis(5)).await;
300            }
301        });
302
303        timeout(Duration::from_secs(10), reader.wait_at_least(3)).await??;
304        assert!(reader.available() >= 3);
305
306        task.await?;
307        Ok(())
308    }
309
310    #[tokio::test(flavor = "current_thread")]
311    #[should_panic(
312        expected = "requested (5) exceeds max possible (4): journal.capacity=4, journal.consumed_index=0"
313    )]
314    async fn wait_at_least_exceeds_capacity() {
315        let (journal, _tmp) = test_journal(4);
316        let mut reader = journal.reader();
317
318        timeout(Duration::from_secs(1), reader.wait_at_least(5))
319            .await
320            .unwrap()
321            .unwrap();
322    }
323
324    #[tokio::test(flavor = "current_thread")]
325    #[should_panic(
326        expected = "requested (5) exceeds max possible (4): journal.capacity=4, journal.consumed_index=0"
327    )]
328    async fn wait_at_least_dirty_read_exceeds_available() {
329        let (journal, _tmp) = test_journal(4);
330        journal.commit(&TEST_DATA[0]);
331
332        let mut reader = journal.reader();
333        reader.read(1).unwrap();
334
335        timeout(Duration::from_secs(1), reader.wait_at_least(4))
336            .await
337            .unwrap()
338            .unwrap();
339    }
340}