use crate::protocol::{Message, Offset};
use crate::storage::log::{Log, LogEntry};
use crate::{FluxmqError, Result};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone)]
pub struct SegmentConfig {
pub max_segment_size: u64,
pub base_dir: PathBuf,
pub segment_prefix: String,
}
impl Default for SegmentConfig {
fn default() -> Self {
Self {
max_segment_size: 1024 * 1024 * 1024, base_dir: PathBuf::from("data"),
segment_prefix: "segment".to_string(),
}
}
}
#[derive(Debug)]
pub struct SegmentManager {
config: SegmentConfig,
segments: BTreeMap<Offset, Log>,
active_segment_offset: Offset,
next_offset: Offset,
}
impl SegmentManager {
pub fn new(config: SegmentConfig) -> Result<Self> {
std::fs::create_dir_all(&config.base_dir)?;
let mut manager = Self {
config,
segments: BTreeMap::new(),
active_segment_offset: 0,
next_offset: 0,
};
manager.load_existing_segments()?;
if manager.segments.is_empty() {
manager.create_new_segment(0)?;
}
Ok(manager)
}
pub fn from_directory<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
let config = SegmentConfig {
base_dir: base_dir.as_ref().to_path_buf(),
..Default::default()
};
Self::new(config)
}
pub fn append(&mut self, messages: &[Message]) -> Result<Offset> {
let base_offset = self.next_offset;
if self.should_roll_segment(messages)? {
self.roll_segment()?;
}
let active_segment = self
.segments
.get_mut(&self.active_segment_offset)
.ok_or_else(|| {
FluxmqError::Storage(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Active segment not found",
))
})?;
let _segment_base_offset = active_segment.append(messages)?;
self.next_offset = active_segment.next_offset();
Ok(base_offset)
}
pub fn read(&self, offset: Offset, max_bytes: usize) -> Result<Vec<LogEntry>> {
let segment_offset = self.find_segment_for_offset(offset);
let mut entries = Vec::new();
let mut remaining_bytes = max_bytes;
for (&seg_offset, segment) in self.segments.range(segment_offset..) {
if remaining_bytes == 0 {
break;
}
let segment_entries = segment.read(offset.max(seg_offset), remaining_bytes)?;
for entry in segment_entries {
if entry.offset >= offset {
let entry_size = entry.serialized_size();
if entry_size <= remaining_bytes {
remaining_bytes -= entry_size;
entries.push(entry);
} else {
break;
}
}
}
}
Ok(entries)
}
pub fn next_offset(&self) -> Offset {
self.next_offset
}
pub fn segment_offsets(&self) -> Vec<Offset> {
self.segments.keys().cloned().collect()
}
pub fn segment_count(&self) -> usize {
self.segments.len()
}
pub fn total_size(&self) -> Result<u64> {
let mut total = 0;
for segment in self.segments.values() {
total += segment.size()?;
}
Ok(total)
}
pub fn flush(&mut self) -> Result<()> {
for segment in self.segments.values_mut() {
segment.flush()?;
}
Ok(())
}
pub fn cleanup(&mut self, retain_segments: usize) -> Result<Vec<Offset>> {
if self.segments.len() <= retain_segments {
return Ok(vec![]);
}
let segments_to_remove = self.segments.len() - retain_segments;
let mut removed_offsets = Vec::new();
let offsets_to_remove: Vec<Offset> = self
.segments
.keys()
.take(segments_to_remove)
.cloned()
.collect();
for offset in offsets_to_remove {
if let Some(segment) = self.segments.remove(&offset) {
std::fs::remove_file(&segment.path)?;
removed_offsets.push(offset);
}
}
Ok(removed_offsets)
}
fn load_existing_segments(&mut self) -> Result<()> {
let entries = std::fs::read_dir(&self.config.base_dir)?;
let mut segment_files = Vec::new();
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("log") {
if let Some(filename) = path.file_stem().and_then(|s| s.to_str()) {
if let Ok(base_offset) = filename.parse::<Offset>() {
segment_files.push((base_offset, path));
}
}
}
}
segment_files.sort_by_key(|(offset, _)| *offset);
for (base_offset, path) in segment_files {
match Log::open(&path, base_offset) {
Ok(segment) => {
self.next_offset = self.next_offset.max(segment.next_offset());
self.segments.insert(base_offset, segment);
self.active_segment_offset = base_offset;
}
Err(e) => {
eprintln!("Warning: Failed to load segment {}: {}", path.display(), e);
}
}
}
Ok(())
}
fn create_new_segment(&mut self, base_offset: Offset) -> Result<()> {
let filename = format!("{:020}.log", base_offset);
let path = self.config.base_dir.join(filename);
let segment = Log::create(&path, base_offset)?;
self.segments.insert(base_offset, segment);
self.active_segment_offset = base_offset;
Ok(())
}
fn should_roll_segment(&self, messages: &[Message]) -> Result<bool> {
let active_segment = self.segments.get(&self.active_segment_offset);
if let Some(segment) = active_segment {
let current_size = segment.size()?;
let estimated_new_size: usize = messages
.iter()
.map(|m| {
let key_len = m.key.as_ref().map_or(0, |k| k.len());
20 + key_len + m.value.len() + 4 })
.sum();
Ok(current_size + estimated_new_size as u64 > self.config.max_segment_size)
} else {
Ok(true) }
}
fn roll_segment(&mut self) -> Result<()> {
let new_base_offset = self.next_offset;
self.create_new_segment(new_base_offset)?;
Ok(())
}
fn find_segment_for_offset(&self, offset: Offset) -> Offset {
self.segments
.keys()
.rev()
.find(|&&seg_offset| seg_offset <= offset)
.copied()
.unwrap_or(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tempfile::tempdir;
#[test]
fn test_segment_manager_creation() {
let temp_dir = tempdir().unwrap();
let config = SegmentConfig {
base_dir: temp_dir.path().to_path_buf(),
max_segment_size: 1024, ..Default::default()
};
let manager = SegmentManager::new(config).unwrap();
assert_eq!(manager.segment_count(), 1);
assert_eq!(manager.next_offset(), 0);
}
#[test]
fn test_segment_append_and_read() {
let temp_dir = tempdir().unwrap();
let config = SegmentConfig {
base_dir: temp_dir.path().to_path_buf(),
max_segment_size: 1024,
..Default::default()
};
let mut manager = SegmentManager::new(config).unwrap();
let messages = vec![
Message::new("message 1").with_key("key1"),
Message::new("message 2").with_key("key2"),
];
let base_offset = manager.append(&messages).unwrap();
assert_eq!(base_offset, 0);
assert_eq!(manager.next_offset(), 2);
let entries = manager.read(0, 1024).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].offset, 0);
assert_eq!(entries[0].value.as_ref(), b"message 1");
assert_eq!(entries[1].offset, 1);
assert_eq!(entries[1].value.as_ref(), b"message 2");
}
#[test]
fn test_segment_rolling() {
let temp_dir = tempdir().unwrap();
let config = SegmentConfig {
base_dir: temp_dir.path().to_path_buf(),
max_segment_size: 100, ..Default::default()
};
let mut manager = SegmentManager::new(config).unwrap();
assert_eq!(manager.segment_count(), 1);
for i in 0..10 {
let message = Message::new(format!(
"This is a longer message {} that should trigger segment rolling",
i
));
let messages = vec![message];
manager.append(&messages).unwrap();
}
assert!(manager.segment_count() > 1);
let entries = manager.read(0, 4096).unwrap();
assert_eq!(entries.len(), 10);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.offset, i as u64);
}
}
#[test]
fn test_segment_recovery() {
let temp_dir = tempdir().unwrap();
let config = SegmentConfig {
base_dir: temp_dir.path().to_path_buf(),
max_segment_size: 200,
..Default::default()
};
{
let mut manager = SegmentManager::new(config.clone()).unwrap();
for i in 0..6 {
let message = Message::new(format!("message {}", i));
manager.append(&[message]).unwrap();
}
}
let manager = SegmentManager::new(config).unwrap();
assert_eq!(manager.next_offset(), 6);
let entries = manager.read(0, 1024).unwrap();
assert_eq!(entries.len(), 6);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.offset, i as u64);
assert_eq!(entry.value, Bytes::from(format!("message {}", i)));
}
}
#[test]
fn test_segment_cleanup() {
let temp_dir = tempdir().unwrap();
let config = SegmentConfig {
base_dir: temp_dir.path().to_path_buf(),
max_segment_size: 50, ..Default::default()
};
let mut manager = SegmentManager::new(config).unwrap();
for i in 0..20 {
let message = Message::new(format!("message {}", i));
manager.append(&[message]).unwrap();
}
let initial_count = manager.segment_count();
assert!(initial_count > 3);
let removed = manager.cleanup(3).unwrap();
assert_eq!(manager.segment_count(), 3);
assert_eq!(removed.len(), initial_count - 3);
let entries = manager.read(15, 1024).unwrap();
assert!(!entries.is_empty());
}
#[test]
fn test_segment_read_from_offset() {
let temp_dir = tempdir().unwrap();
let config = SegmentConfig {
base_dir: temp_dir.path().to_path_buf(),
max_segment_size: 100,
..Default::default()
};
let mut manager = SegmentManager::new(config).unwrap();
for i in 0..10 {
let message = Message::new(format!("message {}", i));
manager.append(&[message]).unwrap();
}
let entries = manager.read(5, 1024).unwrap();
assert_eq!(entries[0].offset, 5);
assert_eq!(entries.len(), 5);
let entries = manager.read(8, 60).unwrap();
assert!(entries.len() >= 1);
assert_eq!(entries[0].offset, 8);
}
}