1use std::io;
2
3use byteorder::BigEndian;
4use byteorder::ReadBytesExt;
5use byteorder::WriteBytesExt;
6use codeq::config::CodeqConfig;
7
8use crate::api::types::Types;
9use crate::raft_log::state_machine::raft_log_state::RaftLogState;
10use crate::types::Checksum;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum WALRecord<T: Types> {
17 SaveVote(T::Vote),
19
20 Append(T::LogId, T::LogPayload),
22
23 Commit(T::LogId),
25
26 TruncateAfter(Option<T::LogId>),
28
29 PurgeUpto(T::LogId),
31
32 State(RaftLogState<T>),
34}
35
36impl<T: Types> WALRecord<T> {
37 pub(crate) fn record_type(&self) -> u32 {
40 match self {
41 WALRecord::SaveVote(_) => 0,
42 WALRecord::Append(_, _) => 1,
43 WALRecord::Commit(_) => 2,
44 WALRecord::TruncateAfter(_) => 3,
45 WALRecord::PurgeUpto(_) => 4,
46 WALRecord::State(_) => 5,
47 }
48 }
49}
50
51impl<T: Types> codeq::Encode for WALRecord<T> {
57 fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
58 let mut n = 0;
59 let mut cw = Checksum::new_writer(&mut w);
60
61 {
63 let typ = self.record_type();
64 cw.write_u32::<BigEndian>(typ)?;
65 n += 4;
66 }
67
68 n += match self {
70 WALRecord::SaveVote(vote) => vote.encode(&mut cw)?,
71 WALRecord::Append(log_id, payload) => {
72 log_id.encode(&mut cw)? + payload.encode(&mut cw)?
73 }
74 WALRecord::Commit(log_id) => log_id.encode(&mut cw)?,
75 WALRecord::TruncateAfter(log_id) => log_id.encode(&mut cw)?,
76 WALRecord::PurgeUpto(log_id) => log_id.encode(&mut cw)?,
77 WALRecord::State(state) => state.encode(&mut cw)?,
78 };
79
80 n += cw.write_checksum()?;
82
83 Ok(n)
84 }
85}
86
87impl<T: Types> codeq::Decode for WALRecord<T> {
90 fn decode<R: io::Read>(r: R) -> Result<Self, io::Error> {
91 let mut cr = Checksum::new_reader(r);
92
93 let record_type = cr.read_u32::<BigEndian>()?;
94
95 let rec = match record_type {
96 0 => Self::SaveVote(codeq::Decode::decode(&mut cr)?),
97 1 => Self::Append(
98 T::LogId::decode(&mut cr)?,
99 T::LogPayload::decode(&mut cr)?,
100 ),
101 2 => Self::Commit(T::LogId::decode(&mut cr)?),
102 3 => Self::TruncateAfter(Option::<T::LogId>::decode(&mut cr)?),
103 4 => Self::PurgeUpto(T::LogId::decode(&mut cr)?),
104 5 => Self::State(RaftLogState::decode(&mut cr)?),
105
106 _ => {
107 return Err(io::Error::new(
108 io::ErrorKind::InvalidData,
109 format!("Unknown record type: {}", record_type),
110 ));
111 }
112 };
113
114 cr.verify_checksum(|| "Record::decode()")?;
115
116 Ok(rec)
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use std::io;
123
124 use codeq::testing::test_codec;
125
126 use crate::raft_log::state_machine::raft_log_state::RaftLogState;
127 use crate::raft_log::wal::wal_record::WALRecord;
128 use crate::testing::ss;
129 use crate::testing::TestTypes;
130
131 #[test]
132 fn test_record_codec_vote() -> Result<(), io::Error> {
133 let rec = WALRecord::<TestTypes>::SaveVote((1, 2));
134
135 let b = vec![
136 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 246, 160, 238, 226, ];
141
142 test_codec(&b, &rec)
143 }
144
145 #[test]
146 fn test_record_codec_append() -> Result<(), io::Error> {
147 let rec = WALRecord::<TestTypes>::Append((1, 2), ss("hello"));
148
149 let b = vec![
150 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 167, 17, 197, 69, ];
157
158 test_codec(&b, &rec)
159 }
160
161 #[test]
162 fn test_record_codec_commit() -> Result<(), io::Error> {
163 let rec = WALRecord::<TestTypes>::Commit((1, 2));
164
165 let b = vec![
166 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 34, 156, 126, 37, ];
171
172 test_codec(&b, &rec)
173 }
174
175 #[test]
176 fn test_record_codec_truncate_after() -> Result<(), io::Error> {
177 let rec = WALRecord::<TestTypes>::TruncateAfter(Some((1, 2)));
178
179 let b = vec![
180 0, 0, 0, 3, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 213, 81, 166, 197, ];
186
187 test_codec(&b, &rec)
188 }
189
190 #[test]
191 fn test_record_codec_purge_upto() -> Result<(), io::Error> {
192 let rec = WALRecord::<TestTypes>::PurgeUpto((1, 2));
193
194 let b = vec![
195 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 133, 168, 201, 45, ];
200
201 test_codec(&b, &rec)
202 }
203
204 #[test]
205 fn test_record_codec_state() -> Result<(), io::Error> {
206 let rec = WALRecord::<TestTypes>::State(RaftLogState {
207 vote: Some((1, 2)),
208 last: Some((2, 3)),
209 committed: Some((4, 5)),
210 purged: Some((6, 7)),
211 user_data: Some(ss("hello")),
212 });
213
214 let b = vec![
215 0, 0, 0, 5, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 1, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7, 1, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 121, 106, 111, 41, ];
234
235 test_codec(&b, &rec)
236 }
237}