raft_log/raft_log/wal/
wal_record.rs

1use std::io;
2
3use byteorder::BigEndian;
4use byteorder::ReadBytesExt;
5use byteorder::WriteBytesExt;
6use codeq::config::CodeqConfig;
7
8use crate::api::types::Types;
9use crate::raft_log::state_machine::raft_log_state::RaftLogState;
10use crate::types::Checksum;
11
12/// WALRecord represents different types of records that can be written to the
13/// Write-Ahead Log (WAL).
14/// Each variant corresponds to a specific operation in the Raft protocol.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum WALRecord<T: Types> {
17    /// Save vote change.
18    SaveVote(T::Vote),
19
20    /// Append new log entry.
21    Append(T::LogId, T::LogPayload),
22
23    /// Save committed log id.
24    Commit(T::LogId),
25
26    /// Truncate log entries after the specified log id.
27    TruncateAfter(Option<T::LogId>),
28
29    /// Purge log entries up to (and including) the specified log id.
30    PurgeUpto(T::LogId),
31
32    /// Save a snapshot of the complete state of the Raft log.
33    State(RaftLogState<T>),
34}
35
36impl<T: Types> WALRecord<T> {
37    /// Returns the numeric type identifier for this record
38    /// Used during encoding and decoding.
39    pub(crate) fn record_type(&self) -> u32 {
40        match self {
41            WALRecord::SaveVote(_) => 0,
42            WALRecord::Append(_, _) => 1,
43            WALRecord::Commit(_) => 2,
44            WALRecord::TruncateAfter(_) => 3,
45            WALRecord::PurgeUpto(_) => 4,
46            WALRecord::State(_) => 5,
47        }
48    }
49}
50
51/// Implements encoding for WALRecord
52/// Each record is encoded as:
53/// - 4 bytes: record type
54/// - variable bytes: record payload
55/// - 4 bytes: checksum
56impl<T: Types> codeq::Encode for WALRecord<T> {
57    fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
58        let mut n = 0;
59        let mut cw = Checksum::new_writer(&mut w);
60
61        // record type
62        {
63            let typ = self.record_type();
64            cw.write_u32::<BigEndian>(typ)?;
65            n += 4;
66        }
67
68        // record payload
69        n += match self {
70            WALRecord::SaveVote(vote) => vote.encode(&mut cw)?,
71            WALRecord::Append(log_id, payload) => {
72                log_id.encode(&mut cw)? + payload.encode(&mut cw)?
73            }
74            WALRecord::Commit(log_id) => log_id.encode(&mut cw)?,
75            WALRecord::TruncateAfter(log_id) => log_id.encode(&mut cw)?,
76            WALRecord::PurgeUpto(log_id) => log_id.encode(&mut cw)?,
77            WALRecord::State(state) => state.encode(&mut cw)?,
78        };
79
80        // checksum
81        n += cw.write_checksum()?;
82
83        Ok(n)
84    }
85}
86
87/// Implements decoding for WALRecord
88/// Reads the record type, payload, and verifies the checksum
89impl<T: Types> codeq::Decode for WALRecord<T> {
90    fn decode<R: io::Read>(r: R) -> Result<Self, io::Error> {
91        let mut cr = Checksum::new_reader(r);
92
93        let record_type = cr.read_u32::<BigEndian>()?;
94
95        let rec = match record_type {
96            0 => Self::SaveVote(codeq::Decode::decode(&mut cr)?),
97            1 => Self::Append(
98                T::LogId::decode(&mut cr)?,
99                T::LogPayload::decode(&mut cr)?,
100            ),
101            2 => Self::Commit(T::LogId::decode(&mut cr)?),
102            3 => Self::TruncateAfter(Option::<T::LogId>::decode(&mut cr)?),
103            4 => Self::PurgeUpto(T::LogId::decode(&mut cr)?),
104            5 => Self::State(RaftLogState::decode(&mut cr)?),
105
106            _ => {
107                return Err(io::Error::new(
108                    io::ErrorKind::InvalidData,
109                    format!("Unknown record type: {}", record_type),
110                ));
111            }
112        };
113
114        cr.verify_checksum(|| "Record::decode()")?;
115
116        Ok(rec)
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use std::io;
123
124    use codeq::testing::test_codec;
125
126    use crate::raft_log::state_machine::raft_log_state::RaftLogState;
127    use crate::raft_log::wal::wal_record::WALRecord;
128    use crate::testing::ss;
129    use crate::testing::TestTypes;
130
131    #[test]
132    fn test_record_codec_vote() -> Result<(), io::Error> {
133        let rec = WALRecord::<TestTypes>::SaveVote((1, 2));
134
135        let b = vec![
136            0, 0, 0, 0, // typ
137            0, 0, 0, 0, 0, 0, 0, 1, // vote.term
138            0, 0, 0, 0, 0, 0, 0, 2, // vote.voted_for
139            0, 0, 0, 0, 246, 160, 238, 226, // checksum
140        ];
141
142        test_codec(&b, &rec)
143    }
144
145    #[test]
146    fn test_record_codec_append() -> Result<(), io::Error> {
147        let rec = WALRecord::<TestTypes>::Append((1, 2), ss("hello"));
148
149        let b = vec![
150            0, 0, 0, 1, // typ
151            0, 0, 0, 0, 0, 0, 0, 1, // log_id.term
152            0, 0, 0, 0, 0, 0, 0, 2, // log_id.index
153            0, 0, 0, 5, // payload.len
154            104, 101, 108, 108, 111, // payload
155            0, 0, 0, 0, 167, 17, 197, 69, // checksum
156        ];
157
158        test_codec(&b, &rec)
159    }
160
161    #[test]
162    fn test_record_codec_commit() -> Result<(), io::Error> {
163        let rec = WALRecord::<TestTypes>::Commit((1, 2));
164
165        let b = vec![
166            0, 0, 0, 2, // typ
167            0, 0, 0, 0, 0, 0, 0, 1, // commit.term
168            0, 0, 0, 0, 0, 0, 0, 2, // commit.voted_for
169            0, 0, 0, 0, 34, 156, 126, 37, // checksum
170        ];
171
172        test_codec(&b, &rec)
173    }
174
175    #[test]
176    fn test_record_codec_truncate_after() -> Result<(), io::Error> {
177        let rec = WALRecord::<TestTypes>::TruncateAfter(Some((1, 2)));
178
179        let b = vec![
180            0, 0, 0, 3, // typ
181            1, // Some
182            0, 0, 0, 0, 0, 0, 0, 1, // log_id.term
183            0, 0, 0, 0, 0, 0, 0, 2, // log_id.index.
184            0, 0, 0, 0, 213, 81, 166, 197, // checksum
185        ];
186
187        test_codec(&b, &rec)
188    }
189
190    #[test]
191    fn test_record_codec_purge_upto() -> Result<(), io::Error> {
192        let rec = WALRecord::<TestTypes>::PurgeUpto((1, 2));
193
194        let b = vec![
195            0, 0, 0, 4, // typ
196            0, 0, 0, 0, 0, 0, 0, 1, // log_id.term
197            0, 0, 0, 0, 0, 0, 0, 2, // log_id.index
198            0, 0, 0, 0, 133, 168, 201, 45, // checksum
199        ];
200
201        test_codec(&b, &rec)
202    }
203
204    #[test]
205    fn test_record_codec_state() -> Result<(), io::Error> {
206        let rec = WALRecord::<TestTypes>::State(RaftLogState {
207            vote: Some((1, 2)),
208            last: Some((2, 3)),
209            committed: Some((4, 5)),
210            purged: Some((6, 7)),
211            user_data: Some(ss("hello")),
212        });
213
214        let b = vec![
215            0, 0, 0, 5, // typ
216            1, // version
217            1, // Some
218            0, 0, 0, 0, 0, 0, 0, 1, // vote.term
219            0, 0, 0, 0, 0, 0, 0, 2, // vote.voted_for
220            1, // Some
221            0, 0, 0, 0, 0, 0, 0, 2, // last.term
222            0, 0, 0, 0, 0, 0, 0, 3, // last.index
223            1, // Some
224            0, 0, 0, 0, 0, 0, 0, 4, // committed.term
225            0, 0, 0, 0, 0, 0, 0, 5, // committed.index
226            1, // Some
227            0, 0, 0, 0, 0, 0, 0, 6, // purged.term
228            0, 0, 0, 0, 0, 0, 0, 7, // purged.index
229            1, // Some
230            0, 0, 0, 5, // user_data.len
231            104, 101, 108, 108, 111, // user_data
232            0, 0, 0, 0, 121, 106, 111, 41, // checksum
233        ];
234
235        test_codec(&b, &rec)
236    }
237}