kekbit_core/
shm.rs

1//! Defines operations to create readers and writers backed by a memory mapped channel.
2pub mod reader;
3use reader::ShmReader;
4pub mod writer;
5use crate::header::Header;
6use log::{error, info};
7use memmap::MmapOptions;
8
9use crate::api::ChannelError;
10use crate::api::ChannelError::*;
11
12use crate::utils::FOOTER_LEN;
13use kekbit_codecs::codecs::DataFormat;
14use std::fs::OpenOptions;
15use std::fs::{remove_file, DirBuilder};
16use std::path::Path;
17use std::result::Result;
18use writer::ShmWriter;
19/// Creates a kekbit reader associated to a memory mapped channel.
20///
21/// Returns a ready to use reader which points to the beginning of a kekbit channel if succeeds, or an error if the operation fails.
22///
23/// # Arguments
24///
25/// * `root_path` - The path to the folder where all the channels will be stored grouped by writer's id.
26/// * `writer_id` - The id of the writer which created the channel.
27/// * `channel_id` - The channel identifier.
28///
29/// # Errors
30///
31/// Various [errors](enum.ChannelError.html) may occur if the operation fails.
32///
33/// # Examples
34///
35/// ```
36/// # use kekbit_core::tick::TickUnit::Nanos;
37/// # use kekbit_core::header::Header;
38/// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
39/// use kekbit_core::shm::*;
40/// # const FOREVER: u64 = 99_999_999_999;
41/// let writer_id = 1850;
42/// let channel_id = 42;
43/// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
44/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
45/// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
46/// let reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
47/// println!("{:?}", reader.header());
48///
49/// ```
50pub fn shm_reader(root_path: &Path, channel_id: u64) -> Result<ShmReader, ChannelError> {
51    let kek_file_path = storage_path(root_path, channel_id).into_path_buf();
52    let kek_lock_path = kek_file_path.with_extension("lock");
53    if !kek_file_path.exists() {
54        return Err(StorageNotFound {
55            file_name: kek_file_path.to_str().unwrap().to_string(),
56        });
57    }
58    if kek_lock_path.exists() {
59        return Err(StorageNotReady {
60            file_name: kek_file_path.to_str().unwrap().to_string(),
61        });
62    }
63
64    let kek_file = OpenOptions::new()
65        .write(true)
66        .read(true)
67        .open(&kek_file_path)
68        .or_else(|err| {
69            Err(CouldNotAccessStorage {
70                file_name: err.to_string(),
71            })
72        })?;
73
74    info!("Kekbit file {:?} opened for read.", kek_file);
75    let mmap =
76        unsafe { MmapOptions::new().map_mut(&kek_file) }.or_else(|err| Err(MemoryMappingFailed { reason: err.to_string() }))?;
77    ShmReader::new(mmap)
78}
79
80/// Tries multiple times to create a kekbit reader associated to a memory mapped channel.
81/// This function will basically call [shm_reader](fn.shm_reader.html) up to *tries* time unless
82/// it succeeds. Between two tries the function will spin/sleep for a about ```duration_millis/tries```
83/// milliseconds so potentially could be blocking.
84/// This should be the preferred method to create a *reader* when you are willing to wait until the channel is available.
85///
86///
87/// Returns a ready to use reader which points to the beginning of a kekbit channel if succeeds, or the error *returned by the last try* if it fails.
88///
89/// # Arguments
90///
91/// * `root_path` - The path to the folder where all the channels will be stored grouped by writer's id.
92/// * `writer_id` - The id of the writer which created the channel.
93/// * `channel_id` - The channel identifier.
94/// * `duration_millis` - How long it should try in milliseconds
95/// * `tries` - How many times it will try during the given time duration
96///
97/// # Errors
98///
99/// Various [errors](enum.ChannelError.html) may occur if the operation fails.
100///
101/// # Examples
102///
103/// ```
104/// # use kekbit_core::tick::TickUnit::Nanos;
105/// # use kekbit_core::header::Header;
106/// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
107/// use kekbit_core::shm::*;
108/// # const FOREVER: u64 = 99_999_999_999;
109/// let writer_id = 1850;
110/// let channel_id = 42;
111/// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
112/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
113/// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
114/// let duration = 1000;
115/// let tries = 10;
116/// let reader = try_shm_reader(&test_tmp_dir.path(), channel_id, duration, tries).unwrap();
117/// println!("{:?}", reader.header());
118///
119/// ```
120pub fn try_shm_reader(root_path: &Path, channel_id: u64, duration_millis: u64, tries: u64) -> Result<ShmReader, ChannelError> {
121    assert!(tries > 0);
122    let interval = duration_millis / tries;
123    let sleep_duration = std::time::Duration::from_millis(interval);
124    let mut reader_res = shm_reader(root_path, channel_id);
125    let mut tries_left = tries - 1;
126    while reader_res.is_err() && tries_left > 0 {
127        std::thread::sleep(sleep_duration);
128        reader_res = shm_reader(root_path, channel_id);
129        tries_left -= 1;
130    }
131    reader_res
132}
133
134/// Creates a file backed memory mapped  kekbit channel and a writer associate with it.
135///
136/// Returns a ready to use writer to the new created channel or an error if the operation fails.
137///
138/// # Arguments
139///
140/// * `root_path` - The path to the folder where all the channels will be stored grouped by writers id.
141/// * `header` - a structure of type [Header](struct.Header.html) which contains the complete information required to create a channel.
142///
143/// # Errors
144///
145/// Various [errors](enum.ChannelError.html) may occur if the operation fails.
146///
147/// # Examples
148///
149/// ```
150/// use kekbit_core::tick::TickUnit::Nanos;
151/// use kekbit_core::shm::*;
152/// use kekbit_core::header::Header;
153/// use kekbit_core::api::Writer;
154/// use kekbit_codecs::codecs::raw::RawBinDataFormat;
155///
156/// const FOREVER: u64 = 99_999_999_999;
157/// let writer_id = 1850;
158/// let channel_id = 42;
159/// let capacity = 3000;
160/// let max_msg_len = 100;
161/// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
162/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
163/// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
164/// writer.heartbeat().unwrap();
165/// ```
166pub fn shm_writer<D: DataFormat>(root_path: &Path, header: &Header, df: D) -> Result<ShmWriter<D>, ChannelError> {
167    let kek_file_path = storage_path(root_path, header.channel_id()).into_path_buf();
168    if kek_file_path.exists() {
169        return Err(StorageAlreadyExists {
170            file_name: kek_file_path.to_str().unwrap().to_string(),
171        });
172    }
173    let mut builder = DirBuilder::new();
174    builder.recursive(true);
175    builder.create(&kek_file_path.parent().unwrap()).or_else(|err| {
176        Err(CouldNotAccessStorage {
177            file_name: err.to_string(),
178        })
179    })?;
180    let kek_lock_path = kek_file_path.with_extension("lock");
181    OpenOptions::new()
182        .write(true)
183        .create(true)
184        .open(&kek_lock_path)
185        .or_else(|err| {
186            Err(CouldNotAccessStorage {
187                file_name: err.to_string(),
188            })
189        })?;
190    info!("Kekbit lock {:?} created", kek_lock_path);
191    let kek_file = OpenOptions::new()
192        .write(true)
193        .read(true)
194        .create(true)
195        .open(&kek_file_path)
196        .or_else(|err| {
197            Err(CouldNotAccessStorage {
198                file_name: err.to_string(),
199            })
200        })?;
201    let total_len = (header.capacity() + header.len() as u32 + FOOTER_LEN) as u64;
202    kek_file.set_len(total_len).or_else(|err| {
203        Err(CouldNotAccessStorage {
204            file_name: err.to_string(),
205        })
206    })?;
207    info!("Kekbit channel store {:?} created.", kek_file);
208    let mut mmap =
209        unsafe { MmapOptions::new().map_mut(&kek_file) }.or_else(|err| Err(MemoryMappingFailed { reason: err.to_string() }))?;
210    let buf = &mut mmap[..];
211    header.write_to(buf);
212    mmap.flush().or_else(|err| Err(AccessError { reason: err.to_string() }))?;
213    info!("Kekbit channel with store {:?} succesfully initialized", kek_file_path);
214    let res = ShmWriter::new(mmap, df);
215    if res.is_err() {
216        error!("Kekbit writer creation error . The file {:?} will be removed!", kek_file_path);
217        remove_file(&kek_file_path).expect("Could not remove kekbit file");
218    }
219    remove_file(&kek_lock_path).expect("Could not remove kekbit lock file");
220    info!("Kekbit lock file {:?} removed", kek_lock_path);
221    res
222}
223
224#[inline]
225/// Returns the path to the file associated with a channel inside a kekbit root folder.
226///
227/// # Arguments
228///
229///  * `root_path` - Path to the kekbit root folder, a folder where channels are stored. Multiple such
230///   folders may exist in a system.  
231///  * `channel_id` - Channel for which the file path will be returned
232///
233pub fn storage_path(root_path: &Path, channel_id: u64) -> Box<Path> {
234    let high_val: u32 = (channel_id >> 32) as u32;
235    let low_val = (channel_id & 0x0000_0000_FFFF_FFFF) as u32;
236    let channel_folder = format!("{:04x}_{:04x}", high_val >> 16, high_val & 0x0000_FFFF);
237    let channel_file = format!("{:04x}_{:04x}", low_val >> 16, low_val & 0x0000_FFFF);
238    let dir_path = root_path.join(channel_folder).join(channel_file);
239    dir_path.with_extension("kekbit").into_boxed_path()
240}
241
242#[cfg(test)]
243mod test {
244    use super::*;
245    use crate::api::{InvalidPosition, Reader, Writer};
246    use crate::tick::TickUnit::Nanos;
247    use crate::utils::{align, REC_HEADER_LEN};
248    use assert_matches::*;
249    use kekbit_codecs::codecs::raw::RawBinDataFormat;
250    use std::sync::Arc;
251    use std::sync::Once;
252    use tempdir::TempDir;
253
254    const FOREVER: u64 = 99_999_999_999;
255
256    static INIT_LOG: Once = Once::new();
257
258    #[test]
259    fn check_max_len() {
260        let header = Header::new(100, 1000, 300_000, 1000, FOREVER, Nanos);
261        let test_tmp_dir = TempDir::new("kektest").unwrap();
262        let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
263        let reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
264        assert_eq!(writer.header(), reader.header());
265    }
266
267    #[test]
268    fn read_than_write() {
269        INIT_LOG.call_once(|| {
270            simple_logger::init().unwrap();
271        });
272        let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
273        let test_tmp_dir = TempDir::new("kektest").unwrap();
274        let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
275        let txt = "There are 10 kinds of people: those who know binary and those who don't";
276        let msgs = txt.split_whitespace();
277        let mut msg_count = 0;
278        let mut bytes_written = 8; //account for the initial heartbeat
279        for m in msgs {
280            let to_wr = m.as_bytes();
281            let len = to_wr.len() as u32;
282            let size = writer.write(&to_wr).unwrap();
283            assert_eq!(size, align(len + REC_HEADER_LEN));
284            bytes_written += size;
285            msg_count += 1;
286        }
287        assert_eq!(writer.write_offset(), bytes_written);
288        writer.flush().unwrap(); //not really necessary
289        let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
290        assert_eq!(reader.position(), 0);
291        let mut res_msg = StrMsgsAppender::default();
292        let bytes_read = reader
293            .read(&mut |_, msg| res_msg.on_message(msg), msg_count + 10 as u16)
294            .unwrap();
295        assert_eq!(res_msg.txt, txt);
296        assert_eq!(bytes_written, bytes_read);
297        assert_eq!(reader.position(), bytes_read);
298    }
299
300    #[derive(Default, Debug)]
301    struct StrMsgsAppender {
302        txt: String,
303    }
304
305    impl StrMsgsAppender {
306        pub fn on_message(&mut self, buf: &[u8]) {
307            let msg_str = std::str::from_utf8(&buf).unwrap();
308            if !self.txt.is_empty() {
309                self.txt.push_str(" ");
310            }
311            self.txt.push_str(msg_str);
312        }
313    }
314
315    #[test]
316    fn check_position() {
317        INIT_LOG.call_once(|| {
318            simple_logger::init().unwrap();
319        });
320        let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
321        let test_tmp_dir = TempDir::new("kektest").unwrap();
322        let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
323        let txt = "There are 10 kinds of people: those who know binary and those who don't";
324        let msgs = txt.split_whitespace();
325        let mut msg_count = 0;
326        for m in msgs {
327            let to_wr = m.as_bytes();
328            let len = to_wr.len() as u32;
329            let size = writer.write(&to_wr).unwrap();
330            assert_eq!(size, align(len + REC_HEADER_LEN));
331            msg_count += 1;
332        }
333        let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
334        let mut read_bytes = 0;
335        let mut last_msg_size = 0;
336        for _i in 0..msg_count {
337            last_msg_size = reader.read(&mut |pos, _msg| assert_eq!(pos, read_bytes), 1).unwrap();
338            read_bytes += last_msg_size;
339        }
340        assert_eq!(reader.position(), writer.write_offset() - last_msg_size);
341    }
342
343    #[test]
344    fn check_move_to() {
345        INIT_LOG.call_once(|| {
346            simple_logger::init().unwrap();
347        });
348        let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
349        let test_tmp_dir = TempDir::new("kektest").unwrap();
350        let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
351        let txt = "There are 10 kinds of people: those who know binary and those who don't";
352        let msgs = txt.split_whitespace();
353        let mut msg_count = 0;
354        for m in msgs {
355            let to_wr = m.as_bytes();
356            let len = to_wr.len() as u32;
357            let size = writer.write(&to_wr).unwrap();
358            assert_eq!(size, align(len + REC_HEADER_LEN));
359            msg_count += 1;
360        }
361        let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
362        reader.move_to(8).unwrap(); //skip  heartbeat
363        let mut msg_read = 0;
364        let mut last_pos = 0;
365        for _i in 0..msg_count {
366            //let's read every message twice...
367            reader
368                .read(
369                    &mut |pos, _| {
370                        msg_read += 1;
371                        last_pos = pos
372                    },
373                    1,
374                )
375                .unwrap();
376            reader.move_to(last_pos).unwrap();
377            reader.read(&mut |_, _| msg_read += 1, 1).unwrap();
378        }
379        assert_eq!(msg_read, 2 * msg_count);
380        reader.move_to(8).unwrap(); //now let's read them again
381        for _i in 0..msg_count {
382            reader
383                .read(
384                    &mut |_, _| {
385                        msg_read += 1;
386                    },
387                    1,
388                )
389                .unwrap();
390        }
391        assert_eq!(msg_read, 3 * msg_count);
392    }
393
394    #[test]
395    fn check_invalid_move_to() {
396        INIT_LOG.call_once(|| {
397            simple_logger::init().unwrap();
398        });
399        let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
400        let test_tmp_dir = TempDir::new("kektest").unwrap();
401        let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
402        let txt = "There are 10 kinds of people: those who know binary and those who don't";
403        let msgs = txt.split_whitespace();
404        for m in msgs {
405            let to_wr = m.as_bytes();
406            writer.write(&to_wr).unwrap();
407        }
408        let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
409        reader.move_to(8).unwrap(); //skip  heartbeat
410        assert_matches!(reader.move_to(4), Err(InvalidPosition::Unaligned { position: 4 }));
411        assert_matches!(reader.move_to(45680), Err(InvalidPosition::Unavailable { position: 45680 })); //to big
412        assert_matches!(reader.move_to(999), Err(InvalidPosition::Unaligned { position: 999 })); //unaligned
413        assert!(reader.move_to(24).is_ok());
414        assert!(reader.move_to(56).is_ok());
415        assert_matches!(reader.move_to(64), Err(InvalidPosition::Unaligned { position: 64 })); //between records
416        assert!(reader.move_to(72).is_ok());
417        assert_matches!(reader.move_to(832), Err(InvalidPosition::Unavailable { position: 832 }));
418        //to big
419    }
420
421    #[test]
422    fn check_path_to_storage() {
423        let dir = TempDir::new("kektest").unwrap();
424        let root_path = dir.path();
425        let channel_id_0: u64 = 0;
426        let path = storage_path(root_path, channel_id_0).into_path_buf();
427        assert_eq!(path, root_path.join("0000_0000").join("0000_0000.kekbit"));
428        assert_eq!(
429            path.with_extension("lock"),
430            root_path.join("0000_0000").join("0000_0000.lock")
431        );
432
433        let channel_id_1: u64 = 0xAAAA_BBBB_CCCC_DDDD;
434        let path = storage_path(root_path, channel_id_1).into_path_buf();
435        assert_eq!(path, root_path.join("aaaa_bbbb").join("cccc_dddd.kekbit"));
436        assert_eq!(
437            path.with_extension("lock"),
438            root_path.join("aaaa_bbbb").join("cccc_dddd.lock")
439        );
440        let channel_id_2: u64 = 0xBBBB_CCCC_0001;
441        let path = storage_path(root_path, channel_id_2).into_path_buf();
442        assert_eq!(path, root_path.join("0000_bbbb").join("cccc_0001.kekbit"));
443        assert_eq!(
444            path.with_extension("lock"),
445            root_path.join("0000_bbbb").join("cccc_0001.lock")
446        );
447        let channel_id_3: u64 = 0xAAAA_00BB_000C_0DDD;
448        let path = storage_path(root_path, channel_id_3).into_path_buf();
449        assert_eq!(path, root_path.join("aaaa_00bb").join("000c_0ddd.kekbit"));
450        assert_eq!(
451            path.with_extension("lock"),
452            root_path.join("aaaa_00bb").join("000c_0ddd.lock")
453        );
454    }
455
456    #[test]
457    fn try_to_create_reader() {
458        INIT_LOG.call_once(|| {
459            simple_logger::init().unwrap();
460        });
461        let test_tmp_dir = Arc::new(TempDir::new("kektest").unwrap());
462        let never_reader = try_shm_reader(&test_tmp_dir.path(), 999_999, 300, 30);
463        assert!(never_reader.is_err());
464        let channel_id = 999;
465        let root_dir = test_tmp_dir.clone();
466        let handle = std::thread::spawn(move || {
467            let good_reader = try_shm_reader(&test_tmp_dir.path(), channel_id, 1000, 20);
468            assert!(good_reader.is_err());
469        });
470        let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
471        shm_writer(&root_dir.path(), &header, RawBinDataFormat).unwrap();
472        handle.join().unwrap();
473    }
474}