raft_log/raft_log/
raft_log.rs

1use std::collections::BTreeMap;
2use std::io;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5
6use codeq::error_context_ext::ErrorContextExt;
7use codeq::OffsetSize;
8use log::info;
9
10use crate::api::raft_log_writer::RaftLogWriter;
11use crate::api::state_machine::StateMachine;
12use crate::api::wal::WAL;
13use crate::chunk::closed_chunk::ClosedChunk;
14use crate::chunk::open_chunk::OpenChunk;
15use crate::chunk::Chunk;
16use crate::errors::LogIndexNotFound;
17use crate::errors::RaftLogStateError;
18use crate::file_lock::FileLock;
19use crate::num::format_pad_u64;
20use crate::raft_log::access_state::AccessStat;
21use crate::raft_log::dump::RefDump;
22use crate::raft_log::dump_raft_log::DumpRaftLog;
23use crate::raft_log::stat::ChunkStat;
24use crate::raft_log::stat::Stat;
25use crate::raft_log::state_machine::raft_log_state::RaftLogState;
26use crate::raft_log::state_machine::RaftLogStateMachine;
27use crate::raft_log::wal::RaftLogWAL;
28use crate::types::Segment;
29use crate::ChunkId;
30use crate::Config;
31use crate::Types;
32use crate::WALRecord;
33
34/// RaftLog is a Write-Ahead-Log implementation for the Raft consensus protocol.
35///
36/// It provides persistent storage for Raft log entries and state, with the
37/// following features:
38/// - Append-only log storage with chunk-based organization
39/// - In-memory caching of log payloads
40/// - Exclusive file locking for thread-safe operations
41/// - Support for log truncation and purging
42/// - Statistics tracking for monitoring
43#[derive(Debug)]
44pub struct RaftLog<T: Types> {
45    pub(crate) config: Arc<Config>,
46
47    /// Acquire the dir exclusive lock when writing to the log.
48    _dir_lock: FileLock,
49
50    pub(crate) wal: RaftLogWAL<T>,
51
52    pub(crate) state_machine: RaftLogStateMachine<T>,
53
54    /// The chunk paths that are no longer needed because all logs in them are
55    /// purged. But removing them must be postponed until the purge record
56    /// is flushed to disk.
57    removed_chunks: Vec<String>,
58
59    access_stat: AccessStat,
60}
61
62impl<T: Types> RaftLogWriter<T> for RaftLog<T> {
63    fn save_user_data(
64        &mut self,
65        user_data: Option<T::UserData>,
66    ) -> Result<Segment, io::Error> {
67        let mut state = self.log_state().clone();
68        state.user_data = user_data;
69        let record = WALRecord::State(state);
70        self.append_and_apply(&record)
71    }
72
73    fn save_vote(&mut self, vote: T::Vote) -> Result<Segment, io::Error> {
74        let record = WALRecord::SaveVote(vote.clone());
75        self.append_and_apply(&record)
76    }
77
78    fn append<I>(&mut self, entries: I) -> Result<Segment, io::Error>
79    where I: IntoIterator<Item = (T::LogId, T::LogPayload)> {
80        for (log_id, payload) in entries {
81            let record = WALRecord::Append(log_id, payload);
82            self.append_and_apply(&record)?;
83        }
84        Ok(self.wal.last_segment())
85    }
86
87    /// Truncate at `index`, keep the record before `index`.
88    fn truncate(&mut self, index: u64) -> Result<Segment, io::Error> {
89        let purged = self.log_state().purged.as_ref();
90
91        let log_id = if index == T::next_log_index(purged) {
92            purged.cloned()
93        } else {
94            let log_id = self.get_log_id(index - 1)?;
95            Some(log_id)
96        };
97
98        let record = WALRecord::TruncateAfter(log_id);
99        self.append_and_apply(&record)
100    }
101
102    fn purge(&mut self, upto: T::LogId) -> Result<Segment, io::Error> {
103        // NOTE that only when the purge record is committed, the chunk file can
104        // be removed.
105
106        let purged = self.log_state().purged.as_ref();
107
108        info!(
109            "RaftLog purge upto: {:?}; current purged: {:?}",
110            upto, purged
111        );
112
113        if T::log_index(&upto) < T::next_log_index(purged) {
114            return Ok(self.wal.last_segment());
115        }
116
117        let record = WALRecord::PurgeUpto(upto.clone());
118        let res = self.append_and_apply(&record)?;
119
120        // Buffer the chunk ids to remove.
121        // After the purge record is flushed to disk,
122        // remove them in the FlushWorker
123
124        while let Some((_chunk_id, closed)) = self.wal.closed.first_key_value()
125        {
126            if closed.state.last.as_ref() > Some(&upto) {
127                break;
128            }
129            let (chunk_id, _r) = self.wal.closed.pop_first().unwrap();
130            let path = self.config.chunk_path(chunk_id);
131            info!(
132                "RaftLog: scheduled to remove chunk after next flush: {}",
133                path
134            );
135            self.removed_chunks.push(path);
136        }
137
138        Ok(res)
139    }
140
141    fn commit(&mut self, log_id: T::LogId) -> Result<Segment, io::Error> {
142        let record = WALRecord::Commit(log_id);
143        self.append_and_apply(&record)
144    }
145
146    fn flush(&mut self, callback: T::Callback) -> Result<(), io::Error> {
147        self.wal.send_flush(callback)?;
148
149        if !self.removed_chunks.is_empty() {
150            let chunk_ids = self.removed_chunks.drain(..).collect::<Vec<_>>();
151            self.wal.send_remove_chunks(chunk_ids)?;
152        }
153
154        Ok(())
155    }
156}
157
158impl<T: Types> RaftLog<T> {
159    /// Dump the RaftLog data for debugging purposes.
160    ///
161    /// Returns a `DumpRaftLog` struct containing a complete snapshot of the
162    /// RaftLog state.
163    pub fn dump_data(&self) -> DumpRaftLog<T> {
164        let logs = self.state_machine.log.values().cloned().collect::<Vec<_>>();
165        let cache =
166            self.state_machine.payload_cache.read().unwrap().cache.clone();
167        let chunks = self.wal.closed.clone();
168
169        DumpRaftLog {
170            state: self.state_machine.log_state.clone(),
171            logs,
172            cache,
173            chunks,
174            cache_hit: 0,
175            cache_miss: 0,
176        }
177    }
178
179    /// Dump the WAL data in this Raft-log for debugging purposes.
180    ///
181    /// This method returns a reference type `RefDump` containing the RaftLog
182    /// configuration and the RaftLog instance itself.
183    pub fn dump(&self) -> RefDump<'_, T> {
184        RefDump {
185            config: self.config.clone(),
186            raft_log: self,
187        }
188    }
189
190    /// Get a reference to the RaftLog configuration.
191    pub fn config(&self) -> &Config {
192        self.config.as_ref()
193    }
194
195    /// Opens a RaftLog at the specified directory.
196    ///
197    /// This operation:
198    /// 1. Acquires an exclusive lock on the directory
199    /// 2. Loads existing chunks in order
200    /// 3. Replays WAL records to rebuild the state
201    /// 4. Creates a new open chunk for future writes
202    ///
203    /// # Errors
204    /// Returns an error if:
205    /// - Directory operations fail
206    /// - There are gaps between chunk offsets
207    /// - WAL records are invalid
208    pub fn open(config: Arc<Config>) -> Result<Self, io::Error> {
209        let dir_lock = FileLock::new(config.clone())
210            .context(|| format!("open RaftLog in '{}'", config.dir))?;
211
212        let chunk_ids = Self::load_chunk_ids(&config)?;
213
214        let mut sm = RaftLogStateMachine::new(&config);
215        let mut closed = BTreeMap::new();
216        let mut prev_end_offset = None;
217        let mut last_log_id = None;
218
219        for chunk_id in chunk_ids.iter().copied() {
220            // Only the last chunk(open chunk) needs to keep all log payload in
221            // cache. Therefore, payloads in previous chunks are marked as
222            // evictable.
223            sm.payload_cache.write().unwrap().set_last_evictable(last_log_id);
224
225            Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
226
227            let (chunk, records) = Chunk::open(config.clone(), chunk_id)?;
228
229            for (i, record) in records.into_iter().enumerate() {
230                let start = chunk.global_offsets[i];
231                let end = chunk.global_offsets[i + 1];
232                let seg = Segment::new(start, end - start);
233                sm.apply(&record, chunk_id, seg)?;
234            }
235
236            prev_end_offset = Some(chunk.last_segment().end().0);
237            last_log_id = sm.log_state.last.clone();
238
239            closed.insert(
240                chunk_id,
241                ClosedChunk::new(chunk, sm.log_state.clone()),
242            );
243        }
244
245        let open = Self::reopen_last_closed(&mut closed);
246
247        let open = if let Some(open) = open {
248            open
249        } else {
250            OpenChunk::create(
251                config.clone(),
252                ChunkId(prev_end_offset.unwrap_or_default()),
253                WALRecord::State(sm.log_state.clone()),
254            )?
255        };
256
257        let cache = sm.payload_cache.clone();
258
259        let wal = RaftLogWAL::new(config.clone(), closed, open, cache);
260
261        let s = Self {
262            config,
263            _dir_lock: dir_lock,
264            state_machine: sm,
265            wal,
266            access_stat: Default::default(),
267            removed_chunks: vec![],
268        };
269
270        Ok(s)
271    }
272
273    /// Verifies that two chunks are consecutive by checking their end/start
274    /// offsets.
275    ///
276    /// This function ensures that there are no gaps between chunks in the WAL.
277    /// A gap would indicate data loss or corruption.
278    ///
279    /// # Arguments
280    ///
281    /// * `prev_end_offset` - The end offset of the previous chunk, if any
282    /// * `chunk_id` - The ID of the current chunk to verify
283    fn ensure_consecutive_chunks(
284        prev_end_offset: Option<u64>,
285        chunk_id: ChunkId,
286    ) -> Result<(), io::Error> {
287        let Some(prev_end) = prev_end_offset else {
288            return Ok(());
289        };
290
291        if prev_end != chunk_id.offset() {
292            let message = format!(
293                "Gap between chunks: {} -> {}; Can not open, \
294                        fix this error and re-open",
295                format_pad_u64(prev_end),
296                format_pad_u64(chunk_id.offset()),
297            );
298            return Err(io::Error::new(io::ErrorKind::InvalidData, message));
299        }
300
301        Ok(())
302    }
303
304    /// If there is a healthy last chunk, re-open it.
305    ///
306    /// Healthy means the data is complete and the chunk is not truncated.
307    /// If reused, the closed chunk will be removed from `closed_chunks`
308    fn reopen_last_closed(
309        closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<T>>,
310    ) -> Option<OpenChunk<T>> {
311        // If the chunk is truncated, it is not healthy, do not re-open it.
312        {
313            let (_chunk_id, closed) = closed_chunks.iter().last()?;
314
315            if closed.chunk.truncated.is_some() {
316                return None;
317            }
318        }
319
320        let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
321        let open = OpenChunk::new(last.chunk);
322        Some(open)
323    }
324
325    pub fn load_chunk_ids(config: &Config) -> Result<Vec<ChunkId>, io::Error> {
326        let path = &config.dir;
327        let entries = std::fs::read_dir(path)?;
328        let mut chunk_ids = vec![];
329        for entry in entries {
330            let entry = entry?;
331            let file_name = entry.file_name();
332
333            let fn_str = file_name.to_string_lossy();
334            if fn_str == FileLock::LOCK_FILE_NAME {
335                continue;
336            }
337
338            let res = Config::parse_chunk_file_name(&fn_str);
339
340            match res {
341                Ok(offset) => {
342                    chunk_ids.push(ChunkId(offset));
343                }
344                Err(err) => {
345                    log::warn!(
346                        "Ignore invalid WAL file name: '{}': {}",
347                        fn_str,
348                        err
349                    );
350                    continue;
351                }
352            };
353        }
354
355        chunk_ids.sort();
356
357        Ok(chunk_ids)
358    }
359
360    /// Update the RaftLog state.
361    ///
362    /// This method updates the RaftLog state with a new state and appends it
363    /// to the WAL.
364    pub fn update_state(
365        &mut self,
366        state: RaftLogState<T>,
367    ) -> Result<Segment, io::Error> {
368        let record = WALRecord::State(state);
369        self.append_and_apply(&record)
370    }
371
372    /// Reads log entries in the specified index range.
373    ///
374    /// Returns an iterator over log entries, attempting to serve them from
375    /// cache first, falling back to disk reads if necessary.
376    pub fn read(
377        &self,
378        from: u64,
379        to: u64,
380    ) -> impl Iterator<Item = Result<(T::LogId, T::LogPayload), io::Error>> + '_
381    {
382        self.state_machine.log.range(from..to).map(|(_, log_data)| {
383            let log_id = log_data.log_id.clone();
384
385            let payload =
386                self.state_machine.payload_cache.read().unwrap().get(&log_id);
387
388            let payload = if let Some(payload) = payload {
389                self.access_stat.cache_hit.fetch_add(1, Ordering::Relaxed);
390                payload
391            } else {
392                self.access_stat.cache_miss.fetch_add(1, Ordering::Relaxed);
393                self.wal.load_log_payload(log_data)?
394            };
395
396            Ok((log_id, payload))
397        })
398    }
399
400    /// Get a reference to the latest RaftLog state.
401    ///
402    /// The state is the latest state of the RaftLog, even if the corresponding
403    /// WAL record is not committed yet.
404    pub fn log_state(&self) -> &RaftLogState<T> {
405        &self.state_machine.log_state
406    }
407
408    #[allow(dead_code)]
409    pub(crate) fn log_state_mut(&mut self) -> &mut RaftLogState<T> {
410        &mut self.state_machine.log_state
411    }
412
413    /// Get a reference to the RaftLog statistics.
414    ///
415    /// This method returns a `Stat` struct containing statistics about the
416    /// RaftLog, including:
417    /// - The number of closed chunks
418    /// - The open chunk statistics
419    pub fn stat(&self) -> Stat<T> {
420        let closed =
421            self.wal.closed.values().map(|c| c.stat()).collect::<Vec<_>>();
422
423        let open = &self.wal.open;
424        let open_stat = ChunkStat {
425            chunk_id: open.chunk.chunk_id(),
426            records_count: open.chunk.records_count() as u64,
427            global_start: open.chunk.global_start(),
428            global_end: open.chunk.global_end(),
429            size: open.chunk.chunk_size(),
430            log_state: self.log_state().clone(),
431        };
432        let cache = self.state_machine.payload_cache.read().unwrap();
433
434        Stat {
435            closed_chunks: closed,
436            open_chunk: open_stat,
437
438            payload_cache_last_evictable: cache.last_evictable().cloned(),
439            payload_cache_item_count: cache.item_count() as u64,
440            payload_cache_max_item: cache.max_items() as u64,
441            payload_cache_size: cache.total_size() as u64,
442            payload_cache_capacity: cache.capacity() as u64,
443
444            payload_cache_miss: self
445                .access_stat
446                .cache_miss
447                .load(Ordering::Relaxed),
448            payload_cache_hit: self
449                .access_stat
450                .cache_hit
451                .load(Ordering::Relaxed),
452        }
453    }
454
455    /// Get a reference to the access statistics.
456    ///
457    /// This method returns a reference to the `AccessStat` struct, which
458    /// contains statistics about the access patterns of the RaftLog.
459    pub fn access_stat(&self) -> &AccessStat {
460        &self.access_stat
461    }
462
463    fn get_log_id(&self, index: u64) -> Result<T::LogId, RaftLogStateError<T>> {
464        let entry = self
465            .state_machine
466            .log
467            .get(&index)
468            .ok_or_else(|| LogIndexNotFound::new(index))?;
469        Ok(entry.log_id.clone())
470    }
471
472    fn append_and_apply(
473        &mut self,
474        rec: &WALRecord<T>,
475    ) -> Result<Segment, io::Error> {
476        WAL::append(&mut self.wal, rec)?;
477        StateMachine::apply(
478            &mut self.state_machine,
479            rec,
480            self.wal.open.chunk.chunk_id(),
481            self.wal.last_segment(),
482        )?;
483
484        self.wal
485            .try_close_full_chunk(|| self.state_machine.log_state.clone())?;
486
487        Ok(self.wal.last_segment())
488    }
489
490    /// Returns the current size of the log on disk in bytes.
491    ///
492    /// This includes all closed chunks and the open chunk, measuring from the
493    /// start of the earliest chunk to the end of the open chunk.
494    pub fn on_disk_size(&self) -> u64 {
495        let end = self.wal.open.chunk.global_end();
496        let open_start = self.wal.open.chunk.global_start();
497        let first_closed_start = self
498            .wal
499            .closed
500            .first_key_value()
501            .map(|(_, v)| v.chunk.global_start())
502            .unwrap_or(open_start);
503
504        end - first_closed_start
505    }
506}