1use std::sync::Arc;
4
5use crate::error::{Error, Result};
6use crate::queue::Queue;
7use crate::store::Store;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ReaderState {
12 Uninitialized,
14 Found,
16 EndOfCycle,
18 CycleNotFound,
20 BeforeStart,
22 NotReached,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum Direction {
29 #[default]
31 Forward,
32 Backward,
34 None,
36}
37
38pub struct Reader<'q> {
63 queue: &'q Queue,
64 cycle: i32,
65 index: u64,
66 sequence: u64,
67 store: Option<Arc<Store>>,
68 state: ReaderState,
69 direction: Direction,
70 next_position: u64,
73}
74
75impl<'q> Reader<'q> {
76 pub(crate) fn new(queue: &'q Queue) -> Result<Self> {
78 Ok(Self {
79 queue,
80 cycle: 0,
81 index: 0,
82 sequence: 0,
83 store: None,
84 state: ReaderState::Uninitialized,
85 direction: Direction::Forward,
86 next_position: 0,
87 })
88 }
89
90 #[inline]
118 pub fn read(&mut self) -> Result<Option<&[u8]>> {
119 if self.state == ReaderState::Uninitialized {
120 self.rewind()?;
121 }
122
123 loop {
124 if let Some((ptr, len, next_pos)) = self.read_fast_path()? {
126 if self.direction == Direction::Forward {
128 self.next_position = next_pos;
129 }
130 self.advance()?;
131 let data = unsafe { std::slice::from_raw_parts(ptr, len) };
138 return Ok(Some(data));
139 }
140
141 if !self.try_next_cycle()? {
142 return Ok(None);
143 }
144 }
145 }
146
147 pub fn read_with<F, T>(&mut self, f: F) -> Result<Option<T>>
163 where
164 F: FnOnce(&[u8]) -> Result<T>,
165 {
166 if self.state == ReaderState::Uninitialized {
167 self.rewind()?;
168 }
169
170 loop {
171 if let Some(store) = &self.store {
172 if let Some(data) = store.read_ref(self.sequence)? {
173 let result = f(data)?;
174 self.advance()?;
175 return Ok(Some(result));
176 }
177 }
178
179 if !self.try_next_cycle()? {
180 return Ok(None);
181 }
182 }
183 }
184
185 pub fn rewind(&mut self) -> Result<&mut Self> {
187 self.next_position = 0; if let Some(first_cycle) = self.queue.first_cycle() {
189 self.move_to_cycle(first_cycle)?;
190 self.sequence = 0;
191 self.update_index();
192 self.state = ReaderState::Found;
193 self.next_position = crate::store::FILE_HEADER_SIZE;
195 } else {
196 self.cycle = self.queue.current_cycle();
197 self.sequence = 0;
198 self.store = None;
199 self.state = ReaderState::Uninitialized;
200 }
201 Ok(self)
202 }
203
204 pub fn seek_end(&mut self) -> Result<&mut Self> {
208 self.next_position = 0; if let Some(last_cycle) = self.queue.last_cycle() {
210 self.move_to_cycle(last_cycle)?;
211
212 if let Some(ref store) = self.store {
213 self.sequence = store.message_count();
214 }
215 self.update_index();
216 self.state = ReaderState::NotReached;
217 } else {
218 self.cycle = self.queue.current_cycle();
219 self.sequence = 0;
220 self.store = None;
221 self.state = ReaderState::Uninitialized;
222 }
223 Ok(self)
224 }
225
226 pub fn seek(&mut self, index: u64) -> Result<bool> {
230 self.next_position = 0; let cycle = self.queue.roll_cycle().to_cycle(index);
232 let sequence = self.queue.roll_cycle().to_sequence(index);
233
234 if let Err(Error::CycleNotFound(_)) = self.move_to_cycle(cycle) {
235 self.state = ReaderState::CycleNotFound;
236 return Ok(false);
237 }
238
239 if let Some(ref store) = self.store {
240 if sequence >= store.message_count() {
241 self.state = ReaderState::NotReached;
242 return Ok(false);
243 }
244 }
245
246 self.sequence = sequence;
247 self.index = index;
248 self.state = ReaderState::Found;
249 Ok(true)
250 }
251
252 pub fn move_to_cycle(&mut self, cycle: i32) -> Result<&mut Self> {
254 if self.cycle == cycle && self.store.is_some() {
255 return Ok(self);
256 }
257
258 self.next_position = 0; match self.queue.acquire_store(cycle) {
260 Ok(store) => {
261 self.store = Some(store);
262 self.cycle = cycle;
263 self.sequence = 0;
264 self.update_index();
265 self.state = ReaderState::Found;
266 Ok(self)
267 }
268 Err(e) => {
269 self.state = ReaderState::CycleNotFound;
270 Err(e)
271 }
272 }
273 }
274
275 #[inline]
277 #[must_use]
278 pub fn index(&self) -> u64 {
279 self.index
280 }
281
282 #[inline]
284 #[must_use]
285 pub fn cycle(&self) -> i32 {
286 self.cycle
287 }
288
289 #[inline]
291 #[must_use]
292 pub fn sequence(&self) -> u64 {
293 self.sequence
294 }
295
296 #[inline]
298 #[must_use]
299 pub fn state(&self) -> ReaderState {
300 self.state
301 }
302
303 #[inline]
305 #[must_use]
306 pub fn direction(&self) -> Direction {
307 self.direction
308 }
309
310 pub fn set_direction(&mut self, direction: Direction) -> &mut Self {
312 self.direction = direction;
313 self
314 }
315
316 pub fn peek(&self) -> Result<Option<&[u8]>> {
320 if let Some(ref store) = self.store {
321 return store.read_ref(self.sequence);
322 }
323 Ok(None)
324 }
325
326 #[inline(always)]
331 fn read_fast_path(&self) -> Result<Option<(*const u8, usize, u64)>> {
332 let store = match &self.store {
333 Some(s) => s,
334 None => return Ok(None),
335 };
336
337 if self.next_position > 0 {
339 return store.read_at_position_raw(self.next_position);
341 }
342
343 if let Some(data) = store.read_ref(self.sequence)? {
345 return Ok(Some((data.as_ptr(), data.len(), 0)));
346 }
347
348 Ok(None)
349 }
350
351 fn advance(&mut self) -> Result<()> {
353 match self.direction {
354 Direction::Forward => {
355 self.sequence += 1;
356 self.update_index();
357 }
358 Direction::Backward => {
359 if self.sequence > 0 {
360 self.sequence -= 1;
361 self.update_index();
362 }
363 }
364 Direction::None => {}
365 }
366 Ok(())
367 }
368
369 fn try_next_cycle(&mut self) -> Result<bool> {
371 match self.direction {
372 Direction::Forward => {
373 let next_cycle = self.cycle + 1;
374
375 if let Some(last_cycle) = self.queue.last_cycle() {
376 if next_cycle <= last_cycle && self.move_to_cycle(next_cycle).is_ok() {
377 self.sequence = 0;
378 self.update_index();
379 return Ok(true);
380 }
381 }
382
383 self.state = ReaderState::EndOfCycle;
384 Ok(false)
385 }
386 Direction::Backward => {
387 let prev_cycle = self.cycle - 1;
388
389 if let Some(first_cycle) = self.queue.first_cycle() {
390 if prev_cycle >= first_cycle && self.move_to_cycle(prev_cycle).is_ok() {
391 if let Some(ref store) = self.store {
392 let count = store.message_count();
393 self.sequence = if count > 0 { count - 1 } else { 0 };
394 }
395 self.update_index();
396 return Ok(true);
397 }
398 }
399
400 self.state = ReaderState::BeforeStart;
401 Ok(false)
402 }
403 Direction::None => Ok(false),
404 }
405 }
406
407 fn update_index(&mut self) {
409 self.index = self.queue.roll_cycle().to_index(self.cycle, self.sequence);
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use tempfile::TempDir;
417
418 fn create_queue_with_messages(dir: &TempDir, messages: &[&[u8]]) -> Queue {
419 let queue = Queue::new(dir.path()).build().unwrap();
420 {
421 let mut writer = queue.writer().unwrap();
422 for msg in messages {
423 writer.write(msg).unwrap();
424 }
425 writer.flush().unwrap();
426 }
427 queue
428 }
429
430 #[test]
431 fn test_creation() {
432 let temp_dir = TempDir::new().unwrap();
433 let queue = Queue::new(temp_dir.path()).build().unwrap();
434
435 let reader = queue.reader().unwrap();
436 assert_eq!(reader.state(), ReaderState::Uninitialized);
437 }
438
439 #[test]
440 fn test_empty_queue() {
441 let temp_dir = TempDir::new().unwrap();
442 let queue = Queue::new(temp_dir.path()).build().unwrap();
443
444 let mut reader = queue.reader().unwrap();
445 let result = reader.read().unwrap();
446 assert!(result.is_none());
447 }
448
449 #[test]
450 fn test_single_message() {
451 let temp_dir = TempDir::new().unwrap();
452 let queue = create_queue_with_messages(&temp_dir, &[b"Hello"]);
453
454 let mut reader = queue.reader().unwrap();
455 reader.rewind().unwrap();
456
457 assert_eq!(reader.read().unwrap(), Some(&b"Hello"[..]));
458 assert!(reader.read().unwrap().is_none());
459 }
460
461 #[test]
462 fn test_multiple_messages() {
463 let temp_dir = TempDir::new().unwrap();
464 let queue = create_queue_with_messages(&temp_dir, &[b"One", b"Two", b"Three"]);
465
466 let mut reader = queue.reader().unwrap();
467 reader.rewind().unwrap();
468
469 assert_eq!(reader.read().unwrap(), Some(&b"One"[..]));
470 assert_eq!(reader.read().unwrap(), Some(&b"Two"[..]));
471 assert_eq!(reader.read().unwrap(), Some(&b"Three"[..]));
472 assert!(reader.read().unwrap().is_none());
473 }
474
475 #[test]
476 fn test_rewind() {
477 let temp_dir = TempDir::new().unwrap();
478 let queue = create_queue_with_messages(&temp_dir, &[b"First", b"Second"]);
479
480 let mut reader = queue.reader().unwrap();
481
482 reader.rewind().unwrap();
483 assert_eq!(reader.read().unwrap(), Some(&b"First"[..]));
484
485 reader.rewind().unwrap();
486 assert_eq!(reader.read().unwrap(), Some(&b"First"[..]));
487 }
488
489 #[test]
490 fn test_seek_end() {
491 let temp_dir = TempDir::new().unwrap();
492 let queue = create_queue_with_messages(&temp_dir, &[b"One", b"Two"]);
493
494 let mut reader = queue.reader().unwrap();
495 reader.seek_end().unwrap();
496
497 assert!(reader.read().unwrap().is_none());
498 }
499
500 #[test]
501 fn test_read_with() {
502 let temp_dir = TempDir::new().unwrap();
503 let queue = create_queue_with_messages(&temp_dir, &[b"Hello, World!"]);
504
505 let mut reader = queue.reader().unwrap();
506 reader.rewind().unwrap();
507
508 let len = reader.read_with(|data| Ok(data.len())).unwrap().unwrap();
509 assert_eq!(len, 13);
510 }
511
512 #[test]
513 fn test_peek() {
514 let temp_dir = TempDir::new().unwrap();
515 let queue = create_queue_with_messages(&temp_dir, &[b"Test"]);
516
517 let mut reader = queue.reader().unwrap();
518 reader.rewind().unwrap();
519
520 assert_eq!(reader.peek().unwrap(), Some(&b"Test"[..]));
521 assert_eq!(reader.peek().unwrap(), Some(&b"Test"[..]));
522 assert_eq!(reader.read().unwrap(), Some(&b"Test"[..]));
523 assert!(reader.peek().unwrap().is_none());
524 }
525
526 #[test]
527 fn test_direction_none() {
528 let temp_dir = TempDir::new().unwrap();
529 let queue = create_queue_with_messages(&temp_dir, &[b"A", b"B"]);
530
531 let mut reader = queue.reader().unwrap();
532 reader.rewind().unwrap();
533 reader.set_direction(Direction::None);
534
535 assert_eq!(reader.read().unwrap(), Some(&b"A"[..]));
536 assert_eq!(reader.read().unwrap(), Some(&b"A"[..]));
537 }
538
539 #[test]
540 fn test_seek() {
541 let temp_dir = TempDir::new().unwrap();
542 let queue = create_queue_with_messages(&temp_dir, &[b"A", b"B", b"C", b"D", b"E"]);
543
544 let mut reader = queue.reader().unwrap();
545
546 reader.rewind().unwrap();
547 reader.read().unwrap(); reader.read().unwrap(); let index_c = reader.index();
550
551 reader.rewind().unwrap();
552 assert_eq!(reader.read().unwrap(), Some(&b"A"[..]));
553
554 assert!(reader.seek(index_c).unwrap());
555 assert_eq!(reader.read().unwrap(), Some(&b"C"[..]));
556 assert_eq!(reader.read().unwrap(), Some(&b"D"[..]));
557 }
558
559 #[test]
560 fn test_sequence_tracking() {
561 let temp_dir = TempDir::new().unwrap();
562 let queue = create_queue_with_messages(&temp_dir, &[b"One", b"Two", b"Three"]);
563
564 let mut reader = queue.reader().unwrap();
565 reader.rewind().unwrap();
566
567 assert_eq!(reader.sequence(), 0);
568 reader.read().unwrap();
569 assert_eq!(reader.sequence(), 1);
570 reader.read().unwrap();
571 assert_eq!(reader.sequence(), 2);
572 reader.read().unwrap();
573 assert_eq!(reader.sequence(), 3);
574 }
575}