#![allow(dead_code)]
use crate::protocol::{Message, Offset, PartitionId};
use crate::Result;
use bytes::Bytes;
use memmap2::{MmapMut, MmapOptions};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
pub struct MappedStorageSegment {
mmap: MmapMut,
file_path: String,
segment_id: u64,
capacity: usize,
write_position: AtomicUsize,
committed_position: AtomicUsize,
writes: AtomicU64,
reads: AtomicU64,
bytes_written: AtomicU64,
bytes_read: AtomicU64,
}
impl MappedStorageSegment {
pub fn create(file_path: String, segment_id: u64, capacity: usize) -> Result<Self> {
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&file_path)
.map_err(|e| crate::FluxmqError::Storage(e))?;
file.set_len(capacity as u64)
.map_err(|e| crate::FluxmqError::Storage(e))?;
let mmap = unsafe {
MmapOptions::new()
.len(capacity)
.map_mut(&file)
.map_err(|e| {
crate::FluxmqError::Storage(std::io::Error::new(std::io::ErrorKind::Other, e))
})?
};
Ok(Self {
mmap,
file_path,
segment_id,
capacity,
write_position: AtomicUsize::new(0),
committed_position: AtomicUsize::new(0),
writes: AtomicU64::new(0),
reads: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
})
}
pub fn write_zero_copy(&mut self, data: &[u8]) -> Result<usize> {
let write_pos = self.write_position.load(Ordering::Relaxed);
if write_pos + data.len() > self.capacity {
return Err(crate::FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::OutOfMemory,
"Segment full",
)));
}
unsafe {
let dst = self.mmap.as_mut_ptr().add(write_pos);
std::ptr::copy_nonoverlapping(data.as_ptr(), dst, data.len());
}
let new_pos = self.write_position.fetch_add(data.len(), Ordering::Relaxed) + data.len();
self.writes.fetch_add(1, Ordering::Relaxed);
self.bytes_written
.fetch_add(data.len() as u64, Ordering::Relaxed);
Ok(new_pos - data.len())
}
pub fn read_zero_copy(&self, offset: usize, length: usize) -> Result<&[u8]> {
let committed_pos = self.committed_position.load(Ordering::Relaxed);
if offset + length > committed_pos {
return Err(crate::FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Read beyond committed data",
)));
}
let data = unsafe { std::slice::from_raw_parts(self.mmap.as_ptr().add(offset), length) };
self.reads.fetch_add(1, Ordering::Relaxed);
self.bytes_read.fetch_add(length as u64, Ordering::Relaxed);
Ok(data)
}
pub fn commit_writes(&self) -> Result<usize> {
let write_pos = self.write_position.load(Ordering::Relaxed);
self.committed_position.store(write_pos, Ordering::Relaxed);
self.mmap.flush().map_err(|e| {
crate::FluxmqError::Storage(std::io::Error::new(std::io::ErrorKind::Other, e))
})?;
Ok(write_pos)
}
pub fn get_stats(&self) -> MappedSegmentStats {
MappedSegmentStats {
segment_id: self.segment_id,
file_path: self.file_path.clone(),
capacity: self.capacity,
write_position: self.write_position.load(Ordering::Relaxed),
committed_position: self.committed_position.load(Ordering::Relaxed),
writes: self.writes.load(Ordering::Relaxed),
reads: self.reads.load(Ordering::Relaxed),
bytes_written: self.bytes_written.load(Ordering::Relaxed),
bytes_read: self.bytes_read.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct MappedSegmentStats {
pub segment_id: u64,
pub file_path: String,
pub capacity: usize,
pub write_position: usize,
pub committed_position: usize,
pub writes: u64,
pub reads: u64,
pub bytes_written: u64,
pub bytes_read: u64,
}
impl MappedSegmentStats {
pub fn utilization(&self) -> f64 {
if self.capacity == 0 {
0.0
} else {
self.write_position as f64 / self.capacity as f64
}
}
}
pub struct ZeroCopyMessageStorage {
segments: Arc<RwLock<HashMap<(String, PartitionId), Vec<Arc<MappedStorageSegment>>>>>,
segment_size: usize,
max_segments_per_partition: usize,
message_index: Arc<RwLock<HashMap<(String, PartitionId, Offset), (u64, usize, usize)>>>,
total_messages: AtomicU64,
total_segments: AtomicU64,
zero_copy_reads: AtomicU64,
zero_copy_writes: AtomicU64,
next_segment_id: AtomicU64,
}
impl ZeroCopyMessageStorage {
pub fn new() -> Self {
Self {
segments: Arc::new(RwLock::new(HashMap::new())),
segment_size: 64 * 1024 * 1024, max_segments_per_partition: 1000,
message_index: Arc::new(RwLock::new(HashMap::new())),
total_messages: AtomicU64::new(0),
total_segments: AtomicU64::new(0),
zero_copy_reads: AtomicU64::new(0),
zero_copy_writes: AtomicU64::new(0),
next_segment_id: AtomicU64::new(1),
}
}
pub fn append_message_zero_copy(
&self,
topic: &str,
partition: PartitionId,
offset: Offset,
message: &Message,
) -> Result<()> {
let serialized = self.serialize_message(message)?;
let segment = self.get_or_create_segment(topic, partition)?;
{
let mut index = self.message_index.write();
index.insert(
(topic.to_string(), partition, offset),
(segment.segment_id, 0, serialized.len()), );
}
self.total_messages.fetch_add(1, Ordering::Relaxed);
self.zero_copy_writes.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn read_message_zero_copy(
&self,
topic: &str,
partition: PartitionId,
offset: Offset,
) -> Result<Message> {
let (segment_id, segment_offset, length) = {
let index = self.message_index.read();
index
.get(&(topic.to_string(), partition, offset))
.cloned()
.ok_or_else(|| {
crate::FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Message not found",
))
})?
};
let segment = self.find_segment(topic, partition, segment_id)?;
let data = segment.read_zero_copy(segment_offset, length)?;
let message = self.deserialize_message(data)?;
self.zero_copy_reads.fetch_add(1, Ordering::Relaxed);
Ok(message)
}
pub fn read_message_range_zero_copy(
&self,
topic: &str,
partition: PartitionId,
start_offset: Offset,
max_messages: usize,
) -> Result<Vec<(Offset, Message)>> {
let mut messages = Vec::with_capacity(max_messages);
let mut current_offset = start_offset;
for _ in 0..max_messages {
match self.read_message_zero_copy(topic, partition, current_offset) {
Ok(message) => {
messages.push((current_offset, message));
current_offset += 1;
}
Err(_) => break, }
}
Ok(messages)
}
fn get_or_create_segment(
&self,
topic: &str,
partition: PartitionId,
) -> Result<Arc<MappedStorageSegment>> {
let key = (topic.to_string(), partition);
{
let segments = self.segments.read();
if let Some(partition_segments) = segments.get(&key) {
if let Some(segment) = partition_segments.last() {
if segment.write_position.load(Ordering::Relaxed) + 1024 < segment.capacity {
return Ok(segment.clone());
}
}
}
}
let segment_id = self.next_segment_id.fetch_add(1, Ordering::Relaxed);
let file_path = format!(
"./data/{}/partition-{}/segment-{:010}.log",
topic, partition, segment_id
);
if let Some(parent) = std::path::Path::new(&file_path).parent() {
std::fs::create_dir_all(parent).map_err(|e| crate::FluxmqError::Storage(e))?;
}
let segment = Arc::new(MappedStorageSegment::create(
file_path,
segment_id,
self.segment_size,
)?);
{
let mut segments = self.segments.write();
segments
.entry(key)
.or_insert_with(Vec::new)
.push(segment.clone());
}
self.total_segments.fetch_add(1, Ordering::Relaxed);
Ok(segment)
}
fn find_segment(
&self,
topic: &str,
partition: PartitionId,
segment_id: u64,
) -> Result<Arc<MappedStorageSegment>> {
let key = (topic.to_string(), partition);
let segments = self.segments.read();
if let Some(partition_segments) = segments.get(&key) {
for segment in partition_segments {
if segment.segment_id == segment_id {
return Ok(segment.clone());
}
}
}
Err(crate::FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Segment not found",
)))
}
fn serialize_message(&self, message: &Message) -> Result<Vec<u8>> {
let mut buf = Vec::new();
let key_bytes = message.key.as_ref().map(|k| k.as_ref()).unwrap_or(&[]);
let value_bytes = message.value.as_ref();
buf.extend_from_slice(&(key_bytes.len() as u32).to_be_bytes());
buf.extend_from_slice(key_bytes);
buf.extend_from_slice(&(value_bytes.len() as u32).to_be_bytes());
buf.extend_from_slice(value_bytes);
buf.extend_from_slice(&message.timestamp.to_be_bytes());
Ok(buf)
}
fn deserialize_message(&self, data: &[u8]) -> Result<Message> {
if data.len() < 16 {
return Err(crate::FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid message data",
)));
}
let mut offset = 0;
let key_len = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
let key = if key_len > 0 {
Some(Bytes::copy_from_slice(&data[offset..offset + key_len]))
} else {
None
};
offset += key_len;
let value_len = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
let value = Bytes::copy_from_slice(&data[offset..offset + value_len]);
offset += value_len;
let timestamp = u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]);
Ok(Message {
key,
value,
timestamp,
headers: std::collections::HashMap::new(),
})
}
pub fn commit_all_writes(&self) -> Result<()> {
let segments = self.segments.read();
for partition_segments in segments.values() {
for segment in partition_segments {
segment.commit_writes()?;
}
}
Ok(())
}
pub fn get_zero_copy_stats(&self) -> ZeroCopyStorageStats {
let segments_guard = self.segments.read();
let mut segment_stats = Vec::new();
for partition_segments in segments_guard.values() {
for segment in partition_segments {
segment_stats.push(segment.get_stats());
}
}
ZeroCopyStorageStats {
total_messages: self.total_messages.load(Ordering::Relaxed),
total_segments: self.total_segments.load(Ordering::Relaxed),
zero_copy_reads: self.zero_copy_reads.load(Ordering::Relaxed),
zero_copy_writes: self.zero_copy_writes.load(Ordering::Relaxed),
segment_stats,
index_size: self.message_index.read().len(),
}
}
}
#[derive(Debug, Clone)]
pub struct ZeroCopyStorageStats {
pub total_messages: u64,
pub total_segments: u64,
pub zero_copy_reads: u64,
pub zero_copy_writes: u64,
pub segment_stats: Vec<MappedSegmentStats>,
pub index_size: usize,
}
impl ZeroCopyStorageStats {
pub fn report(&self) -> String {
let total_capacity: usize = self.segment_stats.iter().map(|s| s.capacity).sum();
let total_used: usize = self.segment_stats.iter().map(|s| s.write_position).sum();
let avg_utilization = if total_capacity > 0 {
total_used as f64 / total_capacity as f64 * 100.0
} else {
0.0
};
format!(
"Zero-Copy Storage Stats:\n\
Messages: {} total\n\
Segments: {} (avg {:.1}% utilization)\n\
Zero-Copy Ops: {} reads, {} writes\n\
Index Size: {} entries\n\
Storage: {:.1} MB used / {:.1} MB allocated",
self.total_messages,
self.total_segments,
avg_utilization,
self.zero_copy_reads,
self.zero_copy_writes,
self.index_size,
total_used as f64 / 1_000_000.0,
total_capacity as f64 / 1_000_000.0
)
}
}