use itertools::Itertools;
use tracing::{info, span, warn, Level};
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
use ibc_relayer_types::events::WithBlockDataType;
use ibc_relayer_types::Height;
use crate::chain::handle::ChainHandle;
use crate::chain::requests::{Qualified, QueryHeight, QueryPacketEventDataRequest};
use crate::error::Error;
use crate::event::IbcEventWithHeight;
use crate::path::PathIdentifiers;
use crate::util::collate::CollatedIterExt;
pub fn query_packet_events_with<'a, ChainA, QueryFn>(
sequences: &'a [Sequence],
query_height: Qualified<Height>,
src_chain: &'a ChainA,
path: &'a PathIdentifiers,
chunk_size: usize,
query_fn: QueryFn,
) -> impl Iterator<Item = Vec<IbcEventWithHeight>> + 'a
where
ChainA: ChainHandle,
QueryFn: Fn(
&ChainA,
&PathIdentifiers,
&[Sequence],
Qualified<Height>,
) -> Result<Vec<IbcEventWithHeight>, Error>
+ 'a,
{
let events_total = sequences.len();
let mut events_left = events_total;
sequences.chunks(chunk_size).map_while(move |chunk| {
match query_fn(src_chain, path, chunk, query_height) {
Ok(events) => {
events_left -= chunk.len();
if events.is_empty() && !chunk.is_empty() {
warn!("no packet data was pulled at height {query_height} for sequences {}, this might be due to the data not being available on the configured endpoint. \
Please verify that the RPC endpoint has the required packet data, for more details see https://hermes.informal.systems/advanced/troubleshooting/cross-comp-config.html#uncleared-pending-packets",
chunk.iter().copied().collated().format(", "));
} else {
info!(
events.total = %events_total,
events.left = %events_left,
"pulled packet data for {} out of {} events: {}",
events.len(),
chunk.len(),
chunk.iter().copied().collated().format(", "),
);
}
let events = events
.into_iter()
.map(|ev| ev.with_height(query_height.get()))
.collect();
Some(events)
}
Err(e) => {
warn!("encountered query failure while pulling packet data: {}", e);
None
}
}
})
}
fn query_packet_events<ChainA: ChainHandle>(
src_chain: &ChainA,
query: QueryPacketEventDataRequest,
) -> Result<Vec<IbcEventWithHeight>, Error> {
src_chain.query_packet_events(query)
}
pub fn query_send_packet_events<ChainA: ChainHandle>(
src_chain: &ChainA,
path: &PathIdentifiers,
sequences: &[Sequence],
src_query_height: Qualified<Height>,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let _span = span!(
Level::DEBUG,
"query_send_packet_events",
chain = %src_chain.id(),
height = %src_query_height,
?sequences,
)
.entered();
let query = QueryPacketEventDataRequest {
event_id: WithBlockDataType::SendPacket,
source_port_id: path.counterparty_port_id.clone(),
source_channel_id: path.counterparty_channel_id.clone(),
destination_port_id: path.port_id.clone(),
destination_channel_id: path.channel_id.clone(),
sequences: sequences.to_vec(),
height: src_query_height.map(QueryHeight::Specific),
};
query_packet_events(src_chain, query)
}
pub fn query_write_ack_events<ChainA: ChainHandle>(
src_chain: &ChainA,
path: &PathIdentifiers,
sequences: &[Sequence],
src_query_height: Qualified<Height>,
) -> Result<Vec<IbcEventWithHeight>, Error> {
let _span = span!(
Level::DEBUG,
"query_write_ack_packet_events",
chain = %src_chain.id(),
height = %src_query_height,
?sequences,
)
.entered();
let query = QueryPacketEventDataRequest {
event_id: WithBlockDataType::WriteAck,
source_port_id: path.port_id.clone(),
source_channel_id: path.channel_id.clone(),
destination_port_id: path.counterparty_port_id.clone(),
destination_channel_id: path.counterparty_channel_id.clone(),
sequences: sequences.to_vec(),
height: src_query_height.map(QueryHeight::Specific),
};
query_packet_events(src_chain, query)
}