Skip to main content

arc_malachitebft_engine/wal/
entry.rs

1use std::io::{self, Read, Write};
2
3use byteorder::{ReadBytesExt, WriteBytesExt, BE};
4
5use malachitebft_codec::Codec;
6use malachitebft_core_consensus::{ProposedValue, SignedConsensusMsg};
7use malachitebft_core_types::{Context, Round, Timeout};
8
9/// Codec for encoding and decoding WAL entries.
10///
11/// This trait is automatically implemented for any type that implements:
12/// - [`Codec<SignedConsensusMsg<Ctx>>`]
13pub trait WalCodec<Ctx>
14where
15    Ctx: Context,
16    Self: Codec<SignedConsensusMsg<Ctx>>,
17    Self: Codec<ProposedValue<Ctx>>,
18{
19}
20
21impl<Ctx, C> WalCodec<Ctx> for C
22where
23    Ctx: Context,
24    C: Codec<SignedConsensusMsg<Ctx>>,
25    C: Codec<ProposedValue<Ctx>>,
26{
27}
28
29pub use malachitebft_core_consensus::WalEntry;
30
31const TAG_CONSENSUS: u8 = 0x01;
32const TAG_TIMEOUT: u8 = 0x02;
33const TAG_PROPOSED_VALUE: u8 = 0x04;
34
35pub fn encode_entry<Ctx, C, W>(entry: &WalEntry<Ctx>, codec: &C, buf: W) -> io::Result<()>
36where
37    Ctx: Context,
38    C: WalCodec<Ctx>,
39    W: Write,
40{
41    match entry {
42        WalEntry::ConsensusMsg(msg) => encode_consensus_msg(TAG_CONSENSUS, msg, codec, buf),
43        WalEntry::Timeout(timeout) => encode_timeout(TAG_TIMEOUT, timeout, buf),
44        WalEntry::ProposedValue(value) => {
45            encode_proposed_value(TAG_PROPOSED_VALUE, value, codec, buf)
46        }
47    }
48}
49
50pub fn decode_entry<Ctx, C, R>(codec: &C, mut buf: R) -> io::Result<WalEntry<Ctx>>
51where
52    Ctx: Context,
53    C: WalCodec<Ctx>,
54    R: Read,
55{
56    let tag = buf.read_u8()?;
57
58    match tag {
59        TAG_CONSENSUS => decode_consensus_msg(codec, buf).map(WalEntry::ConsensusMsg),
60        TAG_TIMEOUT => decode_timeout(buf).map(WalEntry::Timeout),
61        TAG_PROPOSED_VALUE => decode_proposed_value(codec, buf).map(WalEntry::ProposedValue),
62        _ => Err(io::Error::new(io::ErrorKind::InvalidData, "invalid tag")),
63    }
64}
65
66// Consensus message helpers
67fn encode_consensus_msg<Ctx, C, W>(
68    tag: u8,
69    msg: &SignedConsensusMsg<Ctx>,
70    codec: &C,
71    mut buf: W,
72) -> io::Result<()>
73where
74    Ctx: Context,
75    C: WalCodec<Ctx>,
76    W: Write,
77{
78    let bytes = codec.encode(msg).map_err(|e| {
79        io::Error::new(
80            io::ErrorKind::InvalidData,
81            format!("failed to encode consensus message: {e}"),
82        )
83    })?;
84
85    // Write tag
86    buf.write_u8(tag)?;
87
88    // Write encoded length
89    buf.write_u64::<BE>(bytes.len() as u64)?;
90
91    // Write encoded bytes
92    buf.write_all(&bytes)?;
93
94    Ok(())
95}
96
97fn decode_consensus_msg<Ctx, C, R>(codec: &C, mut buf: R) -> io::Result<SignedConsensusMsg<Ctx>>
98where
99    Ctx: Context,
100    C: WalCodec<Ctx>,
101    R: Read,
102{
103    let len = buf.read_u64::<BE>()?;
104    let mut bytes = vec![0; len as usize];
105    buf.read_exact(&mut bytes)?;
106
107    codec.decode(bytes.into()).map_err(|e| {
108        io::Error::new(
109            io::ErrorKind::InvalidData,
110            format!("failed to decode consensus msg: {e}"),
111        )
112    })
113}
114
115// Timeout helpers
116fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
117    use malachitebft_core_types::TimeoutKind;
118
119    let step = match timeout.kind {
120        TimeoutKind::Propose => 1,
121        TimeoutKind::Prevote => 2,
122        TimeoutKind::Precommit => 3,
123
124        // NOTE: Commit, prevote and precommit time limit timeouts have been removed.
125
126        // Consensus will typically not want to store these timeouts in the WAL,
127        // but we still need to handle them here.
128        TimeoutKind::Rebroadcast => 7,
129        TimeoutKind::FinalizeHeight(_) => {
130            // FinalizeHeight timeouts are not persisted to WAL
131            panic!("FinalizeHeight timeout should not be written to WAL")
132        }
133    };
134
135    buf.write_u8(tag)?;
136    buf.write_u8(step)?;
137    buf.write_i64::<BE>(timeout.round.as_i64())?;
138
139    Ok(())
140}
141
142fn decode_timeout(mut buf: impl Read) -> io::Result<Timeout> {
143    use malachitebft_core_types::TimeoutKind;
144
145    let step = match buf.read_u8()? {
146        1 => TimeoutKind::Propose,
147        2 => TimeoutKind::Prevote,
148        3 => TimeoutKind::Precommit,
149
150        // Commit timeouts have been removed in PR #976,
151        // but we still need to handle them here in order to decode old WAL entries.
152        4 => {
153            return Err(io::Error::new(
154                io::ErrorKind::InvalidData,
155                "commit timeouts are no longer supported, ignoring",
156            ))
157        }
158
159        // Prevote/precommit rebroadcast timeouts have been removed in PR #1037,
160        // but we still need to handle them here in order to decode old WAL entries.
161        5 | 6 => {
162            return Err(io::Error::new(
163                io::ErrorKind::InvalidData,
164                "prevote/precommit time limit timeouts are no longer supported, ignoring",
165            ))
166        }
167
168        // Consensus will typically not want to store these timeouts in the WAL,
169        // but we still need to handle them here.
170        7 => TimeoutKind::Rebroadcast,
171
172        // FinalizeHeight timeouts were never actually persisted
173        8 => {
174            return Err(io::Error::new(
175                io::ErrorKind::InvalidData,
176                "FinalizeHeight timeouts are not persisted to WAL, ignoring",
177            ))
178        }
179
180        _ => {
181            return Err(io::Error::new(
182                io::ErrorKind::InvalidData,
183                "invalid timeout step",
184            ))
185        }
186    };
187
188    let round = Round::from(buf.read_i64::<BE>()?);
189
190    Ok(Timeout::new(round, step))
191}
192
193// Proposed value helpers
194fn encode_proposed_value<Ctx, C, W>(
195    tag: u8,
196    value: &ProposedValue<Ctx>,
197    codec: &C,
198    mut buf: W,
199) -> io::Result<()>
200where
201    Ctx: Context,
202    C: WalCodec<Ctx>,
203    W: Write,
204{
205    let bytes = codec.encode(value).map_err(|e| {
206        io::Error::new(
207            io::ErrorKind::InvalidData,
208            format!("failed to encode consensus message: {e}"),
209        )
210    })?;
211
212    // Write tag
213    buf.write_u8(tag)?;
214
215    // Write encoded length
216    buf.write_u64::<BE>(bytes.len() as u64)?;
217
218    // Write encoded bytes
219    buf.write_all(&bytes)?;
220
221    Ok(())
222}
223
224fn decode_proposed_value<Ctx, C, R>(codec: &C, mut buf: R) -> io::Result<ProposedValue<Ctx>>
225where
226    Ctx: Context,
227    C: WalCodec<Ctx>,
228    R: Read,
229{
230    let len = buf.read_u64::<BE>()?;
231    let mut bytes = vec![0; len as usize];
232    buf.read_exact(&mut bytes)?;
233
234    codec.decode(bytes.into()).map_err(|e| {
235        io::Error::new(
236            io::ErrorKind::InvalidData,
237            format!("failed to decode proposed value: {e}"),
238        )
239    })
240}