arc_malachitebft_engine/wal/
entry.rs1use std::io::{self, Read, Write};
2
3use byteorder::{ReadBytesExt, WriteBytesExt, BE};
4
5use malachitebft_codec::Codec;
6use malachitebft_core_consensus::{ProposedValue, SignedConsensusMsg};
7use malachitebft_core_types::{Context, Round, Timeout};
8
9pub trait WalCodec<Ctx>
14where
15 Ctx: Context,
16 Self: Codec<SignedConsensusMsg<Ctx>>,
17 Self: Codec<ProposedValue<Ctx>>,
18{
19}
20
21impl<Ctx, C> WalCodec<Ctx> for C
22where
23 Ctx: Context,
24 C: Codec<SignedConsensusMsg<Ctx>>,
25 C: Codec<ProposedValue<Ctx>>,
26{
27}
28
29pub use malachitebft_core_consensus::WalEntry;
30
31const TAG_CONSENSUS: u8 = 0x01;
32const TAG_TIMEOUT: u8 = 0x02;
33const TAG_PROPOSED_VALUE: u8 = 0x04;
34
35pub fn encode_entry<Ctx, C, W>(entry: &WalEntry<Ctx>, codec: &C, buf: W) -> io::Result<()>
36where
37 Ctx: Context,
38 C: WalCodec<Ctx>,
39 W: Write,
40{
41 match entry {
42 WalEntry::ConsensusMsg(msg) => encode_consensus_msg(TAG_CONSENSUS, msg, codec, buf),
43 WalEntry::Timeout(timeout) => encode_timeout(TAG_TIMEOUT, timeout, buf),
44 WalEntry::ProposedValue(value) => {
45 encode_proposed_value(TAG_PROPOSED_VALUE, value, codec, buf)
46 }
47 }
48}
49
50pub fn decode_entry<Ctx, C, R>(codec: &C, mut buf: R) -> io::Result<WalEntry<Ctx>>
51where
52 Ctx: Context,
53 C: WalCodec<Ctx>,
54 R: Read,
55{
56 let tag = buf.read_u8()?;
57
58 match tag {
59 TAG_CONSENSUS => decode_consensus_msg(codec, buf).map(WalEntry::ConsensusMsg),
60 TAG_TIMEOUT => decode_timeout(buf).map(WalEntry::Timeout),
61 TAG_PROPOSED_VALUE => decode_proposed_value(codec, buf).map(WalEntry::ProposedValue),
62 _ => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid tag")),
63 }
64}
65
66fn encode_consensus_msg<Ctx, C, W>(
68 tag: u8,
69 msg: &SignedConsensusMsg<Ctx>,
70 codec: &C,
71 mut buf: W,
72) -> io::Result<()>
73where
74 Ctx: Context,
75 C: WalCodec<Ctx>,
76 W: Write,
77{
78 let bytes = codec.encode(msg).map_err(|e| {
79 io::Error::new(
80 io::ErrorKind::InvalidData,
81 format!("failed to encode consensus message: {e}"),
82 )
83 })?;
84
85 buf.write_u8(tag)?;
87
88 buf.write_u64::<BE>(bytes.len() as u64)?;
90
91 buf.write_all(&bytes)?;
93
94 Ok(())
95}
96
97fn decode_consensus_msg<Ctx, C, R>(codec: &C, mut buf: R) -> io::Result<SignedConsensusMsg<Ctx>>
98where
99 Ctx: Context,
100 C: WalCodec<Ctx>,
101 R: Read,
102{
103 let len = buf.read_u64::<BE>()?;
104 let mut bytes = vec![0; len as usize];
105 buf.read_exact(&mut bytes)?;
106
107 codec.decode(bytes.into()).map_err(|e| {
108 io::Error::new(
109 io::ErrorKind::InvalidData,
110 format!("failed to decode consensus msg: {e}"),
111 )
112 })
113}
114
115fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
117 use malachitebft_core_types::TimeoutKind;
118
119 let step = match timeout.kind {
120 TimeoutKind::Propose => 1,
121 TimeoutKind::Prevote => 2,
122 TimeoutKind::Precommit => 3,
123
124 TimeoutKind::Rebroadcast => 7,
129 TimeoutKind::FinalizeHeight(_) => {
130 panic!("FinalizeHeight timeout should not be written to WAL")
132 }
133 };
134
135 buf.write_u8(tag)?;
136 buf.write_u8(step)?;
137 buf.write_i64::<BE>(timeout.round.as_i64())?;
138
139 Ok(())
140}
141
142fn decode_timeout(mut buf: impl Read) -> io::Result<Timeout> {
143 use malachitebft_core_types::TimeoutKind;
144
145 let step = match buf.read_u8()? {
146 1 => TimeoutKind::Propose,
147 2 => TimeoutKind::Prevote,
148 3 => TimeoutKind::Precommit,
149
150 4 => {
153 return Err(io::Error::new(
154 io::ErrorKind::InvalidData,
155 "commit timeouts are no longer supported, ignoring",
156 ))
157 }
158
159 5 | 6 => {
162 return Err(io::Error::new(
163 io::ErrorKind::InvalidData,
164 "prevote/precommit time limit timeouts are no longer supported, ignoring",
165 ))
166 }
167
168 7 => TimeoutKind::Rebroadcast,
171
172 8 => {
174 return Err(io::Error::new(
175 io::ErrorKind::InvalidData,
176 "FinalizeHeight timeouts are not persisted to WAL, ignoring",
177 ))
178 }
179
180 _ => {
181 return Err(io::Error::new(
182 io::ErrorKind::InvalidData,
183 "invalid timeout step",
184 ))
185 }
186 };
187
188 let round = Round::from(buf.read_i64::<BE>()?);
189
190 Ok(Timeout::new(round, step))
191}
192
193fn encode_proposed_value<Ctx, C, W>(
195 tag: u8,
196 value: &ProposedValue<Ctx>,
197 codec: &C,
198 mut buf: W,
199) -> io::Result<()>
200where
201 Ctx: Context,
202 C: WalCodec<Ctx>,
203 W: Write,
204{
205 let bytes = codec.encode(value).map_err(|e| {
206 io::Error::new(
207 io::ErrorKind::InvalidData,
208 format!("failed to encode consensus message: {e}"),
209 )
210 })?;
211
212 buf.write_u8(tag)?;
214
215 buf.write_u64::<BE>(bytes.len() as u64)?;
217
218 buf.write_all(&bytes)?;
220
221 Ok(())
222}
223
224fn decode_proposed_value<Ctx, C, R>(codec: &C, mut buf: R) -> io::Result<ProposedValue<Ctx>>
225where
226 Ctx: Context,
227 C: WalCodec<Ctx>,
228 R: Read,
229{
230 let len = buf.read_u64::<BE>()?;
231 let mut bytes = vec![0; len as usize];
232 buf.read_exact(&mut bytes)?;
233
234 codec.decode(bytes.into()).map_err(|e| {
235 io::Error::new(
236 io::ErrorKind::InvalidData,
237 format!("failed to decode proposed value: {e}"),
238 )
239 })
240}