Skip to main content

rivven_core/storage/
segment.rs

1use crate::{Error, Message, Result};
2use bytes::{BufMut, BytesMut};
3use crc32fast::Hasher;
4use memmap2::Mmap;
5use std::fs::{File, OpenOptions};
6use std::io::{Seek, SeekFrom, Write};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11const INDEX_ENTRY_SIZE: usize = 12; // 4 bytes relative offset, 8 bytes position
12const LOG_SUFFIX: &str = "log";
13const INDEX_SUFFIX: &str = "index";
14
15/// Represents a segment of the log on disk
16/// A segment consists of a .log file (data) and a .index file (sparse index)
17#[derive(Debug)]
18pub struct Segment {
19    base_offset: u64,
20    log_path: PathBuf,
21    index_path: PathBuf,
22    log_file: Arc<Mutex<File>>,
23    current_size: u64,
24    index_buffer: Vec<(u32, u64)>, // Relative offset -> Position
25}
26
27impl Segment {
28    pub fn new(dir: &Path, base_offset: u64) -> Result<Self> {
29        let log_path = dir.join(format!("{:020}.{}", base_offset, LOG_SUFFIX));
30        let index_path = dir.join(format!("{:020}.{}", base_offset, INDEX_SUFFIX));
31
32        // Open or create log file
33        let mut log_file = OpenOptions::new()
34            .read(true)
35            .create(true)
36            .append(true)
37            .open(&log_path)?;
38
39        let current_size = log_file.seek(SeekFrom::End(0))?;
40
41        // Open or create index file
42        let index_file = OpenOptions::new()
43            .read(true)
44            .write(true)
45            .create(true)
46            .truncate(false) // Preserve existing data
47            .open(&index_path)?;
48
49        let mut segment = Self {
50            base_offset,
51            log_path,
52            index_path,
53            log_file: Arc::new(Mutex::new(log_file)),
54            current_size,
55            index_buffer: Vec::new(),
56        };
57
58        // Load index if exists
59        if index_file.metadata()?.len() > 0 {
60            segment.load_index(&index_file)?;
61        }
62
63        Ok(segment)
64    }
65
66    fn load_index(&mut self, file: &File) -> Result<()> {
67        let len = file.metadata()?.len();
68        let count = len as usize / INDEX_ENTRY_SIZE;
69        // SAFETY: The file is opened for reading and remains valid for the mmap lifetime.
70        // The mmap is read-only and we only access within its bounds.
71        let mmap = unsafe { Mmap::map(file)? };
72
73        let mut cursor = 0;
74        for _ in 0..count {
75            if cursor + INDEX_ENTRY_SIZE > mmap.len() {
76                break;
77            }
78
79            let rel_offset_bytes: [u8; 4] = mmap[cursor..cursor + 4].try_into().unwrap();
80            let pos_bytes: [u8; 8] = mmap[cursor + 4..cursor + 12].try_into().unwrap();
81
82            self.index_buffer.push((
83                u32::from_be_bytes(rel_offset_bytes),
84                u64::from_be_bytes(pos_bytes),
85            ));
86
87            cursor += INDEX_ENTRY_SIZE;
88        }
89
90        Ok(())
91    }
92
93    /// Append a message to the segment
94    pub async fn append(&mut self, offset: u64, mut message: Message) -> Result<u64> {
95        if offset < self.base_offset {
96            return Err(Error::Other(format!(
97                "Offset {} is smaller than segment base offset {}",
98                offset, self.base_offset
99            )));
100        }
101
102        // 1. Serialize message
103        message.offset = offset;
104        let bytes = message.to_bytes()?;
105        let len = bytes.len() as u32;
106
107        // 2. Calculate CRC
108        let mut hasher = Hasher::new();
109        hasher.update(&bytes);
110        let crc = hasher.finalize();
111
112        // 3. Prepare frame: [CRC: 4][Len: 4][Payload: N]
113        let mut frame = BytesMut::with_capacity(8 + bytes.len());
114        frame.put_u32(crc);
115        frame.put_u32(len);
116        frame.put_slice(&bytes);
117
118        // 4. Write to disk (Holding lock)
119        {
120            let mut file = self.log_file.lock().await;
121            file.write_all(&frame)?;
122        }
123        let position = self.current_size;
124        // file.sync_data()?; // Optional: Call sync for durability (slow) or rely on OS cache
125
126        self.current_size += frame.len() as u64;
127
128        // 5. Update Index (every 4KB or so)
129
130        // Simple strategy: Index every message for now for exact lookup in concept
131        // optimized: only index if position - last_index_position > 4096
132        let relative_offset = (offset - self.base_offset) as u32;
133        self.append_index(relative_offset, position)?;
134
135        Ok(position)
136    }
137
138    fn append_index(&mut self, relative_offset: u32, position: u64) -> Result<()> {
139        let mut file = OpenOptions::new()
140            .append(true)
141            .create(true)
142            .open(&self.index_path)?;
143
144        let mut buf = BytesMut::with_capacity(12);
145        buf.put_u32(relative_offset);
146        buf.put_u64(position);
147        file.write_all(&buf)?;
148
149        self.index_buffer.push((relative_offset, position));
150        Ok(())
151    }
152
153    /// Flush segment data to disk ensuring durability
154    pub async fn flush(&self) -> Result<()> {
155        let file = self.log_file.lock().await;
156        file.sync_all()?;
157        Ok(())
158    }
159
160    /// Read a batch of messages starting from a given offset
161    pub async fn read(&self, offset: u64, max_bytes: usize) -> Result<Vec<Message>> {
162        if offset < self.base_offset {
163            return Ok(Vec::new()); // Or error? LogManager should handle this
164        }
165
166        // 1. Find position from index
167        let relative_offset = (offset - self.base_offset).try_into().unwrap_or(u32::MAX);
168        let mut start_pos = 0;
169
170        // Binary search for the closest index entry <= relative_offset
171        if let Some(idx) = self
172            .index_buffer
173            .partition_point(|&(off, _)| off <= relative_offset)
174            .checked_sub(1)
175        {
176            start_pos = self.index_buffer[idx].1;
177        }
178
179        // 2. Mmap the file for reading (Zero clone from kernel cache context)
180        let file = File::open(&self.log_path)?;
181        let file_len = file.metadata()?.len();
182        if file_len == 0 {
183            return Ok(Vec::new());
184        }
185
186        // SAFETY: File is opened read-only and remains valid for mmap lifetime.
187        // We check bounds before all slice accesses below.
188        let mmap = unsafe { Mmap::map(&file)? };
189
190        if start_pos >= mmap.len() as u64 {
191            return Ok(Vec::new());
192        }
193
194        let mut current_pos = start_pos as usize;
195        let mut messages = Vec::new();
196        let mut bytes_read = 0;
197
198        while current_pos < mmap.len() && bytes_read < max_bytes {
199            // Check headers
200            if current_pos + 8 > mmap.len() {
201                break;
202            }
203
204            let slice = &mmap[current_pos..];
205            let _cursor = std::io::Cursor::new(slice);
206
207            // Read CRC and Len
208            // Using converting methods
209            let crc_bytes: [u8; 4] = slice[0..4].try_into().unwrap();
210            let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
211            let stored_crc = u32::from_be_bytes(crc_bytes);
212            let msg_len = u32::from_be_bytes(len_bytes) as usize;
213
214            if current_pos + 8 + msg_len > mmap.len() {
215                break; // Incomplete message
216            }
217
218            // Verify CRC
219            let payload = &slice[8..8 + msg_len];
220            let mut hasher = Hasher::new();
221            hasher.update(payload);
222            let computed_crc = hasher.finalize();
223
224            if computed_crc != stored_crc {
225                return Err(Error::Other(format!(
226                    "CRC mismatch at position {}",
227                    current_pos
228                )));
229            }
230
231            // Deserialize
232            let msg = Message::from_bytes(payload)?;
233            if msg.offset >= offset {
234                messages.push(msg);
235                bytes_read += 8 + msg_len;
236            }
237
238            current_pos += 8 + msg_len;
239        }
240
241        Ok(messages)
242    }
243
244    pub fn size(&self) -> u64 {
245        self.current_size
246    }
247
248    pub fn base_offset(&self) -> u64 {
249        self.base_offset
250    }
251
252    pub async fn recover_last_offset(&self) -> Result<Option<u64>> {
253        let mut start_pos = 0;
254        if let Some((_, pos)) = self.index_buffer.last() {
255            start_pos = *pos;
256        }
257
258        let file = File::open(&self.log_path)?;
259        let len = file.metadata()?.len();
260        if len == 0 {
261            return Ok(None);
262        }
263
264        // SAFETY: File is opened read-only, checked non-empty, and remains valid.
265        // We check bounds before all slice accesses.
266        let mmap = unsafe { Mmap::map(&file)? };
267
268        if start_pos >= mmap.len() as u64 {
269            return Ok(None);
270        }
271
272        let mut current_pos = start_pos as usize;
273        let mut last_offset = None;
274
275        while current_pos < mmap.len() {
276            if current_pos + 8 > mmap.len() {
277                break;
278            }
279
280            let slice = &mmap[current_pos..];
281            let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
282            let msg_len = u32::from_be_bytes(len_bytes) as usize;
283
284            if current_pos + 8 + msg_len > mmap.len() {
285                break;
286            }
287
288            let payload = &slice[8..8 + msg_len];
289            if let Ok(msg) = Message::from_bytes(payload) {
290                last_offset = Some(msg.offset);
291            }
292
293            current_pos += 8 + msg_len;
294        }
295
296        Ok(last_offset)
297    }
298
299    /// Find the first offset with timestamp >= target_timestamp
300    /// Uses linear scan through the segment (timestamps may not be monotonic due to clock skew)
301    /// Returns None if no matching offset is found
302    pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
303        let file = File::open(&self.log_path)?;
304        let len = file.metadata()?.len();
305        if len == 0 {
306            return Ok(None);
307        }
308
309        // SAFETY: File is opened read-only, checked non-empty, and remains valid.
310        // We check bounds before all slice accesses.
311        let mmap = unsafe { Mmap::map(&file)? };
312        let mut current_pos = 0usize;
313
314        while current_pos < mmap.len() {
315            if current_pos + 8 > mmap.len() {
316                break;
317            }
318
319            let slice = &mmap[current_pos..];
320            let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
321            let msg_len = u32::from_be_bytes(len_bytes) as usize;
322
323            if current_pos + 8 + msg_len > mmap.len() {
324                break;
325            }
326
327            let payload = &slice[8..8 + msg_len];
328            if let Ok(msg) = Message::from_bytes(payload) {
329                let msg_timestamp = msg.timestamp.timestamp_millis();
330                if msg_timestamp >= target_timestamp {
331                    return Ok(Some(msg.offset));
332                }
333            }
334
335            current_pos += 8 + msg_len;
336        }
337
338        Ok(None)
339    }
340
341    /// Get the timestamp range of messages in this segment
342    /// Returns (min_timestamp, max_timestamp) in milliseconds since epoch
343    /// Useful for quickly determining if a segment might contain a target timestamp
344    pub async fn timestamp_bounds(&self) -> Result<Option<(i64, i64)>> {
345        let file = File::open(&self.log_path)?;
346        let len = file.metadata()?.len();
347        if len == 0 {
348            return Ok(None);
349        }
350
351        // SAFETY: File is opened read-only, checked non-empty, and remains valid.
352        // We check bounds before all slice accesses.
353        let mmap = unsafe { Mmap::map(&file)? };
354        let mut current_pos = 0usize;
355        let mut min_ts: Option<i64> = None;
356        let mut max_ts: Option<i64> = None;
357
358        while current_pos < mmap.len() {
359            if current_pos + 8 > mmap.len() {
360                break;
361            }
362
363            let slice = &mmap[current_pos..];
364            let len_bytes: [u8; 4] = slice[4..8].try_into().unwrap();
365            let msg_len = u32::from_be_bytes(len_bytes) as usize;
366
367            if current_pos + 8 + msg_len > mmap.len() {
368                break;
369            }
370
371            let payload = &slice[8..8 + msg_len];
372            if let Ok(msg) = Message::from_bytes(payload) {
373                let ts = msg.timestamp.timestamp_millis();
374                min_ts = Some(min_ts.map_or(ts, |m| m.min(ts)));
375                max_ts = Some(max_ts.map_or(ts, |m| m.max(ts)));
376            }
377
378            current_pos += 8 + msg_len;
379        }
380
381        match (min_ts, max_ts) {
382            (Some(min), Some(max)) => Ok(Some((min, max))),
383            _ => Ok(None),
384        }
385    }
386}