1mod handlers;
3mod metadata;
4mod reader;
5mod tick;
6mod utils;
7mod version;
8mod writer;
9
10pub use handlers::*;
11pub use metadata::*;
12pub use reader::*;
13pub use tick::*;
14pub use writer::*;
15
16use log::{error, info};
17use memmap::MmapOptions;
18
19use crate::api::ChannelError;
20use crate::api::ChannelError::*;
21use crate::api::Handler;
22
23use crate::core::utils::FOOTER_LEN;
24use std::fs::OpenOptions;
25use std::fs::{remove_file, DirBuilder};
26use std::path::Path;
27use std::result::Result;
28pub fn shm_reader(root_path: &Path, channel_id: u64) -> Result<ShmReader, ChannelError> {
59 let kek_file_path = storage_path(root_path, channel_id).into_path_buf();
60 let kek_lock_path = kek_file_path.with_extension("lock");
61 if !kek_file_path.exists() {
62 return Err(StorageNotFound {
63 file_name: kek_file_path.to_str().unwrap().to_string(),
64 });
65 }
66 if kek_lock_path.exists() {
67 return Err(StorageNotReady {
68 file_name: kek_file_path.to_str().unwrap().to_string(),
69 });
70 }
71 let kek_file = OpenOptions::new()
72 .write(true)
73 .read(true)
74 .open(&kek_file_path)
75 .map_err(|err| CouldNotAccessStorage {
76 file_name: err.to_string(),
77 })?;
78
79 info!("Kekbit file {:?} opened for read.", kek_file);
80 let mmap = unsafe { MmapOptions::new().map_mut(&kek_file) }.map_err(|err| MemoryMappingFailed { reason: err.to_string() })?;
81 ShmReader::new(mmap)
82}
83
84pub fn try_shm_reader(root_path: &Path, channel_id: u64, duration_millis: u64, tries: u64) -> Result<ShmReader, ChannelError> {
124 assert!(tries > 0);
125 let interval = duration_millis / tries;
126 let sleep_duration = std::time::Duration::from_millis(interval);
127 let mut reader_res = shm_reader(root_path, channel_id);
128 let mut tries_left = tries;
129 while reader_res.is_err() && tries_left > 0 {
130 std::thread::sleep(sleep_duration);
131 reader_res = shm_reader(root_path, channel_id);
132 tries_left -= 1;
133 }
134 reader_res
135}
136#[inline]
155pub fn shm_timeout_reader(reader: ShmReader) -> TimeoutReader<ShmReader> {
156 reader.into()
157}
158
159pub fn shm_writer<H: Handler>(root_path: &Path, metadata: &Metadata, rec_handler: H) -> Result<ShmWriter<H>, ChannelError> {
189 let kek_file_path = storage_path(root_path, metadata.channel_id()).into_path_buf();
190 if kek_file_path.exists() {
191 return Err(StorageAlreadyExists {
192 file_name: kek_file_path.to_str().unwrap().to_string(),
193 });
194 }
195 let mut builder = DirBuilder::new();
196 builder.recursive(true);
197 builder
198 .create(&kek_file_path.parent().unwrap())
199 .map_err(|err| CouldNotAccessStorage {
200 file_name: err.to_string(),
201 })?;
202 let kek_lock_path = kek_file_path.with_extension("lock");
203 OpenOptions::new()
204 .write(true)
205 .create(true)
206 .open(&kek_lock_path)
207 .map_err(|err| CouldNotAccessStorage {
208 file_name: err.to_string(),
209 })?;
210 info!("Kekbit lock {:?} created", kek_lock_path);
211 let kek_file = OpenOptions::new()
212 .write(true)
213 .read(true)
214 .create(true)
215 .open(&kek_file_path)
216 .map_err(|err| CouldNotAccessStorage {
217 file_name: err.to_string(),
218 })?;
219 let total_len = (metadata.capacity() + metadata.len() as u32 + FOOTER_LEN) as u64;
220 kek_file.set_len(total_len).map_err(|err| CouldNotAccessStorage {
221 file_name: err.to_string(),
222 })?;
223 info!("Kekbit channel store {:?} created.", kek_file);
224 let mut mmap =
225 unsafe { MmapOptions::new().map_mut(&kek_file) }.map_err(|err| MemoryMappingFailed { reason: err.to_string() })?;
226 let buf = &mut mmap[..];
227 metadata.write_to(buf);
228 mmap.flush().map_err(|err| AccessError { reason: err.to_string() })?;
229 info!("Kekbit channel with store {:?} succesfully initialized", kek_file_path);
230 let res = ShmWriter::new(mmap, rec_handler);
231 if res.is_err() {
232 error!("Kekbit writer creation error . The file {:?} will be removed!", kek_file_path);
233 remove_file(&kek_file_path).expect("Could not remove kekbit file");
234 }
235 remove_file(&kek_lock_path).expect("Could not remove kekbit lock file");
236 info!("Kekbit lock file {:?} removed", kek_lock_path);
237 res
238}
239
240#[inline]
249pub fn storage_path(root_path: &Path, channel_id: u64) -> Box<Path> {
250 let high_val: u32 = (channel_id >> 32) as u32;
251 let low_val = (channel_id & 0x0000_0000_FFFF_FFFF) as u32;
252 let channel_folder = format!("{:04x}_{:04x}", high_val >> 16, high_val & 0x0000_FFFF);
253 let channel_file = format!("{:04x}_{:04x}", low_val >> 16, low_val & 0x0000_FFFF);
254 let dir_path = root_path.join(channel_folder).join(channel_file);
255 dir_path.with_extension("kekbit").into_boxed_path()
256}
257
258#[cfg(test)]
259mod test {
260 use super::tick::TickUnit::Nanos;
261 use super::utils::{align, REC_HEADER_LEN};
262 use super::*;
263 use crate::api::EncoderHandler;
264 use crate::api::ReadError;
265 use crate::api::ReadError::Timeout;
266 use crate::api::Reader;
267 use crate::api::Writer;
268 use crate::core::TickUnit::Millis;
269 use simple_logger::SimpleLogger;
270 use std::sync::Arc;
271 use std::sync::Once;
272 use tempdir::TempDir;
273 const FOREVER: u64 = 99_999_999_999;
274 static INIT_LOG: Once = Once::new();
275
276 #[test]
277 fn check_max_len() {
278 let metadata = Metadata::new(100, 1000, 300_000, 1000, FOREVER, Nanos);
279 let test_tmp_dir = TempDir::new("kektest").unwrap();
280 let writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
281 let reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
282 assert_eq!(writer.metadata(), reader.metadata());
283 }
284
285 #[test]
286 fn write_than_read() {
287 INIT_LOG.call_once(|| {
288 SimpleLogger::new().init().unwrap();
289 });
290 let metadata = Metadata::new(100, 1000, 10000, 1000, FOREVER, Nanos);
291 let test_tmp_dir = TempDir::new("kektest").unwrap();
292 let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
293 let txt = "There are 10 kinds of people: those who know binary and those who don't";
294 let msgs = txt.split_whitespace();
295 let mut msg_count = 0;
296 let mut bytes_written = 0;
297 for m in msgs {
298 let to_wr = m.as_bytes();
299 let len = to_wr.len() as u32;
300 let size = writer.write(&to_wr).unwrap();
301 assert_eq!(size, align(len + REC_HEADER_LEN));
302 bytes_written += size;
303 msg_count += 1;
304 }
305 assert_eq!(writer.write_offset(), bytes_written);
306 writer.flush().unwrap(); let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
308 assert_eq!(reader.position(), 0);
309 let mut msg_iter = reader.try_iter();
310 let mut res_txt = String::new();
311 for read_res in &mut msg_iter {
312 match read_res {
313 ReadResult::Record(msg) => {
314 let msg_str = std::str::from_utf8(&msg).unwrap();
315 if !res_txt.is_empty() {
316 res_txt.push(' ');
317 }
318 res_txt.push_str(msg_str);
319 msg_count -= 1;
320 }
321 ReadResult::Nothing => {
322 assert!(msg_count == 0);
323 break;
324 }
325 ReadResult::Failed(err) => match err {
326 ReadError::Closed => break,
327 _ => {
328 panic!("Unexpected read error {:?}", err);
329 }
330 },
331 }
332 }
333 assert_eq!(res_txt, txt);
334 assert_eq!(bytes_written, reader.position());
335 }
336
337 #[test]
338 fn try_iterator_hint_size() {
339 INIT_LOG.call_once(|| {
340 SimpleLogger::new().init().unwrap();
341 });
342 let metadata = Metadata::new(100, 1000, 10000, 1000, FOREVER, Nanos);
343 let test_tmp_dir = TempDir::new("kektest").unwrap();
344 let mut msg_count = 0;
345 {
346 let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
347 let txt = "There are 10 kinds of people: those who know binary and those who don't";
348 let msgs = txt.split_whitespace();
349 for m in msgs {
350 let to_wr = m.as_bytes();
351 let len = to_wr.len() as u32;
352 let size = writer.write(&to_wr).unwrap();
353 assert_eq!(size, align(len + REC_HEADER_LEN));
354 msg_count += 1;
355 }
356 }
357 let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
358 assert!(reader.exhausted().is_none());
359 let mut read_iter = reader.try_iter();
360 let sh1 = read_iter.size_hint();
361 assert_eq!(sh1.0, 0);
362 assert!(sh1.1.is_none());
363 read_iter.next().unwrap();
364 let sh2 = read_iter.size_hint();
365 assert_eq!(sh2.0, 0);
366 assert!(sh2.1.is_none());
367 let mut total = 0;
369 for _msg in &mut read_iter {
370 total += 1
371 }
372 assert_eq!(total, msg_count);
373 let sh3 = read_iter.size_hint();
374 assert_eq!(sh3.0, 0);
375 assert!(sh3.1.unwrap() == 0);
376 assert!(read_iter.next().is_none());
377 assert!(reader.exhausted().is_some());
378 assert_eq!(reader.exhausted().unwrap(), ReadError::Closed);
379 }
380
381 #[test]
382 fn check_path_to_storage() {
383 let dir = TempDir::new("kektest").unwrap();
384 let root_path = dir.path();
385 let channel_id_0: u64 = 0;
386 let path = storage_path(root_path, channel_id_0).into_path_buf();
387 assert_eq!(path, root_path.join("0000_0000").join("0000_0000.kekbit"));
388 assert_eq!(
389 path.with_extension("lock"),
390 root_path.join("0000_0000").join("0000_0000.lock")
391 );
392
393 let channel_id_1: u64 = 0xAAAA_BBBB_CCCC_DDDD;
394 let path = storage_path(root_path, channel_id_1).into_path_buf();
395 assert_eq!(path, root_path.join("aaaa_bbbb").join("cccc_dddd.kekbit"));
396 assert_eq!(
397 path.with_extension("lock"),
398 root_path.join("aaaa_bbbb").join("cccc_dddd.lock")
399 );
400 let channel_id_2: u64 = 0xBBBB_CCCC_0001;
401 let path = storage_path(root_path, channel_id_2).into_path_buf();
402 assert_eq!(path, root_path.join("0000_bbbb").join("cccc_0001.kekbit"));
403 assert_eq!(
404 path.with_extension("lock"),
405 root_path.join("0000_bbbb").join("cccc_0001.lock")
406 );
407 let channel_id_3: u64 = 0xAAAA_00BB_000C_0DDD;
408 let path = storage_path(root_path, channel_id_3).into_path_buf();
409 assert_eq!(path, root_path.join("aaaa_00bb").join("000c_0ddd.kekbit"));
410 assert_eq!(
411 path.with_extension("lock"),
412 root_path.join("aaaa_00bb").join("000c_0ddd.lock")
413 );
414 }
415
416 #[test]
417 fn try_to_create_reader() {
418 INIT_LOG.call_once(|| {
419 SimpleLogger::new().init().unwrap();
420 });
421 let test_tmp_dir = Arc::new(TempDir::new("kektest").unwrap());
422 let never_reader = try_shm_reader(&test_tmp_dir.path(), 999_999, 300, 30);
423 assert!(never_reader.is_err());
424 let channel_id = 999;
425 let root_dir = test_tmp_dir.clone();
426 let handle = std::thread::spawn(move || {
427 let good_reader = try_shm_reader(&test_tmp_dir.path(), channel_id, 1000, 20);
428 assert!(good_reader.is_err());
429 });
430 let metadata = Metadata::new(100, 1000, 10000, 1000, FOREVER, Nanos);
431 shm_writer(&root_dir.path(), &metadata, EncoderHandler::default()).unwrap();
432 handle.join().unwrap();
433 }
434 use assert_matches::assert_matches;
435 #[test]
436 fn read_with_timeout() {
437 INIT_LOG.call_once(|| {
438 SimpleLogger::new().init().unwrap();
439 });
440 let timeout = 50;
441 let metadata = Metadata::new(100, 1000, 10000, 1000, timeout, Millis);
442 let test_tmp_dir = TempDir::new("kektest").unwrap();
443 let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
444 let txt = "Just a bad day";
445 writer.write(&txt.as_bytes()).unwrap();
446 let reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
447 let mut timeout_reader = shm_timeout_reader(reader);
448 let mut msg_iter = timeout_reader.try_iter();
449 assert_matches!(msg_iter.next(), Some(ReadResult::Record(_)));
450 assert_matches!(msg_iter.next(), Some(ReadResult::Nothing));
451 let sleep_duration = std::time::Duration::from_millis(timeout + 10);
452 std::thread::sleep(sleep_duration);
453 assert_matches!(msg_iter.next(), Some(ReadResult::Failed(Timeout(_))));
454 assert_matches!(msg_iter.next(), None);
455 writer.flush().unwrap(); }
457}