kekbit_core/shm/reader.rs
1use crate::api::{ChannelError, InvalidPosition, ReadError, Reader};
2use crate::header::Header;
3use crate::utils::{align, is_aligned, load_atomic_u64, CLOSE, REC_HEADER_LEN, U64_SIZE, WATERMARK};
4use log::{error, info, warn};
5use memmap::MmapMut;
6use std::cmp::Ordering::*;
7use std::ops::FnMut;
8use std::result::Result;
9use std::sync::atomic::Ordering;
10
11const END_OF_TIME: u64 = std::u64::MAX; //this should be good for any time unit including nanos
12
13/// An implementation of the [Reader](trait.Reader.html) which access a persistent channel through
14/// memory mapping. A `ShmReader` must be created using the [shm_reader](fn.shm_reader.html) function.
15///
16/// # Examples
17///
18/// ```
19/// # use kekbit_core::tick::TickUnit::Nanos;
20/// # use kekbit_core::header::Header;
21/// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
22/// use kekbit_core::shm::*;
23/// # const FOREVER: u64 = 99_999_999_999;
24/// let writer_id = 1850;
25/// let channel_id = 42;
26/// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
27/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
28/// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
29/// let reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
30/// println!("{:?}", reader.header());
31///
32/// ```
33#[derive(Debug)]
34pub struct ShmReader {
35 header: Header,
36 data_ptr: *mut u8,
37 read_index: u32,
38 expiration: u64,
39 _mmap: MmapMut,
40}
41
42impl ShmReader {
43 #[allow(clippy::cast_ptr_alignment)]
44 pub(super) fn new(mut mmap: MmapMut) -> Result<ShmReader, ChannelError> {
45 let buf = &mut mmap[..];
46 let header = Header::read(buf)?;
47 let header_ptr = buf.as_ptr() as *mut u64;
48 let data_ptr = unsafe { header_ptr.add(header.len() as usize) } as *mut u8;
49 info!("Kekbit Reader successfully created");
50 Ok(ShmReader {
51 header,
52 data_ptr,
53 read_index: 0,
54 expiration: END_OF_TIME,
55 _mmap: mmap,
56 })
57 }
58 ///Returns a reference to the [Header](struct.Header.html) associated with this channel
59 #[inline]
60 pub fn header(&self) -> &Header {
61 &self.header
62 }
63
64 ///Returns the current read position. It *could* be the `total` amount of bytes read
65 ///so far(including bytes from record headers and the one used for record padding) *if*
66 ///no succesfull [move_to](struct.ShmReader.html#method.move_to) operation was executed on this reader.
67 pub fn position(&self) -> u32 {
68 self.read_index
69 }
70}
71impl Reader for ShmReader {
72 #[allow(clippy::cast_ptr_alignment)]
73 ///Reads up to `message_count` messages from the channel and for each message
74 ///calls the given handler. The handler it is `not` called for `heartbeat` messages.
75 ///This operation is non-blocking, if you want to wait for a message to be available, external
76 ///wait/spin mechanisms must be used.
77 ///
78 ///Returns the amount of bytes read together and/or an error. Even if an error occurred
79 ///there may have been messages which were correctly read, and for which the handler was called.
80 ///
81 /// # Arguments
82 ///
83 /// * `handler` - The function which will be called every time a valid messages is read from the channel.
84 /// The message is just binary data, it's up to the handler to interpret it properly.
85 /// * `message_count` - The `maximum` number of messages to be read on this call.
86 ///
87 /// # Errors
88 /// Various [errors](enum.ReadError.html) may occur such: a `writer` timeout is detected, end of channel is reached, channel is closed or channel data is corrupted.
89 /// However even in such situations, some valid records *may* have been processed.
90 ///
91 /// # Examples
92 ///
93 /// ```
94 /// # use kekbit_core::tick::TickUnit::Nanos;
95 /// # use kekbit_core::header::Header;
96 /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
97 /// use kekbit_core::shm::*;
98 /// use crate::kekbit_core::api::Reader;
99 /// # const FOREVER: u64 = 99_999_999_999;
100 /// let writer_id = 1850;
101 /// let channel_id = 42;
102 /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
103 /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
104 /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
105 /// let mut reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
106 /// reader.read(&mut |pos,buf| println!("{}->{}",pos, std::str::from_utf8(buf).unwrap()), 10).unwrap();
107 ///
108 /// ```
109 ///
110 fn read(&mut self, handler: &mut impl FnMut(u32, &[u8]) -> (), message_count: u16) -> Result<u32, ReadError> {
111 let mut msg_read = 0u16;
112 let bytes_at_start = self.read_index;
113 while msg_read < message_count {
114 let crt_index = self.read_index as usize;
115 if crt_index + U64_SIZE >= self.header.capacity() as usize {
116 return Err(ReadError::ChannelFull {
117 bytes_read: self.read_index - bytes_at_start,
118 });
119 }
120 let rec_len: u64 = unsafe { load_atomic_u64(self.data_ptr.add(crt_index) as *mut u64, Ordering::Acquire) };
121 if rec_len <= self.header.max_msg_len() as u64 {
122 let rec_size = align(REC_HEADER_LEN + rec_len as u32);
123 if crt_index + rec_size as usize >= self.header.capacity() as usize {
124 return Err(ReadError::ChannelFull {
125 bytes_read: self.read_index - bytes_at_start,
126 });
127 }
128 if rec_len > 0 {
129 //otherwise is a heartbeat
130 handler(self.read_index, unsafe {
131 std::slice::from_raw_parts(self.data_ptr.add(crt_index + REC_HEADER_LEN as usize), rec_len as usize)
132 });
133 }
134 msg_read += 1;
135 self.read_index += rec_size;
136 } else {
137 match rec_len {
138 WATERMARK => {
139 break;
140 }
141 CLOSE => {
142 info!("Producer closed channel");
143 return Err(ReadError::Closed {
144 bytes_read: self.read_index - bytes_at_start,
145 });
146 }
147 _ => {
148 error!(
149 "Channel corrupted. Unknown Marker {:#016X} at position {} ",
150 rec_len, self.read_index,
151 );
152 return Err(ReadError::Failed {
153 bytes_read: self.read_index - bytes_at_start,
154 });
155 }
156 }
157 }
158 }
159 if msg_read > 0 {
160 self.expiration = END_OF_TIME;
161 } else if self.expiration == END_OF_TIME {
162 self.expiration = self.header.tick_unit().nix_time() + self.header.timeout();
163 //start the timeout clock
164 } else if self.expiration <= self.header.tick_unit().nix_time() {
165 warn!("Writer timeout detected. Channel will be abandoned. No more reads will be performed");
166 return Err(ReadError::Timeout {
167 timeout: self.expiration,
168 });
169 }
170 Ok(self.read_index - bytes_at_start)
171 }
172 /// Tries to move this reader to a given position if it is valid.
173 ///
174 /// Returns the position itself if the operation was successful otherwise some error.
175 ///
176 /// # Arguments
177 ///
178 /// * `position` - Position where will try to point this reader. It must be a valid position on the channel
179 ///
180 /// # Errors
181 /// ///
182 /// # Examples
183 ///
184 /// ```
185 /// # use kekbit_core::tick::TickUnit::Nanos;
186 /// # use kekbit_core::header::Header;
187 /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
188 /// use kekbit_core::shm::*;
189 /// use crate::kekbit_core::api::Reader;
190 /// # const FOREVER: u64 = 99_999_999_999;
191 /// let writer_id = 1850;
192 /// let channel_id = 42;
193 /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
194 /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
195 /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
196 /// let mut reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
197 /// reader.read(&mut |pos,buf| println!("{}->{}",pos, std::str::from_utf8(buf).unwrap()), 10).unwrap();
198 ///
199 /// reader.move_to(0);//start reading from beginning again
200 /// ```
201 ///
202 fn move_to(&mut self, position: u32) -> Result<u32, InvalidPosition> {
203 if !is_aligned(position) {
204 return Err(InvalidPosition::Unaligned { position });
205 }
206 if position >= self.header.capacity() {
207 return Err(InvalidPosition::Unavailable { position });
208 }
209 let crt_pos = self.read_index;
210 self.read_index = 0;
211 loop {
212 match self.read_index.cmp(&position) {
213 Less => {
214 match self.read(&mut |_, _| (), 1) {
215 Ok(bytes_read) => {
216 if bytes_read == 0 {
217 // nothing more to read
218 self.read_index = crt_pos;
219 return Err(InvalidPosition::Unavailable { position });
220 }
221 }
222 Err(_) => {
223 self.read_index = crt_pos;
224 return Err(InvalidPosition::Unavailable { position });
225 }
226 }
227 }
228 Equal => return Ok(position),
229 Greater => {
230 self.read_index = crt_pos;
231 return Err(InvalidPosition::Unaligned { position });
232 }
233 }
234 }
235 }
236}