1pub mod reader;
3use reader::ShmReader;
4pub mod writer;
5use crate::header::Header;
6use log::{error, info};
7use memmap::MmapOptions;
8
9use crate::api::ChannelError;
10use crate::api::ChannelError::*;
11
12use crate::utils::FOOTER_LEN;
13use kekbit_codecs::codecs::DataFormat;
14use std::fs::OpenOptions;
15use std::fs::{remove_file, DirBuilder};
16use std::path::Path;
17use std::result::Result;
18use writer::ShmWriter;
19pub fn shm_reader(root_path: &Path, channel_id: u64) -> Result<ShmReader, ChannelError> {
51 let kek_file_path = storage_path(root_path, channel_id).into_path_buf();
52 let kek_lock_path = kek_file_path.with_extension("lock");
53 if !kek_file_path.exists() {
54 return Err(StorageNotFound {
55 file_name: kek_file_path.to_str().unwrap().to_string(),
56 });
57 }
58 if kek_lock_path.exists() {
59 return Err(StorageNotReady {
60 file_name: kek_file_path.to_str().unwrap().to_string(),
61 });
62 }
63
64 let kek_file = OpenOptions::new()
65 .write(true)
66 .read(true)
67 .open(&kek_file_path)
68 .or_else(|err| {
69 Err(CouldNotAccessStorage {
70 file_name: err.to_string(),
71 })
72 })?;
73
74 info!("Kekbit file {:?} opened for read.", kek_file);
75 let mmap =
76 unsafe { MmapOptions::new().map_mut(&kek_file) }.or_else(|err| Err(MemoryMappingFailed { reason: err.to_string() }))?;
77 ShmReader::new(mmap)
78}
79
80pub fn try_shm_reader(root_path: &Path, channel_id: u64, duration_millis: u64, tries: u64) -> Result<ShmReader, ChannelError> {
121 assert!(tries > 0);
122 let interval = duration_millis / tries;
123 let sleep_duration = std::time::Duration::from_millis(interval);
124 let mut reader_res = shm_reader(root_path, channel_id);
125 let mut tries_left = tries - 1;
126 while reader_res.is_err() && tries_left > 0 {
127 std::thread::sleep(sleep_duration);
128 reader_res = shm_reader(root_path, channel_id);
129 tries_left -= 1;
130 }
131 reader_res
132}
133
134pub fn shm_writer<D: DataFormat>(root_path: &Path, header: &Header, df: D) -> Result<ShmWriter<D>, ChannelError> {
167 let kek_file_path = storage_path(root_path, header.channel_id()).into_path_buf();
168 if kek_file_path.exists() {
169 return Err(StorageAlreadyExists {
170 file_name: kek_file_path.to_str().unwrap().to_string(),
171 });
172 }
173 let mut builder = DirBuilder::new();
174 builder.recursive(true);
175 builder.create(&kek_file_path.parent().unwrap()).or_else(|err| {
176 Err(CouldNotAccessStorage {
177 file_name: err.to_string(),
178 })
179 })?;
180 let kek_lock_path = kek_file_path.with_extension("lock");
181 OpenOptions::new()
182 .write(true)
183 .create(true)
184 .open(&kek_lock_path)
185 .or_else(|err| {
186 Err(CouldNotAccessStorage {
187 file_name: err.to_string(),
188 })
189 })?;
190 info!("Kekbit lock {:?} created", kek_lock_path);
191 let kek_file = OpenOptions::new()
192 .write(true)
193 .read(true)
194 .create(true)
195 .open(&kek_file_path)
196 .or_else(|err| {
197 Err(CouldNotAccessStorage {
198 file_name: err.to_string(),
199 })
200 })?;
201 let total_len = (header.capacity() + header.len() as u32 + FOOTER_LEN) as u64;
202 kek_file.set_len(total_len).or_else(|err| {
203 Err(CouldNotAccessStorage {
204 file_name: err.to_string(),
205 })
206 })?;
207 info!("Kekbit channel store {:?} created.", kek_file);
208 let mut mmap =
209 unsafe { MmapOptions::new().map_mut(&kek_file) }.or_else(|err| Err(MemoryMappingFailed { reason: err.to_string() }))?;
210 let buf = &mut mmap[..];
211 header.write_to(buf);
212 mmap.flush().or_else(|err| Err(AccessError { reason: err.to_string() }))?;
213 info!("Kekbit channel with store {:?} succesfully initialized", kek_file_path);
214 let res = ShmWriter::new(mmap, df);
215 if res.is_err() {
216 error!("Kekbit writer creation error . The file {:?} will be removed!", kek_file_path);
217 remove_file(&kek_file_path).expect("Could not remove kekbit file");
218 }
219 remove_file(&kek_lock_path).expect("Could not remove kekbit lock file");
220 info!("Kekbit lock file {:?} removed", kek_lock_path);
221 res
222}
223
224#[inline]
225pub fn storage_path(root_path: &Path, channel_id: u64) -> Box<Path> {
234 let high_val: u32 = (channel_id >> 32) as u32;
235 let low_val = (channel_id & 0x0000_0000_FFFF_FFFF) as u32;
236 let channel_folder = format!("{:04x}_{:04x}", high_val >> 16, high_val & 0x0000_FFFF);
237 let channel_file = format!("{:04x}_{:04x}", low_val >> 16, low_val & 0x0000_FFFF);
238 let dir_path = root_path.join(channel_folder).join(channel_file);
239 dir_path.with_extension("kekbit").into_boxed_path()
240}
241
242#[cfg(test)]
243mod test {
244 use super::*;
245 use crate::api::{InvalidPosition, Reader, Writer};
246 use crate::tick::TickUnit::Nanos;
247 use crate::utils::{align, REC_HEADER_LEN};
248 use assert_matches::*;
249 use kekbit_codecs::codecs::raw::RawBinDataFormat;
250 use std::sync::Arc;
251 use std::sync::Once;
252 use tempdir::TempDir;
253
254 const FOREVER: u64 = 99_999_999_999;
255
256 static INIT_LOG: Once = Once::new();
257
258 #[test]
259 fn check_max_len() {
260 let header = Header::new(100, 1000, 300_000, 1000, FOREVER, Nanos);
261 let test_tmp_dir = TempDir::new("kektest").unwrap();
262 let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
263 let reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
264 assert_eq!(writer.header(), reader.header());
265 }
266
267 #[test]
268 fn read_than_write() {
269 INIT_LOG.call_once(|| {
270 simple_logger::init().unwrap();
271 });
272 let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
273 let test_tmp_dir = TempDir::new("kektest").unwrap();
274 let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
275 let txt = "There are 10 kinds of people: those who know binary and those who don't";
276 let msgs = txt.split_whitespace();
277 let mut msg_count = 0;
278 let mut bytes_written = 8; for m in msgs {
280 let to_wr = m.as_bytes();
281 let len = to_wr.len() as u32;
282 let size = writer.write(&to_wr).unwrap();
283 assert_eq!(size, align(len + REC_HEADER_LEN));
284 bytes_written += size;
285 msg_count += 1;
286 }
287 assert_eq!(writer.write_offset(), bytes_written);
288 writer.flush().unwrap(); let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
290 assert_eq!(reader.position(), 0);
291 let mut res_msg = StrMsgsAppender::default();
292 let bytes_read = reader
293 .read(&mut |_, msg| res_msg.on_message(msg), msg_count + 10 as u16)
294 .unwrap();
295 assert_eq!(res_msg.txt, txt);
296 assert_eq!(bytes_written, bytes_read);
297 assert_eq!(reader.position(), bytes_read);
298 }
299
300 #[derive(Default, Debug)]
301 struct StrMsgsAppender {
302 txt: String,
303 }
304
305 impl StrMsgsAppender {
306 pub fn on_message(&mut self, buf: &[u8]) {
307 let msg_str = std::str::from_utf8(&buf).unwrap();
308 if !self.txt.is_empty() {
309 self.txt.push_str(" ");
310 }
311 self.txt.push_str(msg_str);
312 }
313 }
314
315 #[test]
316 fn check_position() {
317 INIT_LOG.call_once(|| {
318 simple_logger::init().unwrap();
319 });
320 let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
321 let test_tmp_dir = TempDir::new("kektest").unwrap();
322 let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
323 let txt = "There are 10 kinds of people: those who know binary and those who don't";
324 let msgs = txt.split_whitespace();
325 let mut msg_count = 0;
326 for m in msgs {
327 let to_wr = m.as_bytes();
328 let len = to_wr.len() as u32;
329 let size = writer.write(&to_wr).unwrap();
330 assert_eq!(size, align(len + REC_HEADER_LEN));
331 msg_count += 1;
332 }
333 let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
334 let mut read_bytes = 0;
335 let mut last_msg_size = 0;
336 for _i in 0..msg_count {
337 last_msg_size = reader.read(&mut |pos, _msg| assert_eq!(pos, read_bytes), 1).unwrap();
338 read_bytes += last_msg_size;
339 }
340 assert_eq!(reader.position(), writer.write_offset() - last_msg_size);
341 }
342
343 #[test]
344 fn check_move_to() {
345 INIT_LOG.call_once(|| {
346 simple_logger::init().unwrap();
347 });
348 let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
349 let test_tmp_dir = TempDir::new("kektest").unwrap();
350 let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
351 let txt = "There are 10 kinds of people: those who know binary and those who don't";
352 let msgs = txt.split_whitespace();
353 let mut msg_count = 0;
354 for m in msgs {
355 let to_wr = m.as_bytes();
356 let len = to_wr.len() as u32;
357 let size = writer.write(&to_wr).unwrap();
358 assert_eq!(size, align(len + REC_HEADER_LEN));
359 msg_count += 1;
360 }
361 let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
362 reader.move_to(8).unwrap(); let mut msg_read = 0;
364 let mut last_pos = 0;
365 for _i in 0..msg_count {
366 reader
368 .read(
369 &mut |pos, _| {
370 msg_read += 1;
371 last_pos = pos
372 },
373 1,
374 )
375 .unwrap();
376 reader.move_to(last_pos).unwrap();
377 reader.read(&mut |_, _| msg_read += 1, 1).unwrap();
378 }
379 assert_eq!(msg_read, 2 * msg_count);
380 reader.move_to(8).unwrap(); for _i in 0..msg_count {
382 reader
383 .read(
384 &mut |_, _| {
385 msg_read += 1;
386 },
387 1,
388 )
389 .unwrap();
390 }
391 assert_eq!(msg_read, 3 * msg_count);
392 }
393
394 #[test]
395 fn check_invalid_move_to() {
396 INIT_LOG.call_once(|| {
397 simple_logger::init().unwrap();
398 });
399 let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
400 let test_tmp_dir = TempDir::new("kektest").unwrap();
401 let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
402 let txt = "There are 10 kinds of people: those who know binary and those who don't";
403 let msgs = txt.split_whitespace();
404 for m in msgs {
405 let to_wr = m.as_bytes();
406 writer.write(&to_wr).unwrap();
407 }
408 let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
409 reader.move_to(8).unwrap(); assert_matches!(reader.move_to(4), Err(InvalidPosition::Unaligned { position: 4 }));
411 assert_matches!(reader.move_to(45680), Err(InvalidPosition::Unavailable { position: 45680 })); assert_matches!(reader.move_to(999), Err(InvalidPosition::Unaligned { position: 999 })); assert!(reader.move_to(24).is_ok());
414 assert!(reader.move_to(56).is_ok());
415 assert_matches!(reader.move_to(64), Err(InvalidPosition::Unaligned { position: 64 })); assert!(reader.move_to(72).is_ok());
417 assert_matches!(reader.move_to(832), Err(InvalidPosition::Unavailable { position: 832 }));
418 }
420
421 #[test]
422 fn check_path_to_storage() {
423 let dir = TempDir::new("kektest").unwrap();
424 let root_path = dir.path();
425 let channel_id_0: u64 = 0;
426 let path = storage_path(root_path, channel_id_0).into_path_buf();
427 assert_eq!(path, root_path.join("0000_0000").join("0000_0000.kekbit"));
428 assert_eq!(
429 path.with_extension("lock"),
430 root_path.join("0000_0000").join("0000_0000.lock")
431 );
432
433 let channel_id_1: u64 = 0xAAAA_BBBB_CCCC_DDDD;
434 let path = storage_path(root_path, channel_id_1).into_path_buf();
435 assert_eq!(path, root_path.join("aaaa_bbbb").join("cccc_dddd.kekbit"));
436 assert_eq!(
437 path.with_extension("lock"),
438 root_path.join("aaaa_bbbb").join("cccc_dddd.lock")
439 );
440 let channel_id_2: u64 = 0xBBBB_CCCC_0001;
441 let path = storage_path(root_path, channel_id_2).into_path_buf();
442 assert_eq!(path, root_path.join("0000_bbbb").join("cccc_0001.kekbit"));
443 assert_eq!(
444 path.with_extension("lock"),
445 root_path.join("0000_bbbb").join("cccc_0001.lock")
446 );
447 let channel_id_3: u64 = 0xAAAA_00BB_000C_0DDD;
448 let path = storage_path(root_path, channel_id_3).into_path_buf();
449 assert_eq!(path, root_path.join("aaaa_00bb").join("000c_0ddd.kekbit"));
450 assert_eq!(
451 path.with_extension("lock"),
452 root_path.join("aaaa_00bb").join("000c_0ddd.lock")
453 );
454 }
455
456 #[test]
457 fn try_to_create_reader() {
458 INIT_LOG.call_once(|| {
459 simple_logger::init().unwrap();
460 });
461 let test_tmp_dir = Arc::new(TempDir::new("kektest").unwrap());
462 let never_reader = try_shm_reader(&test_tmp_dir.path(), 999_999, 300, 30);
463 assert!(never_reader.is_err());
464 let channel_id = 999;
465 let root_dir = test_tmp_dir.clone();
466 let handle = std::thread::spawn(move || {
467 let good_reader = try_shm_reader(&test_tmp_dir.path(), channel_id, 1000, 20);
468 assert!(good_reader.is_err());
469 });
470 let header = Header::new(100, 1000, 10000, 1000, FOREVER, Nanos);
471 shm_writer(&root_dir.path(), &header, RawBinDataFormat).unwrap();
472 handle.join().unwrap();
473 }
474}