use std::convert::TryFrom;
use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset};
use crate::message::SignedMessage;
use crate::shim::message::Message;
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_encoding::tuple::*;
use nunny::Vec as NonEmpty;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub const HEADERS: u64 = 0b01;
pub const MESSAGES: u64 = 0b10;
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ChainExchangeRequest {
pub start: NonEmpty<Cid>,
pub request_len: u64,
pub options: u64,
}
impl ChainExchangeRequest {
pub fn include_blocks(&self) -> bool {
self.options & HEADERS > 0
}
pub fn include_messages(&self) -> bool {
self.options & MESSAGES > 0
}
pub fn is_options_valid(&self) -> bool {
self.include_blocks() || self.include_messages()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
pub enum ChainExchangeResponseStatus {
Success,
PartialResponse,
BlockNotFound,
GoAway,
InternalError,
BadRequest,
Other(#[cfg_attr(test, arbitrary(gen(|_| 1)))] i32),
}
impl Serialize for ChainExchangeResponseStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use ChainExchangeResponseStatus::*;
let code: i32 = match self {
Success => 0,
PartialResponse => 101,
BlockNotFound => 201,
GoAway => 202,
InternalError => 203,
BadRequest => 204,
Other(i) => *i,
};
code.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for ChainExchangeResponseStatus {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
where
D: Deserializer<'de>,
{
let code: i32 = Deserialize::deserialize(deserializer)?;
use ChainExchangeResponseStatus::*;
let status = match code {
0 => Success,
101 => PartialResponse,
201 => BlockNotFound,
202 => GoAway,
203 => InternalError,
204 => BadRequest,
x => Other(x),
};
Ok(status)
}
}
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct ChainExchangeResponse {
pub status: ChainExchangeResponseStatus,
pub message: String,
pub chain: Vec<TipsetBundle>,
}
impl ChainExchangeResponse {
pub fn into_result<T>(self) -> anyhow::Result<Vec<T>>
where
T: TryFrom<TipsetBundle>,
<T as TryFrom<TipsetBundle>>::Error: Into<anyhow::Error>,
{
if self.status != ChainExchangeResponseStatus::Success
&& self.status != ChainExchangeResponseStatus::PartialResponse
{
anyhow::bail!("Status {:?}: {}", self.status, self.message);
}
self.chain
.into_iter()
.map(|i| {
T::try_from(i).map_err(|e| e.into().context("failed to convert from tipset bundle"))
})
.collect()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct CompactedMessages {
pub bls_msgs: Vec<Message>,
pub bls_msg_includes: Vec<Vec<u64>>,
pub secp_msgs: Vec<SignedMessage>,
pub secp_msg_includes: Vec<Vec<u64>>,
}
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple, Default)]
pub struct TipsetBundle {
pub blocks: Vec<CachingBlockHeader>,
pub messages: Option<CompactedMessages>,
}
impl TryFrom<TipsetBundle> for Tipset {
type Error = anyhow::Error;
fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
Ok(Tipset::new(tsb.blocks)?)
}
}
impl TryFrom<TipsetBundle> for CompactedMessages {
type Error = anyhow::Error;
fn try_from(tsb: TipsetBundle) -> Result<Self, Self::Error> {
tsb.messages.context("Request contained no messages")
}
}
impl TryFrom<TipsetBundle> for FullTipset {
type Error = anyhow::Error;
fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
}
}
impl TryFrom<&TipsetBundle> for FullTipset {
type Error = anyhow::Error;
fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
}
}
fn fts_from_bundle_parts(
headers: Vec<CachingBlockHeader>,
messages: Option<&CompactedMessages>,
) -> anyhow::Result<FullTipset> {
let CompactedMessages {
bls_msgs,
bls_msg_includes,
secp_msg_includes,
secp_msgs,
} = messages.context("Tipset bundle did not contain message bundle")?;
if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
anyhow::bail!(
"Invalid formed Tipset bundle, lengths of includes does not match blocks. Header len: {}, bls_msg len: {}, secp_msg len: {}",
headers.len(),
bls_msg_includes.len(),
secp_msg_includes.len()
);
}
let zipped = headers
.into_iter()
.zip(bls_msg_includes.iter())
.zip(secp_msg_includes.iter());
fn values_from_indexes<T: Clone>(indexes: &[u64], values: &[T]) -> anyhow::Result<Vec<T>> {
indexes
.iter()
.map(|idx| {
values
.get(*idx as usize)
.cloned()
.context("Invalid message index")
})
.collect()
}
let blocks = zipped
.enumerate()
.map(|(i, ((header, bls_msg_include), secp_msg_include))| {
let message_count = bls_msg_include.len() + secp_msg_include.len();
if message_count > BLOCK_MESSAGE_LIMIT {
anyhow::bail!(
"Block {i} in bundle has too many messages ({message_count} > {BLOCK_MESSAGE_LIMIT})"
);
}
let bls_messages = values_from_indexes(bls_msg_include, bls_msgs)?;
let secp_messages = values_from_indexes(secp_msg_include, secp_msgs)?;
Ok(Block {
header,
bls_messages,
secp_messages,
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(FullTipset::new(blocks)?)
}
#[cfg(test)]
mod tests {
use quickcheck_macros::quickcheck;
use serde_json;
use super::*;
#[quickcheck]
fn chain_exchange_response_status_roundtrip(status: ChainExchangeResponseStatus) {
let serialized = serde_json::to_string(&status).unwrap();
let parsed = serde_json::from_str(&serialized).unwrap();
assert_eq!(status, parsed);
}
}