rivven_core/storage/
log_manager.rs1use super::segment::Segment;
2use crate::{Message, Result};
3use std::fs;
4use std::path::PathBuf;
5
6#[derive(Debug)]
7pub struct LogManager {
8 dir: PathBuf,
9 segments: Vec<Segment>,
10 active_segment_index: usize,
11 max_segment_size: u64,
12}
13
14impl LogManager {
15 pub async fn new(
16 base_dir: PathBuf,
17 topic: &str,
18 partition: u32,
19 max_segment_size: u64,
20 ) -> Result<Self> {
21 let dir = base_dir
22 .join(topic)
23 .join(format!("partition-{}", partition));
24 fs::create_dir_all(&dir)?;
25
26 let mut segments = Vec::new();
27 let mut paths: Vec<_> = fs::read_dir(&dir)?
30 .filter_map(|entry| entry.ok())
31 .map(|entry| entry.path())
32 .filter(|path| path.extension().is_some_and(|ext| ext == "log"))
33 .collect();
34
35 paths.sort();
36
37 if paths.is_empty() {
38 segments.push(Segment::new(&dir, 0)?);
40 } else {
41 for path in paths {
42 let filename = path.file_stem().unwrap().to_str().unwrap();
43 if let Ok(base_offset) = filename.parse::<u64>() {
45 segments.push(Segment::new(&dir, base_offset)?);
46 }
47 }
48 }
49
50 if segments.is_empty() {
51 segments.push(Segment::new(&dir, 0)?);
52 }
53
54 let active_segment_index = segments.len() - 1;
55
56 Ok(Self {
57 dir,
58 segments,
59 active_segment_index,
60 max_segment_size,
61 })
62 }
63
64 pub async fn append(&mut self, offset: u64, message: Message) -> Result<u64> {
65 let segment = &mut self.segments[self.active_segment_index];
66
67 if segment.size() >= self.max_segment_size {
69 let new_segment = Segment::new(&self.dir, offset)?;
70 self.segments.push(new_segment);
71 self.active_segment_index += 1;
72 }
73
74 let segment = &mut self.segments[self.active_segment_index];
75 segment.append(offset, message).await
76 }
77
78 pub async fn read(&self, offset: u64, max_bytes: usize) -> Result<Vec<Message>> {
79 let mut messages = Vec::new();
80 let mut bytes_collected = 0;
81
82 let start_segment_idx = self
84 .segments
85 .partition_point(|seg| seg.base_offset() <= offset)
86 .saturating_sub(1);
87
88 for segment in self.segments.iter().skip(start_segment_idx) {
89 if bytes_collected >= max_bytes {
91 break;
92 }
93
94 let batch = segment.read(offset, max_bytes - bytes_collected).await?;
96
97 for msg in batch {
98 if msg.offset < offset {
99 continue;
100 }
101
102 if messages.len() < 1000 && bytes_collected < max_bytes {
103 let size = 8 + msg.key.as_ref().map(|k| k.len()).unwrap_or(0) + msg.value.len();
105 bytes_collected += size;
106
107 messages.push(msg);
108 }
109 }
110 }
111
112 Ok(messages)
113 }
114
115 pub fn earliest_offset(&self) -> u64 {
116 self.segments.first().map(|s| s.base_offset()).unwrap_or(0)
117 }
118
119 pub async fn recover_next_offset(&self) -> Result<u64> {
120 if let Some(last_segment) = self.segments.last() {
121 if let Some(last_offset) = last_segment.recover_last_offset().await? {
122 return Ok(last_offset + 1);
123 }
124 return Ok(last_segment.base_offset());
128 }
129 Ok(0)
130 }
131
132 pub async fn flush(&self) -> Result<()> {
134 for segment in &self.segments {
135 segment.flush().await?;
136 }
137 Ok(())
138 }
139
140 pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
144 for segment in &self.segments {
146 if let Some((_min_ts, max_ts)) = segment.timestamp_bounds().await? {
148 if max_ts < target_timestamp {
150 continue;
151 }
152 if let Some(offset) = segment.find_offset_for_timestamp(target_timestamp).await? {
154 return Ok(Some(offset));
155 }
156 }
157 }
158 Ok(None)
159 }
160}