aeron_rs/utils/
log_buffers.rs1use std::ffi::OsString;
2use std::path::Path;
3
4use crate::concurrent::atomic_buffer::AtomicBuffer;
5use crate::concurrent::logbuffer::log_buffer_descriptor::{
6 self, check_page_size, check_term_length, page_size, term_length, PARTITION_COUNT,
7};
8use crate::log;
9use crate::utils::errors::AeronError;
10use crate::utils::memory_mapped_file::MemoryMappedFile;
11use crate::utils::types::Index;
12
13#[allow(dead_code)]
14pub struct LogBuffers {
15 memory_mapped_file: Option<MemoryMappedFile>,
16 buffers: [AtomicBuffer; log_buffer_descriptor::PARTITION_COUNT as usize + 1],
17}
18
19impl LogBuffers {
20 pub unsafe fn new(address: *mut u8, log_length: isize, term_length: i32) -> Self {
25 assert_eq!(log_buffer_descriptor::PARTITION_COUNT, 3);
26
27 Self {
28 memory_mapped_file: None,
29 buffers: [
30 AtomicBuffer::new(address, term_length),
31 AtomicBuffer::new(address.offset(term_length as isize), term_length),
32 AtomicBuffer::new(address.offset(2 * term_length as isize), term_length),
33 AtomicBuffer::new(
34 address.offset(log_length - log_buffer_descriptor::LOG_META_DATA_LENGTH as isize),
35 log_buffer_descriptor::LOG_META_DATA_LENGTH,
36 ),
37 ],
38 }
39 }
40
41 pub(crate) fn from_existing<P: std::fmt::Display + AsRef<Path> + Into<OsString>>(
42 file_path: P,
43 pre_touch: bool,
44 ) -> Result<Self, AeronError> {
45 assert_eq!(log_buffer_descriptor::PARTITION_COUNT, 3);
46
47 log!(trace, "from_existing: file_path {}, pre_touch {}", &file_path, pre_touch);
48
49 let log_len = MemoryMappedFile::get_file_size(&file_path)?;
50
51 let memory_mapped_file = MemoryMappedFile::map_existing(file_path, false).expect("todo");
52
53 let meta_buffer = memory_mapped_file.atomic_buffer(
54 (log_len as Index) - log_buffer_descriptor::LOG_META_DATA_LENGTH,
55 log_buffer_descriptor::LOG_META_DATA_LENGTH,
56 );
57
58 let term_length = term_length(&meta_buffer) as Index;
59 let page_size = page_size(&meta_buffer);
60
61 check_term_length(term_length)?;
62 check_page_size(page_size)?;
63
64 let mut buffers: Vec<AtomicBuffer> = Vec::with_capacity((PARTITION_COUNT + 1) as usize);
65
66 for i in 0..PARTITION_COUNT {
67 let buffer = memory_mapped_file.atomic_buffer(i * term_length, term_length);
68
69 if pre_touch {
70 let mut offset = 0;
71 while offset < term_length {
72 let _ignored = buffer.get::<i32>(offset);
73 offset += page_size;
74 }
75 }
76
77 buffers.push(buffer)
78 }
79
80 buffers.push(meta_buffer);
81
82 log!(trace, "from_existing: file mapped successfully, term_length {}", term_length);
83
84 Ok(Self {
85 memory_mapped_file: Some(memory_mapped_file),
86 buffers: [
87 #[allow(clippy::get_first)]
88 *buffers.get(0).expect("Log buffers get(0) failed"),
89 *buffers.get(1).expect("Log buffers get(1) failed"),
90 *buffers.get(2).expect("Log buffers get(2) failed"),
91 *buffers.get(3).expect("Log buffers get(3) failed"),
92 ],
93 })
94 }
95
96 pub fn atomic_buffer(&self, index: Index) -> AtomicBuffer {
97 self.buffers[index as usize]
98 }
99}