aeron_rs/utils/
log_buffers.rs

1use std::ffi::OsString;
2use std::path::Path;
3
4use crate::concurrent::atomic_buffer::AtomicBuffer;
5use crate::concurrent::logbuffer::log_buffer_descriptor::{
6    self, check_page_size, check_term_length, page_size, term_length, PARTITION_COUNT,
7};
8use crate::log;
9use crate::utils::errors::AeronError;
10use crate::utils::memory_mapped_file::MemoryMappedFile;
11use crate::utils::types::Index;
12
13#[allow(dead_code)]
14pub struct LogBuffers {
15    memory_mapped_file: Option<MemoryMappedFile>,
16    buffers: [AtomicBuffer; log_buffer_descriptor::PARTITION_COUNT as usize + 1],
17}
18
19impl LogBuffers {
20    /// # Safety
21    ///
22    /// LogBuffer is created internally by Publication and Image and not designed to
23    /// be created by application level code.
24    pub unsafe fn new(address: *mut u8, log_length: isize, term_length: i32) -> Self {
25        assert_eq!(log_buffer_descriptor::PARTITION_COUNT, 3);
26
27        Self {
28            memory_mapped_file: None,
29            buffers: [
30                AtomicBuffer::new(address, term_length),
31                AtomicBuffer::new(address.offset(term_length as isize), term_length),
32                AtomicBuffer::new(address.offset(2 * term_length as isize), term_length),
33                AtomicBuffer::new(
34                    address.offset(log_length - log_buffer_descriptor::LOG_META_DATA_LENGTH as isize),
35                    log_buffer_descriptor::LOG_META_DATA_LENGTH,
36                ),
37            ],
38        }
39    }
40
41    pub(crate) fn from_existing<P: std::fmt::Display + AsRef<Path> + Into<OsString>>(
42        file_path: P,
43        pre_touch: bool,
44    ) -> Result<Self, AeronError> {
45        assert_eq!(log_buffer_descriptor::PARTITION_COUNT, 3);
46
47        log!(trace, "from_existing: file_path {}, pre_touch {}", &file_path, pre_touch);
48
49        let log_len = MemoryMappedFile::get_file_size(&file_path)?;
50
51        let memory_mapped_file = MemoryMappedFile::map_existing(file_path, false).expect("todo");
52
53        let meta_buffer = memory_mapped_file.atomic_buffer(
54            (log_len as Index) - log_buffer_descriptor::LOG_META_DATA_LENGTH,
55            log_buffer_descriptor::LOG_META_DATA_LENGTH,
56        );
57
58        let term_length = term_length(&meta_buffer) as Index;
59        let page_size = page_size(&meta_buffer);
60
61        check_term_length(term_length)?;
62        check_page_size(page_size)?;
63
64        let mut buffers: Vec<AtomicBuffer> = Vec::with_capacity((PARTITION_COUNT + 1) as usize);
65
66        for i in 0..PARTITION_COUNT {
67            let buffer = memory_mapped_file.atomic_buffer(i * term_length, term_length);
68
69            if pre_touch {
70                let mut offset = 0;
71                while offset < term_length {
72                    let _ignored = buffer.get::<i32>(offset);
73                    offset += page_size;
74                }
75            }
76
77            buffers.push(buffer)
78        }
79
80        buffers.push(meta_buffer);
81
82        log!(trace, "from_existing: file mapped successfully, term_length {}", term_length);
83
84        Ok(Self {
85            memory_mapped_file: Some(memory_mapped_file),
86            buffers: [
87                #[allow(clippy::get_first)]
88                *buffers.get(0).expect("Log buffers get(0) failed"),
89                *buffers.get(1).expect("Log buffers get(1) failed"),
90                *buffers.get(2).expect("Log buffers get(2) failed"),
91                *buffers.get(3).expect("Log buffers get(3) failed"),
92            ],
93        })
94    }
95
96    pub fn atomic_buffer(&self, index: Index) -> AtomicBuffer {
97        self.buffers[index as usize]
98    }
99}