1use crate::{ConsumerWait, Error, JournalInner, helper::FatalErrorExt};
2use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch};
3use std::{
4 fmt,
5 pin::Pin,
6 sync::{Arc, atomic::Ordering},
7 task::{Context, Poll},
8};
9
10pub struct JournalReader {
16 journal: Arc<JournalInner>,
17 consumed: u64,
19 read_buf: Vec<Box<[u8]>>,
21}
22
23impl fmt::Debug for JournalReader {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 f.debug_struct("JournalReader")
26 .field("consumed", &self.consumed)
27 .finish()
28 }
29}
30
31impl Drop for JournalReader {
32 fn drop(&mut self) {
33 self.journal.reader_taken.store(false, Ordering::Release);
34 }
35}
36
37impl JournalReader {
38 pub(super) fn new(journal: Arc<JournalInner>) -> Self {
39 let consumed = journal.consumed_index();
40 Self {
41 journal,
42 consumed,
43 read_buf: Vec::new(),
44 }
45 }
46
47 #[inline]
50 pub fn available(&self) -> usize {
51 let write_idx = self.journal.write_index();
52 write_idx.wrapping_sub(self.consumed) as usize
53 }
54
55 pub async fn wait_at_least(&mut self, min: usize) -> Result<(), Error> {
57 if self.journal.fatal_error.load(Ordering::Acquire) {
58 return Err(Error::Fatal);
59 }
60 if self.available() >= min {
61 return Ok(());
62 }
63
64 let target_index = self.consumed.wrapping_add(min as u64);
65 {
66 let capacity = self.journal.capacity;
67 let current_consumed = self.journal.consumed_index();
68 let max_possible_target = current_consumed.wrapping_add(capacity);
69 if target_index > max_possible_target {
70 panic!(
71 "requested ({target_index}) exceeds max possible ({max_possible_target}): journal.capacity={capacity}, journal.consumed_index={current_consumed}"
72 );
73 }
74 }
75
76 struct WaitForBatch<'a> {
78 reader: &'a JournalReader,
79 target_index: u64,
80 }
81
82 impl Future for WaitForBatch<'_> {
83 type Output = Result<(), Error>;
84 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85 if self.reader.journal.fatal_error.load(Ordering::Acquire) {
86 return Poll::Ready(Err(Error::Fatal));
87 }
88
89 if self.reader.journal.write_index() >= self.target_index {
90 return Poll::Ready(Ok(()));
91 }
92
93 let mut guard = self
94 .reader
95 .journal
96 .consumer_wait
97 .lock()
98 .expect("Mutex poisoned");
99 if self.reader.journal.write_index() >= self.target_index {
100 return Poll::Ready(Ok(()));
101 }
102 *guard = Some(ConsumerWait {
103 waker: cx.waker().clone(),
104 target_index: self.target_index,
105 });
106
107 Poll::Pending
108 }
109 }
110
111 impl Drop for WaitForBatch<'_> {
113 fn drop(&mut self) {
114 let mut guard = self
115 .reader
116 .journal
117 .consumer_wait
118 .lock()
119 .expect("Mutex poisoned");
120 if let Some(wait) = guard.as_ref() {
121 debug_assert_eq!(wait.target_index, self.target_index);
122 guard.take();
125 }
126 }
127 }
128
129 WaitForBatch {
130 reader: self,
131 target_index,
132 }
133 .await
134 }
135
136 pub fn read(&mut self, max: usize) -> Result<&[Box<[u8]>], Error> {
142 if self.journal.fatal_error.load(Ordering::Acquire) {
143 return Err(Error::Fatal);
144 }
145
146 let available = self.available();
147 if available == 0 {
148 return Ok(&[]);
149 }
150
151 let count = available.min(max);
152 self.read_buf.clear();
153
154 let start_key = self.consumed.to_be_bytes();
155 let end_key = self
156 .consumed
157 .checked_add(count as u64)
158 .expect("let's handle overflow 10000 years later")
159 .to_be_bytes();
160
161 let mut options = ReadOptions::default();
162 options.set_iterate_lower_bound(start_key);
163 options.set_iterate_upper_bound(end_key);
164 options.set_auto_readahead_size(true);
165
166 let iter = self.journal.db.iterator_cf_opt(
167 self.journal.cf_entries(),
168 options,
169 IteratorMode::From(&start_key, Direction::Forward),
170 );
171 for (idx, data) in iter.enumerate() {
172 let (_key, value) = data.stop_if_error(&self.journal)?;
173 debug_assert_eq!((self.consumed + idx as u64).to_be_bytes(), _key.as_ref(),);
174 self.read_buf.push(value);
175 }
176
177 let read = self.read_buf.len();
178 if read != count {
179 error!(
180 "journal reader short read: expected {count} entries, got {read}; treating as fatal"
181 );
182 return Err(Error::Fatal);
183 }
184
185 self.consumed += count as u64;
186 Ok(&self.read_buf)
187 }
188
189 pub fn commit(&mut self) -> Result<(), Error> {
192 if self.journal.fatal_error.load(Ordering::Acquire) {
193 return Err(Error::Fatal);
194 }
195
196 let old_consumed = self.journal.consumed_index();
197
198 let mut batch = WriteBatch::default();
199 self.journal
200 .write_consumed_index_batched(&mut batch, self.consumed);
201 batch.delete_range_cf(
203 self.journal.cf_entries(),
204 old_consumed.to_be_bytes(),
205 self.consumed.to_be_bytes(),
206 );
207 self.journal.db.write(batch).stop_if_error(&self.journal)?;
208
209 self.journal
210 .consumed_index
211 .store(self.consumed, Ordering::Release);
212 Ok(())
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use crate::tests::*;
219 use tokio::time::{Duration, sleep, timeout};
220
221 #[tokio::test(flavor = "current_thread")]
222 async fn available_tracks_written_entries() -> eyre::Result<()> {
223 let (journal, _tmp) = test_journal(4);
224 let mut reader = journal.reader();
225
226 assert_eq!(reader.available(), 0);
227
228 journal.commit(&TEST_DATA[0]);
229 assert_eq!(reader.available(), 1);
230
231 journal.commit(&TEST_DATA[1]);
232 assert_eq!(reader.available(), 2);
233
234 let slice = reader.read(1)?;
235 assert_eq!(slice.len(), 1);
236 assert_eq!(reader.available(), 1);
237 Ok(())
238 }
239
240 #[tokio::test(flavor = "current_thread")]
241 async fn commit_updates_shared_consumed_boundary() -> eyre::Result<()> {
242 let (journal, _tmp) = test_journal(4);
243 let mut reader = journal.reader();
244
245 for entry in TEST_DATA.iter().take(3) {
246 journal.commit(entry);
247 }
248
249 let slice = reader.read(2)?;
250 assert_eq!(slice.len(), 2);
251 assert_eq!(reader.available(), 1);
252 assert_eq!(
253 reader
254 .journal
255 .consumed_index
256 .load(std::sync::atomic::Ordering::Acquire),
257 0,
258 "global consumed boundary should not advance before commit",
259 );
260
261 reader.commit()?;
262 assert_eq!(
263 reader
264 .journal
265 .consumed_index
266 .load(std::sync::atomic::Ordering::Acquire),
267 2
268 );
269 Ok(())
270 }
271
272 #[tokio::test(flavor = "current_thread")]
273 async fn wait_at_least_resumes_after_write() -> eyre::Result<()> {
274 let (journal, _tmp) = test_journal(4);
275 let mut reader = journal.reader();
276
277 let journal_clone = journal.clone();
278 let task = tokio::spawn(async move {
279 sleep(Duration::from_millis(5)).await;
280 journal_clone.commit(&TEST_DATA[0]);
281 });
282
283 reader.wait_at_least(1).await?;
284 assert_eq!(reader.available(), 1);
285
286 task.await?;
287 Ok(())
288 }
289
290 #[tokio::test(flavor = "current_thread")]
291 async fn wait_at_least_waits_for_correct_count() -> eyre::Result<()> {
292 let (journal, _tmp) = test_journal(4);
293 let mut reader = journal.reader();
294
295 let journal_clone = journal.clone();
296 let task = tokio::spawn(async move {
297 for entry in TEST_DATA.iter().take(4) {
298 journal_clone.commit(entry);
299 sleep(Duration::from_millis(5)).await;
300 }
301 });
302
303 timeout(Duration::from_secs(10), reader.wait_at_least(3)).await??;
304 assert!(reader.available() >= 3);
305
306 task.await?;
307 Ok(())
308 }
309
310 #[tokio::test(flavor = "current_thread")]
311 #[should_panic(
312 expected = "requested (5) exceeds max possible (4): journal.capacity=4, journal.consumed_index=0"
313 )]
314 async fn wait_at_least_exceeds_capacity() {
315 let (journal, _tmp) = test_journal(4);
316 let mut reader = journal.reader();
317
318 timeout(Duration::from_secs(1), reader.wait_at_least(5))
319 .await
320 .unwrap()
321 .unwrap();
322 }
323
324 #[tokio::test(flavor = "current_thread")]
325 #[should_panic(
326 expected = "requested (5) exceeds max possible (4): journal.capacity=4, journal.consumed_index=0"
327 )]
328 async fn wait_at_least_dirty_read_exceeds_available() {
329 let (journal, _tmp) = test_journal(4);
330 journal.commit(&TEST_DATA[0]);
331
332 let mut reader = journal.reader();
333 reader.read(1).unwrap();
334
335 timeout(Duration::from_secs(1), reader.wait_at_least(4))
336 .await
337 .unwrap()
338 .unwrap();
339 }
340}