forest/libp2p/chain_exchange/
message.rs1use std::{convert::TryFrom, sync::Arc};
5
6use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset};
7use crate::message::SignedMessage;
8use crate::shim::message::Message;
9use cid::Cid;
10use fvm_ipld_encoding::tuple::*;
11use nunny::Vec as NonEmpty;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13
14pub const HEADERS: u64 = 0b01;
16pub const MESSAGES: u64 = 0b10;
18
19#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
22pub struct ChainExchangeRequest {
23 pub start: NonEmpty<Cid>,
25 pub request_len: u64,
27 pub options: u64,
29}
30
31impl ChainExchangeRequest {
32 pub fn include_blocks(&self) -> bool {
34 self.options & HEADERS > 0
35 }
36
37 pub fn include_messages(&self) -> bool {
40 self.options & MESSAGES > 0
41 }
42
43 pub fn is_options_valid(&self) -> bool {
45 self.include_blocks() || self.include_messages()
46 }
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Copy)]
51#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
52pub enum ChainExchangeResponseStatus {
53 Success,
55 PartialResponse,
58 BlockNotFound,
60 GoAway,
62 InternalError,
64 BadRequest,
66 Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
68}
69
70impl Serialize for ChainExchangeResponseStatus {
71 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
72 where
73 S: Serializer,
74 {
75 use ChainExchangeResponseStatus::*;
76 let code: i32 = match self {
77 Success => 0,
78 PartialResponse => 101,
79 BlockNotFound => 201,
80 GoAway => 202,
81 InternalError => 203,
82 BadRequest => 204,
83 Other(i) => *i,
84 };
85 code.serialize(serializer)
86 }
87}
88
89impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
90 fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
91 where
92 D: Deserializer<'de>,
93 {
94 let code: i32 = Deserialize::deserialize(deserializer)?;
95
96 use ChainExchangeResponseStatus::*;
97 let status = match code {
98 0 => Success,
99 101 => PartialResponse,
100 201 => BlockNotFound,
101 202 => GoAway,
102 203 => InternalError,
103 204 => BadRequest,
104 x => Other(x),
105 };
106 Ok(status)
107 }
108}
109
110#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
112pub struct ChainExchangeResponse {
113 pub status: ChainExchangeResponseStatus,
115 pub message: String,
117 pub chain: Vec<TipsetBundle>,
119}
120
121impl ChainExchangeResponse {
122 pub fn into_result<T>(self) -> Result<Vec<T>, String>
127 where
128 T: TryFrom<TipsetBundle>,
129 <T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
130 {
131 if self.status != ChainExchangeResponseStatus::Success
132 && self.status != ChainExchangeResponseStatus::PartialResponse
133 {
134 return Err(format!("Status {:?}: {}", self.status, self.message));
135 }
136
137 self.chain
138 .into_iter()
139 .map(|i| T::try_from(i).map_err(|e| e.to_string()))
140 .collect()
141 }
142}
143#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
145pub struct CompactedMessages {
146 pub bls_msgs: Vec<Message>,
148 pub bls_msg_includes: Vec<Vec<u64>>,
151
152 pub secp_msgs: Vec<SignedMessage>,
154 pub secp_msg_includes: Vec<Vec<u64>>,
156}
157
158#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
160pub struct TipsetBundle {
161 pub blocks: Vec<CachingBlockHeader>,
163
164 pub messages: Option<CompactedMessages>,
166}
167
168impl TryFrom<TipsetBundle> for Tipset {
169 type Error = String;
170
171 fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
172 Tipset::new(tsb.blocks).map_err(|e| e.to_string())
173 }
174}
175
176impl TryFrom<TipsetBundle> for Arc<Tipset> {
177 type Error = String;
178
179 fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
180 Tipset::try_from(tsb).map(Arc::new)
181 }
182}
183
184impl TryFrom<TipsetBundle> for CompactedMessages {
185 type Error = String;
186
187 fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
188 tsb.messages
189 .ok_or_else(|| "Request contained no messages".to_string())
190 }
191}
192
193impl TryFrom<TipsetBundle> for FullTipset {
194 type Error = String;
195
196 fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
197 fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
198 }
199}
200
201impl TryFrom<&TipsetBundle> for FullTipset {
202 type Error = String;
203
204 fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
205 fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
206 }
207}
208
209fn fts_from_bundle_parts(
212 headers: Vec<CachingBlockHeader>,
213 messages: Option<&CompactedMessages>,
214) -> Result<FullTipset, String> {
215 let CompactedMessages {
216 bls_msgs,
217 bls_msg_includes,
218 secp_msg_includes,
219 secp_msgs,
220 } = messages.ok_or("Tipset bundle did not contain message bundle")?;
221
222 if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
223 return Err(format!(
224 "Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}",
225 headers.len(),
226 bls_msg_includes.len(),
227 secp_msg_includes.len()
228 ));
229 }
230 let zipped = headers
231 .into_iter()
232 .zip(bls_msg_includes.iter())
233 .zip(secp_msg_includes.iter());
234
235 fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> Result<Vec<T>, String> {
236 indexes
237 .iter()
238 .map(|idx| {
239 values
240 .get(*idx as usize)
241 .cloned()
242 .ok_or_else(|| "Invalid message index".to_string())
243 })
244 .collect()
245 }
246
247 let blocks = zipped
248 .enumerate()
249 .map(|(i, ((header, bls_msg_include), secp_msg_include))| {
250 let message_count = bls_msg_include.len() + secp_msg_include.len();
251 if message_count > BLOCK_MESSAGE_LIMIT {
252 return Err(format!(
253 "Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
254 ));
255 }
256 let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
257 let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;
258
259 Ok(Block {
260 header,
261 bls_messages,
262 secp_messages,
263 })
264 })
265 .collect::<Result<Vec<_>, _>>()?;
266
267 FullTipset::new(blocks).map_err(|e| e.to_string())
268}
269
270#[cfg(test)]
271mod tests {
272 use quickcheck_macros::quickcheck;
273 use serde_json;
274
275 use super::*;
276
277 #[quickcheck]
278 fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
279 let serialized = serde_json::to_string(&status).unwrap();
280 let parsed = serde_json::from_str(&serialized).unwrap();
281 assert_eq!(status, parsed);
282 }
283}