pub mod high_performance;
pub mod immediate_optimizations;
pub mod index;
pub mod log;
pub mod segment;
pub mod tests;
use segment::{SegmentConfig, SegmentManager};
use tracing::{debug, error, info, trace, warn};
use crate::protocol::{Message, Offset, PartitionId, TopicName};
use crate::{FluxmqError, Result};
use crossbeam::channel;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
enum PersistenceCommand {
Append {
topic: TopicName,
partition: PartitionId,
messages: Vec<Message>,
#[allow(dead_code)] offset: Offset,
},
#[allow(dead_code)] BatchAppend {
batch: Vec<(TopicName, PartitionId, Vec<Message>, Offset)>,
},
Flush {
topic: TopicName,
partition: PartitionId,
},
}
#[derive(Debug, Clone)]
pub struct MessageNotification {
pub topic: TopicName,
pub partition: PartitionId,
pub offset_range: (Offset, Offset),
pub message_count: usize,
}
#[derive(Debug)]
pub struct InMemoryStorage {
topics: Arc<RwLock<HashMap<TopicName, Topic>>>,
}
#[derive(Debug)]
pub struct Topic {
partitions: HashMap<PartitionId, Partition>,
}
#[derive(Debug)]
pub struct Partition {
messages: Vec<(Offset, Message)>,
next_offset: Offset,
}
impl InMemoryStorage {
pub fn new() -> Self {
Self {
topics: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn append_messages(
&self,
topic: &str,
partition: PartitionId,
messages: Vec<Message>,
) -> Result<Offset> {
let message_count = messages.len();
if message_count == 0 {
return Ok(0);
}
let mut topics = self.topics.write();
let topic_data = topics
.entry(topic.to_string())
.or_insert_with(|| Topic::new());
let partition_data = topic_data
.partitions
.entry(partition)
.or_insert_with(|| Partition::new());
let base_offset = partition_data.next_offset;
partition_data.messages.reserve(message_count);
let mut current_offset = base_offset;
for message in messages {
partition_data.messages.push((current_offset, message));
current_offset += 1;
}
partition_data.next_offset = current_offset;
Ok(base_offset)
}
pub fn fetch_messages(
&self,
topic: &str,
partition: PartitionId,
offset: Offset,
max_bytes: u32,
) -> Result<Vec<(Offset, Message)>> {
let topics = self.topics.read();
let topic_data = match topics.get(topic) {
Some(t) => t,
None => return Ok(vec![]),
};
let partition_data = match topic_data.partitions.get(&partition) {
Some(p) => p,
None => return Ok(vec![]),
};
let start_idx = partition_data
.messages
.binary_search_by_key(&offset, |(msg_offset, _)| *msg_offset)
.unwrap_or_else(|idx| idx);
if start_idx >= partition_data.messages.len() {
return Ok(vec![]);
}
let mut result = Vec::with_capacity(std::cmp::min(
1024,
partition_data.messages.len() - start_idx,
));
let mut total_bytes = 0usize;
let max_bytes = max_bytes as usize;
for (msg_offset, message) in &partition_data.messages[start_idx..] {
let message_size =
message.value.len() + message.key.as_ref().map(|k| k.len()).unwrap_or(0);
if total_bytes + message_size > max_bytes && !result.is_empty() {
break;
}
result.push((*msg_offset, message.clone()));
total_bytes += message_size;
if result.len() >= 10000 {
break;
}
}
Ok(result)
}
pub fn get_topics(&self) -> Vec<TopicName> {
let topics = self.topics.read();
topics.keys().cloned().collect()
}
pub fn get_partitions(&self, topic: &str) -> Vec<PartitionId> {
let topics = self.topics.read();
match topics.get(topic) {
Some(topic_data) => topic_data.partitions.keys().cloned().collect(),
None => vec![],
}
}
pub fn get_latest_offset(&self, topic: &str, partition: PartitionId) -> Option<Offset> {
let topics = self.topics.read();
topics
.get(topic)?
.partitions
.get(&partition)
.map(|p| p.next_offset)
}
pub fn get_offset_by_timestamp(
&self,
topic: &str,
partition: PartitionId,
timestamp: u64,
) -> Option<Offset> {
let topics = self.topics.read();
let partition_data = topics.get(topic)?.partitions.get(&partition)?;
for (offset, message) in &partition_data.messages {
if message.timestamp >= timestamp {
return Some(*offset);
}
}
Some(partition_data.next_offset)
}
pub fn get_earliest_offset(&self, topic: &str, partition: PartitionId) -> Option<Offset> {
let topics = self.topics.read();
let partition_data = topics.get(topic)?.partitions.get(&partition)?;
partition_data
.messages
.first()
.map(|(offset, _)| *offset)
.or(Some(0))
}
}
impl Topic {
fn new() -> Self {
Self {
partitions: HashMap::new(),
}
}
}
impl Partition {
fn new() -> Self {
Self {
messages: Vec::new(),
next_offset: 0,
}
}
}
#[derive(Debug)]
pub struct HybridStorage {
memory: Arc<InMemoryStorage>,
#[allow(dead_code)] base_dir: String,
#[allow(dead_code)] segments:
Arc<RwLock<HashMap<(TopicName, PartitionId), Arc<parking_lot::Mutex<SegmentManager>>>>>,
persistence_tx: channel::Sender<PersistenceCommand>,
message_notifier: broadcast::Sender<MessageNotification>,
}
impl HybridStorage {
pub fn new<P: AsRef<str>>(base_dir: P) -> Result<Self> {
let base_dir = base_dir.as_ref().to_string();
std::fs::create_dir_all(&base_dir)?;
let (tx, rx) = channel::unbounded();
let (message_notifier, _) = broadcast::channel(1024);
let memory = Arc::new(InMemoryStorage::new());
let segments = Arc::new(RwLock::new(HashMap::new()));
let segments_clone = Arc::clone(&segments);
let base_dir_clone = base_dir.clone();
tokio::spawn(async move {
Self::persistence_worker(rx, segments_clone, base_dir_clone).await;
});
Ok(Self {
memory,
base_dir,
segments,
persistence_tx: tx,
message_notifier,
})
}
async fn persistence_worker(
rx: channel::Receiver<PersistenceCommand>,
segments: Arc<
RwLock<HashMap<(TopicName, PartitionId), Arc<parking_lot::Mutex<SegmentManager>>>>,
>,
base_dir: String,
) {
const BATCH_TIMEOUT: u64 = 5; const MAX_BATCH_SIZE: usize = 1000;
let mut pending_batch = Vec::new();
let mut batch_timer =
tokio::time::interval(std::time::Duration::from_millis(BATCH_TIMEOUT));
loop {
tokio::select! {
result = tokio::task::spawn_blocking({
let rx_clone = rx.clone();
move || rx_clone.try_recv()
}) => {
match result {
Ok(Ok(cmd)) => {
match cmd {
PersistenceCommand::Append {
topic,
partition,
messages,
offset,
} => {
pending_batch.push((topic, partition, messages, offset));
if pending_batch.len() >= MAX_BATCH_SIZE {
Self::process_batch(&segments, &base_dir, &mut pending_batch);
}
}
PersistenceCommand::BatchAppend { batch } => {
for (topic, partition, messages, _offset) in batch {
if let Err(e) = Self::persist_messages(&segments, &base_dir, &topic, partition, &messages) {
error!("Failed to persist batch messages: {}", e);
}
}
}
PersistenceCommand::Flush { topic, partition } => {
if !pending_batch.is_empty() {
Self::process_batch(&segments, &base_dir, &mut pending_batch);
}
if let Err(e) = Self::flush_partition_internal(&segments, &base_dir, &topic, partition) {
error!("Failed to flush partition: {}", e);
}
}
}
}
Ok(Err(crossbeam::channel::TryRecvError::Empty)) => {
}
Ok(Err(crossbeam::channel::TryRecvError::Disconnected)) => {
break; }
Err(_) => {
error!("Persistence worker spawn_blocking failed");
break;
}
}
}
_ = batch_timer.tick() => {
if !pending_batch.is_empty() {
Self::process_batch(&segments, &base_dir, &mut pending_batch);
}
}
}
}
}
fn process_batch(
segments: &Arc<
RwLock<HashMap<(TopicName, PartitionId), Arc<parking_lot::Mutex<SegmentManager>>>>,
>,
base_dir: &str,
batch: &mut Vec<(TopicName, PartitionId, Vec<Message>, Offset)>,
) {
for (topic, partition, messages, _offset) in batch.drain(..) {
if let Err(e) = Self::persist_messages(segments, base_dir, &topic, partition, &messages)
{
error!(
"Failed to persist batch messages for {}:{}: {}",
topic, partition, e
);
}
}
}
fn persist_messages(
segments: &Arc<
RwLock<HashMap<(TopicName, PartitionId), Arc<parking_lot::Mutex<SegmentManager>>>>,
>,
base_dir: &str,
topic: &str,
partition: PartitionId,
messages: &[Message],
) -> Result<()> {
let manager_arc =
Self::get_or_create_segment_manager_sync(segments, base_dir, topic, partition)?;
let mut manager = manager_arc.lock();
manager.append(messages)?;
Ok(())
}
fn flush_partition_internal(
segments: &Arc<
RwLock<HashMap<(TopicName, PartitionId), Arc<parking_lot::Mutex<SegmentManager>>>>,
>,
base_dir: &str,
topic: &str,
partition: PartitionId,
) -> Result<()> {
let manager_arc =
Self::get_or_create_segment_manager_sync(segments, base_dir, topic, partition)?;
let mut manager = manager_arc.lock();
manager.flush()?;
Ok(())
}
fn get_or_create_segment_manager_sync(
segments: &Arc<
RwLock<HashMap<(TopicName, PartitionId), Arc<parking_lot::Mutex<SegmentManager>>>>,
>,
base_dir: &str,
topic: &str,
partition: PartitionId,
) -> Result<Arc<parking_lot::Mutex<SegmentManager>>> {
let key = (topic.to_string(), partition);
{
let segments_map = segments.read();
if let Some(manager) = segments_map.get(&key) {
return Ok(Arc::clone(manager));
}
}
let partition_dir = std::path::PathBuf::from(base_dir)
.join(topic)
.join(format!("partition-{}", partition));
let config = SegmentConfig {
base_dir: partition_dir,
max_segment_size: 1024 * 1024 * 1024, segment_prefix: "segment".to_string(),
};
let manager = Arc::new(parking_lot::Mutex::new(SegmentManager::new(config)?));
{
let mut segments_map = segments.write();
segments_map.insert(key, Arc::clone(&manager));
}
Ok(manager)
}
pub fn append_messages(
&self,
topic: &str,
partition: PartitionId,
messages: Vec<Message>,
) -> Result<Offset> {
let start_offset = self.get_latest_offset(topic, partition).unwrap_or(0);
let message_count = messages.len();
let end_offset = self
.memory
.append_messages(topic, partition, messages.clone())?;
let notification = MessageNotification {
topic: topic.to_string(),
partition,
offset_range: (start_offset, end_offset),
message_count,
};
let _ = self.message_notifier.send(notification);
if let Err(_) = self.persistence_tx.send(PersistenceCommand::Append {
topic: topic.to_string(),
partition,
messages,
offset: end_offset,
}) {
warn!("Persistence channel full, skipping disk write");
}
Ok(end_offset)
}
pub fn fetch_messages(
&self,
topic: &str,
partition: PartitionId,
offset: Offset,
max_bytes: u32,
) -> Result<Vec<(Offset, Message)>> {
self.memory
.fetch_messages(topic, partition, offset, max_bytes)
}
pub fn get_topics(&self) -> Vec<TopicName> {
self.memory.get_topics()
}
pub fn get_partitions(&self, topic: &str) -> Vec<PartitionId> {
self.memory.get_partitions(topic)
}
pub fn get_latest_offset(&self, topic: &str, partition: PartitionId) -> Option<Offset> {
self.memory.get_latest_offset(topic, partition)
}
pub fn get_offset_by_timestamp(
&self,
topic: &str,
partition: PartitionId,
timestamp: u64,
) -> Option<Offset> {
self.memory
.get_offset_by_timestamp(topic, partition, timestamp)
}
pub fn get_earliest_offset(&self, topic: &str, partition: PartitionId) -> Option<Offset> {
self.memory.get_earliest_offset(topic, partition)
}
pub fn flush_partition(&self, topic: &str, partition: PartitionId) -> Result<()> {
let _ = self.persistence_tx.send(PersistenceCommand::Flush {
topic: topic.to_string(),
partition,
});
Ok(())
}
pub async fn load_from_disk(&self) -> Result<()> {
info!("Starting recovery from disk storage at: {}", self.base_dir);
let base_path = std::path::Path::new(&self.base_dir);
if !base_path.exists() {
info!("No existing data directory found, starting with fresh state");
return Ok(());
}
let mut total_topics = 0;
let mut total_partitions = 0;
let mut total_messages = 0;
for topic_entry in std::fs::read_dir(base_path)? {
let topic_entry = topic_entry?;
let topic_path = topic_entry.path();
if !topic_path.is_dir() {
continue;
}
let topic_name = topic_path
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| {
FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid topic directory name",
))
})?;
total_topics += 1;
info!("Loading topic: {}", topic_name);
for partition_entry in std::fs::read_dir(&topic_path)? {
let partition_entry = partition_entry?;
let partition_path = partition_entry.path();
if !partition_path.is_dir() {
continue;
}
let partition_dir_name = partition_path
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| {
FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid partition directory name",
))
})?;
if !partition_dir_name.starts_with("partition-") {
continue;
}
let partition_id: PartitionId = partition_dir_name
.strip_prefix("partition-")
.and_then(|s| s.parse().ok())
.ok_or_else(|| {
FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Invalid partition ID in directory: {}", partition_dir_name),
))
})?;
total_partitions += 1;
info!("Loading partition: {}-{}", topic_name, partition_id);
let messages_loaded = self
.load_partition_from_disk(topic_name, partition_id, &partition_path)
.await?;
total_messages += messages_loaded;
debug!(
"Loaded {} messages for {}-{}",
messages_loaded, topic_name, partition_id
);
}
}
info!(
"Recovery completed: {} topics, {} partitions, {} messages loaded",
total_topics, total_partitions, total_messages
);
Ok(())
}
async fn load_partition_from_disk(
&self,
topic: &str,
partition: PartitionId,
partition_path: &std::path::Path,
) -> Result<u64> {
let config = SegmentConfig {
base_dir: partition_path.to_path_buf(),
max_segment_size: 1024 * 1024 * 1024, segment_prefix: "segment".to_string(),
};
let segment_manager = SegmentManager::new(config)?;
let mut total_messages = 0u64;
let mut current_offset = 0;
const BATCH_SIZE: usize = 1024 * 1024;
loop {
let log_entries = segment_manager.read(current_offset, BATCH_SIZE)?;
if log_entries.is_empty() {
break;
}
let mut messages = Vec::new();
let mut max_offset = current_offset;
for entry in log_entries {
messages.push(entry.to_message());
max_offset = entry.offset + 1;
total_messages += 1;
}
if !messages.is_empty() {
let _base_offset = self.memory.append_messages(topic, partition, messages)?;
trace!(
"Loaded batch of {} messages for {}-{}",
max_offset - current_offset,
topic,
partition
);
}
current_offset = max_offset;
if total_messages % 1000 == 0 {
tokio::task::yield_now().await;
}
}
if total_messages > 0 {
let key = (topic.to_string(), partition);
let manager_arc = Arc::new(parking_lot::Mutex::new(segment_manager));
let mut segments_map = self.segments.write();
segments_map.insert(key, manager_arc);
}
Ok(total_messages)
}
pub fn subscribe_to_messages(&self) -> broadcast::Receiver<MessageNotification> {
self.message_notifier.subscribe()
}
pub fn get_subscriber_count(&self) -> usize {
self.message_notifier.receiver_count()
}
}