kekbit_core/shm/
writer.rs

1use crate::api::ChannelError::AccessError;
2use crate::api::{ChannelError, WriteError, Writer};
3use crate::header::Header;
4use crate::utils::{align, store_atomic_u64, CLOSE, REC_HEADER_LEN, WATERMARK};
5use kekbit_codecs::codecs::DataFormat;
6use kekbit_codecs::codecs::Encodable;
7use log::{debug, error, info};
8use memmap::MmapMut;
9use std::cmp::min;
10use std::io::Write;
11use std::ptr::copy_nonoverlapping;
12use std::result::Result;
13use std::sync::atomic::Ordering;
14
15/// An implementation of the [Writer](trait.Writer.html) which access a persistent channel through
16/// memory mapping, and uses a specific [DataFormat](../codecs/trait.DataFormat.html). A `ShmWriter` must be created using the [shm_writer](fn.shm_writer.html) function.
17/// Any `ShmWriter` exclusively holds the channel is bound to, and it is *not thread safe*.
18/// If multiple threads must write into a channel they should be externally synchronized.
19///
20/// # Examples
21///
22/// ```
23/// use kekbit_core::tick::TickUnit::Nanos;
24/// use kekbit_core::shm::*;
25/// use kekbit_core::header::Header;
26/// use kekbit_core::api::Writer;
27/// use kekbit_codecs::codecs::raw::RawBinDataFormat;
28///
29/// const FOREVER: u64 = 99_999_999_999;
30/// let writer_id = 1850;
31/// let channel_id = 42;
32/// let capacity = 3000;
33/// let max_msg_len = 100;
34/// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
35/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
36/// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
37/// writer.heartbeat().unwrap();
38/// ```
39pub struct ShmWriter<D: DataFormat> {
40    header: Header,
41    data_ptr: *mut u8,
42    write_offset: u32,
43    mmap: MmapMut,
44    df: D,
45    write: KekWrite,
46}
47
48impl<D: DataFormat> ShmWriter<D> {
49    #[allow(clippy::cast_ptr_alignment)]
50    pub(super) fn new(mut mmap: MmapMut, df: D) -> Result<ShmWriter<D>, ChannelError> {
51        let buf = &mut mmap[..];
52        let header = Header::read(buf)?;
53        let header_ptr = buf.as_ptr() as *mut u64;
54        let head_len = header.len();
55        let data_ptr = unsafe { header_ptr.add(head_len) } as *mut u8;
56        let write = KekWrite::new(data_ptr, header.max_msg_len() as usize);
57        let mut writer = ShmWriter {
58            header,
59            data_ptr,
60            write_offset: 0,
61            mmap,
62            df,
63            write,
64        };
65        info!(
66            "Kekbit channel writer created. Size is {}MB. Max msg size {}KB",
67            writer.header.capacity() / 1_000_000,
68            writer.header.max_msg_len() / 1_000
69        );
70        //sent the very first original heart bear
71        match writer.heartbeat() {
72            Ok(_) => {
73                info!("Initial hearbeat successfully sent!");
74                Ok(writer)
75            }
76            Err(we) => Err(AccessError {
77                reason: format!("Initial heartbeat failed!. Reason {:?}", we),
78            }),
79        }
80    }
81
82    #[inline(always)]
83    fn write_metadata(&mut self, write_ptr: *mut u64, len: u64, aligned_rec_len: u32) {
84        unsafe {
85            //we should always have the 8 bytes required by WATERMARK as they are acounted in the Footer
86            store_atomic_u64(write_ptr.add(aligned_rec_len as usize), WATERMARK, Ordering::Release);
87        }
88        store_atomic_u64(write_ptr, len, Ordering::Release);
89    }
90}
91
92impl<D: DataFormat> Writer<D> for ShmWriter<D> {
93    /// Writes a message into the channel. This operation will encode the data directly into  channel.
94    /// While this is a non blocking operation, only one write should be executed at any given time.
95    ///
96    /// Returns the total amount of bytes wrote into the channel which includes, the size of the message,
97    /// the size of the message header and the amount of padding add to that message.
98    ///
99    /// # Arguments
100    ///
101    /// * `data` - The  data which to encode and  write into the channel.
102    ///
103    /// # Errors
104    ///
105    /// Two kinds of [failures](enum.WriteError.html) may occur. One if the encoding operation failed, the other if the channel
106    /// rejected the message for reasons such data is too large or no space is available in the channel.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use kekbit_core::tick::TickUnit::Nanos;
112    /// use kekbit_core::shm::*;
113    /// use kekbit_core::header::Header;
114    /// use kekbit_core::api::Writer;
115    /// use kekbit_codecs::codecs::raw::RawBinDataFormat;
116    ///
117    /// const FOREVER: u64 = 99_999_999_999;
118    /// let writer_id = 1850;
119    /// let channel_id = 42;
120    /// let capacity = 30_000;
121    /// let max_msg_len = 100;
122    /// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
123    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
124    /// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
125    /// let msg = "There are 10 kinds of people: those who know binary and those who don't";
126    /// let msg_data = msg.as_bytes();
127    /// writer.write(&msg_data).unwrap();
128    /// ```
129    ///
130    #[allow(clippy::cast_ptr_alignment)]
131    fn write(&mut self, data: &impl Encodable<D>) -> Result<u32, WriteError> {
132        let read_head_ptr = unsafe { self.data_ptr.add(self.write_offset as usize) };
133        let write_ptr = unsafe { read_head_ptr.add(REC_HEADER_LEN as usize) };
134        let available = self.available();
135        if available <= REC_HEADER_LEN {
136            return Err(WriteError::ChannelFull);
137        }
138        let len = min(self.header.max_msg_len(), available - REC_HEADER_LEN) as usize;
139        let write_res = data.encode_to(&self.df, self.write.reset(write_ptr, len));
140        match write_res {
141            Ok(0) => Err(WriteError::NoSpaceForRecord),
142            Ok(_) => {
143                if !self.write.failed {
144                    let aligned_rec_len = align(self.write.total as u32 + REC_HEADER_LEN);
145                    self.write_metadata(read_head_ptr as *mut u64, self.write.total as u64, aligned_rec_len >> 3);
146                    self.write_offset += aligned_rec_len;
147                    Ok(aligned_rec_len)
148                } else {
149                    Err(WriteError::NoSpaceForRecord)
150                }
151            }
152            Err(io_err) => Err(WriteError::EncodingError(io_err)),
153        }
154    }
155    ///Push a heartbeat message into the channel. Hearbeats are zero sized messages which do not need encoding.
156    ///Reader should never activate callbacks for heartbeat messsages.
157    ///
158    /// Returns RecordHeaderLen, 8 in the current version if the operation succeeds.
159    ///
160    /// # Errors
161    ///
162    /// If the operation fails a *ChannelFull* error will be returned, which signals that the channel will not accept any new messages.
163    ///
164    #[allow(clippy::cast_ptr_alignment)]
165    #[inline]
166    fn heartbeat(&mut self) -> Result<u32, WriteError> {
167        let read_head_ptr = unsafe { self.data_ptr.add(self.write_offset as usize) };
168        let available = self.available();
169        if available <= REC_HEADER_LEN {
170            return Err(WriteError::ChannelFull);
171        }
172        let aligned_rec_len = REC_HEADER_LEN; //no need to align REC_HEADER)LEN must be align
173        self.write_metadata(read_head_ptr as *mut u64, 0u64, aligned_rec_len >> 3);
174        self.write_offset += aligned_rec_len;
175        Ok(aligned_rec_len)
176    }
177
178    /// Flushes the channel's outstanding memory map modifications to disk. Calling  this method explicitly
179    /// it is not encouraged as flushing does occur automatically and comes with a performance penalty.
180    /// It should be used only if for various reasons a writer wants to persist the channel data to the disk
181    /// at a higher rate than is done automatically.
182    ///
183    /// Returns Ok(()) if the operation succeeds.
184    ///
185    /// # Errors
186    ///
187    /// If flushing fails an I/O error is returned.
188    ///
189    /// # Examples
190    ///
191    /// ```
192    /// use kekbit_core::tick::TickUnit::Nanos;
193    /// use kekbit_core::shm::*;
194    /// use kekbit_core::header::Header;
195    /// use kekbit_core::api::Writer;
196    /// use kekbit_codecs::codecs::raw::RawBinDataFormat;
197    ///
198    /// const FOREVER: u64 = 99_999_999_999;
199    /// let writer_id = 1850;
200    /// let channel_id = 42;
201    /// let capacity = 30_000;
202    /// let max_msg_len = 100;
203    /// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
204    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
205    /// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
206    /// let msg = "There are 10 kinds of people: those who know binary and those who don't";
207    /// let msg_data = msg.as_bytes();
208    /// writer.write(&msg_data).unwrap();
209    /// writer.flush().unwrap();
210    /// ```
211    #[inline]
212    fn flush(&mut self) -> Result<(), std::io::Error> {
213        debug!("Flushing the channel");
214        self.mmap.flush()
215    }
216}
217impl<D: DataFormat> Drop for ShmWriter<D> {
218    /// Marks this channel as `closed`, flushes the changes to the disk, and removes the memory mapping.
219    fn drop(&mut self) {
220        let write_index = self.write_offset;
221        info!("Closing message queue..");
222        unsafe {
223            #[allow(clippy::cast_ptr_alignment)]
224            //we should always have the 8 bytes required by CLOSE as they are acounted in the Footer
225            let write_ptr = self.data_ptr.offset(write_index as isize) as *mut u64;
226            store_atomic_u64(write_ptr, CLOSE, Ordering::Release);
227            info!("Closing message sent")
228        }
229        self.write_offset = self.mmap.len() as u32;
230        if self.mmap.flush().is_ok() {
231            info!("All changes flushed");
232        } else {
233            error!("Flush Failed");
234        }
235    }
236}
237impl<D: DataFormat> ShmWriter<D> {
238    ///Returns the amount of space in this channel still available for write.
239    #[inline]
240    pub fn available(&self) -> u32 {
241        (self.header.capacity() - self.write_offset) & 0xFFFF_FFF8 //rounded down to alignement
242    }
243    ///Returns the amount of data written into this channel.
244    #[inline]
245    pub fn write_offset(&self) -> u32 {
246        self.write_offset
247    }
248
249    ///Returns a reference to the [Header](struct.Header.html) associated with this channel.
250    #[inline]
251    pub fn header(&self) -> &Header {
252        &self.header
253    }
254    #[inline]
255    pub fn data_format(&self) -> &D {
256        &self.df
257    }
258}
259
260struct KekWrite {
261    write_ptr: *mut u8,
262    max_size: usize,
263    total: usize,
264    failed: bool,
265}
266
267impl KekWrite {
268    #[inline]
269    fn new(write_ptr: *mut u8, max_size: usize) -> Self {
270        KekWrite {
271            write_ptr,
272            max_size,
273            total: 0,
274            failed: false,
275        }
276    }
277    #[inline]
278    fn reset(&mut self, write_ptr: *mut u8, max_size: usize) -> &mut Self {
279        self.write_ptr = write_ptr;
280        self.max_size = max_size;
281        self.total = 0;
282        self.failed = false;
283        self
284    }
285}
286
287impl Write for KekWrite {
288    #[inline]
289    fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
290        if self.failed {
291            return Ok(0);
292        }
293        let data_len = data.len();
294        if self.total + data_len > self.max_size {
295            self.failed |= true;
296            return Ok(0);
297        }
298        unsafe {
299            let crt_ptr = self.write_ptr.add(self.total as usize);
300            copy_nonoverlapping(data.as_ptr(), crt_ptr, data_len);
301        }
302        self.total += data_len;
303        Ok(data_len)
304    }
305    #[inline]
306    fn flush(&mut self) -> Result<(), std::io::Error> {
307        Ok(())
308    }
309}
310
311#[cfg(test)]
312mod test {
313    use super::*;
314
315    #[test]
316    fn test_write() {
317        let mut raw_data: [u8; 1000] = [0; 1000];
318        let write_ptr = raw_data.as_mut_ptr();
319        let mut kw = KekWrite::new(write_ptr, 20);
320        kw.flush().unwrap(); //should never crash as it does nothing
321        let d1: [u8; 10] = [1; 10];
322        let r1 = kw.write(&d1).unwrap();
323        assert_eq!(kw.total, r1);
324        assert!(!kw.failed);
325        for i in 0..10 {
326            assert_eq!(raw_data[i], 1);
327        }
328        kw.flush().unwrap(); //should never crash as it does nothing
329        let r2 = kw.write(&d1).unwrap();
330        assert_eq!(kw.total, r1 + r2);
331        assert!(!kw.failed);
332        for i in 10..20 {
333            assert_eq!(raw_data[i], 1);
334        }
335        let r3 = kw.write(&d1).unwrap();
336        assert_eq!(0, r3);
337        assert!(kw.failed);
338        kw.reset(write_ptr, 15);
339        assert!(!kw.failed);
340        let d2: [u8; 10] = [2; 10];
341        let r4 = kw.write(&d2).unwrap();
342        assert_eq!(kw.total, r4);
343        assert!(!kw.failed);
344        for i in 0..10 {
345            assert_eq!(raw_data[i], 2);
346        }
347        assert_eq!(kw.total, 10);
348        let r5 = kw.write(&d2).unwrap();
349        assert_eq!(0, r5);
350        assert!(kw.failed);
351        assert_eq!(kw.total, 10);
352        //once it fails it will never recover, even if it has enough space
353        let r6 = kw.write(&d2[0..3]).unwrap();
354        assert_eq!(0, r6);
355        assert!(kw.failed);
356        assert_eq!(kw.total, 10);
357    }
358}