forest/libp2p/chain_exchange/
message.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{convert::TryFrom, sync::Arc};
5
6use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset};
7use crate::message::SignedMessage;
8use crate::shim::message::Message;
9use cid::Cid;
10use fvm_ipld_encoding::tuple::*;
11use nunny::Vec as NonEmpty;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13
14/// `ChainExchange` Filecoin header set bit.
15pub const HEADERS: u64 = 0b01;
16/// `ChainExchange` Filecoin messages set bit.
17pub const MESSAGES: u64 = 0b10;
18
19/// The payload that gets sent to another node to request for blocks and
20/// messages.
21#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
22pub struct ChainExchangeRequest {
23    /// The tipset [Cid] to start the request from.
24    pub start: NonEmpty<Cid>,
25    /// The amount of tipsets to request.
26    pub request_len: u64,
27    /// 1 for Block only, 2 for Messages only, 3 for Blocks and Messages.
28    pub options: u64,
29}
30
31impl ChainExchangeRequest {
32    /// If a request has the [HEADERS] bit set and requests Filecoin headers.
33    pub fn include_blocks(&self) -> bool {
34        self.options & HEADERS > 0
35    }
36
37    /// If a request has the [MESSAGES] bit set and requests messages of a
38    /// block.
39    pub fn include_messages(&self) -> bool {
40        self.options & MESSAGES > 0
41    }
42
43    /// If either the [HEADERS] bit or the [MESSAGES] bit is set.
44    pub fn is_options_valid(&self) -> bool {
45        self.include_blocks() || self.include_messages()
46    }
47}
48
49/// Status codes of a `chain_exchange` response.
50#[derive(Clone, Debug, PartialEq, Eq, Copy)]
51#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
52pub enum ChainExchangeResponseStatus {
53    /// All is well.
54    Success,
55    /// We could not fetch all blocks requested (but at least we returned
56    /// the `Head` requested). Not considered an error.
57    PartialResponse,
58    /// Request.Start not found.
59    BlockNotFound,
60    /// Requester is making too many requests.
61    GoAway,
62    /// Internal error occurred.
63    InternalError,
64    /// Request was bad.
65    BadRequest,
66    /// Other undefined response code.
67    Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
68}
69
70impl Serialize for ChainExchangeResponseStatus {
71    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
72    where
73        S: Serializer,
74    {
75        use ChainExchangeResponseStatus::*;
76        let code: i32 = match self {
77            Success => 0,
78            PartialResponse => 101,
79            BlockNotFound => 201,
80            GoAway => 202,
81            InternalError => 203,
82            BadRequest => 204,
83            Other(i) => *i,
84        };
85        code.serialize(serializer)
86    }
87}
88
89impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
90    fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
91    where
92        D: Deserializer<'de>,
93    {
94        let code: i32 = Deserialize::deserialize(deserializer)?;
95
96        use ChainExchangeResponseStatus::*;
97        let status = match code {
98            0 => Success,
99            101 => PartialResponse,
100            201 => BlockNotFound,
101            202 => GoAway,
102            203 => InternalError,
103            204 => BadRequest,
104            x => Other(x),
105        };
106        Ok(status)
107    }
108}
109
110/// The response to a `ChainExchange` request.
111#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
112pub struct ChainExchangeResponse {
113    /// Status code of the response.
114    pub status: ChainExchangeResponseStatus,
115    /// Status message indicating failure reason.
116    pub message: String,
117    /// The tipsets requested.
118    pub chain: Vec<TipsetBundle>,
119}
120
121impl ChainExchangeResponse {
122    /// Converts `chain_exchange` response into result.
123    /// Returns an error if the response status is not `Ok`.
124    /// Tipset bundle is converted into generic return type with `TryFrom` trait
125    /// implementation.
126    pub fn into_result<T>(self) -> Result<Vec<T>, String>
127    where
128        T: TryFrom<TipsetBundle>,
129        <T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
130    {
131        if self.status != ChainExchangeResponseStatus::Success
132            && self.status != ChainExchangeResponseStatus::PartialResponse
133        {
134            return Err(format!("Status {:?}: {}", self.status, self.message));
135        }
136
137        self.chain
138            .into_iter()
139            .map(|i| T::try_from(i).map_err(|e| e.to_string()))
140            .collect()
141    }
142}
143/// Contains all BLS and SECP messages and their indexes per block
144#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
145pub struct CompactedMessages {
146    /// Unsigned BLS messages.
147    pub bls_msgs: Vec<Message>,
148    /// Describes which block each message belongs to.
149    /// if `bls_msg_includes[2] = vec![5]` then `TipsetBundle.blocks[2]` contains `bls_msgs[5]`
150    pub bls_msg_includes: Vec<Vec<u64>>,
151
152    /// Signed SECP messages.
153    pub secp_msgs: Vec<SignedMessage>,
154    /// Describes which block each message belongs to.
155    pub secp_msg_includes: Vec<Vec<u64>>,
156}
157
158/// Contains the blocks and messages in a particular tipset
159#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
160pub struct TipsetBundle {
161    /// The blocks in the tipset.
162    pub blocks: Vec<CachingBlockHeader>,
163
164    /// Compressed messages format.
165    pub messages: Option<CompactedMessages>,
166}
167
168impl TryFrom<TipsetBundle> for Tipset {
169    type Error = String;
170
171    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
172        Tipset::new(tsb.blocks).map_err(|e| e.to_string())
173    }
174}
175
176impl TryFrom<TipsetBundle> for Arc<Tipset> {
177    type Error = String;
178
179    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
180        Tipset::try_from(tsb).map(Arc::new)
181    }
182}
183
184impl TryFrom<TipsetBundle> for CompactedMessages {
185    type Error = String;
186
187    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
188        tsb.messages
189            .ok_or_else(|| "Request contained no messages".to_string())
190    }
191}
192
193impl TryFrom<TipsetBundle> for FullTipset {
194    type Error = String;
195
196    fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
197        fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
198    }
199}
200
201impl TryFrom<&TipsetBundle> for FullTipset {
202    type Error = String;
203
204    fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
205        fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
206    }
207}
208
209/// Constructs a [`FullTipset`] from headers and compacted messages from a
210/// bundle.
211fn fts_from_bundle_parts(
212    headers: Vec<CachingBlockHeader>,
213    messages: Option<&CompactedMessages>,
214) -> Result<FullTipset, String> {
215    let CompactedMessages {
216        bls_msgs,
217        bls_msg_includes,
218        secp_msg_includes,
219        secp_msgs,
220    } = messages.ok_or("Tipset bundle did not contain message bundle")?;
221
222    if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
223        return Err(format!(
224            "Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}",
225            headers.len(),
226            bls_msg_includes.len(),
227            secp_msg_includes.len()
228        ));
229    }
230    let zipped = headers
231        .into_iter()
232        .zip(bls_msg_includes.iter())
233        .zip(secp_msg_includes.iter());
234
235    fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> Result<Vec<T>, String> {
236        indexes
237            .iter()
238            .map(|idx| {
239                values
240                    .get(*idx as usize)
241                    .cloned()
242                    .ok_or_else(|| "Invalid message index".to_string())
243            })
244            .collect()
245    }
246
247    let blocks = zipped
248        .enumerate()
249        .map(|(i, ((header, bls_msg_include), secp_msg_include))| {
250            let message_count = bls_msg_include.len() + secp_msg_include.len();
251            if message_count > BLOCK_MESSAGE_LIMIT {
252                return Err(format!(
253                    "Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
254                ));
255            }
256            let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
257            let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;
258
259            Ok(Block {
260                header,
261                bls_messages,
262                secp_messages,
263            })
264        })
265        .collect::<Result<Vec<_>, _>>()?;
266
267    FullTipset::new(blocks).map_err(|e| e.to_string())
268}
269
270#[cfg(test)]
271mod tests {
272    use quickcheck_macros::quickcheck;
273    use serde_json;
274
275    use super::*;
276
277    #[quickcheck]
278    fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
279        let serialized = serde_json::to_string(&status).unwrap();
280        let parsed = serde_json::from_str(&serialized).unwrap();
281        assert_eq!(status, parsed);
282    }
283}