kekbit/
core.rs

1//! Provides the components and functions required to work with memory mapped data channels.
2mod handlers;
3mod metadata;
4mod reader;
5mod tick;
6mod utils;
7mod version;
8mod writer;
9
10pub use handlers::*;
11pub use metadata::*;
12pub use reader::*;
13pub use tick::*;
14pub use writer::*;
15
16use log::{error, info};
17use memmap::MmapOptions;
18
19use crate::api::ChannelError;
20use crate::api::ChannelError::*;
21use crate::api::Handler;
22
23use crate::core::utils::FOOTER_LEN;
24use std::fs::OpenOptions;
25use std::fs::{remove_file, DirBuilder};
26use std::path::Path;
27use std::result::Result;
28/// Creates a kekbit reader associated to a memory mapped channel.
29///
30/// Returns a ready to use reader which points to the beginning of a kekbit channel if succeeds, or an error if the operation fails.
31///
32/// # Arguments
33///
34/// * `root_path` - The path to the folder where all the channels will be stored grouped by writer's id.
35/// * `writer_id` - The id of the writer which created the channel.
36/// * `channel_id` - The channel identifier.
37///
38/// # Errors
39///
40/// Various [errors](../api/enum.ChannelError.html) may occur if the operation fails.
41///
42/// # Examples
43///
44/// ```
45/// # use kekbit::core::TickUnit::Nanos;
46/// use kekbit::core::*;
47/// use kekbit::api::*;
48/// # const FOREVER: u64 = 99_999_999_999;
49/// let writer_id = 1850;
50/// let channel_id = 42;
51/// # let metadata = Metadata::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
52/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
53/// # let writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
54/// let reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
55/// println!("{:?}", reader.metadata());
56///
57/// ```
58pub fn shm_reader(root_path: &Path, channel_id: u64) -> Result<ShmReader, ChannelError> {
59    let kek_file_path = storage_path(root_path, channel_id).into_path_buf();
60    let kek_lock_path = kek_file_path.with_extension("lock");
61    if !kek_file_path.exists() {
62        return Err(StorageNotFound {
63            file_name: kek_file_path.to_str().unwrap().to_string(),
64        });
65    }
66    if kek_lock_path.exists() {
67        return Err(StorageNotReady {
68            file_name: kek_file_path.to_str().unwrap().to_string(),
69        });
70    }
71    let kek_file = OpenOptions::new()
72        .write(true)
73        .read(true)
74        .open(&kek_file_path)
75        .map_err(|err| CouldNotAccessStorage {
76            file_name: err.to_string(),
77        })?;
78
79    info!("Kekbit file {:?} opened for read.", kek_file);
80    let mmap = unsafe { MmapOptions::new().map_mut(&kek_file) }.map_err(|err| MemoryMappingFailed { reason: err.to_string() })?;
81    ShmReader::new(mmap)
82}
83
84/// Tries multiple times to create a kekbit reader associated to a memory mapped channel.
85/// This function will basically call [shm_reader](fn.shm_reader.html) up to *tries* time unless
86/// it succeeds. Between two tries the function will spin/sleep for a about ```duration_millis/tries```
87/// milliseconds so potentially could be blocking.
88/// This should be the preferred method to create a *reader* when you are willing to wait until the channel is available.
89///
90///
91/// Returns a ready to use reader which points to the beginning of a kekbit channel if succeeds, or the error *returned by the last try* if it fails.
92///
93/// # Arguments
94///
95/// * `root_path` - The path to the folder where all the channels will be stored grouped by writer's id.
96/// * `writer_id` - The id of the writer which created the channel.
97/// * `channel_id` - The channel identifier.
98/// * `duration_millis` - How long it should try in milliseconds
99/// * `tries` - How many times it will try during the given time duration
100///
101/// # Errors
102///
103/// Various [errors](enum.ChannelError.html) may occur if the operation fails.
104///
105/// # Examples
106///
107/// ```
108/// # use kekbit::core::TickUnit::Nanos;
109/// use kekbit::core::*;
110/// use kekbit::api::*;
111/// # const FOREVER: u64 = 99_999_999_999;
112/// let writer_id = 1850;
113/// let channel_id = 42;
114/// # let metadata = Metadata::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
115/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
116/// # let writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
117/// let duration = 1000;
118/// let tries = 10;
119/// let reader = try_shm_reader(&test_tmp_dir.path(), channel_id, duration, tries).unwrap();
120/// println!("{:?}", reader.metadata());
121///
122/// ```
123pub fn try_shm_reader(root_path: &Path, channel_id: u64, duration_millis: u64, tries: u64) -> Result<ShmReader, ChannelError> {
124    assert!(tries > 0);
125    let interval = duration_millis / tries;
126    let sleep_duration = std::time::Duration::from_millis(interval);
127    let mut reader_res = shm_reader(root_path, channel_id);
128    let mut tries_left = tries;
129    while reader_res.is_err() && tries_left > 0 {
130        std::thread::sleep(sleep_duration);
131        reader_res = shm_reader(root_path, channel_id);
132        tries_left -= 1;
133    }
134    reader_res
135}
136//This method should be removed as soon as metadata is exposed in the reader trait
137/// Decorates a [ShmReader](struct.ShmReader.html) with a timeout functionality.
138///
139/// # Examples
140///
141/// ```
142/// # use kekbit::core::TickUnit::Millis;
143/// use kekbit::core::*;
144/// use kekbit::api::*;
145/// const TWO_SECS: u64 = 2000;
146/// let writer_id = 1850;
147/// let channel_id = 42;
148/// # let metadata = Metadata::new(writer_id, channel_id, 300_000, 1000, TWO_SECS, Millis);
149/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
150/// # let writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
151/// let reader = shm_reader(&test_tmp_dir.path(), channel_id).unwrap();
152/// let timeout_reader = shm_timeout_reader(reader);
153/// ```
154#[inline]
155pub fn shm_timeout_reader(reader: ShmReader) -> TimeoutReader<ShmReader> {
156    reader.into()
157}
158
159/// Creates a file backed memory mapped  kekbit channel and a writer associate with it.
160///
161/// Returns a ready to use writer to the new created channel or an error if the operation fails.
162///
163/// # Arguments
164///
165/// * `root_path` - The path to the folder where all the channels will be stored grouped by writers id.
166/// * `metadata` - a structure of type [Metadata](struct.Metadata.html) which contains the complete information required to create a channel.
167///
168/// # Errors
169///
170/// Various [errors](enum.ChannelError.html) may occur if the operation fails.
171///
172/// # Examples
173///
174/// ```
175/// use kekbit::core::TickUnit::Nanos;
176/// use kekbit::core::*;
177/// use kekbit::api::*;
178///
179/// const FOREVER: u64 = 99_999_999_999;
180/// let writer_id = 1850;
181/// let channel_id = 42;
182/// let capacity = 3000;
183/// let max_msg_len = 100;
184/// let metadata = Metadata::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
185/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
186/// let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
187/// ```
188pub fn shm_writer<H: Handler>(root_path: &Path, metadata: &Metadata, rec_handler: H) -> Result<ShmWriter<H>, ChannelError> {
189    let kek_file_path = storage_path(root_path, metadata.channel_id()).into_path_buf();
190    if kek_file_path.exists() {
191        return Err(StorageAlreadyExists {
192            file_name: kek_file_path.to_str().unwrap().to_string(),
193        });
194    }
195    let mut builder = DirBuilder::new();
196    builder.recursive(true);
197    builder
198        .create(&kek_file_path.parent().unwrap())
199        .map_err(|err| CouldNotAccessStorage {
200            file_name: err.to_string(),
201        })?;
202    let kek_lock_path = kek_file_path.with_extension("lock");
203    OpenOptions::new()
204        .write(true)
205        .create(true)
206        .open(&kek_lock_path)
207        .map_err(|err| CouldNotAccessStorage {
208            file_name: err.to_string(),
209        })?;
210    info!("Kekbit lock {:?} created", kek_lock_path);
211    let kek_file = OpenOptions::new()
212        .write(true)
213        .read(true)
214        .create(true)
215        .open(&kek_file_path)
216        .map_err(|err| CouldNotAccessStorage {
217            file_name: err.to_string(),
218        })?;
219    let total_len = (metadata.capacity() + metadata.len() as u32 + FOOTER_LEN) as u64;
220    kek_file.set_len(total_len).map_err(|err| CouldNotAccessStorage {
221        file_name: err.to_string(),
222    })?;
223    info!("Kekbit channel store {:?} created.", kek_file);
224    let mut mmap =
225        unsafe { MmapOptions::new().map_mut(&kek_file) }.map_err(|err| MemoryMappingFailed { reason: err.to_string() })?;
226    let buf = &mut mmap[..];
227    metadata.write_to(buf);
228    mmap.flush().map_err(|err| AccessError { reason: err.to_string() })?;
229    info!("Kekbit channel with store {:?} succesfully initialized", kek_file_path);
230    let res = ShmWriter::new(mmap, rec_handler);
231    if res.is_err() {
232        error!("Kekbit writer creation error . The file {:?} will be removed!", kek_file_path);
233        remove_file(&kek_file_path).expect("Could not remove kekbit file");
234    }
235    remove_file(&kek_lock_path).expect("Could not remove kekbit lock file");
236    info!("Kekbit lock file {:?} removed", kek_lock_path);
237    res
238}
239
240/// Returns the path to the file associated with a channel inside a kekbit root folder.
241///
242/// # Arguments
243///
244///  * `root_path` - Path to the kekbit root folder, a folder where channels are stored. Multiple such
245///   folders may exist in a system.  
246///  * `channel_id` - Channel for which the file path will be returned
247///
248#[inline]
249pub fn storage_path(root_path: &Path, channel_id: u64) -> Box<Path> {
250    let high_val: u32 = (channel_id >> 32) as u32;
251    let low_val = (channel_id & 0x0000_0000_FFFF_FFFF) as u32;
252    let channel_folder = format!("{:04x}_{:04x}", high_val >> 16, high_val & 0x0000_FFFF);
253    let channel_file = format!("{:04x}_{:04x}", low_val >> 16, low_val & 0x0000_FFFF);
254    let dir_path = root_path.join(channel_folder).join(channel_file);
255    dir_path.with_extension("kekbit").into_boxed_path()
256}
257
258#[cfg(test)]
259mod test {
260    use super::tick::TickUnit::Nanos;
261    use super::utils::{align, REC_HEADER_LEN};
262    use super::*;
263    use crate::api::EncoderHandler;
264    use crate::api::ReadError;
265    use crate::api::ReadError::Timeout;
266    use crate::api::Reader;
267    use crate::api::Writer;
268    use crate::core::TickUnit::Millis;
269    use simple_logger::SimpleLogger;
270    use std::sync::Arc;
271    use std::sync::Once;
272    use tempdir::TempDir;
273    const FOREVER: u64 = 99_999_999_999;
274    static INIT_LOG: Once = Once::new();
275
276    #[test]
277    fn check_max_len() {
278        let metadata = Metadata::new(100, 1000, 300_000, 1000, FOREVER, Nanos);
279        let test_tmp_dir = TempDir::new("kektest").unwrap();
280        let writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
281        let reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
282        assert_eq!(writer.metadata(), reader.metadata());
283    }
284
285    #[test]
286    fn write_than_read() {
287        INIT_LOG.call_once(|| {
288            SimpleLogger::new().init().unwrap();
289        });
290        let metadata = Metadata::new(100, 1000, 10000, 1000, FOREVER, Nanos);
291        let test_tmp_dir = TempDir::new("kektest").unwrap();
292        let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
293        let txt = "There are 10 kinds of people: those who know binary and those who don't";
294        let msgs = txt.split_whitespace();
295        let mut msg_count = 0;
296        let mut bytes_written = 0;
297        for m in msgs {
298            let to_wr = m.as_bytes();
299            let len = to_wr.len() as u32;
300            let size = writer.write(&to_wr).unwrap();
301            assert_eq!(size, align(len + REC_HEADER_LEN));
302            bytes_written += size;
303            msg_count += 1;
304        }
305        assert_eq!(writer.write_offset(), bytes_written);
306        writer.flush().unwrap(); //not really necessary
307        let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
308        assert_eq!(reader.position(), 0);
309        let mut msg_iter = reader.try_iter();
310        let mut res_txt = String::new();
311        for read_res in &mut msg_iter {
312            match read_res {
313                ReadResult::Record(msg) => {
314                    let msg_str = std::str::from_utf8(&msg).unwrap();
315                    if !res_txt.is_empty() {
316                        res_txt.push(' ');
317                    }
318                    res_txt.push_str(msg_str);
319                    msg_count -= 1;
320                }
321                ReadResult::Nothing => {
322                    assert!(msg_count == 0);
323                    break;
324                }
325                ReadResult::Failed(err) => match err {
326                    ReadError::Closed => break,
327                    _ => {
328                        panic!("Unexpected read error {:?}", err);
329                    }
330                },
331            }
332        }
333        assert_eq!(res_txt, txt);
334        assert_eq!(bytes_written, reader.position());
335    }
336
337    #[test]
338    fn try_iterator_hint_size() {
339        INIT_LOG.call_once(|| {
340            SimpleLogger::new().init().unwrap();
341        });
342        let metadata = Metadata::new(100, 1000, 10000, 1000, FOREVER, Nanos);
343        let test_tmp_dir = TempDir::new("kektest").unwrap();
344        let mut msg_count = 0;
345        {
346            let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
347            let txt = "There are 10 kinds of people: those who know binary and those who don't";
348            let msgs = txt.split_whitespace();
349            for m in msgs {
350                let to_wr = m.as_bytes();
351                let len = to_wr.len() as u32;
352                let size = writer.write(&to_wr).unwrap();
353                assert_eq!(size, align(len + REC_HEADER_LEN));
354                msg_count += 1;
355            }
356        }
357        let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
358        assert!(reader.exhausted().is_none());
359        let mut read_iter = reader.try_iter();
360        let sh1 = read_iter.size_hint();
361        assert_eq!(sh1.0, 0);
362        assert!(sh1.1.is_none());
363        read_iter.next().unwrap();
364        let sh2 = read_iter.size_hint();
365        assert_eq!(sh2.0, 0);
366        assert!(sh2.1.is_none());
367        //consume it
368        let mut total = 0;
369        for _msg in &mut read_iter {
370            total += 1
371        }
372        assert_eq!(total, msg_count);
373        let sh3 = read_iter.size_hint();
374        assert_eq!(sh3.0, 0);
375        assert!(sh3.1.unwrap() == 0);
376        assert!(read_iter.next().is_none());
377        assert!(reader.exhausted().is_some());
378        assert_eq!(reader.exhausted().unwrap(), ReadError::Closed);
379    }
380
381    #[test]
382    fn check_path_to_storage() {
383        let dir = TempDir::new("kektest").unwrap();
384        let root_path = dir.path();
385        let channel_id_0: u64 = 0;
386        let path = storage_path(root_path, channel_id_0).into_path_buf();
387        assert_eq!(path, root_path.join("0000_0000").join("0000_0000.kekbit"));
388        assert_eq!(
389            path.with_extension("lock"),
390            root_path.join("0000_0000").join("0000_0000.lock")
391        );
392
393        let channel_id_1: u64 = 0xAAAA_BBBB_CCCC_DDDD;
394        let path = storage_path(root_path, channel_id_1).into_path_buf();
395        assert_eq!(path, root_path.join("aaaa_bbbb").join("cccc_dddd.kekbit"));
396        assert_eq!(
397            path.with_extension("lock"),
398            root_path.join("aaaa_bbbb").join("cccc_dddd.lock")
399        );
400        let channel_id_2: u64 = 0xBBBB_CCCC_0001;
401        let path = storage_path(root_path, channel_id_2).into_path_buf();
402        assert_eq!(path, root_path.join("0000_bbbb").join("cccc_0001.kekbit"));
403        assert_eq!(
404            path.with_extension("lock"),
405            root_path.join("0000_bbbb").join("cccc_0001.lock")
406        );
407        let channel_id_3: u64 = 0xAAAA_00BB_000C_0DDD;
408        let path = storage_path(root_path, channel_id_3).into_path_buf();
409        assert_eq!(path, root_path.join("aaaa_00bb").join("000c_0ddd.kekbit"));
410        assert_eq!(
411            path.with_extension("lock"),
412            root_path.join("aaaa_00bb").join("000c_0ddd.lock")
413        );
414    }
415
416    #[test]
417    fn try_to_create_reader() {
418        INIT_LOG.call_once(|| {
419            SimpleLogger::new().init().unwrap();
420        });
421        let test_tmp_dir = Arc::new(TempDir::new("kektest").unwrap());
422        let never_reader = try_shm_reader(&test_tmp_dir.path(), 999_999, 300, 30);
423        assert!(never_reader.is_err());
424        let channel_id = 999;
425        let root_dir = test_tmp_dir.clone();
426        let handle = std::thread::spawn(move || {
427            let good_reader = try_shm_reader(&test_tmp_dir.path(), channel_id, 1000, 20);
428            assert!(good_reader.is_err());
429        });
430        let metadata = Metadata::new(100, 1000, 10000, 1000, FOREVER, Nanos);
431        shm_writer(&root_dir.path(), &metadata, EncoderHandler::default()).unwrap();
432        handle.join().unwrap();
433    }
434    use assert_matches::assert_matches;
435    #[test]
436    fn read_with_timeout() {
437        INIT_LOG.call_once(|| {
438            SimpleLogger::new().init().unwrap();
439        });
440        let timeout = 50;
441        let metadata = Metadata::new(100, 1000, 10000, 1000, timeout, Millis);
442        let test_tmp_dir = TempDir::new("kektest").unwrap();
443        let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
444        let txt = "Just a bad day";
445        writer.write(&txt.as_bytes()).unwrap();
446        let reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
447        let mut timeout_reader = shm_timeout_reader(reader);
448        let mut msg_iter = timeout_reader.try_iter();
449        assert_matches!(msg_iter.next(), Some(ReadResult::Record(_)));
450        assert_matches!(msg_iter.next(), Some(ReadResult::Nothing));
451        let sleep_duration = std::time::Duration::from_millis(timeout + 10);
452        std::thread::sleep(sleep_duration);
453        assert_matches!(msg_iter.next(), Some(ReadResult::Failed(Timeout(_))));
454        assert_matches!(msg_iter.next(), None);
455        writer.flush().unwrap(); //not really necessary
456    }
457}