1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
use crate::api::{ChannelError, InvalidPosition, ReadError, Reader};
use crate::header::Header;
use crate::utils::{align, is_aligned, load_atomic_u64, CLOSE, REC_HEADER_LEN, U64_SIZE, WATERMARK};
use log::{error, info, warn};
use memmap::MmapMut;
use std::cmp::Ordering::*;
use std::ops::FnMut;
use std::result::Result;
use std::sync::atomic::Ordering;

const END_OF_TIME: u64 = std::u64::MAX; //this should be good for any time unit including nanos

/// An implementation of the [Reader](trait.Reader.html) which access a persistent channel through
/// memory mapping. A `ShmReader` must be created using the [shm_reader](fn.shm_reader.html) function.
///
/// # Examples
///
/// ```
/// # use kekbit_core::tick::TickUnit::Nanos;
/// # use kekbit_core::header::Header;
/// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
/// use kekbit_core::shm::*;
/// # const FOREVER: u64 = 99_999_999_999;
/// let writer_id = 1850;
/// let channel_id = 42;
/// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
/// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
/// let reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
/// println!("{:?}", reader.header());
///
/// ```
#[derive(Debug)]
pub struct ShmReader {
    header: Header,
    data_ptr: *mut u8,
    read_index: u32,
    expiration: u64,
    _mmap: MmapMut,
}

impl ShmReader {
    #[allow(clippy::cast_ptr_alignment)]
    pub(super) fn new(mut mmap: MmapMut) -> Result<ShmReader, ChannelError> {
        let buf = &mut mmap[..];
        let header = Header::read(buf)?;
        let header_ptr = buf.as_ptr() as *mut u64;
        let data_ptr = unsafe { header_ptr.add(header.len() as usize) } as *mut u8;
        info!("Kekbit Reader successfully created");
        Ok(ShmReader {
            header,
            data_ptr,
            read_index: 0,
            expiration: END_OF_TIME,
            _mmap: mmap,
        })
    }
    ///Returns a reference to the [Header](struct.Header.html) associated with this channel
    #[inline]
    pub fn header(&self) -> &Header {
        &self.header
    }

    ///Returns the current read position. It *could* be the `total` amount of bytes read
    ///so far(including bytes from record headers and the one used for record padding) *if*
    ///no succesfull [move_to](struct.ShmReader.html#method.move_to) operation was executed on this reader.
    pub fn position(&self) -> u32 {
        self.read_index
    }
}
impl Reader for ShmReader {
    #[allow(clippy::cast_ptr_alignment)]
    ///Reads up to `message_count` messages from the channel and for each message  
    ///calls the given handler. The handler it is `not` called  for `heartbeat` messages.
    ///This operation is non-blocking, if you want to wait for a message to be available, external
    ///wait/spin mechanisms must be used.
    ///
    ///Returns the amount of bytes read together and/or an error. Even if an error occurred
    ///there may have been messages which were correctly read, and  for which the handler was called.
    ///
    /// # Arguments
    ///
    /// * `handler` - The function which will be called every time a valid messages is read from the channel.
    ///                   The message is just binary data, it's up to the handler to interpret it properly.
    /// * `message_count` - The `maximum` number of messages to be read on this call.
    ///
    /// # Errors
    /// Various [errors](enum.ReadError.html) may occur such: a `writer` timeout is detected, end of channel is reached, channel is closed or channel data is corrupted.
    /// However even in such situations, some valid records *may* have been processed.
    ///
    /// # Examples
    ///
    /// ```
    /// # use kekbit_core::tick::TickUnit::Nanos;
    /// # use kekbit_core::header::Header;
    /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
    /// use kekbit_core::shm::*;
    /// use crate::kekbit_core::api::Reader;
    /// # const FOREVER: u64 = 99_999_999_999;
    /// let writer_id = 1850;
    /// let channel_id = 42;
    /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
    /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
    /// let mut reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
    /// reader.read(&mut |pos,buf| println!("{}->{}",pos, std::str::from_utf8(buf).unwrap()), 10).unwrap();  
    ///
    /// ```
    ///
    fn read(&mut self, handler: &mut impl FnMut(u32, &[u8]) -> (), message_count: u16) -> Result<u32, ReadError> {
        let mut msg_read = 0u16;
        let bytes_at_start = self.read_index;
        while msg_read < message_count {
            let crt_index = self.read_index as usize;
            if crt_index + U64_SIZE >= self.header.capacity() as usize {
                return Err(ReadError::ChannelFull {
                    bytes_read: self.read_index - bytes_at_start,
                });
            }
            let rec_len: u64 = unsafe { load_atomic_u64(self.data_ptr.add(crt_index) as *mut u64, Ordering::Acquire) };
            if rec_len <= self.header.max_msg_len() as u64 {
                let rec_size = align(REC_HEADER_LEN + rec_len as u32);
                if crt_index + rec_size as usize >= self.header.capacity() as usize {
                    return Err(ReadError::ChannelFull {
                        bytes_read: self.read_index - bytes_at_start,
                    });
                }
                if rec_len > 0 {
                    //otherwise is a heartbeat
                    handler(self.read_index, unsafe {
                        std::slice::from_raw_parts(self.data_ptr.add(crt_index + REC_HEADER_LEN as usize), rec_len as usize)
                    });
                }
                msg_read += 1;
                self.read_index += rec_size;
            } else {
                match rec_len {
                    WATERMARK => {
                        break;
                    }
                    CLOSE => {
                        info!("Producer closed channel");
                        return Err(ReadError::Closed {
                            bytes_read: self.read_index - bytes_at_start,
                        });
                    }
                    _ => {
                        error!(
                            "Channel corrupted. Unknown Marker {:#016X} at position {} ",
                            rec_len, self.read_index,
                        );
                        return Err(ReadError::Failed {
                            bytes_read: self.read_index - bytes_at_start,
                        });
                    }
                }
            }
        }
        if msg_read > 0 {
            self.expiration = END_OF_TIME;
        } else if self.expiration == END_OF_TIME {
            self.expiration = self.header.tick_unit().nix_time() + self.header.timeout();
        //start the timeout clock
        } else if self.expiration <= self.header.tick_unit().nix_time() {
            warn!("Writer timeout detected. Channel will be abandoned. No more reads will be performed");
            return Err(ReadError::Timeout {
                timeout: self.expiration,
            });
        }
        Ok(self.read_index - bytes_at_start)
    }
    /// Tries to move this reader to a given position if it is valid.
    ///
    /// Returns the position itself if the operation was successful otherwise some error.
    ///
    /// # Arguments
    ///       
    /// * `position` - Position where will try to point this reader. It must be a valid position on the channel
    ///
    /// # Errors
    ///    ///
    /// # Examples
    ///
    /// ```
    /// # use kekbit_core::tick::TickUnit::Nanos;
    /// # use kekbit_core::header::Header;
    /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
    /// use kekbit_core::shm::*;
    /// use crate::kekbit_core::api::Reader;
    /// # const FOREVER: u64 = 99_999_999_999;
    /// let writer_id = 1850;
    /// let channel_id = 42;
    /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
    /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
    /// let mut reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
    /// reader.read(&mut |pos,buf| println!("{}->{}",pos, std::str::from_utf8(buf).unwrap()), 10).unwrap();  
    ///
    /// reader.move_to(0);//start reading from beginning again
    /// ```
    ///  
    fn move_to(&mut self, position: u32) -> Result<u32, InvalidPosition> {
        if !is_aligned(position) {
            return Err(InvalidPosition::Unaligned { position });
        }
        if position >= self.header.capacity() {
            return Err(InvalidPosition::Unavailable { position });
        }
        let crt_pos = self.read_index;
        self.read_index = 0;
        loop {
            match self.read_index.cmp(&position) {
                Less => {
                    match self.read(&mut |_, _| (), 1) {
                        Ok(bytes_read) => {
                            if bytes_read == 0 {
                                // nothing more to read
                                self.read_index = crt_pos;
                                return Err(InvalidPosition::Unavailable { position });
                            }
                        }
                        Err(_) => {
                            self.read_index = crt_pos;
                            return Err(InvalidPosition::Unavailable { position });
                        }
                    }
                }
                Equal => return Ok(position),
                Greater => {
                    self.read_index = crt_pos;
                    return Err(InvalidPosition::Unaligned { position });
                }
            }
        }
    }
}