Skip to main content

forest/libp2p/chain_exchange/
message.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::convert::TryFrom;
5
6use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset};
7use crate::message::SignedMessage;
8use crate::shim::message::Message;
9use crate::shim::policy::policy_constants::CHAIN_FINALITY;
10use anyhow::Context as _;
11use cid::Cid;
12use fvm_ipld_encoding::tuple::*;
13use nunny::Vec as NonEmpty;
14use serde::{Deserialize, Deserializer, Serialize, Serializer};
15
16/// `ChainExchange` Filecoin header set bit.
17pub const HEADERS: u64 = 0b01;
18/// `ChainExchange` Filecoin messages set bit.
19pub const MESSAGES: u64 = 0b10;
20
21/// The payload that gets sent to another node to request for blocks and
22/// messages.
23#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
24pub struct ChainExchangeRequest {
25    /// The tipset [Cid] to start the request from.
26    pub start: NonEmpty<Cid>,
27    /// The amount of tipsets to request.
28    pub request_len: u64,
29    /// 1 for Block only, 2 for Messages only, 3 for Blocks and Messages.
30    pub options: u64,
31}
32
33impl ChainExchangeRequest {
34    /// If a request has the [HEADERS] bit set and requests Filecoin headers.
35    pub fn include_blocks(&self) -> bool {
36        self.options & HEADERS > 0
37    }
38
39    /// If a request has the [MESSAGES] bit set and requests messages of a
40    /// block.
41    pub fn include_messages(&self) -> bool {
42        self.options & MESSAGES > 0
43    }
44
45    /// If either the [HEADERS] bit or the [MESSAGES] bit is set.
46    pub fn is_options_valid(&self) -> bool {
47        self.include_blocks() || self.include_messages()
48    }
49
50    /// Checks if the request length is within `(0, CHAIN_FINALITY]`, matching
51    /// Lotus's [`MaxRequestLength`].
52    ///
53    /// [`MaxRequestLength`]: https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/protocol.go#L30
54    pub fn is_request_len_valid(&self) -> bool {
55        self.request_len > 0 && self.request_len <= CHAIN_FINALITY as u64
56    }
57}
58
59/// Status codes of a `chain_exchange` response.
60#[derive(Clone, Debug, PartialEq, Eq, Copy)]
61#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
62pub enum ChainExchangeResponseStatus {
63    /// All is well.
64    Success,
65    /// We could not fetch all blocks requested (but at least we returned
66    /// the `Head` requested). Not considered an error.
67    PartialResponse,
68    /// Request.Start not found.
69    BlockNotFound,
70    /// Requester is making too many requests.
71    GoAway,
72    /// Internal error occurred.
73    InternalError,
74    /// Request was bad.
75    BadRequest,
76    /// Other undefined response code.
77    Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
78}
79
80impl Serialize for ChainExchangeResponseStatus {
81    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
82    where
83        S: Serializer,
84    {
85        use ChainExchangeResponseStatus::*;
86        let code: i32 = match self {
87            Success => 0,
88            PartialResponse => 101,
89            BlockNotFound => 201,
90            GoAway => 202,
91            InternalError => 203,
92            BadRequest => 204,
93            Other(i) => *i,
94        };
95        code.serialize(serializer)
96    }
97}
98
99impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
100    fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
101    where
102        D: Deserializer<'de>,
103    {
104        let code: i32 = Deserialize::deserialize(deserializer)?;
105
106        use ChainExchangeResponseStatus::*;
107        let status = match code {
108            0 => Success,
109            101 => PartialResponse,
110            201 => BlockNotFound,
111            202 => GoAway,
112            203 => InternalError,
113            204 => BadRequest,
114            x => Other(x),
115        };
116        Ok(status)
117    }
118}
119
120/// The response to a `ChainExchange` request.
121#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
122pub struct ChainExchangeResponse {
123    /// Status code of the response.
124    pub status: ChainExchangeResponseStatus,
125    /// Status message indicating failure reason.
126    pub message: String,
127    /// The tipsets requested.
128    pub chain: Vec<TipsetBundle>,
129}
130
131impl ChainExchangeResponse {
132    /// Build a [`ChainExchangeResponseStatus::GoAway`] response asking the
133    /// requester to back off (e.g. when concurrent-request caps are reached).
134    pub fn go_away(message: impl Into<String>) -> Self {
135        Self {
136            chain: Default::default(),
137            status: ChainExchangeResponseStatus::GoAway,
138            message: message.into(),
139        }
140    }
141
142    /// Converts `chain_exchange` response into result.
143    /// Returns an error if the response status is not `Ok`.
144    /// Tipset bundle is converted into generic return type with `TryFrom` trait
145    /// implementation.
146    pub fn into_result<T>(self) -> anyhow::Result<Vec<T>>
147    where
148        T: TryFrom<TipsetBundle>,
149        <T as TryFrom<TipsetBundle>>::Error: Into<anyhow::Error>,
150    {
151        if self.status != ChainExchangeResponseStatus::Success
152            && self.status != ChainExchangeResponseStatus::PartialResponse
153        {
154            anyhow::bail!("Status {:?}: {}", self.status, self.message);
155        }
156
157        self.chain
158            .into_iter()
159            .map(|i| {
160                T::try_from(i).map_err(|e| e.into().context("failed to convert from tipset bundle"))
161            })
162            .collect()
163    }
164}
165/// Contains all BLS and SECP messages and their indexes per block
166#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
167pub struct CompactedMessages {
168    /// Unsigned BLS messages.
169    pub bls_msgs: Vec<Message>,
170    /// Describes which block each message belongs to.
171    /// if `bls_msg_includes[2] = vec![5]` then `TipsetBundle.blocks[2]` contains `bls_msgs[5]`
172    pub bls_msg_includes: Vec<Vec<u64>>,
173
174    /// Signed SECP messages.
175    pub secp_msgs: Vec<SignedMessage>,
176    /// Describes which block each message belongs to.
177    pub secp_msg_includes: Vec<Vec<u64>>,
178}
179
180/// Contains the blocks and messages in a particular tipset
181#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
182pub struct TipsetBundle {
183    /// The blocks in the tipset.
184    pub blocks: Vec<CachingBlockHeader>,
185
186    /// Compressed messages format.
187    pub messages: Option<CompactedMessages>,
188}
189
190impl TryFrom<TipsetBundle> for Tipset {
191    type Error = anyhow::Error;
192
193    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
194        Ok(Tipset::new(tsb.blocks)?)
195    }
196}
197
198impl TryFrom<TipsetBundle> for CompactedMessages {
199    type Error = anyhow::Error;
200
201    fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
202        tsb.messages.context("Request contained no messages")
203    }
204}
205
206impl TryFrom<TipsetBundle> for FullTipset {
207    type Error = anyhow::Error;
208
209    fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
210        fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
211    }
212}
213
214impl TryFrom<&TipsetBundle> for FullTipset {
215    type Error = anyhow::Error;
216
217    fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
218        fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
219    }
220}
221
222/// Constructs a [`FullTipset`] from headers and compacted messages from a
223/// bundle.
224fn fts_from_bundle_parts(
225    headers: Vec<CachingBlockHeader>,
226    messages: Option<&CompactedMessages>,
227) -> anyhow::Result<FullTipset> {
228    let CompactedMessages {
229        bls_msgs,
230        bls_msg_includes,
231        secp_msg_includes,
232        secp_msgs,
233    } = messages.context("Tipset bundle did not contain message bundle")?;
234
235    if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
236        anyhow::bail!(
237            "Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}",
238            headers.len(),
239            bls_msg_includes.len(),
240            secp_msg_includes.len()
241        );
242    }
243    let zipped = headers
244        .into_iter()
245        .zip(bls_msg_includes.iter())
246        .zip(secp_msg_includes.iter());
247
248    fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> anyhow::Result<Vec<T>> {
249        indexes
250            .iter()
251            .map(|idx| {
252                values
253                    .get(*idx as usize)
254                    .cloned()
255                    .context("Invalid message index")
256            })
257            .collect()
258    }
259
260    let blocks = zipped
261        .enumerate()
262        .map(|(i, ((header, bls_msg_include), secp_msg_include))| {
263            let message_count = bls_msg_include.len() + secp_msg_include.len();
264            if message_count > BLOCK_MESSAGE_LIMIT {
265                anyhow::bail!(
266                    "Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
267                );
268            }
269            let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
270            let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;
271
272            Ok(Block {
273                header,
274                bls_messages,
275                secp_messages,
276            })
277        })
278        .collect::<Result<Vec<_>, _>>()?;
279
280    Ok(FullTipset::new(blocks)?)
281}
282
283#[cfg(test)]
284mod tests {
285    use quickcheck_macros::quickcheck;
286    use serde_json;
287
288    use super::*;
289
290    #[quickcheck]
291    fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
292        let serialized = serde_json::to_string(&status).unwrap();
293        let parsed = serde_json::from_str(&serialized).unwrap();
294        assert_eq!(status, parsed);
295    }
296}