Skip to main content

raft_log/raft_log/
raft_log.rs

1use std::collections::BTreeMap;
2use std::io;
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6use codeq::OffsetSize;
7use codeq::error_context_ext::ErrorContextExt;
8use log::info;
9
10use crate::ChunkId;
11use crate::Config;
12use crate::Types;
13use crate::WALRecord;
14use crate::api::raft_log_writer::RaftLogWriter;
15use crate::api::state_machine::StateMachine;
16use crate::api::wal::WAL;
17use crate::chunk::Chunk;
18use crate::chunk::closed_chunk::ClosedChunk;
19use crate::chunk::open_chunk::OpenChunk;
20use crate::errors::LogIndexNotFound;
21use crate::errors::RaftLogStateError;
22use crate::file_lock::FileLock;
23use crate::num::format_pad_u64;
24use crate::raft_log::access_state::AccessStat;
25use crate::raft_log::dump::RefDump;
26use crate::raft_log::dump_raft_log::DumpRaftLog;
27use crate::raft_log::stat::ChunkStat;
28use crate::raft_log::stat::Stat;
29use crate::raft_log::state_machine::RaftLogStateMachine;
30use crate::raft_log::state_machine::raft_log_state::RaftLogState;
31use crate::raft_log::wal::RaftLogWAL;
32use crate::types::Segment;
33
34/// RaftLog is a Write-Ahead-Log implementation for the Raft consensus protocol.
35///
36/// It provides persistent storage for Raft log entries and state, with the
37/// following features:
38/// - Append-only log storage with chunk-based organization
39/// - In-memory caching of log payloads
40/// - Exclusive file locking for thread-safe operations
41/// - Support for log truncation and purging
42/// - Statistics tracking for monitoring
43#[derive(Debug)]
44pub struct RaftLog<T: Types> {
45    pub(crate) config: Arc<Config>,
46
47    /// Acquire the dir exclusive lock when writing to the log.
48    _dir_lock: FileLock,
49
50    pub(crate) wal: RaftLogWAL<T>,
51
52    pub(crate) state_machine: RaftLogStateMachine<T>,
53
54    /// The chunk paths that are no longer needed because all logs in them are
55    /// purged. But removing them must be postponed until the purge record
56    /// is flushed to disk.
57    removed_chunks: Vec<String>,
58
59    access_stat: AccessStat,
60}
61
62impl<T: Types> RaftLogWriter<T> for RaftLog<T> {
63    fn save_user_data(
64        &mut self,
65        user_data: Option<T::UserData>,
66    ) -> Result<Segment, io::Error> {
67        let mut state = self.log_state().clone();
68        state.user_data = user_data;
69        let record = WALRecord::State(state);
70        self.append_and_apply(&record)
71    }
72
73    fn save_vote(&mut self, vote: T::Vote) -> Result<Segment, io::Error> {
74        let record = WALRecord::SaveVote(vote.clone());
75        self.append_and_apply(&record)
76    }
77
78    fn append<I>(&mut self, entries: I) -> Result<Segment, io::Error>
79    where I: IntoIterator<Item = (T::LogId, T::LogPayload)> {
80        for (log_id, payload) in entries {
81            let record = WALRecord::Append(log_id, payload);
82            self.append_and_apply(&record)?;
83        }
84        Ok(self.wal.last_segment())
85    }
86
87    /// Truncate at `index`, keep the record before `index`.
88    fn truncate(&mut self, index: u64) -> Result<Segment, io::Error> {
89        let purged = self.log_state().purged.as_ref();
90
91        let log_id = if index == T::next_log_index(purged) {
92            purged.cloned()
93        } else {
94            let log_id = self.get_log_id(index - 1)?;
95            Some(log_id)
96        };
97
98        let record = WALRecord::TruncateAfter(log_id);
99        self.append_and_apply(&record)
100    }
101
102    fn purge(&mut self, upto: T::LogId) -> Result<Segment, io::Error> {
103        // NOTE that only when the purge record is committed, the chunk file can
104        // be removed.
105
106        let purged = self.log_state().purged.as_ref();
107
108        info!(
109            "RaftLog purge upto: {:?}; current purged: {:?}",
110            upto, purged
111        );
112
113        if T::log_index(&upto) < T::next_log_index(purged) {
114            return Ok(self.wal.last_segment());
115        }
116
117        let record = WALRecord::PurgeUpto(upto.clone());
118        let res = self.append_and_apply(&record)?;
119
120        // Buffer the chunk ids to remove.
121        // After the purge record is flushed to disk,
122        // remove them in the FlushWorker
123
124        while let Some((_chunk_id, closed)) = self.wal.closed.first_key_value()
125        {
126            if closed.state.last.as_ref() > Some(&upto) {
127                break;
128            }
129            let (chunk_id, _r) = self.wal.closed.pop_first().unwrap();
130            let path = self.config.chunk_path(chunk_id);
131            info!(
132                "RaftLog: scheduled to remove chunk after next flush: {}",
133                path
134            );
135            self.removed_chunks.push(path);
136        }
137
138        Ok(res)
139    }
140
141    fn commit(&mut self, log_id: T::LogId) -> Result<Segment, io::Error> {
142        let record = WALRecord::Commit(log_id);
143        self.append_and_apply(&record)
144    }
145
146    fn flush(
147        &mut self,
148        callback: Option<T::Callback>,
149    ) -> Result<(), io::Error> {
150        self.wal.send_flush(callback)?;
151
152        if !self.removed_chunks.is_empty() {
153            let chunk_ids = self.removed_chunks.drain(..).collect::<Vec<_>>();
154            self.wal.send_remove_chunks(chunk_ids)?;
155        }
156
157        Ok(())
158    }
159}
160
161impl<T: Types> RaftLog<T> {
162    /// Dump the RaftLog data for debugging purposes.
163    ///
164    /// Returns a `DumpRaftLog` struct containing a complete snapshot of the
165    /// RaftLog state.
166    pub fn dump_data(&self) -> DumpRaftLog<T> {
167        let logs = self.state_machine.log.values().cloned().collect::<Vec<_>>();
168        let cache =
169            self.state_machine.payload_cache.read().unwrap().cache.clone();
170        let chunks = self.wal.closed.clone();
171
172        DumpRaftLog {
173            state: self.state_machine.log_state.clone(),
174            logs,
175            cache,
176            chunks,
177            cache_hit: 0,
178            cache_miss: 0,
179        }
180    }
181
182    /// Dump the WAL data in this Raft-log for debugging purposes.
183    ///
184    /// This method returns a reference type `RefDump` containing the RaftLog
185    /// configuration and the RaftLog instance itself.
186    pub fn dump(&self) -> RefDump<'_, T> {
187        RefDump {
188            config: self.config.clone(),
189            raft_log: self,
190        }
191    }
192
193    /// Get a reference to the RaftLog configuration.
194    pub fn config(&self) -> &Config {
195        self.config.as_ref()
196    }
197
198    /// Opens a RaftLog at the specified directory.
199    ///
200    /// This operation:
201    /// 1. Acquires an exclusive lock on the directory
202    /// 2. Loads existing chunks in order
203    /// 3. Replays WAL records to rebuild the state
204    /// 4. Creates a new open chunk for future writes
205    ///
206    /// # Errors
207    /// Returns an error if:
208    /// - Directory operations fail
209    /// - There are gaps between chunk offsets
210    /// - WAL records are invalid
211    pub fn open(config: Arc<Config>) -> Result<Self, io::Error> {
212        let dir_lock = FileLock::new(config.clone())
213            .context(|| format!("open RaftLog in '{}'", config.dir))?;
214
215        let chunk_ids = Self::load_chunk_ids(&config)?;
216
217        let mut sm = RaftLogStateMachine::new(&config);
218        let mut closed = BTreeMap::new();
219        let mut prev_end_offset = None;
220        let mut last_log_id = None;
221
222        for chunk_id in chunk_ids.iter().copied() {
223            // Only the last chunk(open chunk) needs to keep all log payload in
224            // cache. Therefore, payloads in previous chunks are marked as
225            // evictable.
226            sm.payload_cache.write().unwrap().set_last_evictable(last_log_id);
227
228            Self::ensure_consecutive_chunks(prev_end_offset, chunk_id)?;
229
230            let (chunk, records) = Chunk::open(config.clone(), chunk_id)?;
231
232            for (i, record) in records.into_iter().enumerate() {
233                let start = chunk.global_offsets[i];
234                let end = chunk.global_offsets[i + 1];
235                let seg = Segment::new(start, end - start);
236                sm.apply(&record, chunk_id, seg)?;
237            }
238
239            prev_end_offset = Some(chunk.last_segment().end().0);
240            last_log_id = sm.log_state.last.clone();
241
242            closed.insert(
243                chunk_id,
244                ClosedChunk::new(chunk, sm.log_state.clone()),
245            );
246        }
247
248        let open = Self::reopen_last_closed(&mut closed);
249
250        let open = if let Some(open) = open {
251            open
252        } else {
253            OpenChunk::create(
254                config.clone(),
255                ChunkId(prev_end_offset.unwrap_or_default()),
256                WALRecord::State(sm.log_state.clone()),
257            )?
258        };
259
260        let cache = sm.payload_cache.clone();
261
262        let wal = RaftLogWAL::new(config.clone(), closed, open, cache);
263
264        let s = Self {
265            config,
266            _dir_lock: dir_lock,
267            state_machine: sm,
268            wal,
269            access_stat: Default::default(),
270            removed_chunks: vec![],
271        };
272
273        Ok(s)
274    }
275
276    /// Verifies that two chunks are consecutive by checking their end/start
277    /// offsets.
278    ///
279    /// This function ensures that there are no gaps between chunks in the WAL.
280    /// A gap would indicate data loss or corruption.
281    ///
282    /// # Arguments
283    ///
284    /// * `prev_end_offset` - The end offset of the previous chunk, if any
285    /// * `chunk_id` - The ID of the current chunk to verify
286    fn ensure_consecutive_chunks(
287        prev_end_offset: Option<u64>,
288        chunk_id: ChunkId,
289    ) -> Result<(), io::Error> {
290        let Some(prev_end) = prev_end_offset else {
291            return Ok(());
292        };
293
294        if prev_end != chunk_id.offset() {
295            let message = format!(
296                "Gap between chunks: {} -> {}; Can not open, \
297                        fix this error and re-open",
298                format_pad_u64(prev_end),
299                format_pad_u64(chunk_id.offset()),
300            );
301            return Err(io::Error::new(io::ErrorKind::InvalidData, message));
302        }
303
304        Ok(())
305    }
306
307    /// If there is a healthy last chunk, re-open it.
308    ///
309    /// Healthy means the data is complete and the chunk is not truncated.
310    /// If reused, the closed chunk will be removed from `closed_chunks`
311    fn reopen_last_closed(
312        closed_chunks: &mut BTreeMap<ChunkId, ClosedChunk<T>>,
313    ) -> Option<OpenChunk<T>> {
314        // If the chunk is truncated, it is not healthy, do not re-open it.
315        {
316            let (_chunk_id, closed) = closed_chunks.iter().last()?;
317
318            if closed.chunk.truncated.is_some() {
319                return None;
320            }
321        }
322
323        let (_chunk_id, last) = closed_chunks.pop_last().unwrap();
324        let open = OpenChunk::new(last.chunk);
325        Some(open)
326    }
327
328    pub fn load_chunk_ids(config: &Config) -> Result<Vec<ChunkId>, io::Error> {
329        let path = &config.dir;
330        let entries = std::fs::read_dir(path)?;
331        let mut chunk_ids = vec![];
332        for entry in entries {
333            let entry = entry?;
334            let file_name = entry.file_name();
335
336            let fn_str = file_name.to_string_lossy();
337            if fn_str == FileLock::LOCK_FILE_NAME {
338                continue;
339            }
340
341            let res = Config::parse_chunk_file_name(&fn_str);
342
343            match res {
344                Ok(offset) => {
345                    chunk_ids.push(ChunkId(offset));
346                }
347                Err(err) => {
348                    log::warn!(
349                        "Ignore invalid WAL file name: '{}': {}",
350                        fn_str,
351                        err
352                    );
353                    continue;
354                }
355            };
356        }
357
358        chunk_ids.sort();
359
360        Ok(chunk_ids)
361    }
362
363    /// Update the RaftLog state.
364    ///
365    /// This method updates the RaftLog state with a new state and appends it
366    /// to the WAL.
367    pub fn update_state(
368        &mut self,
369        state: RaftLogState<T>,
370    ) -> Result<Segment, io::Error> {
371        let record = WALRecord::State(state);
372        self.append_and_apply(&record)
373    }
374
375    /// Reads log entries in the specified index range.
376    ///
377    /// Returns an iterator over log entries, attempting to serve them from
378    /// cache first, falling back to disk reads if necessary.
379    pub fn read(
380        &self,
381        from: u64,
382        to: u64,
383    ) -> impl Iterator<Item = Result<(T::LogId, T::LogPayload), io::Error>> + '_
384    {
385        self.state_machine.log.range(from..to).map(|(_, log_data)| {
386            let log_id = log_data.log_id.clone();
387
388            let payload =
389                self.state_machine.payload_cache.read().unwrap().get(&log_id);
390
391            let payload = if let Some(payload) = payload {
392                self.access_stat.cache_hit.fetch_add(1, Ordering::Relaxed);
393                payload
394            } else {
395                self.access_stat.cache_miss.fetch_add(1, Ordering::Relaxed);
396                self.wal.load_log_payload(log_data)?
397            };
398
399            Ok((log_id, payload))
400        })
401    }
402
403    /// Get a reference to the latest RaftLog state.
404    ///
405    /// The state is the latest state of the RaftLog, even if the corresponding
406    /// WAL record is not committed yet.
407    pub fn log_state(&self) -> &RaftLogState<T> {
408        &self.state_machine.log_state
409    }
410
411    #[allow(dead_code)]
412    pub(crate) fn log_state_mut(&mut self) -> &mut RaftLogState<T> {
413        &mut self.state_machine.log_state
414    }
415
416    /// Get a reference to the RaftLog statistics.
417    ///
418    /// This method returns a `Stat` struct containing statistics about the
419    /// RaftLog, including:
420    /// - The number of closed chunks
421    /// - The open chunk statistics
422    pub fn stat(&self) -> Stat<T> {
423        let closed =
424            self.wal.closed.values().map(|c| c.stat()).collect::<Vec<_>>();
425
426        let open = &self.wal.open;
427        let open_stat = ChunkStat {
428            chunk_id: open.chunk.chunk_id(),
429            records_count: open.chunk.records_count() as u64,
430            global_start: open.chunk.global_start(),
431            global_end: open.chunk.global_end(),
432            size: open.chunk.chunk_size(),
433            log_state: self.log_state().clone(),
434        };
435        let cache = self.state_machine.payload_cache.read().unwrap();
436
437        Stat {
438            closed_chunks: closed,
439            open_chunk: open_stat,
440
441            payload_cache_last_evictable: cache.last_evictable().cloned(),
442            payload_cache_item_count: cache.item_count() as u64,
443            payload_cache_max_item: cache.max_items() as u64,
444            payload_cache_size: cache.total_size() as u64,
445            payload_cache_capacity: cache.capacity() as u64,
446
447            payload_cache_miss: self
448                .access_stat
449                .cache_miss
450                .load(Ordering::Relaxed),
451            payload_cache_hit: self
452                .access_stat
453                .cache_hit
454                .load(Ordering::Relaxed),
455        }
456    }
457
458    /// Get a reference to the access statistics.
459    ///
460    /// This method returns a reference to the `AccessStat` struct, which
461    /// contains statistics about the access patterns of the RaftLog.
462    pub fn access_stat(&self) -> &AccessStat {
463        &self.access_stat
464    }
465
466    /// Block until the FlushWorker has processed all queued requests.
467    ///
468    /// After this returns, all file writes, syncs, and cache eviction boundary
469    /// updates issued before this call are complete. Note that the payload
470    /// cache item count may still be non-deterministic because eviction is
471    /// lazy; call `drain_cache_evictable()` afterwards to normalize it.
472    pub fn wait_worker_idle(&self) {
473        self.wal.wait_worker_idle();
474    }
475
476    /// Drain all evictable entries from the payload cache.
477    ///
478    /// Call after `wait_worker_idle()` to normalize the cache to a
479    /// deterministic state. See `PayloadCache::drain_evictable` for details.
480    pub fn drain_cache_evictable(&self) {
481        self.state_machine.payload_cache.write().unwrap().drain_evictable();
482    }
483
484    fn get_log_id(&self, index: u64) -> Result<T::LogId, RaftLogStateError<T>> {
485        let entry = self
486            .state_machine
487            .log
488            .get(&index)
489            .ok_or_else(|| LogIndexNotFound::new(index))?;
490        Ok(entry.log_id.clone())
491    }
492
493    fn append_and_apply(
494        &mut self,
495        rec: &WALRecord<T>,
496    ) -> Result<Segment, io::Error> {
497        WAL::append(&mut self.wal, rec)?;
498        StateMachine::apply(
499            &mut self.state_machine,
500            rec,
501            self.wal.open.chunk.chunk_id(),
502            self.wal.last_segment(),
503        )?;
504
505        self.wal
506            .try_close_full_chunk(|| self.state_machine.log_state.clone())?;
507
508        Ok(self.wal.last_segment())
509    }
510
511    /// Returns the current size of the log on disk in bytes.
512    ///
513    /// This includes all closed chunks and the open chunk, measuring from the
514    /// start of the earliest chunk to the end of the open chunk.
515    pub fn on_disk_size(&self) -> u64 {
516        let end = self.wal.open.chunk.global_end();
517        let open_start = self.wal.open.chunk.global_start();
518        let first_closed_start = self
519            .wal
520            .closed
521            .first_key_value()
522            .map(|(_, v)| v.chunk.global_start())
523            .unwrap_or(open_start);
524
525        end - first_closed_start
526    }
527}