use ibc_relayer_types::core::ics02_client::height::Height;
use ibc_relayer_types::core::ics04_channel::packet::{Packet, Sequence};
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::Height as ICSHeight;
use tendermint::abci::Event;
use tendermint::Hash as TxHash;
use tendermint_rpc::endpoint::tx::Response as TxResponse;
use tendermint_rpc::{Client, HttpClient, Order, Url};
use tracing::warn;
use crate::chain::cosmos::query::{header_query, packet_query, tx_hash_query};
use crate::chain::cosmos::types::events;
use crate::chain::requests::{
QueryClientEventRequest, QueryHeight, QueryPacketEventDataRequest, QueryTxHash, QueryTxRequest,
};
use crate::error::Error;
use crate::event::{ibc_event_try_from_abci_event, IbcEventWithHeight};
pub async fn query_txs(
chain_id: &ChainId,
rpc_client: &HttpClient,
rpc_address: &Url,
request: QueryTxRequest,
) -> Result<Vec<IbcEventWithHeight>, Error> {
crate::time!("query_txs",
{
"src_chain": chain_id,
});
crate::telemetry!(query, chain_id, "query_txs");
match request {
QueryTxRequest::Client(request) => {
crate::time!(
"query_txs: single client update event",
{
"src_chain": chain_id,
}
);
let mut response = rpc_client
.tx_search(
header_query(&request),
false,
1,
1, Order::Ascending,
)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;
if response.txs.is_empty() {
return Ok(vec![]);
}
assert!(
response.txs.len() <= 1,
"packet_from_tx_search_response: unexpected number of txs"
);
let tx = response.txs.remove(0);
let event = update_client_from_tx_search_response(chain_id, &request, tx)?;
Ok(event.into_iter().collect())
}
QueryTxRequest::Transaction(tx) => {
crate::time!(
"query_txs: transaction hash",
{
"src_chain": chain_id,
}
);
let mut response = rpc_client
.tx_search(
tx_hash_query(&tx),
false,
1,
1, Order::Ascending,
)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;
if response.txs.is_empty() {
Ok(vec![])
} else {
let tx = response.txs.remove(0);
Ok(all_ibc_events_from_tx_search_response(chain_id, tx))
}
}
}
}
pub async fn query_packets_from_txs(
chain_id: &ChainId,
rpc_client: &HttpClient,
rpc_address: &Url,
request: &QueryPacketEventDataRequest,
) -> Result<Vec<IbcEventWithHeight>, Error> {
crate::time!(
"query_packets_from_txs",
{
"src_chain": chain_id,
}
);
crate::telemetry!(query, chain_id, "query_packets_from_txs");
let mut result: Vec<IbcEventWithHeight> = vec![];
for seq in &request.sequences {
let response = rpc_client
.tx_search(packet_query(request, *seq), false, 1, 10, Order::Descending)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;
if response.txs.is_empty() {
continue;
}
let mut tx_events = vec![];
for tx in response.txs {
if let Some(event) = packet_from_tx_search_response(chain_id, request, *seq, &tx)? {
tx_events.push((event, tx.hash, tx.height));
}
}
if tx_events.is_empty() {
continue;
}
if tx_events.len() > 1 {
warn!("more than one packet event found for sequence {seq}, this should not happen",);
for (event, hash, height) in &tx_events {
warn!("seq: {seq}, tx hash: {hash}, tx height: {height}, event: {event}",);
}
}
let (first_event, _, _) = tx_events.remove(0);
result.push(first_event);
}
Ok(result)
}
pub async fn query_packets_from_block(
chain_id: &ChainId,
rpc_client: &HttpClient,
rpc_address: &Url,
request: &QueryPacketEventDataRequest,
) -> Result<Vec<IbcEventWithHeight>, Error> {
crate::time!(
"query_packets_from_block",
{
"src_chain": chain_id,
}
);
crate::telemetry!(query, chain_id, "query_packets_from_block");
let tm_height = match request.height.get() {
QueryHeight::Latest => tendermint::block::Height::default(),
QueryHeight::Specific(h) => {
tendermint::block::Height::try_from(h.revision_height()).unwrap()
}
};
let height = Height::new(chain_id.version(), u64::from(tm_height))
.map_err(|_| Error::invalid_height_no_source())?;
let block_results = rpc_client
.block_results(tm_height)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;
let mut events: Vec<_> = block_results
.begin_block_events
.unwrap_or_default()
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height))
.collect();
if let Some(txs) = block_results.txs_results {
for tx in txs {
events.extend(
tx.events
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height)),
)
}
}
events.extend(
block_results
.end_block_events
.unwrap_or_default()
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height)),
);
events.extend(
block_results
.finalize_block_events
.iter()
.filter_map(|ev| filter_matching_event(ev, request, &request.sequences))
.map(|ev| IbcEventWithHeight::new(ev, height)),
);
Ok(events)
}
fn update_client_from_tx_search_response(
chain_id: &ChainId,
request: &QueryClientEventRequest,
response: TxResponse,
) -> Result<Option<IbcEventWithHeight>, Error> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height))
.map_err(|_| Error::invalid_height_no_source())?;
if let QueryHeight::Specific(specific_query_height) = request.query_height {
if height > specific_query_height {
return Ok(None);
}
};
Ok(response
.tx_result
.events
.into_iter()
.filter(|event| event.kind == request.event_id.as_str())
.flat_map(|event| ibc_event_try_from_abci_event(&event).ok())
.flat_map(|event| match event {
IbcEvent::UpdateClient(update) => Some(update),
_ => None,
})
.find(|update| {
update.common.client_id == request.client_id
&& update.common.consensus_height == request.consensus_height
})
.map(|update| IbcEventWithHeight::new(IbcEvent::UpdateClient(update), height)))
}
fn packet_from_tx_search_response(
chain_id: &ChainId,
request: &QueryPacketEventDataRequest,
seq: Sequence,
response: &TxResponse,
) -> Result<Option<IbcEventWithHeight>, Error> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height))
.map_err(|_| Error::invalid_height_no_source())?;
if let QueryHeight::Specific(query_height) = request.height.get() {
if height > query_height {
return Ok(None);
}
}
Ok(response
.tx_result
.events
.iter()
.find_map(|ev| filter_matching_event(ev, request, &[seq]))
.map(|ibc_event| IbcEventWithHeight::new(ibc_event, height)))
}
pub fn filter_matching_event(
event: &Event,
request: &QueryPacketEventDataRequest,
seqs: &[Sequence],
) -> Option<IbcEvent> {
fn matches_packet(
request: &QueryPacketEventDataRequest,
seqs: Vec<Sequence>,
packet: &Packet,
) -> bool {
packet.source_port == request.source_port_id
&& packet.source_channel == request.source_channel_id
&& packet.destination_port == request.destination_port_id
&& packet.destination_channel == request.destination_channel_id
&& seqs.contains(&packet.sequence)
}
if event.kind != request.event_id.as_str() {
return None;
}
let ibc_event = ibc_event_try_from_abci_event(event).ok()?;
match ibc_event {
IbcEvent::SendPacket(ref send_ev)
if matches_packet(request, seqs.to_vec(), &send_ev.packet) =>
{
Some(ibc_event)
}
IbcEvent::WriteAcknowledgement(ref ack_ev)
if matches_packet(request, seqs.to_vec(), &ack_ev.packet) =>
{
Some(ibc_event)
}
_ => None,
}
}
pub async fn query_tx_response(
rpc_client: &HttpClient,
rpc_address: &Url,
tx_hash: &TxHash,
) -> Result<Option<TxResponse>, Error> {
let response = rpc_client
.tx_search(
tx_hash_query(&QueryTxHash(*tx_hash)),
false,
1,
1, Order::Ascending,
)
.await
.map_err(|e| Error::rpc(rpc_address.clone(), e))?;
Ok(response.txs.into_iter().next())
}
pub fn all_ibc_events_from_tx_search_response(
chain_id: &ChainId,
response: TxResponse,
) -> Vec<IbcEventWithHeight> {
let height = ICSHeight::new(chain_id.version(), u64::from(response.height)).unwrap();
let deliver_tx_result = response.tx_result;
if deliver_tx_result.code.is_err() {
vec![IbcEventWithHeight::new(
IbcEvent::ChainError(format!(
"deliver_tx for {} reports error: code={:?}, log={:?}",
response.hash, deliver_tx_result.code, deliver_tx_result.log
)),
height,
)]
} else {
let result = deliver_tx_result
.events
.iter()
.flat_map(|event| events::from_tx_response_event(height, event).into_iter())
.collect::<Vec<_>>();
result
}
}