rivven_core/storage/
segment.rs1use crate::{Error, Message, Result};
2use bytes::{BufMut, BytesMut};
3use crc32fast::Hasher;
4use memmap2::Mmap;
5use std::fs::{File, OpenOptions};
6use std::io::{Seek, SeekFrom, Write};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11const INDEX_ENTRY_SIZE: usize = 12; const LOG_SUFFIX: &str = "log";
13const INDEX_SUFFIX: &str = "index";
14
15#[derive(Debug)]
18pub struct Segment {
19 base_offset: u64,
20 log_path: PathBuf,
21 index_path: PathBuf,
22 log_file: Arc<Mutex<File>>,
23 current_size: u64,
24 index_buffer: Vec<(u32, u64)>, }
26
27impl Segment {
28 pub fn new(dir: &Path, base_offset: u64) -> Result<Self> {
29 let log_path = dir.join(format!("{:020}.{}", base_offset, LOG_SUFFIX));
30 let index_path = dir.join(format!("{:020}.{}", base_offset, INDEX_SUFFIX));
31
32 let mut log_file = OpenOptions::new()
34 .read(true)
35 .create(true)
36 .append(true)
37 .open(&log_path)?;
38
39 let current_size = log_file.seek(SeekFrom::End(0))?;
40
41 let index_file = OpenOptions::new()
43 .read(true)
44 .write(true)
45 .create(true)
46 .truncate(false) .open(&index_path)?;
48
49 let mut segment = Self {
50 base_offset,
51 log_path,
52 index_path,
53 log_file: Arc::new(Mutex::new(log_file)),
54 current_size,
55 index_buffer: Vec::new(),
56 };
57
58 if index_file.metadata()?.len() > 0 {
60 segment.load_index(&index_file)?;
61 }
62
63 Ok(segment)
64 }
65
66 fn load_index(&mut self, file: &File) -> Result<()> {
67 let len = file.metadata()?.len();
68 let count = len as usize / INDEX_ENTRY_SIZE;
69 let mmap = unsafe { Mmap::map(file)? };
72
73 let mut cursor = 0;
74 for _ in 0..count {
75 if cursor + INDEX_ENTRY_SIZE > mmap.len() {
76 break;
77 }
78
79 let rel_offset_bytes: [u8; 4] = mmap[cursor..cursor + 4].try_into().unwrap();
80 let pos_bytes: [u8; 8] = mmap[cursor + 4..cursor + 12].try_into().unwrap();
81
82 self.index_buffer.push((
83 u32::from_be_bytes(rel_offset_bytes),
84 u64::from_be_bytes(pos_bytes),
85 ));
86
87 cursor += INDEX_ENTRY_SIZE;
88 }
89
90 Ok(())
91 }
92
93 pub async fn append(&mut self, offset: u64, mut message: Message) -> Result<u64> {
95 if offset < self.base_offset {
96 return Err(Error::Other(format!(
97 "Offset {} is smaller than segment base offset {}",
98 offset, self.base_offset
99 )));
100 }
101
102 message.offset = offset;
104 let bytes = message.to_bytes()?;
105 let len = bytes.len() as u32;
106
107 let mut hasher = Hasher::new();
109 hasher.update(&bytes);
110 let crc = hasher.finalize();
111
112 let mut frame = BytesMut::with_capacity(8 + bytes.len());
114 frame.put_u32(crc);
115 frame.put_u32(len);
116 frame.put_slice(&bytes);
117
118 {
120 let mut file = self.log_file.lock().await;
121 file.write_all(&frame)?;
122 }
123 let position = self.current_size;
124 self.current_size += frame.len() as u64;
127
128 let relative_offset = (offset - self.base_offset) as u32;
133 self.append_index(relative_offset, position)?;
134
135 Ok(position)
136 }
137
138 fn append_index(&mut self, relative_offset: u32, position: u64) -> Result<()> {
139 let mut file = OpenOptions::new()
140 .append(true)
141 .create(true)
142 .open(&self.index_path)?;
143
144 let mut buf = BytesMut::with_capacity(12);
145 buf.put_u32(relative_offset);
146 buf.put_u64(position);
147 file.write_all(&buf)?;
148
149 self.index_buffer.push((relative_offset, position));
150 Ok(())
151 }
152
153 pub async fn flush(&self) -> Result<()> {
155 let file = self.log_file.lock().await;
156 file.sync_all()?;
157 Ok(())
158 }
159
160 pub async fn read(&self, offset: u64, max_bytes: usize) -> Result<Vec<Message>> {
162 if offset < self.base_offset {
163 return Ok(Vec::new()); }
165
166 let relative_offset = (offset - self.base_offset).try_into().unwrap_or(u32::MAX);
168 let mut start_pos = 0;
169
170 if let Some(idx) = self
172 .index_buffer
173 .partition_point(|&(off, _)| off <= relative_offset)
174 .checked_sub(1)
175 {
176 start_pos = self.index_buffer[idx].1;
177 }
178
179 let file = File::open(&self.log_path)?;
181 let file_len = file.metadata()?.len();
182 if file_len == 0 {
183 return Ok(Vec::new());
184 }
185
186 let mmap = unsafe { Mmap::map(&file)? };
189
190 if start_pos >= mmap.len() as u64 {
191 return Ok(Vec::new());
192 }
193
194 let mut current_pos = start_pos as usize;
195 let mut messages = Vec::new();
196 let mut bytes_read = 0;
197
198 while current_pos < mmap.len() && bytes_read < max_bytes {
199 if current_pos + 8 > mmap.len() {
201 break;
202 }
203
204 let slice = &mmap[current_pos..];
205 let _cursor = std::io::Cursor::new(slice);
206
207 let crc_bytes: [u8; 4] = slice[0..4].try_into().unwrap();
210 let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
211 let stored_crc = u32::from_be_bytes(crc_bytes);
212 let msg_len = u32::from_be_bytes(len_bytes) as usize;
213
214 if current_pos + 8 + msg_len > mmap.len() {
215 break; }
217
218 let payload = &slice[8..8 + msg_len];
220 let mut hasher = Hasher::new();
221 hasher.update(payload);
222 let computed_crc = hasher.finalize();
223
224 if computed_crc != stored_crc {
225 return Err(Error::Other(format!(
226 "CRC mismatch at position {}",
227 current_pos
228 )));
229 }
230
231 let msg = Message::from_bytes(payload)?;
233 if msg.offset >= offset {
234 messages.push(msg);
235 bytes_read += 8 + msg_len;
236 }
237
238 current_pos += 8 + msg_len;
239 }
240
241 Ok(messages)
242 }
243
244 pub fn size(&self) -> u64 {
245 self.current_size
246 }
247
248 pub fn base_offset(&self) -> u64 {
249 self.base_offset
250 }
251
252 pub async fn recover_last_offset(&self) -> Result<Option<u64>> {
253 let mut start_pos = 0;
254 if let Some((_, pos)) = self.index_buffer.last() {
255 start_pos = *pos;
256 }
257
258 let file = File::open(&self.log_path)?;
259 let len = file.metadata()?.len();
260 if len == 0 {
261 return Ok(None);
262 }
263
264 let mmap = unsafe { Mmap::map(&file)? };
267
268 if start_pos >= mmap.len() as u64 {
269 return Ok(None);
270 }
271
272 let mut current_pos = start_pos as usize;
273 let mut last_offset = None;
274
275 while current_pos < mmap.len() {
276 if current_pos + 8 > mmap.len() {
277 break;
278 }
279
280 let slice = &mmap[current_pos..];
281 let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
282 let msg_len = u32::from_be_bytes(len_bytes) as usize;
283
284 if current_pos + 8 + msg_len > mmap.len() {
285 break;
286 }
287
288 let payload = &slice[8..8 + msg_len];
289 if let Ok(msg) = Message::from_bytes(payload) {
290 last_offset = Some(msg.offset);
291 }
292
293 current_pos += 8 + msg_len;
294 }
295
296 Ok(last_offset)
297 }
298
299 pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
303 let file = File::open(&self.log_path)?;
304 let len = file.metadata()?.len();
305 if len == 0 {
306 return Ok(None);
307 }
308
309 let mmap = unsafe { Mmap::map(&file)? };
312 let mut current_pos = 0usize;
313
314 while current_pos < mmap.len() {
315 if current_pos + 8 > mmap.len() {
316 break;
317 }
318
319 let slice = &mmap[current_pos..];
320 let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
321 let msg_len = u32::from_be_bytes(len_bytes) as usize;
322
323 if current_pos + 8 + msg_len > mmap.len() {
324 break;
325 }
326
327 let payload = &slice[8..8 + msg_len];
328 if let Ok(msg) = Message::from_bytes(payload) {
329 let msg_timestamp = msg.timestamp.timestamp_millis();
330 if msg_timestamp >= target_timestamp {
331 return Ok(Some(msg.offset));
332 }
333 }
334
335 current_pos += 8 + msg_len;
336 }
337
338 Ok(None)
339 }
340
341 pub async fn timestamp_bounds(&self) -> Result<Option<(i64, i64)>> {
345 let file = File::open(&self.log_path)?;
346 let len = file.metadata()?.len();
347 if len == 0 {
348 return Ok(None);
349 }
350
351 let mmap = unsafe { Mmap::map(&file)? };
354 let mut current_pos = 0usize;
355 let mut min_ts: Option<i64> = None;
356 let mut max_ts: Option<i64> = None;
357
358 while current_pos < mmap.len() {
359 if current_pos + 8 > mmap.len() {
360 break;
361 }
362
363 let slice = &mmap[current_pos..];
364 let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
365 let msg_len = u32::from_be_bytes(len_bytes) as usize;
366
367 if current_pos + 8 + msg_len > mmap.len() {
368 break;
369 }
370
371 let payload = &slice[8..8 + msg_len];
372 if let Ok(msg) = Message::from_bytes(payload) {
373 let ts = msg.timestamp.timestamp_millis();
374 min_ts = Some(min_ts.map_or(ts, |m| m.min(ts)));
375 max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
376 }
377
378 current_pos += 8 + msg_len;
379 }
380
381 match (min_ts, max_ts) {
382 (Some(min), Some(max)) => Ok(Some((min, max))),
383 _ => Ok(None),
384 }
385 }
386}