kekbit_core/shm/writer.rs
1use crate::api::ChannelError::AccessError;
2use crate::api::{ChannelError, WriteError, Writer};
3use crate::header::Header;
4use crate::utils::{align, store_atomic_u64, CLOSE, REC_HEADER_LEN, WATERMARK};
5use kekbit_codecs::codecs::DataFormat;
6use kekbit_codecs::codecs::Encodable;
7use log::{debug, error, info};
8use memmap::MmapMut;
9use std::cmp::min;
10use std::io::Write;
11use std::ptr::copy_nonoverlapping;
12use std::result::Result;
13use std::sync::atomic::Ordering;
14
15/// An implementation of the [Writer](trait.Writer.html) which access a persistent channel through
16/// memory mapping, and uses a specific [DataFormat](../codecs/trait.DataFormat.html). A `ShmWriter` must be created using the [shm_writer](fn.shm_writer.html) function.
17/// Any `ShmWriter` exclusively holds the channel is bound to, and it is *not thread safe*.
18/// If multiple threads must write into a channel they should be externally synchronized.
19///
20/// # Examples
21///
22/// ```
23/// use kekbit_core::tick::TickUnit::Nanos;
24/// use kekbit_core::shm::*;
25/// use kekbit_core::header::Header;
26/// use kekbit_core::api::Writer;
27/// use kekbit_codecs::codecs::raw::RawBinDataFormat;
28///
29/// const FOREVER: u64 = 99_999_999_999;
30/// let writer_id = 1850;
31/// let channel_id = 42;
32/// let capacity = 3000;
33/// let max_msg_len = 100;
34/// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
35/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
36/// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
37/// writer.heartbeat().unwrap();
38/// ```
39pub struct ShmWriter<D: DataFormat> {
40 header: Header,
41 data_ptr: *mut u8,
42 write_offset: u32,
43 mmap: MmapMut,
44 df: D,
45 write: KekWrite,
46}
47
48impl<D: DataFormat> ShmWriter<D> {
49 #[allow(clippy::cast_ptr_alignment)]
50 pub(super) fn new(mut mmap: MmapMut, df: D) -> Result<ShmWriter<D>, ChannelError> {
51 let buf = &mut mmap[..];
52 let header = Header::read(buf)?;
53 let header_ptr = buf.as_ptr() as *mut u64;
54 let head_len = header.len();
55 let data_ptr = unsafe { header_ptr.add(head_len) } as *mut u8;
56 let write = KekWrite::new(data_ptr, header.max_msg_len() as usize);
57 let mut writer = ShmWriter {
58 header,
59 data_ptr,
60 write_offset: 0,
61 mmap,
62 df,
63 write,
64 };
65 info!(
66 "Kekbit channel writer created. Size is {}MB. Max msg size {}KB",
67 writer.header.capacity() / 1_000_000,
68 writer.header.max_msg_len() / 1_000
69 );
70 //sent the very first original heart bear
71 match writer.heartbeat() {
72 Ok(_) => {
73 info!("Initial hearbeat successfully sent!");
74 Ok(writer)
75 }
76 Err(we) => Err(AccessError {
77 reason: format!("Initial heartbeat failed!. Reason {:?}", we),
78 }),
79 }
80 }
81
82 #[inline(always)]
83 fn write_metadata(&mut self, write_ptr: *mut u64, len: u64, aligned_rec_len: u32) {
84 unsafe {
85 //we should always have the 8 bytes required by WATERMARK as they are acounted in the Footer
86 store_atomic_u64(write_ptr.add(aligned_rec_len as usize), WATERMARK, Ordering::Release);
87 }
88 store_atomic_u64(write_ptr, len, Ordering::Release);
89 }
90}
91
92impl<D: DataFormat> Writer<D> for ShmWriter<D> {
93 /// Writes a message into the channel. This operation will encode the data directly into channel.
94 /// While this is a non blocking operation, only one write should be executed at any given time.
95 ///
96 /// Returns the total amount of bytes wrote into the channel which includes, the size of the message,
97 /// the size of the message header and the amount of padding add to that message.
98 ///
99 /// # Arguments
100 ///
101 /// * `data` - The data which to encode and write into the channel.
102 ///
103 /// # Errors
104 ///
105 /// Two kinds of [failures](enum.WriteError.html) may occur. One if the encoding operation failed, the other if the channel
106 /// rejected the message for reasons such data is too large or no space is available in the channel.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use kekbit_core::tick::TickUnit::Nanos;
112 /// use kekbit_core::shm::*;
113 /// use kekbit_core::header::Header;
114 /// use kekbit_core::api::Writer;
115 /// use kekbit_codecs::codecs::raw::RawBinDataFormat;
116 ///
117 /// const FOREVER: u64 = 99_999_999_999;
118 /// let writer_id = 1850;
119 /// let channel_id = 42;
120 /// let capacity = 30_000;
121 /// let max_msg_len = 100;
122 /// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
123 /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
124 /// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
125 /// let msg = "There are 10 kinds of people: those who know binary and those who don't";
126 /// let msg_data = msg.as_bytes();
127 /// writer.write(&msg_data).unwrap();
128 /// ```
129 ///
130 #[allow(clippy::cast_ptr_alignment)]
131 fn write(&mut self, data: &impl Encodable<D>) -> Result<u32, WriteError> {
132 let read_head_ptr = unsafe { self.data_ptr.add(self.write_offset as usize) };
133 let write_ptr = unsafe { read_head_ptr.add(REC_HEADER_LEN as usize) };
134 let available = self.available();
135 if available <= REC_HEADER_LEN {
136 return Err(WriteError::ChannelFull);
137 }
138 let len = min(self.header.max_msg_len(), available - REC_HEADER_LEN) as usize;
139 let write_res = data.encode_to(&self.df, self.write.reset(write_ptr, len));
140 match write_res {
141 Ok(0) => Err(WriteError::NoSpaceForRecord),
142 Ok(_) => {
143 if !self.write.failed {
144 let aligned_rec_len = align(self.write.total as u32 + REC_HEADER_LEN);
145 self.write_metadata(read_head_ptr as *mut u64, self.write.total as u64, aligned_rec_len >> 3);
146 self.write_offset += aligned_rec_len;
147 Ok(aligned_rec_len)
148 } else {
149 Err(WriteError::NoSpaceForRecord)
150 }
151 }
152 Err(io_err) => Err(WriteError::EncodingError(io_err)),
153 }
154 }
155 ///Push a heartbeat message into the channel. Hearbeats are zero sized messages which do not need encoding.
156 ///Reader should never activate callbacks for heartbeat messsages.
157 ///
158 /// Returns RecordHeaderLen, 8 in the current version if the operation succeeds.
159 ///
160 /// # Errors
161 ///
162 /// If the operation fails a *ChannelFull* error will be returned, which signals that the channel will not accept any new messages.
163 ///
164 #[allow(clippy::cast_ptr_alignment)]
165 #[inline]
166 fn heartbeat(&mut self) -> Result<u32, WriteError> {
167 let read_head_ptr = unsafe { self.data_ptr.add(self.write_offset as usize) };
168 let available = self.available();
169 if available <= REC_HEADER_LEN {
170 return Err(WriteError::ChannelFull);
171 }
172 let aligned_rec_len = REC_HEADER_LEN; //no need to align REC_HEADER)LEN must be align
173 self.write_metadata(read_head_ptr as *mut u64, 0u64, aligned_rec_len >> 3);
174 self.write_offset += aligned_rec_len;
175 Ok(aligned_rec_len)
176 }
177
178 /// Flushes the channel's outstanding memory map modifications to disk. Calling this method explicitly
179 /// it is not encouraged as flushing does occur automatically and comes with a performance penalty.
180 /// It should be used only if for various reasons a writer wants to persist the channel data to the disk
181 /// at a higher rate than is done automatically.
182 ///
183 /// Returns Ok(()) if the operation succeeds.
184 ///
185 /// # Errors
186 ///
187 /// If flushing fails an I/O error is returned.
188 ///
189 /// # Examples
190 ///
191 /// ```
192 /// use kekbit_core::tick::TickUnit::Nanos;
193 /// use kekbit_core::shm::*;
194 /// use kekbit_core::header::Header;
195 /// use kekbit_core::api::Writer;
196 /// use kekbit_codecs::codecs::raw::RawBinDataFormat;
197 ///
198 /// const FOREVER: u64 = 99_999_999_999;
199 /// let writer_id = 1850;
200 /// let channel_id = 42;
201 /// let capacity = 30_000;
202 /// let max_msg_len = 100;
203 /// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
204 /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
205 /// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
206 /// let msg = "There are 10 kinds of people: those who know binary and those who don't";
207 /// let msg_data = msg.as_bytes();
208 /// writer.write(&msg_data).unwrap();
209 /// writer.flush().unwrap();
210 /// ```
211 #[inline]
212 fn flush(&mut self) -> Result<(), std::io::Error> {
213 debug!("Flushing the channel");
214 self.mmap.flush()
215 }
216}
217impl<D: DataFormat> Drop for ShmWriter<D> {
218 /// Marks this channel as `closed`, flushes the changes to the disk, and removes the memory mapping.
219 fn drop(&mut self) {
220 let write_index = self.write_offset;
221 info!("Closing message queue..");
222 unsafe {
223 #[allow(clippy::cast_ptr_alignment)]
224 //we should always have the 8 bytes required by CLOSE as they are acounted in the Footer
225 let write_ptr = self.data_ptr.offset(write_index as isize) as *mut u64;
226 store_atomic_u64(write_ptr, CLOSE, Ordering::Release);
227 info!("Closing message sent")
228 }
229 self.write_offset = self.mmap.len() as u32;
230 if self.mmap.flush().is_ok() {
231 info!("All changes flushed");
232 } else {
233 error!("Flush Failed");
234 }
235 }
236}
237impl<D: DataFormat> ShmWriter<D> {
238 ///Returns the amount of space in this channel still available for write.
239 #[inline]
240 pub fn available(&self) -> u32 {
241 (self.header.capacity() - self.write_offset) & 0xFFFF_FFF8 //rounded down to alignement
242 }
243 ///Returns the amount of data written into this channel.
244 #[inline]
245 pub fn write_offset(&self) -> u32 {
246 self.write_offset
247 }
248
249 ///Returns a reference to the [Header](struct.Header.html) associated with this channel.
250 #[inline]
251 pub fn header(&self) -> &Header {
252 &self.header
253 }
254 #[inline]
255 pub fn data_format(&self) -> &D {
256 &self.df
257 }
258}
259
260struct KekWrite {
261 write_ptr: *mut u8,
262 max_size: usize,
263 total: usize,
264 failed: bool,
265}
266
267impl KekWrite {
268 #[inline]
269 fn new(write_ptr: *mut u8, max_size: usize) -> Self {
270 KekWrite {
271 write_ptr,
272 max_size,
273 total: 0,
274 failed: false,
275 }
276 }
277 #[inline]
278 fn reset(&mut self, write_ptr: *mut u8, max_size: usize) -> &mut Self {
279 self.write_ptr = write_ptr;
280 self.max_size = max_size;
281 self.total = 0;
282 self.failed = false;
283 self
284 }
285}
286
287impl Write for KekWrite {
288 #[inline]
289 fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
290 if self.failed {
291 return Ok(0);
292 }
293 let data_len = data.len();
294 if self.total + data_len > self.max_size {
295 self.failed |= true;
296 return Ok(0);
297 }
298 unsafe {
299 let crt_ptr = self.write_ptr.add(self.total as usize);
300 copy_nonoverlapping(data.as_ptr(), crt_ptr, data_len);
301 }
302 self.total += data_len;
303 Ok(data_len)
304 }
305 #[inline]
306 fn flush(&mut self) -> Result<(), std::io::Error> {
307 Ok(())
308 }
309}
310
311#[cfg(test)]
312mod test {
313 use super::*;
314
315 #[test]
316 fn test_write() {
317 let mut raw_data: [u8; 1000] = [0; 1000];
318 let write_ptr = raw_data.as_mut_ptr();
319 let mut kw = KekWrite::new(write_ptr, 20);
320 kw.flush().unwrap(); //should never crash as it does nothing
321 let d1: [u8; 10] = [1; 10];
322 let r1 = kw.write(&d1).unwrap();
323 assert_eq!(kw.total, r1);
324 assert!(!kw.failed);
325 for i in 0..10 {
326 assert_eq!(raw_data[i], 1);
327 }
328 kw.flush().unwrap(); //should never crash as it does nothing
329 let r2 = kw.write(&d1).unwrap();
330 assert_eq!(kw.total, r1 + r2);
331 assert!(!kw.failed);
332 for i in 10..20 {
333 assert_eq!(raw_data[i], 1);
334 }
335 let r3 = kw.write(&d1).unwrap();
336 assert_eq!(0, r3);
337 assert!(kw.failed);
338 kw.reset(write_ptr, 15);
339 assert!(!kw.failed);
340 let d2: [u8; 10] = [2; 10];
341 let r4 = kw.write(&d2).unwrap();
342 assert_eq!(kw.total, r4);
343 assert!(!kw.failed);
344 for i in 0..10 {
345 assert_eq!(raw_data[i], 2);
346 }
347 assert_eq!(kw.total, 10);
348 let r5 = kw.write(&d2).unwrap();
349 assert_eq!(0, r5);
350 assert!(kw.failed);
351 assert_eq!(kw.total, 10);
352 //once it fails it will never recover, even if it has enough space
353 let r6 = kw.write(&d2[0..3]).unwrap();
354 assert_eq!(0, r6);
355 assert!(kw.failed);
356 assert_eq!(kw.total, 10);
357 }
358}