use std::{io, num::NonZeroUsize, sync::LazyLock};
use ahash::{HashMap, HashMapExt};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use nonzero_ext::nonzero;
use super::{
ChainExchangeRequest, ChainExchangeResponse, ChainExchangeResponseStatus, CompactedMessages,
TipsetBundle,
};
use crate::{
blocks::{Tipset, TipsetKey},
chain::{ChainStore, Error as ChainError},
utils::misc::env::env_or_default_logged,
};
static MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES: LazyLock<NonZeroUsize> = LazyLock::new(|| {
env_or_default_logged(
"FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES",
nonzero!(10 * 1024 * 1024_usize),
)
});
struct CountingSink(usize);
impl io::Write for CountingSink {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0 += buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn encoded_size<T: serde::Serialize>(value: &T) -> Result<usize, ChainError> {
let mut sink = CountingSink(0);
fvm_ipld_encoding::to_writer(&mut sink, value)?;
Ok(sink.0)
}
pub fn make_chain_exchange_response<DB>(
cs: &ChainStore<DB>,
request: &ChainExchangeRequest,
) -> ChainExchangeResponse
where
DB: Blockstore + Send + Sync + 'static,
{
make_chain_exchange_response_with_cap(
cs,
request,
MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES.get(),
)
}
fn make_chain_exchange_response_with_cap<DB>(
cs: &ChainStore<DB>,
request: &ChainExchangeRequest,
max_bytes: usize,
) -> ChainExchangeResponse
where
DB: Blockstore + Send + Sync + 'static,
{
if !request.is_options_valid() || !request.is_request_len_valid() {
return ChainExchangeResponse {
chain: Default::default(),
status: ChainExchangeResponseStatus::BadRequest,
message: format!("Invalid chain exchange request {request:?}"),
};
}
let inner = move || {
let root = match cs
.chain_index()
.load_tipset(&TipsetKey::from(request.start.clone()))?
{
Some(tipset) => tipset,
None => {
return Ok(ChainExchangeResponse {
status: ChainExchangeResponseStatus::BlockNotFound,
chain: Default::default(),
message: "Start tipset was not found in the database".into(),
});
}
};
let mut chain: Vec<TipsetBundle> = Vec::with_capacity(request.request_len as usize);
let mut accumulated: usize = 0;
for tipset in root.chain(cs.blockstore()).take(request.request_len as _) {
let mut tipset_bundle: TipsetBundle = TipsetBundle::default();
if request.include_messages() {
tipset_bundle.messages = Some(compact_messages(cs.blockstore(), &tipset)?);
}
if request.include_blocks() {
tipset_bundle.blocks = tipset.block_headers().iter().cloned().collect_vec();
}
let bundle_bytes = encoded_size(&tipset_bundle)?;
if !chain.is_empty() && accumulated + bundle_bytes > max_bytes {
break;
}
accumulated += bundle_bytes;
chain.push(tipset_bundle);
}
anyhow::Ok(ChainExchangeResponse {
status: if request.request_len > chain.len() as u64 {
ChainExchangeResponseStatus::PartialResponse
} else {
ChainExchangeResponseStatus::Success
},
chain,
message: "Success".into(),
})
};
match inner() {
Ok(r) => r,
Err(e) => ChainExchangeResponse {
chain: Default::default(),
status: ChainExchangeResponseStatus::InternalError,
message: e.to_string(),
},
}
}
fn compact_messages<DB>(db: &DB, tipset: &Tipset) -> Result<CompactedMessages, ChainError>
where
DB: Blockstore,
{
let mut bls_messages_order = HashMap::new();
let mut secp_messages_order = HashMap::new();
let mut bls_cids_combined: Vec<Cid> = vec![];
let mut secp_cids_combined: Vec<Cid> = vec![];
let mut bls_msg_includes: Vec<Vec<u64>> = vec![];
let mut secp_msg_includes: Vec<Vec<u64>> = vec![];
for block_header in tipset.block_headers().iter() {
let (bls_cids, secp_cids) = crate::chain::read_msg_cids(db, block_header)?;
let mut bls_include = Vec::with_capacity(bls_cids.len());
for bls_cid in bls_cids.into_iter() {
let order = match bls_messages_order.get(&bls_cid) {
Some(order) => *order,
None => {
let order = bls_cids_combined.len() as u64;
bls_cids_combined.push(bls_cid);
bls_messages_order.insert(bls_cid, order);
order
}
};
bls_include.push(order);
}
bls_msg_includes.push(bls_include);
let mut secp_include = Vec::with_capacity(secp_cids.len());
for secp_cid in secp_cids.into_iter() {
let order = match secp_messages_order.get(&secp_cid) {
Some(order) => *order,
None => {
let order = secp_cids_combined.len() as u64;
secp_cids_combined.push(secp_cid);
secp_messages_order.insert(secp_cid, order);
order
}
};
secp_include.push(order);
}
secp_msg_includes.push(secp_include);
}
let (bls_msgs, secp_msgs) =
crate::chain::block_messages_from_cids(db, &bls_cids_combined, &secp_cids_combined)?;
Ok(CompactedMessages {
bls_msgs,
bls_msg_includes,
secp_msgs,
secp_msg_includes,
})
}
#[cfg(test)]
mod tests {
use super::{
super::{HEADERS, MESSAGES},
*,
};
use crate::blocks::{CachingBlockHeader, RawBlockHeader};
use crate::db::MemoryDB;
use crate::genesis::EXPORT_SR_40;
use crate::networks::ChainConfig;
use crate::shim::address::Address;
use crate::utils::db::car_util::load_car;
use nunny::Vec as NonEmpty;
use std::{io::Cursor, sync::Arc};
async fn populate_chain_store() -> (NonEmpty<Cid>, ChainStore<MemoryDB>) {
let db = Arc::new(MemoryDB::default());
let header = load_car(&db, Cursor::new(EXPORT_SR_40)).await.unwrap();
let gen_block = CachingBlockHeader::new(RawBlockHeader {
miner_address: Address::new_id(0),
..Default::default()
});
let cs = ChainStore::new(
db.clone(),
db.clone(),
db,
Arc::new(ChainConfig::default()),
gen_block,
)
.unwrap();
(header.roots, cs)
}
#[tokio::test]
async fn compact_messages_test() {
let (cids, cs) = populate_chain_store().await;
let response = make_chain_exchange_response(
&cs,
&ChainExchangeRequest {
start: cids,
request_len: 2,
options: HEADERS | MESSAGES,
},
);
assert_eq!(response.chain.len(), 2);
let tipset_39_idx = 0;
let tipset_38_idx = 1;
let ts_39_msgs = response.chain[tipset_39_idx].messages.as_ref().unwrap();
let ts_38_msgs = response.chain[tipset_38_idx].messages.as_ref().unwrap();
assert_eq!(ts_39_msgs.secp_msgs.len(), 22);
assert_eq!(ts_39_msgs.bls_msgs.len(), 12);
assert_eq!(ts_39_msgs.secp_msg_includes.len(), 1);
assert_eq!(ts_39_msgs.bls_msg_includes.len(), 1);
assert_eq!(ts_39_msgs.secp_msg_includes[0].len(), 22);
assert_eq!(ts_39_msgs.bls_msg_includes[0].len(), 12);
assert_eq!(response.chain[tipset_38_idx].blocks.len(), 2);
assert_eq!(ts_38_msgs.secp_msgs.len(), 1);
assert_eq!(ts_38_msgs.bls_msgs.len(), 11);
assert_eq!(ts_38_msgs.secp_msg_includes.len(), 2);
assert_eq!(ts_38_msgs.bls_msg_includes.len(), 2);
assert_eq!(ts_38_msgs.secp_msg_includes[0].len(), 1);
assert_eq!(ts_38_msgs.bls_msg_includes[0].len(), 11);
assert_eq!(ts_38_msgs.secp_msg_includes[1].len(), 1);
assert_eq!(ts_38_msgs.bls_msg_includes[1].len(), 11);
}
#[tokio::test]
async fn response_byte_cap_truncates_to_partial() {
let (cids, cs) = populate_chain_store().await;
let response = make_chain_exchange_response_with_cap(
&cs,
&ChainExchangeRequest {
start: cids,
request_len: 5,
options: HEADERS | MESSAGES,
},
1,
);
assert_eq!(response.chain.len(), 1);
assert_eq!(
response.status,
ChainExchangeResponseStatus::PartialResponse
);
}
#[tokio::test]
async fn counting_sink_matches_to_vec() {
let (cids, cs) = populate_chain_store().await;
let response = make_chain_exchange_response(
&cs,
&ChainExchangeRequest {
start: cids,
request_len: 2,
options: HEADERS | MESSAGES,
},
);
let mut sink = CountingSink(0);
fvm_ipld_encoding::to_writer(&mut sink, &response).unwrap();
let to_vec_len = fvm_ipld_encoding::to_vec(&response).unwrap().len();
assert!(sink.0 > 0, "expected a non-empty encoded response");
assert_eq!(sink.0, to_vec_len);
}
}