#![allow(dead_code)]
use crate::protocol::{Message, Offset, PartitionId};
use crate::Result;
use memmap2::{MmapMut, MmapOptions};
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
pub struct MemoryMappedStorage {
config: MMapStorageConfig,
segments:
Arc<RwLock<std::collections::HashMap<(String, PartitionId), Arc<PartitionMMapSegment>>>>,
next_offsets: Arc<RwLock<std::collections::HashMap<(String, PartitionId), AtomicU64>>>,
total_writes: AtomicU64,
total_reads: AtomicU64,
bytes_written: AtomicU64,
bytes_read: AtomicU64,
}
#[derive(Clone)]
pub struct MMapStorageConfig {
pub data_directory: PathBuf,
pub segment_size_mb: usize,
pub max_segments_per_partition: usize,
pub enable_direct_io: bool,
pub sync_on_write: bool,
pub preallocate_segments: bool,
}
impl Default for MMapStorageConfig {
fn default() -> Self {
Self {
data_directory: PathBuf::from("./data"),
segment_size_mb: 256, max_segments_per_partition: 1000,
enable_direct_io: true,
sync_on_write: false, preallocate_segments: true,
}
}
}
struct PartitionMMapSegment {
current_segment: Arc<Mutex<MMapSegment>>,
segments: Vec<Arc<Mutex<MMapSegment>>>,
partition_key: (String, PartitionId),
segment_size: usize,
write_position: AtomicUsize,
total_messages: AtomicU64,
}
impl Clone for PartitionMMapSegment {
fn clone(&self) -> Self {
Self {
current_segment: self.current_segment.clone(),
segments: self.segments.clone(),
partition_key: self.partition_key.clone(),
segment_size: self.segment_size,
write_position: AtomicUsize::new(self.write_position.load(Ordering::Relaxed)),
total_messages: AtomicU64::new(self.total_messages.load(Ordering::Relaxed)),
}
}
}
struct MMapSegment {
mmap: MmapMut,
file: File,
segment_id: u64,
file_path: PathBuf,
write_offset: usize,
max_size: usize,
}
impl MemoryMappedStorage {
pub fn new() -> Result<Self> {
Self::with_config(MMapStorageConfig::default())
}
pub fn with_config(config: MMapStorageConfig) -> Result<Self> {
std::fs::create_dir_all(&config.data_directory)?;
Ok(Self {
config,
segments: Arc::new(RwLock::new(std::collections::HashMap::new())),
next_offsets: Arc::new(RwLock::new(std::collections::HashMap::new())),
total_writes: AtomicU64::new(0),
total_reads: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
})
}
pub fn append_messages_zero_copy(
&self,
topic: &str,
partition: PartitionId,
messages: Vec<Message>,
) -> Result<Offset> {
if messages.is_empty() {
return Ok(0);
}
let key = (topic.to_string(), partition);
let message_count = messages.len() as u64;
let partition_segment = {
let mut segments = self.segments.write();
if !segments.contains_key(&key) {
let segment = self.create_partition_segment(&key)?;
segments.insert(key.clone(), segment.clone());
segment
} else {
segments.get(&key).unwrap().clone()
}
};
let base_offset = {
let mut offsets = self.next_offsets.write();
let offset_counter = offsets
.entry(key.clone())
.or_insert_with(|| AtomicU64::new(0));
offset_counter.fetch_add(message_count, Ordering::SeqCst)
};
let serialized_data = self.serialize_messages_batch(&messages, base_offset)?;
self.write_to_mmap(&partition_segment, &serialized_data)?;
self.total_writes
.fetch_add(message_count, Ordering::Relaxed);
self.bytes_written
.fetch_add(serialized_data.len() as u64, Ordering::Relaxed);
Ok(base_offset)
}
pub fn fetch_messages_zero_copy(
&self,
topic: &str,
partition: PartitionId,
offset: Offset,
max_bytes: u32,
) -> Result<Vec<(Offset, Message)>> {
let key = (topic.to_string(), partition);
let segments = self.segments.read();
let Some(partition_segment) = segments.get(&key) else {
return Ok(Vec::new());
};
let messages = self.read_from_mmap(partition_segment, offset, max_bytes)?;
self.total_reads.fetch_add(1, Ordering::Relaxed);
Ok(messages)
}
pub fn append_messages_zero_copy_arc(
&self,
topic: &str,
partition: PartitionId,
messages_arc: Arc<Vec<Message>>,
) -> Result<Offset> {
if messages_arc.is_empty() {
return Ok(0);
}
let key = (topic.to_string(), partition);
let message_count = messages_arc.len() as u64;
let partition_segment = {
let mut segments = self.segments.write();
if !segments.contains_key(&key) {
let segment = self.create_partition_segment(&key)?;
segments.insert(key.clone(), segment.clone());
segment
} else {
segments.get(&key).unwrap().clone()
}
};
let base_offset = {
let mut offsets = self.next_offsets.write();
let offset_counter = offsets
.entry(key.clone())
.or_insert_with(|| AtomicU64::new(0));
offset_counter.fetch_add(message_count, Ordering::SeqCst)
};
let serialized_data = self.serialize_messages_batch_arc(&messages_arc, base_offset)?;
self.write_to_mmap(&partition_segment, &serialized_data)?;
self.total_writes
.fetch_add(message_count, Ordering::Relaxed);
self.bytes_written
.fetch_add(serialized_data.len() as u64, Ordering::Relaxed);
Ok(base_offset)
}
fn create_partition_segment(
&self,
key: &(String, PartitionId),
) -> Result<Arc<PartitionMMapSegment>> {
let segment_size = self.config.segment_size_mb * 1024 * 1024;
let segment = self.create_mmap_segment(key, 0, segment_size)?;
Ok(Arc::new(PartitionMMapSegment {
current_segment: Arc::new(Mutex::new(segment)),
segments: Vec::new(),
partition_key: key.clone(),
segment_size,
write_position: AtomicUsize::new(0),
total_messages: AtomicU64::new(0),
}))
}
fn create_mmap_segment(
&self,
key: &(String, PartitionId),
segment_id: u64,
size: usize,
) -> Result<MMapSegment> {
let file_name = format!("{}_partition_{}_segment_{}.log", key.0, key.1, segment_id);
let file_path = self.config.data_directory.join(file_name);
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&file_path)?;
if self.config.preallocate_segments {
file.seek(SeekFrom::Start((size - 1) as u64))?;
file.write_all(&[0])?;
file.flush()?;
}
let mmap = unsafe { MmapOptions::new().len(size).map_mut(&file)? };
Ok(MMapSegment {
mmap,
file,
segment_id,
file_path,
write_offset: 0,
max_size: size,
})
}
fn serialize_messages_batch(
&self,
messages: &[Message],
base_offset: Offset,
) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
for (i, message) in messages.iter().enumerate() {
let offset = base_offset + i as u64;
buffer.extend_from_slice(&offset.to_le_bytes());
let key_bytes = message.key.as_ref().map(|k| k.as_ref()).unwrap_or(&[]);
buffer.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(key_bytes);
let value_bytes = message.value.as_ref();
buffer.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(value_bytes);
buffer.extend_from_slice(&message.timestamp.to_le_bytes());
let crc =
crc32fast::hash(&buffer[buffer.len() - key_bytes.len() - value_bytes.len() - 20..]);
buffer.extend_from_slice(&crc.to_le_bytes());
}
Ok(buffer)
}
fn serialize_messages_batch_arc(
&self,
messages_arc: &Arc<Vec<Message>>,
base_offset: Offset,
) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
for (i, message) in messages_arc.iter().enumerate() {
let offset = base_offset + i as u64;
buffer.extend_from_slice(&offset.to_le_bytes());
let key_bytes = message.key.as_ref().map(|k| k.as_ref()).unwrap_or(&[]);
buffer.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(key_bytes);
let value_bytes = message.value.as_ref();
buffer.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(value_bytes);
buffer.extend_from_slice(&message.timestamp.to_le_bytes());
let crc =
crc32fast::hash(&buffer[buffer.len() - key_bytes.len() - value_bytes.len() - 20..]);
buffer.extend_from_slice(&crc.to_le_bytes());
}
Ok(buffer)
}
fn write_to_mmap(&self, partition_segment: &PartitionMMapSegment, data: &[u8]) -> Result<()> {
let mut segment = partition_segment.current_segment.lock();
if segment.write_offset + data.len() > segment.max_size {
return Err(std::io::Error::new(
std::io::ErrorKind::OutOfMemory,
"Segment full - rotation not implemented yet",
)
.into());
}
let write_start = segment.write_offset;
let write_end = write_start + data.len();
segment.mmap[write_start..write_end].copy_from_slice(data);
segment.write_offset = write_end;
if self.config.sync_on_write {
segment.mmap.flush()?;
}
Ok(())
}
fn read_from_mmap(
&self,
partition_segment: &PartitionMMapSegment,
offset: Offset,
max_bytes: u32,
) -> Result<Vec<(Offset, Message)>> {
let segment = partition_segment.current_segment.lock();
let mut messages = Vec::new();
let mut bytes_read = 0usize;
let mut read_offset = 0usize;
while read_offset < segment.write_offset && bytes_read < max_bytes as usize {
if let Some((msg_offset, message, msg_size)) =
self.deserialize_message_at(&segment.mmap, read_offset)?
{
if msg_offset >= offset {
messages.push((msg_offset, message));
bytes_read += msg_size;
if messages.len() >= 10000 {
break;
}
}
read_offset += msg_size;
} else {
break;
}
}
self.bytes_read
.fetch_add(bytes_read as u64, Ordering::Relaxed);
Ok(messages)
}
fn deserialize_message_at(
&self,
mmap: &[u8],
offset: usize,
) -> Result<Option<(Offset, Message, usize)>> {
if offset + 8 > mmap.len() {
return Ok(None);
}
let msg_offset = u64::from_le_bytes([
mmap[offset],
mmap[offset + 1],
mmap[offset + 2],
mmap[offset + 3],
mmap[offset + 4],
mmap[offset + 5],
mmap[offset + 6],
mmap[offset + 7],
]);
let mut pos = offset + 8;
if pos + 4 > mmap.len() {
return Ok(None);
}
let key_len =
u32::from_le_bytes([mmap[pos], mmap[pos + 1], mmap[pos + 2], mmap[pos + 3]]) as usize;
pos += 4;
if pos + key_len > mmap.len() {
return Ok(None);
}
let key = if key_len > 0 {
Some(bytes::Bytes::copy_from_slice(&mmap[pos..pos + key_len]))
} else {
None
};
pos += key_len;
if pos + 4 > mmap.len() {
return Ok(None);
}
let value_len =
u32::from_le_bytes([mmap[pos], mmap[pos + 1], mmap[pos + 2], mmap[pos + 3]]) as usize;
pos += 4;
if pos + value_len > mmap.len() {
return Ok(None);
}
let value = bytes::Bytes::copy_from_slice(&mmap[pos..pos + value_len]);
pos += value_len;
if pos + 8 > mmap.len() {
return Ok(None);
}
let timestamp = u64::from_le_bytes([
mmap[pos],
mmap[pos + 1],
mmap[pos + 2],
mmap[pos + 3],
mmap[pos + 4],
mmap[pos + 5],
mmap[pos + 6],
mmap[pos + 7],
]);
pos += 8;
if pos + 4 > mmap.len() {
return Ok(None);
}
let _stored_crc =
u32::from_le_bytes([mmap[pos], mmap[pos + 1], mmap[pos + 2], mmap[pos + 3]]);
pos += 4;
let message = Message {
key,
value,
timestamp,
headers: HashMap::new(),
};
let total_size = pos - offset;
Ok(Some((msg_offset, message, total_size)))
}
pub fn get_stats(&self) -> MMapStorageStats {
MMapStorageStats {
total_segments: {
let segments = self.segments.read();
segments.len()
},
total_writes: self.total_writes.load(Ordering::Relaxed),
total_reads: self.total_reads.load(Ordering::Relaxed),
bytes_written: self.bytes_written.load(Ordering::Relaxed),
bytes_read: self.bytes_read.load(Ordering::Relaxed),
segment_size_mb: self.config.segment_size_mb,
}
}
}
#[derive(Debug, Clone)]
pub struct MMapStorageStats {
pub total_segments: usize,
pub total_writes: u64,
pub total_reads: u64,
pub bytes_written: u64,
pub bytes_read: u64,
pub segment_size_mb: usize,
}
impl MMapStorageStats {
pub fn report(&self) -> String {
format!(
"MMap Storage - Segments: {}, Writes: {}, Reads: {}, Data: {:.1}MB written / {:.1}MB read",
self.total_segments,
self.total_writes,
self.total_reads,
self.bytes_written as f64 / 1_000_000.0,
self.bytes_read as f64 / 1_000_000.0
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tempfile::tempdir;
#[test]
fn test_mmap_storage_basic_operations() {
let temp_dir = tempdir().unwrap();
let config = MMapStorageConfig {
data_directory: temp_dir.path().to_path_buf(),
..Default::default()
};
let storage = MemoryMappedStorage::with_config(config).unwrap();
let messages = vec![Message {
key: Some(Bytes::from("test_key")),
value: Bytes::from("test_value"),
timestamp: 1234567890,
headers: std::collections::HashMap::new(),
}];
let offset = storage
.append_messages_zero_copy("test_topic", 0, messages)
.unwrap();
assert_eq!(offset, 0);
let fetched = storage
.fetch_messages_zero_copy("test_topic", 0, 0, 1024)
.unwrap();
assert_eq!(fetched.len(), 1);
assert_eq!(fetched[0].0, 0);
assert_eq!(fetched[0].1.value, "test_value");
}
}