#[cfg(feature = "v2_experimental")]
use crate::backend::native::{
graph_file::buffers::WriteBuffer,
types::{NativeBackendError, NativeResult},
};
#[cfg(feature = "v2_experimental")]
use memmap2::{MmapMut, MmapOptions};
#[cfg(feature = "v2_experimental")]
use std::io::{Seek, SeekFrom, Write};
pub struct MemoryMappingManager;
impl MemoryMappingManager {
#[cfg(feature = "v2_experimental")]
pub fn ensure_mmap_initialized(
file: &std::fs::File,
mmap: &mut Option<MmapMut>,
) -> NativeResult<()> {
if mmap.is_none() {
let file_size = file.metadata()?.len();
if file_size > 0 {
*mmap = unsafe { Some(MmapOptions::new().map_mut(&file.try_clone()?)?) };
} else {
*mmap = unsafe { Some(MmapOptions::new().map_mut(&file.try_clone()?)?) };
}
}
Ok(())
}
#[cfg(feature = "v2_experimental")]
pub fn ensure_mmap_covers(
file: &mut std::fs::File,
write_buffer: &mut WriteBuffer,
mmap: &mut Option<MmapMut>,
min_len: u64,
) -> NativeResult<()> {
thread_local! {
static MMAP_ENSURE_DEPTH: std::cell::RefCell<u32> = const { std::cell::RefCell::new(0) };
}
MMAP_ENSURE_DEPTH.with(|d| {
let mut depth = d.borrow_mut();
if *depth >= 2 {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!("ensure_mmap_covers recursion depth exceeded: {}", *depth),
});
}
*depth += 1;
Ok(())
})?;
let depth = MMAP_ENSURE_DEPTH.with(|d| *d.borrow());
Self::ensure_mmap_initialized(file, mmap)?;
let current_file_size = file.metadata()?.len();
if min_len > current_file_size {
file.set_len(min_len)?;
file.flush()?;
}
let current_mmap_size = mmap
.as_ref()
.ok_or_else(|| NativeBackendError::InvalidState {
context: "Memory mapping not initialized in ensure_mmap_covers".to_string(),
source: None,
})?
.len() as u64;
if min_len > current_mmap_size {
if depth == 1 {
Self::flush_write_buffer(file, write_buffer)?;
}
*mmap = unsafe { Some(MmapOptions::new().map_mut(&file.try_clone()?)?) };
}
MMAP_ENSURE_DEPTH.with(|d| {
let mut depth = d.borrow_mut();
*depth = depth.saturating_sub(1);
});
Ok(())
}
#[cfg(feature = "v2_experimental")]
fn flush_write_buffer(
file: &mut std::fs::File,
write_buffer: &mut WriteBuffer,
) -> NativeResult<()> {
let operations = write_buffer.flush();
let mut sorted_ops: Vec<_> = operations.into_iter().collect();
sorted_ops.sort_by_key(|(offset, _)| *offset);
for (offset, data) in sorted_ops {
file.seek(SeekFrom::Start(offset))?;
file.write_all(&data)?;
}
file.flush()?;
Ok(())
}
#[cfg(feature = "v2_experimental")]
pub fn mmap_read_bytes(
mmap: &Option<MmapMut>,
offset: u64,
buffer: &mut [u8],
) -> NativeResult<()> {
let mmap = mmap
.as_ref()
.ok_or_else(|| NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "mmap not initialized".to_string(),
})?;
if offset as usize + buffer.len() > mmap.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"Read beyond mmap region: offset={}, len={}, mmap_size={}",
offset,
buffer.len(),
mmap.len()
),
});
}
let start = offset as usize;
let end = start + buffer.len();
buffer.copy_from_slice(&mmap[start..end]);
Ok(())
}
#[cfg(feature = "v2_experimental")]
pub fn mmap_write_bytes(
file: &mut std::fs::File,
file_path: &std::path::Path,
write_buffer: &mut WriteBuffer,
mmap: &mut Option<MmapMut>,
offset: u64,
data: &[u8],
) -> NativeResult<()> {
super::file_management::FileManager::mmap_ensure_size(
file,
file_path,
offset + data.len() as u64,
mmap,
)?;
let mmap_ref = mmap
.as_mut()
.ok_or_else(|| NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "mmap not initialized".to_string(),
})?;
if offset as usize + data.len() > mmap_ref.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"Write beyond mmap region: offset={}, len={}, mmap_size={}",
offset,
data.len(),
mmap_ref.len()
),
});
}
let start = offset as usize;
let end = start + data.len();
mmap_ref[start..end].copy_from_slice(data);
Ok(())
}
#[cfg(feature = "v2_experimental")]
pub fn is_mmap_available(mmap: &Option<MmapMut>) -> bool {
mmap.is_some()
}
#[cfg(feature = "v2_experimental")]
pub fn get_mmap_size(mmap: &Option<MmapMut>) -> Option<usize> {
mmap.as_ref().map(|m| m.len())
}
#[cfg(feature = "v2_experimental")]
pub fn refresh_mmap(
file: &mut std::fs::File,
write_buffer: &mut WriteBuffer,
mmap: &mut Option<MmapMut>,
) -> NativeResult<()> {
if mmap.is_some() {
Self::flush_write_buffer(file, write_buffer)?;
*mmap = unsafe { Some(MmapOptions::new().map_mut(&file.try_clone()?)?) };
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempfile;
#[cfg(feature = "v2_experimental")]
#[test]
fn test_ensure_mmap_initialized() {
let mut temp_file = tempfile().unwrap();
let mut mmap: Option<MmapMut> = None;
temp_file.write_all(b"test data for mmap").unwrap();
temp_file.flush().unwrap();
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
assert!(mmap.is_some());
let mmap_ref = mmap.as_ref().unwrap();
assert!(mmap_ref.len() >= 17); }
#[cfg(feature = "v2_experimental")]
#[test]
fn test_ensure_mmap_initialized_empty_file() {
let temp_file = tempfile().unwrap();
let mut mmap: Option<MmapMut> = None;
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
assert!(mmap.is_some());
}
#[cfg(feature = "v2_experimental")]
#[test]
fn test_ensure_mmap_covers() {
let mut temp_file = tempfile().unwrap();
let mut write_buffer = WriteBuffer::new(10);
let mut mmap: Option<MmapMut> = None;
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
let initial_size = mmap.as_ref().unwrap().len();
MemoryMappingManager::ensure_mmap_covers(
&mut temp_file,
&mut write_buffer,
&mut mmap,
2048,
)
.unwrap();
assert!(mmap.as_ref().unwrap().len() >= 2048);
assert!(temp_file.metadata().unwrap().len() >= 2048);
}
#[cfg(feature = "v2_experimental")]
#[test]
fn test_mmap_read_bytes() {
let mut temp_file = tempfile().unwrap();
let mut mmap: Option<MmapMut> = None;
let test_data = b"memory mapping test data";
temp_file.write_all(test_data).unwrap();
temp_file.flush().unwrap();
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
let mut buffer = vec![0u8; test_data.len()];
MemoryMappingManager::mmap_read_bytes(&mmap, 0, &mut buffer).unwrap();
assert_eq!(buffer, test_data);
}
#[cfg(feature = "v2_experimental")]
#[test]
fn test_mmap_read_bytes_beyond_bounds() {
let mut temp_file = tempfile().unwrap();
let mut mmap: Option<MmapMut> = None;
temp_file.write_all(b"small").unwrap();
temp_file.flush().unwrap();
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
let mut buffer = vec![0u8; 100];
let result = MemoryMappingManager::mmap_read_bytes(&mmap, 0, &mut buffer);
assert!(result.is_err());
}
#[cfg(feature = "v2_experimental")]
#[test]
fn test_mmap_write_bytes() {
let mut temp_file = tempfile().unwrap();
let file_path = std::path::PathBuf::from("test");
let mut write_buffer = WriteBuffer::new(10);
let mut mmap: Option<MmapMut> = None;
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
let test_data = b"mmap write test";
MemoryMappingManager::mmap_write_bytes(
&mut temp_file,
&file_path,
&mut write_buffer,
&mut mmap,
0,
test_data,
)
.unwrap();
let mut buffer = vec![0u8; test_data.len()];
temp_file.seek(SeekFrom::Start(0)).unwrap();
temp_file.read_exact(&mut buffer).unwrap();
assert_eq!(buffer, test_data);
}
#[cfg(feature = "v2_experimental")]
#[test]
fn test_mmap_operations_helpers() {
let mut temp_file = tempfile().unwrap();
let mut mmap: Option<MmapMut> = None;
assert!(!MemoryMappingManager::is_mmap_available(&mmap));
assert_eq!(MemoryMappingManager::get_mmap_size(&mmap), None);
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
assert!(MemoryMappingManager::is_mmap_available(&mmap));
assert!(MemoryMappingManager::get_mmap_size(&mmap).is_some());
assert!(MemoryMappingManager::get_mmap_size(&mmap).unwrap() >= 0);
}
#[cfg(feature = "v2_experimental")]
#[test]
fn test_refresh_mmap() {
let mut temp_file = tempfile().unwrap();
let mut write_buffer = WriteBuffer::new(10);
let mut mmap: Option<MmapMut> = None;
MemoryMappingManager::ensure_mmap_initialized(&temp_file, &mut mmap).unwrap();
let initial_size = mmap.as_ref().unwrap().len();
temp_file.seek(SeekFrom::End(0)).unwrap();
temp_file.write_all(b"additional data").unwrap();
temp_file.flush().unwrap();
MemoryMappingManager::refresh_mmap(&mut temp_file, &mut write_buffer, &mut mmap).unwrap();
assert!(mmap.as_ref().unwrap().len() > initial_size);
}
}