Skip to main content

khata_rs/
reader.rs

1//! Reader for consuming messages from a persistent queue
2
3use std::sync::Arc;
4
5use crate::error::{Error, Result};
6use crate::queue::Queue;
7use crate::store::Store;
8
9/// State of a reader in the queue.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ReaderState {
12    /// Not yet initialized.
13    Uninitialized,
14    /// Positioned at a valid entry.
15    Found,
16    /// Reached the end of the current cycle.
17    EndOfCycle,
18    /// The requested cycle doesn't exist.
19    CycleNotFound,
20    /// Before the start of a cycle.
21    BeforeStart,
22    /// Entry not yet written.
23    NotReached,
24}
25
26/// Reading direction.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum Direction {
29    /// Read forward (default).
30    #[default]
31    Forward,
32    /// Read backward.
33    Backward,
34    /// Stay at current position.
35    None,
36}
37
38/// A reader for consuming messages from a [`Queue`].
39///
40/// Provides sequential and random access to messages, automatically handling
41/// cycle boundaries.
42///
43/// # Thread Safety
44///
45/// A single `Reader` should only be used by one thread. Create separate
46/// instances for concurrent readers.
47///
48/// # Example
49///
50/// ```no_run
51/// use khata_rs::Queue;
52///
53/// let queue = Queue::new("/tmp/my-queue").build()?;
54/// let mut reader = queue.reader()?;
55///
56/// reader.rewind()?;
57/// while let Some(data) = reader.read()? {
58///     println!("Read: {:?}", String::from_utf8_lossy(&data));
59/// }
60/// # Ok::<(), khata_rs::Error>(())
61/// ```
62pub struct Reader<'q> {
63    queue: &'q Queue,
64    cycle: i32,
65    index: u64,
66    sequence: u64,
67    store: Option<Arc<Store>>,
68    state: ReaderState,
69    direction: Direction,
70    /// Cached position of next message for O(1) sequential reads.
71    /// Format: position in file (0 = unknown/invalid).
72    next_position: u64,
73}
74
75impl<'q> Reader<'q> {
76    /// Creates a new reader for the given queue.
77    pub(crate) fn new(queue: &'q Queue) -> Result<Self> {
78        Ok(Self {
79            queue,
80            cycle: 0,
81            index: 0,
82            sequence: 0,
83            store: None,
84            state: ReaderState::Uninitialized,
85            direction: Direction::Forward,
86            next_position: 0,
87        })
88    }
89
90    /// Reads the next message (zero-copy).
91    ///
92    /// Returns a reference to the message data directly in the memory-mapped file.
93    /// Returns `None` if there are no more messages. Automatically advances
94    /// based on the configured direction.
95    ///
96    /// # Zero-Copy Performance
97    ///
98    /// This method returns a slice into the memory-mapped file without copying.
99    /// Sequential reads are O(1) due to internal position caching.
100    /// The reference is valid as long as the Reader is not dropped or moved to
101    /// a different cycle.
102    ///
103    /// # Example
104    ///
105    /// ```no_run
106    /// # use khata_rs::Queue;
107    /// # let queue = Queue::new("/tmp/q").build().unwrap();
108    /// let mut reader = queue.reader()?;
109    /// reader.rewind()?;
110    ///
111    /// while let Some(data) = reader.read()? {
112    ///     // Process data directly without allocation
113    ///     println!("Length: {}", data.len());
114    /// }
115    /// # Ok::<(), khata_rs::Error>(())
116    /// ```
117    #[inline]
118    pub fn read(&mut self) -> Result<Option<&[u8]>> {
119        if self.state == ReaderState::Uninitialized {
120            self.rewind()?;
121        }
122
123        loop {
124            // Try fast path with cached position first
125            if let Some((ptr, len, next_pos)) = self.read_fast_path()? {
126                // Only cache next position if we're actually advancing
127                if self.direction == Direction::Forward {
128                    self.next_position = next_pos;
129                }
130                self.advance()?;
131                // SAFETY: The slice data lives in the mmap which is owned by
132                // self.store (Arc<Store>). advance() only modifies cursor state
133                // (sequence/index numbers), not the mmap contents. The mmap
134                // remains valid for the lifetime of &self since self holds the
135                // Arc<Store>. The borrow checker ensures self is not dropped
136                // while this reference is alive.
137                let data = unsafe { std::slice::from_raw_parts(ptr, len) };
138                return Ok(Some(data));
139            }
140
141            if !self.try_next_cycle()? {
142                return Ok(None);
143            }
144        }
145    }
146
147    /// Reads the next message with a closure for zero-copy access.
148    ///
149    /// # Example
150    ///
151    /// ```no_run
152    /// # use khata_rs::Queue;
153    /// # let queue = Queue::new("/tmp/q").build().unwrap();
154    /// let mut reader = queue.reader()?;
155    /// reader.rewind()?;
156    ///
157    /// while let Some(len) = reader.read_with(|data| Ok(data.len()))? {
158    ///     println!("Message length: {len}");
159    /// }
160    /// # Ok::<(), khata_rs::Error>(())
161    /// ```
162    pub fn read_with<F, T>(&mut self, f: F) -> Result<Option<T>>
163    where
164        F: FnOnce(&[u8]) -> Result<T>,
165    {
166        if self.state == ReaderState::Uninitialized {
167            self.rewind()?;
168        }
169
170        loop {
171            if let Some(store) = &self.store {
172                if let Some(data) = store.read_ref(self.sequence)? {
173                    let result = f(data)?;
174                    self.advance()?;
175                    return Ok(Some(result));
176                }
177            }
178
179            if !self.try_next_cycle()? {
180                return Ok(None);
181            }
182        }
183    }
184
185    /// Moves to the start of the queue.
186    pub fn rewind(&mut self) -> Result<&mut Self> {
187        self.next_position = 0; // Reset position cache
188        if let Some(first_cycle) = self.queue.first_cycle() {
189            self.move_to_cycle(first_cycle)?;
190            self.sequence = 0;
191            self.update_index();
192            self.state = ReaderState::Found;
193            // Set initial position to start of data (after file header)
194            self.next_position = crate::store::FILE_HEADER_SIZE;
195        } else {
196            self.cycle = self.queue.current_cycle();
197            self.sequence = 0;
198            self.store = None;
199            self.state = ReaderState::Uninitialized;
200        }
201        Ok(self)
202    }
203
204    /// Moves to the end of the queue.
205    ///
206    /// Subsequent reads will return new messages as they are written.
207    pub fn seek_end(&mut self) -> Result<&mut Self> {
208        self.next_position = 0; // Reset position cache
209        if let Some(last_cycle) = self.queue.last_cycle() {
210            self.move_to_cycle(last_cycle)?;
211
212            if let Some(ref store) = self.store {
213                self.sequence = store.message_count();
214            }
215            self.update_index();
216            self.state = ReaderState::NotReached;
217        } else {
218            self.cycle = self.queue.current_cycle();
219            self.sequence = 0;
220            self.store = None;
221            self.state = ReaderState::Uninitialized;
222        }
223        Ok(self)
224    }
225
226    /// Seeks to a specific index.
227    ///
228    /// Returns `true` if successful, `false` if the index doesn't exist.
229    pub fn seek(&mut self, index: u64) -> Result<bool> {
230        self.next_position = 0; // Reset position cache
231        let cycle = self.queue.roll_cycle().to_cycle(index);
232        let sequence = self.queue.roll_cycle().to_sequence(index);
233
234        if let Err(Error::CycleNotFound(_)) = self.move_to_cycle(cycle) {
235            self.state = ReaderState::CycleNotFound;
236            return Ok(false);
237        }
238
239        if let Some(ref store) = self.store {
240            if sequence >= store.message_count() {
241                self.state = ReaderState::NotReached;
242                return Ok(false);
243            }
244        }
245
246        self.sequence = sequence;
247        self.index = index;
248        self.state = ReaderState::Found;
249        Ok(true)
250    }
251
252    /// Moves to a specific cycle.
253    pub fn move_to_cycle(&mut self, cycle: i32) -> Result<&mut Self> {
254        if self.cycle == cycle && self.store.is_some() {
255            return Ok(self);
256        }
257
258        self.next_position = 0; // Reset position cache on cycle change
259        match self.queue.acquire_store(cycle) {
260            Ok(store) => {
261                self.store = Some(store);
262                self.cycle = cycle;
263                self.sequence = 0;
264                self.update_index();
265                self.state = ReaderState::Found;
266                Ok(self)
267            }
268            Err(e) => {
269                self.state = ReaderState::CycleNotFound;
270                Err(e)
271            }
272        }
273    }
274
275    /// Returns the current index.
276    #[inline]
277    #[must_use]
278    pub fn index(&self) -> u64 {
279        self.index
280    }
281
282    /// Returns the current cycle number.
283    #[inline]
284    #[must_use]
285    pub fn cycle(&self) -> i32 {
286        self.cycle
287    }
288
289    /// Returns the current sequence within the cycle.
290    #[inline]
291    #[must_use]
292    pub fn sequence(&self) -> u64 {
293        self.sequence
294    }
295
296    /// Returns the current state.
297    #[inline]
298    #[must_use]
299    pub fn state(&self) -> ReaderState {
300        self.state
301    }
302
303    /// Returns the current direction.
304    #[inline]
305    #[must_use]
306    pub fn direction(&self) -> Direction {
307        self.direction
308    }
309
310    /// Sets the reading direction.
311    pub fn set_direction(&mut self, direction: Direction) -> &mut Self {
312        self.direction = direction;
313        self
314    }
315
316    /// Peeks at the next message without advancing (zero-copy).
317    ///
318    /// Returns a reference to the message data directly in the memory-mapped file.
319    pub fn peek(&self) -> Result<Option<&[u8]>> {
320        if let Some(ref store) = self.store {
321            return store.read_ref(self.sequence);
322        }
323        Ok(None)
324    }
325
326    /// Fast path for sequential reads with O(1) position lookup.
327    ///
328    /// Returns (ptr, len, next_position) where next_position is the file position
329    /// of the next message for caching.
330    #[inline(always)]
331    fn read_fast_path(&self) -> Result<Option<(*const u8, usize, u64)>> {
332        let store = match &self.store {
333            Some(s) => s,
334            None => return Ok(None),
335        };
336
337        // Use cached position if available
338        if self.next_position > 0 {
339            // Fast path: read directly at cached position
340            return store.read_at_position_raw(self.next_position);
341        }
342
343        // Slow path: first read or after seek, use Store's read mechanism
344        if let Some(data) = store.read_ref(self.sequence)? {
345            return Ok(Some((data.as_ptr(), data.len(), 0)));
346        }
347
348        Ok(None)
349    }
350
351    /// Advances the sequence after a successful read.
352    fn advance(&mut self) -> Result<()> {
353        match self.direction {
354            Direction::Forward => {
355                self.sequence += 1;
356                self.update_index();
357            }
358            Direction::Backward => {
359                if self.sequence > 0 {
360                    self.sequence -= 1;
361                    self.update_index();
362                }
363            }
364            Direction::None => {}
365        }
366        Ok(())
367    }
368
369    /// Tries to move to the next cycle.
370    fn try_next_cycle(&mut self) -> Result<bool> {
371        match self.direction {
372            Direction::Forward => {
373                let next_cycle = self.cycle + 1;
374
375                if let Some(last_cycle) = self.queue.last_cycle() {
376                    if next_cycle <= last_cycle && self.move_to_cycle(next_cycle).is_ok() {
377                        self.sequence = 0;
378                        self.update_index();
379                        return Ok(true);
380                    }
381                }
382
383                self.state = ReaderState::EndOfCycle;
384                Ok(false)
385            }
386            Direction::Backward => {
387                let prev_cycle = self.cycle - 1;
388
389                if let Some(first_cycle) = self.queue.first_cycle() {
390                    if prev_cycle >= first_cycle && self.move_to_cycle(prev_cycle).is_ok() {
391                        if let Some(ref store) = self.store {
392                            let count = store.message_count();
393                            self.sequence = if count > 0 { count - 1 } else { 0 };
394                        }
395                        self.update_index();
396                        return Ok(true);
397                    }
398                }
399
400                self.state = ReaderState::BeforeStart;
401                Ok(false)
402            }
403            Direction::None => Ok(false),
404        }
405    }
406
407    /// Updates the index from cycle and sequence.
408    fn update_index(&mut self) {
409        self.index = self.queue.roll_cycle().to_index(self.cycle, self.sequence);
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416    use tempfile::TempDir;
417
418    fn create_queue_with_messages(dir: &TempDir, messages: &[&[u8]]) -> Queue {
419        let queue = Queue::new(dir.path()).build().unwrap();
420        {
421            let mut writer = queue.writer().unwrap();
422            for msg in messages {
423                writer.write(msg).unwrap();
424            }
425            writer.flush().unwrap();
426        }
427        queue
428    }
429
430    #[test]
431    fn test_creation() {
432        let temp_dir = TempDir::new().unwrap();
433        let queue = Queue::new(temp_dir.path()).build().unwrap();
434
435        let reader = queue.reader().unwrap();
436        assert_eq!(reader.state(), ReaderState::Uninitialized);
437    }
438
439    #[test]
440    fn test_empty_queue() {
441        let temp_dir = TempDir::new().unwrap();
442        let queue = Queue::new(temp_dir.path()).build().unwrap();
443
444        let mut reader = queue.reader().unwrap();
445        let result = reader.read().unwrap();
446        assert!(result.is_none());
447    }
448
449    #[test]
450    fn test_single_message() {
451        let temp_dir = TempDir::new().unwrap();
452        let queue = create_queue_with_messages(&temp_dir, &[b"Hello"]);
453
454        let mut reader = queue.reader().unwrap();
455        reader.rewind().unwrap();
456
457        assert_eq!(reader.read().unwrap(), Some(&b"Hello"[..]));
458        assert!(reader.read().unwrap().is_none());
459    }
460
461    #[test]
462    fn test_multiple_messages() {
463        let temp_dir = TempDir::new().unwrap();
464        let queue = create_queue_with_messages(&temp_dir, &[b"One", b"Two", b"Three"]);
465
466        let mut reader = queue.reader().unwrap();
467        reader.rewind().unwrap();
468
469        assert_eq!(reader.read().unwrap(), Some(&b"One"[..]));
470        assert_eq!(reader.read().unwrap(), Some(&b"Two"[..]));
471        assert_eq!(reader.read().unwrap(), Some(&b"Three"[..]));
472        assert!(reader.read().unwrap().is_none());
473    }
474
475    #[test]
476    fn test_rewind() {
477        let temp_dir = TempDir::new().unwrap();
478        let queue = create_queue_with_messages(&temp_dir, &[b"First", b"Second"]);
479
480        let mut reader = queue.reader().unwrap();
481
482        reader.rewind().unwrap();
483        assert_eq!(reader.read().unwrap(), Some(&b"First"[..]));
484
485        reader.rewind().unwrap();
486        assert_eq!(reader.read().unwrap(), Some(&b"First"[..]));
487    }
488
489    #[test]
490    fn test_seek_end() {
491        let temp_dir = TempDir::new().unwrap();
492        let queue = create_queue_with_messages(&temp_dir, &[b"One", b"Two"]);
493
494        let mut reader = queue.reader().unwrap();
495        reader.seek_end().unwrap();
496
497        assert!(reader.read().unwrap().is_none());
498    }
499
500    #[test]
501    fn test_read_with() {
502        let temp_dir = TempDir::new().unwrap();
503        let queue = create_queue_with_messages(&temp_dir, &[b"Hello, World!"]);
504
505        let mut reader = queue.reader().unwrap();
506        reader.rewind().unwrap();
507
508        let len = reader.read_with(|data| Ok(data.len())).unwrap().unwrap();
509        assert_eq!(len, 13);
510    }
511
512    #[test]
513    fn test_peek() {
514        let temp_dir = TempDir::new().unwrap();
515        let queue = create_queue_with_messages(&temp_dir, &[b"Test"]);
516
517        let mut reader = queue.reader().unwrap();
518        reader.rewind().unwrap();
519
520        assert_eq!(reader.peek().unwrap(), Some(&b"Test"[..]));
521        assert_eq!(reader.peek().unwrap(), Some(&b"Test"[..]));
522        assert_eq!(reader.read().unwrap(), Some(&b"Test"[..]));
523        assert!(reader.peek().unwrap().is_none());
524    }
525
526    #[test]
527    fn test_direction_none() {
528        let temp_dir = TempDir::new().unwrap();
529        let queue = create_queue_with_messages(&temp_dir, &[b"A", b"B"]);
530
531        let mut reader = queue.reader().unwrap();
532        reader.rewind().unwrap();
533        reader.set_direction(Direction::None);
534
535        assert_eq!(reader.read().unwrap(), Some(&b"A"[..]));
536        assert_eq!(reader.read().unwrap(), Some(&b"A"[..]));
537    }
538
539    #[test]
540    fn test_seek() {
541        let temp_dir = TempDir::new().unwrap();
542        let queue = create_queue_with_messages(&temp_dir, &[b"A", b"B", b"C", b"D", b"E"]);
543
544        let mut reader = queue.reader().unwrap();
545
546        reader.rewind().unwrap();
547        reader.read().unwrap(); // A (seq 0)
548        reader.read().unwrap(); // B (seq 1)
549        let index_c = reader.index();
550
551        reader.rewind().unwrap();
552        assert_eq!(reader.read().unwrap(), Some(&b"A"[..]));
553
554        assert!(reader.seek(index_c).unwrap());
555        assert_eq!(reader.read().unwrap(), Some(&b"C"[..]));
556        assert_eq!(reader.read().unwrap(), Some(&b"D"[..]));
557    }
558
559    #[test]
560    fn test_sequence_tracking() {
561        let temp_dir = TempDir::new().unwrap();
562        let queue = create_queue_with_messages(&temp_dir, &[b"One", b"Two", b"Three"]);
563
564        let mut reader = queue.reader().unwrap();
565        reader.rewind().unwrap();
566
567        assert_eq!(reader.sequence(), 0);
568        reader.read().unwrap();
569        assert_eq!(reader.sequence(), 1);
570        reader.read().unwrap();
571        assert_eq!(reader.sequence(), 2);
572        reader.read().unwrap();
573        assert_eq!(reader.sequence(), 3);
574    }
575}