1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use std::fs::*;
use std::io::{self};
use std::path::{Path, PathBuf};

use crate::header::Header;
use crate::state::{QueueState, QueueStatePersistence};
use crate::sync::{FileGuard, SyncFollower};
use crate::version::check_queue_version;

use super::try_acquire_recv_lock;
use super::{segment_filename, HEADER_EOF};

/// An [`Iterator`] that iterates over the elements of the queue, until it hts
/// the end for the first time. Use this structure instead of
/// [`crate::Receiver`] if you just need to read the data stored in a queue.
///
/// Three good reasons for this are:
///
/// 1. The API is synchronous, since there is no need to wait for new elements.
/// 2. There is no transactional mechanism involved, since there is no need for
/// one and, because of this,
/// 3. Elements are not buffered in memory, as opposed to what
/// [`crate::Receiver::recv_batch`] does.
///
/// And you also get some extra percents of performance from a simpler
/// implementation. Don't pay for what you don't use!
pub struct QueueIter {
    _file_guard: FileGuard,
    base: PathBuf,
    state: QueueState,
    sync_follower: SyncFollower,
}

impl QueueIter {
    /// Opens a queue for reading. The access will be exclusive, based on the
    /// existence of the temporary file `recv.lock` inside the queue folder.
    ///
    /// # Errors
    ///
    /// This function will return an IO error if the queue is already in use for
    /// receiving, which is indicated by a lock file. Also, any other IO error
    /// encountered while opening will be sent.
    ///
    /// # Panics
    ///
    /// This function will panic if it is not able to set up the notification
    /// handler to watch for file changes.
    pub fn open<P: AsRef<Path>>(base: P) -> io::Result<QueueIter> {
        // Guarantee that the queue exists:
        create_dir_all(base.as_ref())?;

        log::trace!("created queue directory");

        // Versioning stuff (this should be lightning-fast. Therefore, shameless block):
        check_queue_version(base.as_ref())?;

        // Acquire guard and state:
        let file_guard = try_acquire_recv_lock(base.as_ref())?;
        let mut persistence = QueueStatePersistence::new();
        let state = persistence.open(base.as_ref())?;

        log::trace!("receiver lock acquired. Iter state now is {:?}", state);

        // Put the needle on the groove (oh! the 70's):
        let mut sync_follower = SyncFollower::open(segment_filename(base.as_ref(), state.segment))?;
        sync_follower.seek(io::SeekFrom::Start(state.position))?;

        log::trace!("last segment opened fo reading");

        Ok(QueueIter {
            _file_guard: file_guard,
            state,
            base: PathBuf::from(base.as_ref()),
            sync_follower,
        })
    }

    /// Puts the queue in another position in another segment. This forcibly
    /// discards the old tail follower and fetches a fresh new one, so be
    /// careful.
    fn advance_segment(&mut self) -> io::Result<()> {
        let current_segment = self.state.segment;
        self.state.advance_segment();
        let next_segment = self.state.segment;

        log::debug!(
            "advanced segment from {:?} to {:?}",
            current_segment,
            next_segment
        );

        log::debug!("opening segment {}", next_segment);
        self.sync_follower = SyncFollower::open(segment_filename(&self.base, next_segment))?;

        Ok(())
    }

    /// Reads the header. This operation is atomic.
    fn read_header(&mut self) -> io::Result<Header> {
        // Read header:
        let mut header = [0; 4];
        self.sync_follower.read_exact(&mut header)?;

        // If the header is EOF, advance segment:
        if header == HEADER_EOF {
            log::trace!("got EOF header. Advancing...");
            self.advance_segment()?;

            // Re-read the header:
            log::trace!("re-reading new header from new file");
            self.sync_follower.read_exact(&mut header)?;
        }

        // Now, you set the header!
        let decoded = Header::decode(header);

        log::trace!("got header {:?} (read {} bytes)", header, decoded.len());

        Ok(decoded)
    }

    /// Reads one element from the queue.
    fn read_one(&mut self) -> io::Result<Vec<u8>> {
        // Get the length:
        let header = self.read_header()?;

        // With the length, read the data:
        let mut data = vec![0; header.len() as usize];
        self.sync_follower
            .read_exact(&mut data)
            .expect("poisoned queue");

        Ok(data)
    }
}

impl Iterator for QueueIter {
    type Item = io::Result<Vec<u8>>;

    fn next(&mut self) -> Option<io::Result<Vec<u8>>> {
        match self.read_one() {
            Ok(item) => Some(Ok(item)),
            Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
                log::trace!("got interrupted by eof");
                None
            }
            Err(err) => Some(Err(err)),
        }
    }
}