kekbit_core/shm/
reader.rs

1use crate::api::{ChannelError, InvalidPosition, ReadError, Reader};
2use crate::header::Header;
3use crate::utils::{align, is_aligned, load_atomic_u64, CLOSE, REC_HEADER_LEN, U64_SIZE, WATERMARK};
4use log::{error, info, warn};
5use memmap::MmapMut;
6use std::cmp::Ordering::*;
7use std::ops::FnMut;
8use std::result::Result;
9use std::sync::atomic::Ordering;
10
11const END_OF_TIME: u64 = std::u64::MAX; //this should be good for any time unit including nanos
12
13/// An implementation of the [Reader](trait.Reader.html) which access a persistent channel through
14/// memory mapping. A `ShmReader` must be created using the [shm_reader](fn.shm_reader.html) function.
15///
16/// # Examples
17///
18/// ```
19/// # use kekbit_core::tick::TickUnit::Nanos;
20/// # use kekbit_core::header::Header;
21/// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
22/// use kekbit_core::shm::*;
23/// # const FOREVER: u64 = 99_999_999_999;
24/// let writer_id = 1850;
25/// let channel_id = 42;
26/// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
27/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
28/// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
29/// let reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
30/// println!("{:?}", reader.header());
31///
32/// ```
33#[derive(Debug)]
34pub struct ShmReader {
35    header: Header,
36    data_ptr: *mut u8,
37    read_index: u32,
38    expiration: u64,
39    _mmap: MmapMut,
40}
41
42impl ShmReader {
43    #[allow(clippy::cast_ptr_alignment)]
44    pub(super) fn new(mut mmap: MmapMut) -> Result<ShmReader, ChannelError> {
45        let buf = &mut mmap[..];
46        let header = Header::read(buf)?;
47        let header_ptr = buf.as_ptr() as *mut u64;
48        let data_ptr = unsafe { header_ptr.add(header.len() as usize) } as *mut u8;
49        info!("Kekbit Reader successfully created");
50        Ok(ShmReader {
51            header,
52            data_ptr,
53            read_index: 0,
54            expiration: END_OF_TIME,
55            _mmap: mmap,
56        })
57    }
58    ///Returns a reference to the [Header](struct.Header.html) associated with this channel
59    #[inline]
60    pub fn header(&self) -> &Header {
61        &self.header
62    }
63
64    ///Returns the current read position. It *could* be the `total` amount of bytes read
65    ///so far(including bytes from record headers and the one used for record padding) *if*
66    ///no succesfull [move_to](struct.ShmReader.html#method.move_to) operation was executed on this reader.
67    pub fn position(&self) -> u32 {
68        self.read_index
69    }
70}
71impl Reader for ShmReader {
72    #[allow(clippy::cast_ptr_alignment)]
73    ///Reads up to `message_count` messages from the channel and for each message  
74    ///calls the given handler. The handler it is `not` called  for `heartbeat` messages.
75    ///This operation is non-blocking, if you want to wait for a message to be available, external
76    ///wait/spin mechanisms must be used.
77    ///
78    ///Returns the amount of bytes read together and/or an error. Even if an error occurred
79    ///there may have been messages which were correctly read, and  for which the handler was called.
80    ///
81    /// # Arguments
82    ///
83    /// * `handler` - The function which will be called every time a valid messages is read from the channel.
84    ///                   The message is just binary data, it's up to the handler to interpret it properly.
85    /// * `message_count` - The `maximum` number of messages to be read on this call.
86    ///
87    /// # Errors
88    /// Various [errors](enum.ReadError.html) may occur such: a `writer` timeout is detected, end of channel is reached, channel is closed or channel data is corrupted.
89    /// However even in such situations, some valid records *may* have been processed.
90    ///
91    /// # Examples
92    ///
93    /// ```
94    /// # use kekbit_core::tick::TickUnit::Nanos;
95    /// # use kekbit_core::header::Header;
96    /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
97    /// use kekbit_core::shm::*;
98    /// use crate::kekbit_core::api::Reader;
99    /// # const FOREVER: u64 = 99_999_999_999;
100    /// let writer_id = 1850;
101    /// let channel_id = 42;
102    /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
103    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
104    /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
105    /// let mut reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
106    /// reader.read(&mut |pos,buf| println!("{}->{}",pos, std::str::from_utf8(buf).unwrap()), 10).unwrap();  
107    ///
108    /// ```
109    ///
110    fn read(&mut self, handler: &mut impl FnMut(u32, &[u8]) -> (), message_count: u16) -> Result<u32, ReadError> {
111        let mut msg_read = 0u16;
112        let bytes_at_start = self.read_index;
113        while msg_read < message_count {
114            let crt_index = self.read_index as usize;
115            if crt_index + U64_SIZE >= self.header.capacity() as usize {
116                return Err(ReadError::ChannelFull {
117                    bytes_read: self.read_index - bytes_at_start,
118                });
119            }
120            let rec_len: u64 = unsafe { load_atomic_u64(self.data_ptr.add(crt_index) as *mut u64, Ordering::Acquire) };
121            if rec_len <= self.header.max_msg_len() as u64 {
122                let rec_size = align(REC_HEADER_LEN + rec_len as u32);
123                if crt_index + rec_size as usize >= self.header.capacity() as usize {
124                    return Err(ReadError::ChannelFull {
125                        bytes_read: self.read_index - bytes_at_start,
126                    });
127                }
128                if rec_len > 0 {
129                    //otherwise is a heartbeat
130                    handler(self.read_index, unsafe {
131                        std::slice::from_raw_parts(self.data_ptr.add(crt_index + REC_HEADER_LEN as usize), rec_len as usize)
132                    });
133                }
134                msg_read += 1;
135                self.read_index += rec_size;
136            } else {
137                match rec_len {
138                    WATERMARK => {
139                        break;
140                    }
141                    CLOSE => {
142                        info!("Producer closed channel");
143                        return Err(ReadError::Closed {
144                            bytes_read: self.read_index - bytes_at_start,
145                        });
146                    }
147                    _ => {
148                        error!(
149                            "Channel corrupted. Unknown Marker {:#016X} at position {} ",
150                            rec_len, self.read_index,
151                        );
152                        return Err(ReadError::Failed {
153                            bytes_read: self.read_index - bytes_at_start,
154                        });
155                    }
156                }
157            }
158        }
159        if msg_read > 0 {
160            self.expiration = END_OF_TIME;
161        } else if self.expiration == END_OF_TIME {
162            self.expiration = self.header.tick_unit().nix_time() + self.header.timeout();
163        //start the timeout clock
164        } else if self.expiration <= self.header.tick_unit().nix_time() {
165            warn!("Writer timeout detected. Channel will be abandoned. No more reads will be performed");
166            return Err(ReadError::Timeout {
167                timeout: self.expiration,
168            });
169        }
170        Ok(self.read_index - bytes_at_start)
171    }
172    /// Tries to move this reader to a given position if it is valid.
173    ///
174    /// Returns the position itself if the operation was successful otherwise some error.
175    ///
176    /// # Arguments
177    ///       
178    /// * `position` - Position where will try to point this reader. It must be a valid position on the channel
179    ///
180    /// # Errors
181    ///    ///
182    /// # Examples
183    ///
184    /// ```
185    /// # use kekbit_core::tick::TickUnit::Nanos;
186    /// # use kekbit_core::header::Header;
187    /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
188    /// use kekbit_core::shm::*;
189    /// use crate::kekbit_core::api::Reader;
190    /// # const FOREVER: u64 = 99_999_999_999;
191    /// let writer_id = 1850;
192    /// let channel_id = 42;
193    /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
194    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
195    /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
196    /// let mut reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
197    /// reader.read(&mut |pos,buf| println!("{}->{}",pos, std::str::from_utf8(buf).unwrap()), 10).unwrap();  
198    ///
199    /// reader.move_to(0);//start reading from beginning again
200    /// ```
201    ///  
202    fn move_to(&mut self, position: u32) -> Result<u32, InvalidPosition> {
203        if !is_aligned(position) {
204            return Err(InvalidPosition::Unaligned { position });
205        }
206        if position >= self.header.capacity() {
207            return Err(InvalidPosition::Unavailable { position });
208        }
209        let crt_pos = self.read_index;
210        self.read_index = 0;
211        loop {
212            match self.read_index.cmp(&position) {
213                Less => {
214                    match self.read(&mut |_, _| (), 1) {
215                        Ok(bytes_read) => {
216                            if bytes_read == 0 {
217                                // nothing more to read
218                                self.read_index = crt_pos;
219                                return Err(InvalidPosition::Unavailable { position });
220                            }
221                        }
222                        Err(_) => {
223                            self.read_index = crt_pos;
224                            return Err(InvalidPosition::Unavailable { position });
225                        }
226                    }
227                }
228                Equal => return Ok(position),
229                Greater => {
230                    self.read_index = crt_pos;
231                    return Err(InvalidPosition::Unaligned { position });
232                }
233            }
234        }
235    }
236}