forest/libp2p/chain_exchange/
message.rs1use std::convert::TryFrom;
5
6use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset};
7use crate::message::SignedMessage;
8use crate::shim::message::Message;
9use crate::shim::policy::policy_constants::CHAIN_FINALITY;
10use anyhow::Context as _;
11use cid::Cid;
12use fvm_ipld_encoding::tuple::*;
13use nunny::Vec as NonEmpty;
14use serde::{Deserialize, Deserializer, Serialize, Serializer};
15
16pub const HEADERS: u64 = 0b01;
18pub const MESSAGES: u64 = 0b10;
20
21#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
24pub struct ChainExchangeRequest {
25 pub start: NonEmpty<Cid>,
27 pub request_len: u64,
29 pub options: u64,
31}
32
33impl ChainExchangeRequest {
34 pub fn include_blocks(&self) -> bool {
36 self.options & HEADERS > 0
37 }
38
39 pub fn include_messages(&self) -> bool {
42 self.options & MESSAGES > 0
43 }
44
45 pub fn is_options_valid(&self) -> bool {
47 self.include_blocks() || self.include_messages()
48 }
49
50 pub fn is_request_len_valid(&self) -> bool {
55 self.request_len > 0 && self.request_len <= CHAIN_FINALITY as u64
56 }
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Copy)]
61#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
62pub enum ChainExchangeResponseStatus {
63 Success,
65 PartialResponse,
68 BlockNotFound,
70 GoAway,
72 InternalError,
74 BadRequest,
76 Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
78}
79
80impl Serialize for ChainExchangeResponseStatus {
81 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
82 where
83 S: Serializer,
84 {
85 use ChainExchangeResponseStatus::*;
86 let code: i32 = match self {
87 Success => 0,
88 PartialResponse => 101,
89 BlockNotFound => 201,
90 GoAway => 202,
91 InternalError => 203,
92 BadRequest => 204,
93 Other(i) => *i,
94 };
95 code.serialize(serializer)
96 }
97}
98
99impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
100 fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 let code: i32 = Deserialize::deserialize(deserializer)?;
105
106 use ChainExchangeResponseStatus::*;
107 let status = match code {
108 0 => Success,
109 101 => PartialResponse,
110 201 => BlockNotFound,
111 202 => GoAway,
112 203 => InternalError,
113 204 => BadRequest,
114 x => Other(x),
115 };
116 Ok(status)
117 }
118}
119
120#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
122pub struct ChainExchangeResponse {
123 pub status: ChainExchangeResponseStatus,
125 pub message: String,
127 pub chain: Vec<TipsetBundle>,
129}
130
131impl ChainExchangeResponse {
132 pub fn go_away(message: impl Into<String>) -> Self {
135 Self {
136 chain: Default::default(),
137 status: ChainExchangeResponseStatus::GoAway,
138 message: message.into(),
139 }
140 }
141
142 pub fn into_result<T>(self) -> anyhow::Result<Vec<T>>
147 where
148 T: TryFrom<TipsetBundle>,
149 <T as TryFrom<TipsetBundle>>::Error: Into<anyhow::Error>,
150 {
151 if self.status != ChainExchangeResponseStatus::Success
152 && self.status != ChainExchangeResponseStatus::PartialResponse
153 {
154 anyhow::bail!("Status {:?}: {}", self.status, self.message);
155 }
156
157 self.chain
158 .into_iter()
159 .map(|i| {
160 T::try_from(i).map_err(|e| e.into().context("failed to convert from tipset bundle"))
161 })
162 .collect()
163 }
164}
165#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
167pub struct CompactedMessages {
168 pub bls_msgs: Vec<Message>,
170 pub bls_msg_includes: Vec<Vec<u64>>,
173
174 pub secp_msgs: Vec<SignedMessage>,
176 pub secp_msg_includes: Vec<Vec<u64>>,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
182pub struct TipsetBundle {
183 pub blocks: Vec<CachingBlockHeader>,
185
186 pub messages: Option<CompactedMessages>,
188}
189
190impl TryFrom<TipsetBundle> for Tipset {
191 type Error = anyhow::Error;
192
193 fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
194 Ok(Tipset::new(tsb.blocks)?)
195 }
196}
197
198impl TryFrom<TipsetBundle> for CompactedMessages {
199 type Error = anyhow::Error;
200
201 fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
202 tsb.messages.context("Request contained no messages")
203 }
204}
205
206impl TryFrom<TipsetBundle> for FullTipset {
207 type Error = anyhow::Error;
208
209 fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
210 fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
211 }
212}
213
214impl TryFrom<&TipsetBundle> for FullTipset {
215 type Error = anyhow::Error;
216
217 fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
218 fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
219 }
220}
221
222fn fts_from_bundle_parts(
225 headers: Vec<CachingBlockHeader>,
226 messages: Option<&CompactedMessages>,
227) -> anyhow::Result<FullTipset> {
228 let CompactedMessages {
229 bls_msgs,
230 bls_msg_includes,
231 secp_msg_includes,
232 secp_msgs,
233 } = messages.context("Tipset bundle did not contain message bundle")?;
234
235 if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
236 anyhow::bail!(
237 "Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}",
238 headers.len(),
239 bls_msg_includes.len(),
240 secp_msg_includes.len()
241 );
242 }
243 let zipped = headers
244 .into_iter()
245 .zip(bls_msg_includes.iter())
246 .zip(secp_msg_includes.iter());
247
248 fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> anyhow::Result<Vec<T>> {
249 indexes
250 .iter()
251 .map(|idx| {
252 values
253 .get(*idx as usize)
254 .cloned()
255 .context("Invalid message index")
256 })
257 .collect()
258 }
259
260 let blocks = zipped
261 .enumerate()
262 .map(|(i, ((header, bls_msg_include), secp_msg_include))| {
263 let message_count = bls_msg_include.len() + secp_msg_include.len();
264 if message_count > BLOCK_MESSAGE_LIMIT {
265 anyhow::bail!(
266 "Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
267 );
268 }
269 let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
270 let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;
271
272 Ok(Block {
273 header,
274 bls_messages,
275 secp_messages,
276 })
277 })
278 .collect::<Result<Vec<_>, _>>()?;
279
280 Ok(FullTipset::new(blocks)?)
281}
282
283#[cfg(test)]
284mod tests {
285 use quickcheck_macros::quickcheck;
286 use serde_json;
287
288 use super::*;
289
290 #[quickcheck]
291 fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
292 let serialized = serde_json::to_string(&status).unwrap();
293 let parsed = serde_json::from_str(&serialized).unwrap();
294 assert_eq!(status, parsed);
295 }
296}