1use std::fmt;
2use std::fmt::Formatter;
3use std::io;
4
5use byteorder::BigEndian;
6use byteorder::ReadBytesExt;
7use byteorder::WriteBytesExt;
8use codeq::config::CodeqConfig;
9use display_more::DisplayOptionExt;
10
11use crate::api::types::Types;
12use crate::raft_log::state_machine::raft_log_state::RaftLogState;
13use crate::types::Checksum;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum WALRecord<T: Types> {
20 SaveVote(T::Vote),
22
23 Append(T::LogId, T::LogPayload),
25
26 Commit(T::LogId),
28
29 TruncateAfter(Option<T::LogId>),
31
32 PurgeUpto(T::LogId),
34
35 State(RaftLogState<T>),
37}
38
39impl<T: Types> WALRecord<T> {
40 pub(crate) fn record_type(&self) -> u32 {
43 match self {
44 WALRecord::SaveVote(_) => 0,
45 WALRecord::Append(_, _) => 1,
46 WALRecord::Commit(_) => 2,
47 WALRecord::TruncateAfter(_) => 3,
48 WALRecord::PurgeUpto(_) => 4,
49 WALRecord::State(_) => 5,
50 }
51 }
52}
53
54impl<T> fmt::Display for WALRecord<T>
55where
56 T: Types,
57 T::Vote: fmt::Display,
58 T::LogId: fmt::Display,
59 T::LogPayload: fmt::Display,
60 RaftLogState<T>: fmt::Display,
61{
62 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
63 match self {
64 WALRecord::SaveVote(vote) => {
65 write!(f, "SaveVote({})", vote)
66 }
67 WALRecord::Append(log_id, payload) => {
68 write!(f, "Append(log_id: {}, payload: {})", log_id, payload)
69 }
70 WALRecord::Commit(log_id) => {
71 write!(f, "Commit({})", log_id)
72 }
73 WALRecord::TruncateAfter(option_log_id) => {
74 write!(f, "TruncateAfter({})", option_log_id.display())
75 }
76 WALRecord::PurgeUpto(log_id) => {
77 write!(f, "PurgeUpto({})", log_id)
78 }
79 WALRecord::State(raft_log_state) => {
80 write!(f, "RaftLogState({})", raft_log_state)
81 }
82 }
83 }
84}
85
86impl<T: Types> codeq::Encode for WALRecord<T> {
92 fn encode<W: io::Write>(&self, mut w: W) -> Result<usize, io::Error> {
93 let mut n = 0;
94 let mut cw = Checksum::new_writer(&mut w);
95
96 {
98 let typ = self.record_type();
99 cw.write_u32::<BigEndian>(typ)?;
100 n += 4;
101 }
102
103 n += match self {
105 WALRecord::SaveVote(vote) => vote.encode(&mut cw)?,
106 WALRecord::Append(log_id, payload) => {
107 log_id.encode(&mut cw)? + payload.encode(&mut cw)?
108 }
109 WALRecord::Commit(log_id) => log_id.encode(&mut cw)?,
110 WALRecord::TruncateAfter(log_id) => log_id.encode(&mut cw)?,
111 WALRecord::PurgeUpto(log_id) => log_id.encode(&mut cw)?,
112 WALRecord::State(state) => state.encode(&mut cw)?,
113 };
114
115 n += cw.write_checksum()?;
117
118 Ok(n)
119 }
120}
121
122impl<T: Types> codeq::Decode for WALRecord<T> {
125 fn decode<R: io::Read>(r: R) -> Result<Self, io::Error> {
126 let mut cr = Checksum::new_reader(r);
127
128 let record_type = cr.read_u32::<BigEndian>()?;
129
130 let rec = match record_type {
131 0 => Self::SaveVote(codeq::Decode::decode(&mut cr)?),
132 1 => Self::Append(
133 T::LogId::decode(&mut cr)?,
134 T::LogPayload::decode(&mut cr)?,
135 ),
136 2 => Self::Commit(T::LogId::decode(&mut cr)?),
137 3 => Self::TruncateAfter(Option::<T::LogId>::decode(&mut cr)?),
138 4 => Self::PurgeUpto(T::LogId::decode(&mut cr)?),
139 5 => Self::State(RaftLogState::decode(&mut cr)?),
140
141 _ => {
142 return Err(io::Error::new(
143 io::ErrorKind::InvalidData,
144 format!("Unknown record type: {}", record_type),
145 ));
146 }
147 };
148
149 cr.verify_checksum(|| "Record::decode()")?;
150
151 Ok(rec)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use std::io;
158
159 use codeq::testing::test_codec;
160 use display_more::DisplaySliceExt;
161
162 use crate::raft_log::state_machine::raft_log_state::RaftLogState;
163 use crate::raft_log::wal::wal_record::WALRecord;
164 use crate::testing::ss;
165 use crate::testing::TestDisplayTypes;
166 use crate::testing::TestTypes;
167
168 #[test]
169 fn test_record_codec_vote() -> Result<(), io::Error> {
170 let rec = WALRecord::<TestTypes>::SaveVote((1, 2));
171
172 let b = vec![
173 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 246, 160, 238, 226, ];
178
179 test_codec(&b, &rec)
180 }
181
182 #[test]
183 fn test_record_codec_append() -> Result<(), io::Error> {
184 let rec = WALRecord::<TestTypes>::Append((1, 2), ss("hello"));
185
186 let b = vec![
187 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 167, 17, 197, 69, ];
194
195 test_codec(&b, &rec)
196 }
197
198 #[test]
199 fn test_record_codec_commit() -> Result<(), io::Error> {
200 let rec = WALRecord::<TestTypes>::Commit((1, 2));
201
202 let b = vec![
203 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 34, 156, 126, 37, ];
208
209 test_codec(&b, &rec)
210 }
211
212 #[test]
213 fn test_record_codec_truncate_after() -> Result<(), io::Error> {
214 let rec = WALRecord::<TestTypes>::TruncateAfter(Some((1, 2)));
215
216 let b = vec![
217 0, 0, 0, 3, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 213, 81, 166, 197, ];
223
224 test_codec(&b, &rec)
225 }
226
227 #[test]
228 fn test_record_codec_purge_upto() -> Result<(), io::Error> {
229 let rec = WALRecord::<TestTypes>::PurgeUpto((1, 2));
230
231 let b = vec![
232 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 133, 168, 201, 45, ];
237
238 test_codec(&b, &rec)
239 }
240
241 #[test]
242 fn test_record_codec_state() -> Result<(), io::Error> {
243 let rec = WALRecord::<TestTypes>::State(RaftLogState {
244 vote: Some((1, 2)),
245 last: Some((2, 3)),
246 committed: Some((4, 5)),
247 purged: Some((6, 7)),
248 user_data: Some(ss("hello")),
249 });
250
251 let b = vec![
252 0, 0, 0, 5, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 1, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7, 1, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 121, 106, 111, 41, ];
271
272 test_codec(&b, &rec)
273 }
274
275 #[test]
276 fn test_wal_record_display() {
277 let records = [
278 WALRecord::<TestDisplayTypes>::SaveVote(1),
279 WALRecord::<TestDisplayTypes>::Append(3u64, "hello".to_string()),
280 WALRecord::<TestDisplayTypes>::Commit(5u64),
281 WALRecord::<TestDisplayTypes>::TruncateAfter(Some(7u64)),
282 WALRecord::<TestDisplayTypes>::PurgeUpto(9u64),
283 WALRecord::<TestDisplayTypes>::State(RaftLogState {
284 vote: Some(1),
285 last: Some(3),
286 committed: Some(4),
287 purged: Some(6),
288 user_data: Some("hello".to_string()),
289 }),
290 ];
291
292 let got = format!("{}", records.display_n(1000));
293
294 let want =
295 "[SaveVote(1),Append(log_id: 3, payload: hello),Commit(5),TruncateAfter(7),PurgeUpto(9),RaftLogState(RaftLogState(vote: 1, last: 3, committed: 4, purged: 6, user_data: hello))]";
296
297 assert_eq!(want, got);
298 }
299}