Skip to main content

rivven_core/storage/
log_manager.rs

1use super::segment::Segment;
2use crate::{Message, Result};
3use std::fs;
4use std::path::PathBuf;
5
6#[derive(Debug)]
7pub struct LogManager {
8    dir: PathBuf,
9    segments: Vec<Segment>,
10    active_segment_index: usize,
11    max_segment_size: u64,
12}
13
14impl LogManager {
15    pub async fn new(
16        base_dir: PathBuf,
17        topic: &str,
18        partition: u32,
19        max_segment_size: u64,
20    ) -> Result<Self> {
21        let dir = base_dir
22            .join(topic)
23            .join(format!("partition-{}", partition));
24        fs::create_dir_all(&dir)?;
25
26        let mut segments = Vec::new();
27        // Load existing segments
28        // Simply look for .log files and sort them
29        let mut paths: Vec<_> = fs::read_dir(&dir)?
30            .filter_map(|entry| entry.ok())
31            .map(|entry| entry.path())
32            .filter(|path| path.extension().is_some_and(|ext| ext == "log"))
33            .collect();
34
35        paths.sort();
36
37        if paths.is_empty() {
38            // Create initial segment
39            segments.push(Segment::new(&dir, 0)?);
40        } else {
41            for path in paths {
42                let filename = path.file_stem().unwrap().to_str().unwrap();
43                // Assumes filename is just the offset (e.g. "00000000000000000000")
44                if let Ok(base_offset) = filename.parse::<u64>() {
45                    segments.push(Segment::new(&dir, base_offset)?);
46                }
47            }
48        }
49
50        if segments.is_empty() {
51            segments.push(Segment::new(&dir, 0)?);
52        }
53
54        let active_segment_index = segments.len() - 1;
55
56        Ok(Self {
57            dir,
58            segments,
59            active_segment_index,
60            max_segment_size,
61        })
62    }
63
64    pub async fn append(&mut self, offset: u64, message: Message) -> Result<u64> {
65        let segment = &mut self.segments[self.active_segment_index];
66
67        // Check if we need to roll
68        if segment.size() >= self.max_segment_size {
69            let new_segment = Segment::new(&self.dir, offset)?;
70            self.segments.push(new_segment);
71            self.active_segment_index += 1;
72        }
73
74        let segment = &mut self.segments[self.active_segment_index];
75        segment.append(offset, message).await
76    }
77
78    pub async fn read(&self, offset: u64, max_bytes: usize) -> Result<Vec<Message>> {
79        let mut messages = Vec::new();
80        let mut bytes_collected = 0;
81
82        // Optimized: Find first segment where base_offset <= offset
83        let start_segment_idx = self
84            .segments
85            .partition_point(|seg| seg.base_offset() <= offset)
86            .saturating_sub(1);
87
88        for segment in self.segments.iter().skip(start_segment_idx) {
89            // If we have enough data, stop
90            if bytes_collected >= max_bytes {
91                break;
92            }
93
94            // Read from segment
95            let batch = segment.read(offset, max_bytes - bytes_collected).await?;
96
97            for msg in batch {
98                if msg.offset < offset {
99                    continue;
100                }
101
102                if messages.len() < 1000 && bytes_collected < max_bytes {
103                    // Estimate size (header + key + val)
104                    let size = 8 + msg.key.as_ref().map(|k| k.len()).unwrap_or(0) + msg.value.len();
105                    bytes_collected += size;
106
107                    messages.push(msg);
108                }
109            }
110        }
111
112        Ok(messages)
113    }
114
115    pub fn earliest_offset(&self) -> u64 {
116        self.segments.first().map(|s| s.base_offset()).unwrap_or(0)
117    }
118
119    pub async fn recover_next_offset(&self) -> Result<u64> {
120        if let Some(last_segment) = self.segments.last() {
121            if let Some(last_offset) = last_segment.recover_last_offset().await? {
122                return Ok(last_offset + 1);
123            }
124            // If last segment is empty, check `base_offset` of it?
125            // Usually base_offset is the next offset of previous segment.
126            // But if it's completely empty (newly created), next offset is `base_offset`.
127            return Ok(last_segment.base_offset());
128        }
129        Ok(0)
130    }
131
132    /// Flush all segments to disk ensuring durability
133    pub async fn flush(&self) -> Result<()> {
134        for segment in &self.segments {
135            segment.flush().await?;
136        }
137        Ok(())
138    }
139
140    /// Find the first offset with timestamp >= target_timestamp (milliseconds since epoch)
141    /// Scans through segments to find the earliest message matching the timestamp.
142    /// Returns None if no matching offset is found.
143    pub async fn find_offset_for_timestamp(&self, target_timestamp: i64) -> Result<Option<u64>> {
144        // Scan segments from oldest to newest
145        for segment in &self.segments {
146            // Check timestamp bounds to skip segments that are entirely before target
147            if let Some((_min_ts, max_ts)) = segment.timestamp_bounds().await? {
148                // If the entire segment is before our target, skip it
149                if max_ts < target_timestamp {
150                    continue;
151                }
152                // If the segment might contain our target, search it
153                if let Some(offset) = segment.find_offset_for_timestamp(target_timestamp).await? {
154                    return Ok(Some(offset));
155                }
156            }
157        }
158        Ok(None)
159    }
160}