raft_log/raft_log/wal/
wal_record.rs

1use std::fmt;
2use std::fmt::Formatter;
3use std::io;
4
5use byteorder::BigEndian;
6use byteorder::ReadBytesExt;
7use byteorder::WriteBytesExt;
8use codeq::config::CodeqConfig;
9use display_more::DisplayOptionExt;
10
11use crate::api::types::Types;
12use crate::raft_log::state_machine::raft_log_state::RaftLogState;
13use crate::types::Checksum;
14
15/// WALRecord represents different types of records that can be written to the
16/// Write-Ahead Log (WAL).
17/// Each variant corresponds to a specific operation in the Raft protocol.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum WALRecord<T: Types> {
20    /// Save vote change.
21    SaveVote(T::Vote),
22
23    /// Append new log entry.
24    Append(T::LogId, T::LogPayload),
25
26    /// Save committed log id.
27    Commit(T::LogId),
28
29    /// Truncate log entries after the specified log id.
30    TruncateAfter(Option<T::LogId>),
31
32    /// Purge log entries up to (and including) the specified log id.
33    PurgeUpto(T::LogId),
34
35    /// Save a snapshot of the complete state of the Raft log.
36    State(RaftLogState<T>),
37}
38
39impl<T: Types> WALRecord<T> {
40    /// Returns the numeric type identifier for this record
41    /// Used during encoding and decoding.
42    pub(crate) fn record_type(&self) -> u32 {
43        match self {
44            WALRecord::SaveVote(_) => 0,
45            WALRecord::Append(_, _) => 1,
46            WALRecord::Commit(_) => 2,
47            WALRecord::TruncateAfter(_) => 3,
48            WALRecord::PurgeUpto(_) => 4,
49            WALRecord::State(_) => 5,
50        }
51    }
52}
53
54impl<T> fmt::Display for WALRecord<T>
55where
56    T: Types,
57    T::Vote: fmt::Display,
58    T::LogId: fmt::Display,
59    T::LogPayload: fmt::Display,
60    RaftLogState<T>: fmt::Display,
61{
62    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
63        match self {
64            WALRecord::SaveVote(vote) => {
65                write!(f, "SaveVote({})", vote)
66            }
67            WALRecord::Append(log_id, payload) => {
68                write!(f, "Append(log_id: {}, payload: {})", log_id, payload)
69            }
70            WALRecord::Commit(log_id) => {
71                write!(f, "Commit({})", log_id)
72            }
73            WALRecord::TruncateAfter(option_log_id) => {
74                write!(f, "TruncateAfter({})", option_log_id.display())
75            }
76            WALRecord::PurgeUpto(log_id) => {
77                write!(f, "PurgeUpto({})", log_id)
78            }
79            WALRecord::State(raft_log_state) => {
80                write!(f, "RaftLogState({})", raft_log_state)
81            }
82        }
83    }
84}
85
86/// Implements encoding for WALRecord
87/// Each record is encoded as:
88/// - 4 bytes: record type
89/// - variable bytes: record payload
90/// - 4 bytes: checksum
91impl<T: Types> codeq::Encode for WALRecord<T> {
92    fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
93        let mut n = 0;
94        let mut cw = Checksum::new_writer(&mut w);
95
96        // record type
97        {
98            let typ = self.record_type();
99            cw.write_u32::<BigEndian>(typ)?;
100            n += 4;
101        }
102
103        // record payload
104        n += match self {
105            WALRecord::SaveVote(vote) => vote.encode(&mut cw)?,
106            WALRecord::Append(log_id, payload) => {
107                log_id.encode(&mut cw)? + payload.encode(&mut cw)?
108            }
109            WALRecord::Commit(log_id) => log_id.encode(&mut cw)?,
110            WALRecord::TruncateAfter(log_id) => log_id.encode(&mut cw)?,
111            WALRecord::PurgeUpto(log_id) => log_id.encode(&mut cw)?,
112            WALRecord::State(state) => state.encode(&mut cw)?,
113        };
114
115        // checksum
116        n += cw.write_checksum()?;
117
118        Ok(n)
119    }
120}
121
122/// Implements decoding for WALRecord
123/// Reads the record type, payload, and verifies the checksum
124impl<T: Types> codeq::Decode for WALRecord<T> {
125    fn decode<R: io::Read>(r: R) -> Result<Self, io::Error> {
126        let mut cr = Checksum::new_reader(r);
127
128        let record_type = cr.read_u32::<BigEndian>()?;
129
130        let rec = match record_type {
131            0 => Self::SaveVote(codeq::Decode::decode(&mut cr)?),
132            1 => Self::Append(
133                T::LogId::decode(&mut cr)?,
134                T::LogPayload::decode(&mut cr)?,
135            ),
136            2 => Self::Commit(T::LogId::decode(&mut cr)?),
137            3 => Self::TruncateAfter(Option::<T::LogId>::decode(&mut cr)?),
138            4 => Self::PurgeUpto(T::LogId::decode(&mut cr)?),
139            5 => Self::State(RaftLogState::decode(&mut cr)?),
140
141            _ => {
142                return Err(io::Error::new(
143                    io::ErrorKind::InvalidData,
144                    format!("Unknown record type: {}", record_type),
145                ));
146            }
147        };
148
149        cr.verify_checksum(|| "Record::decode()")?;
150
151        Ok(rec)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use std::io;
158
159    use codeq::testing::test_codec;
160    use display_more::DisplaySliceExt;
161
162    use crate::raft_log::state_machine::raft_log_state::RaftLogState;
163    use crate::raft_log::wal::wal_record::WALRecord;
164    use crate::testing::ss;
165    use crate::testing::TestDisplayTypes;
166    use crate::testing::TestTypes;
167
168    #[test]
169    fn test_record_codec_vote() -> Result<(), io::Error> {
170        let rec = WALRecord::<TestTypes>::SaveVote((1, 2));
171
172        let b = vec![
173            0, 0, 0, 0, // typ
174            0, 0, 0, 0, 0, 0, 0, 1, // vote.term
175            0, 0, 0, 0, 0, 0, 0, 2, // vote.voted_for
176            0, 0, 0, 0, 246, 160, 238, 226, // checksum
177        ];
178
179        test_codec(&b, &rec)
180    }
181
182    #[test]
183    fn test_record_codec_append() -> Result<(), io::Error> {
184        let rec = WALRecord::<TestTypes>::Append((1, 2), ss("hello"));
185
186        let b = vec![
187            0, 0, 0, 1, // typ
188            0, 0, 0, 0, 0, 0, 0, 1, // log_id.term
189            0, 0, 0, 0, 0, 0, 0, 2, // log_id.index
190            0, 0, 0, 5, // payload.len
191            104, 101, 108, 108, 111, // payload
192            0, 0, 0, 0, 167, 17, 197, 69, // checksum
193        ];
194
195        test_codec(&b, &rec)
196    }
197
198    #[test]
199    fn test_record_codec_commit() -> Result<(), io::Error> {
200        let rec = WALRecord::<TestTypes>::Commit((1, 2));
201
202        let b = vec![
203            0, 0, 0, 2, // typ
204            0, 0, 0, 0, 0, 0, 0, 1, // commit.term
205            0, 0, 0, 0, 0, 0, 0, 2, // commit.voted_for
206            0, 0, 0, 0, 34, 156, 126, 37, // checksum
207        ];
208
209        test_codec(&b, &rec)
210    }
211
212    #[test]
213    fn test_record_codec_truncate_after() -> Result<(), io::Error> {
214        let rec = WALRecord::<TestTypes>::TruncateAfter(Some((1, 2)));
215
216        let b = vec![
217            0, 0, 0, 3, // typ
218            1, // Some
219            0, 0, 0, 0, 0, 0, 0, 1, // log_id.term
220            0, 0, 0, 0, 0, 0, 0, 2, // log_id.index.
221            0, 0, 0, 0, 213, 81, 166, 197, // checksum
222        ];
223
224        test_codec(&b, &rec)
225    }
226
227    #[test]
228    fn test_record_codec_purge_upto() -> Result<(), io::Error> {
229        let rec = WALRecord::<TestTypes>::PurgeUpto((1, 2));
230
231        let b = vec![
232            0, 0, 0, 4, // typ
233            0, 0, 0, 0, 0, 0, 0, 1, // log_id.term
234            0, 0, 0, 0, 0, 0, 0, 2, // log_id.index
235            0, 0, 0, 0, 133, 168, 201, 45, // checksum
236        ];
237
238        test_codec(&b, &rec)
239    }
240
241    #[test]
242    fn test_record_codec_state() -> Result<(), io::Error> {
243        let rec = WALRecord::<TestTypes>::State(RaftLogState {
244            vote: Some((1, 2)),
245            last: Some((2, 3)),
246            committed: Some((4, 5)),
247            purged: Some((6, 7)),
248            user_data: Some(ss("hello")),
249        });
250
251        let b = vec![
252            0, 0, 0, 5, // typ
253            1, // version
254            1, // Some
255            0, 0, 0, 0, 0, 0, 0, 1, // vote.term
256            0, 0, 0, 0, 0, 0, 0, 2, // vote.voted_for
257            1, // Some
258            0, 0, 0, 0, 0, 0, 0, 2, // last.term
259            0, 0, 0, 0, 0, 0, 0, 3, // last.index
260            1, // Some
261            0, 0, 0, 0, 0, 0, 0, 4, // committed.term
262            0, 0, 0, 0, 0, 0, 0, 5, // committed.index
263            1, // Some
264            0, 0, 0, 0, 0, 0, 0, 6, // purged.term
265            0, 0, 0, 0, 0, 0, 0, 7, // purged.index
266            1, // Some
267            0, 0, 0, 5, // user_data.len
268            104, 101, 108, 108, 111, // user_data
269            0, 0, 0, 0, 121, 106, 111, 41, // checksum
270        ];
271
272        test_codec(&b, &rec)
273    }
274
275    #[test]
276    fn test_wal_record_display() {
277        let records = [
278            WALRecord::<TestDisplayTypes>::SaveVote(1),
279            WALRecord::<TestDisplayTypes>::Append(3u64, "hello".to_string()),
280            WALRecord::<TestDisplayTypes>::Commit(5u64),
281            WALRecord::<TestDisplayTypes>::TruncateAfter(Some(7u64)),
282            WALRecord::<TestDisplayTypes>::PurgeUpto(9u64),
283            WALRecord::<TestDisplayTypes>::State(RaftLogState {
284                vote: Some(1),
285                last: Some(3),
286                committed: Some(4),
287                purged: Some(6),
288                user_data: Some("hello".to_string()),
289            }),
290        ];
291
292        let got = format!("{}", records.display_n(1000));
293
294        let want =
295        "[SaveVote(1),Append(log_id: 3, payload: hello),Commit(5),TruncateAfter(7),PurgeUpto(9),RaftLogState(RaftLogState(vote: 1, last: 3, committed: 4, purged: 6, user_data: hello))]";
296
297        assert_eq!(want, got);
298    }
299}