use super::manager::MemoryResourceManager;
#[cfg(any(feature = "v2_io_exclusive_mmap", feature = "v2_io_exclusive_std"))]
use super::types::MemoryIOMode;
use crate::backend::native::{
graph_file::buffers::ReadBuffer,
graph_file::file_ops::FileOperations,
types::{NativeBackendError, NativeResult},
};
use std::io::{Read, Seek, SeekFrom};
impl<'a> MemoryResourceManager<'a> {
pub fn memory_aware_read<F>(
&mut self,
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
file_size_fn: F,
) -> NativeResult<()>
where
F: FnOnce() -> NativeResult<u64>,
{
match self.current_io_mode() {
#[cfg(all(feature = "native-v2", feature = "v2_io_exclusive_mmap"))]
MemoryIOMode::MemoryMapped => {
self.read_from_mmap(offset, buffer)?;
}
#[cfg(all(feature = "native-v2", feature = "v2_io_exclusive_std"))]
MemoryIOMode::ExclusiveStd => {
self.clear_write_buffer_safely();
self.direct_read_with_sync(file, offset, buffer)?;
}
_ => {
self.buffered_read(file, offset, buffer, file_size_fn)?;
}
}
Ok(())
}
#[allow(unused_variables)] pub fn memory_aware_write<F>(
&mut self,
file: &mut std::fs::File,
offset: u64,
data: &[u8],
file_size_fn: F,
) -> NativeResult<()>
where
F: FnOnce() -> NativeResult<u64>,
{
self.validate_header_region_protection(offset)?;
match self.current_io_mode() {
#[cfg(all(feature = "native-v2", feature = "v2_io_exclusive_mmap"))]
MemoryIOMode::MemoryMapped => {
self.write_to_mmap(offset, data, file_size_fn)?;
}
#[cfg(all(feature = "native-v2", feature = "v2_io_exclusive_std"))]
MemoryIOMode::ExclusiveStd => {
self.clear_write_buffer_safely();
self.direct_write_with_sync(file, offset, data)?;
}
_ => {
self.buffered_write(file, offset, data)?;
}
}
Ok(())
}
#[cfg(all(feature = "native-v2", feature = "v2_io_exclusive_mmap"))]
fn read_from_mmap(&self, offset: u64, buffer: &mut [u8]) -> NativeResult<()> {
let mmap = self
.mmap
.as_ref()
.ok_or(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "Memory mapping not initialized".to_string(),
})?;
if offset as usize + buffer.len() > mmap.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"Read beyond mmap region: offset={}, len={}, mmap_size={}",
offset,
buffer.len(),
mmap.len()
),
});
}
let start = offset as usize;
let end = start + buffer.len();
buffer.copy_from_slice(&mmap[start..end]);
Ok(())
}
#[cfg(all(feature = "native-v2", feature = "v2_io_exclusive_mmap"))]
fn write_to_mmap<F>(&mut self, offset: u64, data: &[u8], file_size_fn: F) -> NativeResult<()>
where
F: FnOnce() -> NativeResult<u64>,
{
let end_offset = offset + data.len() as u64;
self.ensure_mmap_covers(end_offset)?;
let mmap = self
.mmap
.as_mut()
.ok_or(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "Memory mapping not initialized".to_string(),
})?;
if offset as usize + data.len() > mmap.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"Write beyond mmap region: offset={}, len={}, mmap_len={}",
offset,
data.len(),
mmap.len()
),
});
}
let start = offset as usize;
let end = start + data.len();
mmap[start..end].copy_from_slice(data);
mmap.flush()?;
if end_offset > file_size_fn()? {
}
Ok(())
}
#[cfg(feature = "native-v2")]
fn ensure_mmap_covers(&self, required_offset: u64) -> NativeResult<()> {
if let Some(mmap) = &self.mmap {
if required_offset as usize > mmap.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"Memory mapping too small: required_offset={}, mmap_size={}",
required_offset,
mmap.len()
),
});
}
}
Ok(())
}
fn direct_read_with_sync(
&mut self,
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
) -> NativeResult<()> {
FileOperations::read_bytes_direct(file, offset, buffer)?;
file.sync_all()?;
Ok(())
}
fn direct_write_with_sync(
&mut self,
file: &mut std::fs::File,
offset: u64,
data: &[u8],
) -> NativeResult<()> {
use std::io::{Seek, SeekFrom, Write};
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
file.sync_all()?;
Ok(())
}
fn buffered_read<F>(
&mut self,
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
file_size_fn: F,
) -> NativeResult<()>
where
F: FnOnce() -> NativeResult<u64>,
{
if !self.write_buffer.operations.is_empty() {
self.flush_write_buffer(file)?;
self.read_buffer.offset = 0;
self.read_buffer.size = 0;
}
if !self.read_buffer.read(offset, buffer) {
self.read_with_ahead(file, offset, buffer, file_size_fn)?;
}
Ok(())
}
fn buffered_write(
&mut self,
file: &mut std::fs::File,
offset: u64,
data: &[u8],
) -> NativeResult<()> {
let is_node_slot =
(offset >= 0x400) && ((offset - 0x400) % 4096 == 0) && (data.len() == 4096);
if !is_node_slot && data.len() <= 256 && self.write_buffer.add(offset, data.to_vec()) {
return Ok(());
}
self.flush_write_buffer(file)?;
self.direct_write_with_sync(file, offset, data)?;
Ok(())
}
fn read_with_ahead<F>(
&mut self,
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
file_size_fn: F,
) -> NativeResult<()>
where
F: FnOnce() -> NativeResult<u64>,
{
use crate::backend::native::types::NativeBackendError;
let optimal_capacity = ReadBuffer::adaptive_capacity(buffer.len());
if optimal_capacity != self.read_buffer.capacity {
*self.read_buffer = ReadBuffer::with_capacity(optimal_capacity);
}
let read_ahead_size = std::cmp::max(buffer.len(), optimal_capacity);
let file_size = file_size_fn()?;
let remaining_bytes = file_size.saturating_sub(offset);
if remaining_bytes == 0 {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "Attempted read beyond end of file".to_string(),
});
}
let adjusted_read_size = std::cmp::min(read_ahead_size, remaining_bytes as usize);
file.seek(SeekFrom::Start(offset))?;
file.read_exact(&mut self.read_buffer.data[..adjusted_read_size])?;
self.read_buffer.offset = offset;
self.read_buffer.size = adjusted_read_size;
if buffer.len() <= adjusted_read_size {
buffer.copy_from_slice(&self.read_buffer.data[..buffer.len()]);
} else {
buffer[..adjusted_read_size]
.copy_from_slice(&self.read_buffer.data[..adjusted_read_size]);
}
Ok(())
}
}