use crate::backend::native::{
graph_file::buffers::WriteBuffer,
types::NativeResult,
};
#[cfg(feature = "v2_experimental")]
use crate::backend::native::NativeBackendError;
#[cfg(feature = "v2_experimental")]
use memmap2::{MmapMut, MmapOptions};
use std::io::{Read, Seek, SeekFrom, Write};
pub struct IOOperationsManager;
impl IOOperationsManager {
pub fn read_bytes_std(
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
) -> NativeResult<()> {
file.seek(SeekFrom::Start(offset))?;
file.read_exact(buffer)?;
Ok(())
}
pub fn write_bytes_std(file: &mut std::fs::File, offset: u64, data: &[u8]) -> NativeResult<()> {
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
Ok(())
}
pub fn write_bytes_direct(
graph_file: &mut crate::backend::native::graph_file::GraphFile,
offset: u64,
data: &[u8],
) -> NativeResult<()> {
use std::io::{Seek, SeekFrom, Write};
let file = graph_file.file_mut();
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
file.flush()?;
Ok(())
}
pub fn read_with_ahead(
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
) -> NativeResult<()> {
file.seek(SeekFrom::Start(offset))?;
file.read_exact(buffer)?;
Ok(())
}
pub fn flush_write_buffer(
file: &mut std::fs::File,
write_buffer: &mut WriteBuffer,
) -> NativeResult<usize> {
let operations = write_buffer.flush();
let mut bytes_written = 0;
let mut sorted_ops: Vec<_> = operations.into_iter().collect();
sorted_ops.sort_by_key(|(offset, _)| *offset);
for (offset, data) in sorted_ops {
file.seek(SeekFrom::Start(offset))?;
file.write_all(&data)?;
bytes_written += data.len();
}
file.flush()?;
Ok(bytes_written)
}
pub fn invalidate_read_buffer(
_read_buffer: &mut crate::backend::native::graph_file::buffers::ReadBuffer,
) {
}
pub fn ensure_file_len_at_least(
file: &mut std::fs::File,
required_size: u64,
) -> NativeResult<()> {
let metadata = file.metadata()?;
let current_size = metadata.len();
if current_size < required_size {
file.set_len(required_size)?;
}
Ok(())
}
#[cfg(all(feature = "v2_experimental", feature = "v2_io_exclusive_mmap"))]
pub fn read_bytes_mmap_exclusive(
mmap: Option<&MmapMut>,
offset: u64,
buffer: &mut [u8],
) -> NativeResult<()> {
let mmap = mmap.ok_or(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "mmap not initialized in exclusive mmap mode".to_string(),
})?;
if offset as usize + buffer.len() > mmap.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"Read beyond mmap region: offset={}, len={}, mmap_size={}",
offset,
buffer.len(),
mmap.len()
),
});
}
let start = offset as usize;
let end = start + buffer.len();
buffer.copy_from_slice(&mmap[start..end]);
Ok(())
}
#[cfg(all(feature = "v2_experimental", feature = "v2_io_exclusive_mmap"))]
pub fn write_bytes_mmap_exclusive(
mmap: Option<&mut MmapMut>,
offset: u64,
data: &[u8],
) -> NativeResult<()> {
let mmap = mmap.ok_or(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: "mmap not initialized in exclusive mmap mode".to_string(),
})?;
let end_offset = offset + data.len() as u64;
if end_offset as usize > mmap.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"mmap write out of bounds: offset={}, len={}, mmap_len={}",
offset,
data.len(),
mmap.len()
),
});
}
let start = offset as usize;
let end = start + data.len();
mmap[start..end].copy_from_slice(data);
mmap.flush()?;
Ok(())
}
#[cfg(all(feature = "v2_experimental", feature = "v2_io_exclusive_std"))]
pub fn read_bytes_std_exclusive(
file: &mut std::fs::File,
offset: u64,
buffer: &mut [u8],
write_buffer: &mut WriteBuffer,
) -> NativeResult<()> {
if !write_buffer.operations.is_empty() {
let ops_count = write_buffer.operations.len();
if std::env::var("WRITEBUF_DEBUG").is_ok() {
println!(
"[WRITEBUF_DEBUG] EXCLUSIVE_STD: CLEARING {} pending ops without flush",
ops_count
);
}
write_buffer.operations.clear();
}
file.seek(SeekFrom::Start(offset))?;
file.read_exact(buffer)?;
Ok(())
}
#[cfg(all(feature = "v2_experimental", feature = "v2_io_exclusive_std"))]
pub fn write_bytes_std_exclusive(
file: &mut std::fs::File,
offset: u64,
data: &[u8],
write_buffer: &mut WriteBuffer,
) -> NativeResult<()> {
if !write_buffer.operations.is_empty() {
let ops_count = write_buffer.operations.len();
if std::env::var("WRITEBUF_DEBUG").is_ok() {
println!(
"[WRITEBUF_DEBUG] EXCLUSIVE_STD: CLEARING {} pending ops without flush",
ops_count
);
}
write_buffer.operations.clear();
}
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
Ok(())
}
pub fn write_buffered_bytes_std(
file: &mut std::fs::File,
data: &[u8],
offset: u64,
write_buffer: &mut WriteBuffer,
) -> NativeResult<()> {
let data_vec = data.to_vec();
let added = write_buffer.add(offset, data_vec);
if !added {
let operations = write_buffer.flush();
for (op_offset, op_data) in operations {
file.seek(SeekFrom::Start(op_offset))?;
file.write_all(&op_data)?;
}
file.seek(SeekFrom::Start(offset))?;
file.write_all(data)?;
}
Ok(())
}
pub fn read_bytes(
graph_file: &mut crate::backend::native::graph_file::GraphFile,
offset: u64,
buffer: &mut [u8],
) -> NativeResult<()> {
graph_file.read_bytes(offset, buffer)
}
pub fn write_bytes(
graph_file: &mut crate::backend::native::graph_file::GraphFile,
offset: u64,
data: &[u8],
) -> NativeResult<()> {
graph_file.write_bytes(offset, data)
}
pub fn flush(
graph_file: &mut crate::backend::native::graph_file::GraphFile,
) -> NativeResult<()> {
graph_file.sync()
}
pub fn prefetch(
graph_file: &mut crate::backend::native::graph_file::GraphFile,
offset: u64,
length: u64,
) -> NativeResult<()> {
let required_size = offset + length;
graph_file.ensure_file_len_at_least(required_size)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Seek, SeekFrom, Write};
use tempfile::tempfile;
#[test]
fn test_read_write_bytes_std() {
let mut temp_file = tempfile().unwrap();
let test_data = b"Hello, I/O Operations!";
IOOperationsManager::write_bytes_std(&mut temp_file, 0, test_data).unwrap();
let mut buffer = vec![0u8; test_data.len()];
IOOperationsManager::read_bytes_std(&mut temp_file, 0, &mut buffer).unwrap();
assert_eq!(buffer, test_data);
}
#[test]
fn test_write_bytes_direct() {
let mut temp_file = tempfile().unwrap();
let test_data = b"Direct write test";
IOOperationsManager::write_bytes_std(&mut temp_file, 0, test_data).unwrap();
let mut buffer = vec![0u8; test_data.len()];
temp_file.seek(SeekFrom::Start(0)).unwrap();
temp_file.read_exact(&mut buffer).unwrap();
assert_eq!(buffer, test_data);
}
#[test]
fn test_read_with_ahead() {
let mut temp_file = tempfile().unwrap();
let test_data = b"Read-ahead test data";
temp_file.seek(SeekFrom::Start(0)).unwrap();
temp_file.write_all(test_data).unwrap();
let mut buffer = vec![0u8; test_data.len()];
IOOperationsManager::read_with_ahead(&mut temp_file, 0, &mut buffer).unwrap();
assert_eq!(buffer, test_data);
}
#[test]
fn test_ensure_file_len_at_least() {
let mut temp_file = tempfile().unwrap();
IOOperationsManager::ensure_file_len_at_least(&mut temp_file, 1024).unwrap();
let metadata = temp_file.metadata().unwrap();
assert!(metadata.len() >= 1024);
}
#[test]
fn test_flush_write_buffer() {
let mut temp_file = tempfile().unwrap();
let mut write_buffer = WriteBuffer::new(10);
write_buffer.add(100, b"data1".to_vec());
write_buffer.add(110, b"data2".to_vec());
let bytes_written =
IOOperationsManager::flush_write_buffer(&mut temp_file, &mut write_buffer).unwrap();
assert_eq!(bytes_written, 10); assert!(write_buffer.operations.is_empty());
}
#[cfg(all(feature = "v2_experimental", feature = "v2_io_exclusive_std"))]
#[test]
fn test_std_exclusive_operations() {
let mut temp_file = tempfile().unwrap();
let mut write_buffer = WriteBuffer::new(10);
let test_data = b"Exclusive std mode test";
IOOperationsManager::write_bytes_std_exclusive(
&mut temp_file,
0,
test_data,
&mut write_buffer,
)
.unwrap();
let mut buffer = vec![0u8; test_data.len()];
IOOperationsManager::read_bytes_std_exclusive(
&mut temp_file,
0,
&mut buffer,
&mut write_buffer,
)
.unwrap();
assert_eq!(buffer, test_data);
}
}