use std::ops::RangeInclusive;
use std::thread;
use std::time::{Duration, Instant};
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
use itertools::Itertools;
use tracing::{error_span, info};
use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::Height;
use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets};
use crate::chain::handle::ChainHandle;
use crate::chain::requests::{Paginate, Qualified};
use crate::chain::tracking::TrackingId;
use crate::error::Error;
use crate::event::IbcEventWithHeight;
use crate::link::error::LinkError;
use crate::link::operational_data::{OperationalData, TrackedEvents};
use crate::link::packet_events::{
query_packet_events_with, query_send_packet_events, query_write_ack_events,
};
use crate::link::relay_path::RelayPath;
use crate::link::relay_sender::SyncSender;
use crate::link::Link;
use crate::path::PathIdentifiers;
use crate::util::collate::CollatedIterExt;
use crate::util::pretty::{PrettyDuration, PrettySlice};
impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
pub(crate) fn fetch_scheduled_operational_data(
&self,
) -> Result<Option<OperationalData>, LinkError> {
if let Some(odata) = self.src_operational_data.pop_front() {
Ok(Some(wait_for_conn_delay(
odata,
&|| self.src_time_latest(),
&|| self.src_max_block_time(),
&|| self.src_latest_height(),
)?))
} else if let Some(odata) = self.dst_operational_data.pop_front() {
Ok(Some(wait_for_conn_delay(
odata,
&|| self.dst_time_latest(),
&|| self.dst_max_block_time(),
&|| self.dst_latest_height(),
)?))
} else {
Ok(None)
}
}
pub fn relay_and_accumulate_results(
&self,
from: Vec<OperationalData>,
results: &mut Vec<IbcEvent>,
) -> Result<(), LinkError> {
for od in from {
let mut last_res = self.relay_from_operational_data::<SyncSender>(od)?;
results.append(&mut last_res.events);
}
Ok(())
}
}
impl<ChainA: ChainHandle, ChainB: ChainHandle> Link<ChainA, ChainB> {
pub fn relay_recv_packet_and_timeout_messages(
&self,
sequences: Vec<RangeInclusive<Sequence>>,
) -> Result<Vec<IbcEvent>, LinkError> {
self.relay_recv_packet_and_timeout_messages_with_packet_data_query_height(sequences, None)
}
pub fn relay_recv_packet_and_timeout_messages_with_packet_data_query_height(
&self,
sequence_filter: Vec<RangeInclusive<Sequence>>,
packet_data_query_height: Option<Height>,
) -> Result<Vec<IbcEvent>, LinkError> {
let _span = error_span!(
"relay_recv_packet_and_timeout_messages",
src_chain = %self.a_to_b.src_chain().id(),
src_port = %self.a_to_b.src_port_id(),
src_channel = %self.a_to_b.src_channel_id(),
dst_chain = %self.a_to_b.dst_chain().id(),
)
.entered();
let (mut sequences, src_response_height) = unreceived_packets(
self.a_to_b.dst_chain(),
self.a_to_b.src_chain(),
&self.a_to_b.path_id,
Paginate::All,
)
.map_err(LinkError::supervisor)?;
if sequences.is_empty() {
return Ok(vec![]);
}
if !sequence_filter.is_empty() {
info!("filtering unreceived packets by given sequence ranges");
sequences.retain(|seq| sequence_filter.iter().any(|range| range.contains(seq)));
}
let raw_sequences: Vec<Sequence> = sequences
.into_iter()
.filter(|sequence| !self.a_to_b.exclude_src_sequences.contains(sequence))
.collect();
info!(
"{} unreceived packets found: {} ",
raw_sequences.len(),
PrettySlice(&raw_sequences)
);
let query_height = match packet_data_query_height {
Some(height) => Qualified::Equal(height),
None => Qualified::SmallerEqual(src_response_height),
};
let chunk_size = self
.a_to_b
.src_chain()
.config()
.map_or(50, |cfg| cfg.query_packets_chunk_size());
self.relay_packet_messages(
raw_sequences,
query_height,
chunk_size,
query_send_packet_events,
TrackingId::new_static("packet-recv"),
)
}
pub fn relay_ack_packet_messages(
&self,
sequences: Vec<RangeInclusive<Sequence>>,
) -> Result<Vec<IbcEvent>, LinkError> {
self.relay_ack_packet_messages_with_packet_data_query_height(sequences, None)
}
pub fn relay_ack_packet_messages_with_packet_data_query_height(
&self,
sequence_filter: Vec<RangeInclusive<Sequence>>,
packet_data_query_height: Option<Height>,
) -> Result<Vec<IbcEvent>, LinkError> {
let _span = error_span!(
"relay_ack_packet_messages",
src_chain = %self.a_to_b.src_chain().id(),
src_port = %self.a_to_b.src_port_id(),
src_channel = %self.a_to_b.src_channel_id(),
dst_chain = %self.a_to_b.dst_chain().id(),
)
.entered();
let Some((mut sequences, src_response_height)) = unreceived_acknowledgements(
self.a_to_b.dst_chain(),
self.a_to_b.src_chain(),
&self.a_to_b.path_id,
Paginate::All,
)
.map_err(LinkError::supervisor)?
else {
return Ok(vec![]);
};
if sequences.is_empty() {
return Ok(vec![]);
}
if !sequence_filter.is_empty() {
info!("filtering unreceived acknowledgements by given sequence ranges");
sequences.retain(|seq| sequence_filter.iter().any(|range| range.contains(seq)));
}
let raw_sequences: Vec<Sequence> = sequences
.into_iter()
.filter(|sequence| !self.a_to_b.exclude_src_sequences.contains(sequence))
.collect();
info!(
"{} unreceived acknowledgements found: {} ",
raw_sequences.len(),
raw_sequences.iter().copied().collated().format(", "),
);
let query_height = match packet_data_query_height {
Some(height) => Qualified::Equal(height),
None => Qualified::SmallerEqual(src_response_height),
};
let chunk_size = self
.a_to_b
.src_chain()
.config()
.map_or(50, |cfg| cfg.query_packets_chunk_size());
self.relay_packet_messages(
raw_sequences,
query_height,
chunk_size,
query_write_ack_events,
TrackingId::new_static("packet-ack"),
)
}
fn relay_packet_messages<QueryFn>(
&self,
sequences: Vec<Sequence>,
query_height: Qualified<Height>,
chunk_size: usize,
query_fn: QueryFn,
tracking_id: TrackingId,
) -> Result<Vec<IbcEvent>, LinkError>
where
QueryFn: Fn(
&ChainA,
&PathIdentifiers,
&[Sequence],
Qualified<Height>,
) -> Result<Vec<IbcEventWithHeight>, Error>,
{
let event_chunks = query_packet_events_with(
&sequences,
query_height,
self.a_to_b.src_chain(),
&self.a_to_b.path_id,
chunk_size,
query_fn,
);
let mut results = vec![];
for event_chunk in event_chunks {
let tracked_events = TrackedEvents::new(event_chunk, tracking_id);
self.a_to_b.events_to_operational_data(tracked_events)?;
let (src_ods, dst_ods) = self.a_to_b.try_fetch_scheduled_operational_data()?;
self.a_to_b
.relay_and_accumulate_results(Vec::from(src_ods), &mut results)?;
self.a_to_b
.relay_and_accumulate_results(Vec::from(dst_ods), &mut results)?;
}
while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? {
self.a_to_b
.relay_and_accumulate_results(vec![odata], &mut results)?;
}
Ok(results)
}
}
fn wait_for_conn_delay<ChainTime, MaxBlockTime, LatestHeight>(
odata: OperationalData,
chain_time: &ChainTime,
max_expected_time_per_block: &MaxBlockTime,
latest_height: &LatestHeight,
) -> Result<OperationalData, LinkError>
where
ChainTime: Fn() -> Result<Instant, LinkError>,
MaxBlockTime: Fn() -> Result<Duration, LinkError>,
LatestHeight: Fn() -> Result<Height, LinkError>,
{
let (time_left, blocks_left) =
odata.conn_delay_remaining(chain_time, max_expected_time_per_block, latest_height)?;
match (time_left, blocks_left) {
(Duration::ZERO, 0) => {
info!(
"ready to fetch a scheduled op. data with batch of size {} targeting {}",
odata.batch.len(),
odata.target,
);
Ok(odata)
}
(Duration::ZERO, blocks_left) => {
info!(
"waiting ({} blocks left) for a scheduled op. data with batch of size {} targeting {}",
blocks_left,
odata.batch.len(),
odata.target,
);
let blocks_left: u32 = blocks_left.try_into().expect("blocks_left > u32::MAX");
thread::sleep(blocks_left * max_expected_time_per_block()?);
Ok(odata)
}
(time_left, _) => {
info!(
"waiting ({} left) for a scheduled op. data with batch of size {} targeting {}",
PrettyDuration(&time_left),
odata.batch.len(),
odata.target,
);
thread::sleep(time_left);
wait_for_conn_delay(
odata,
chain_time,
max_expected_time_per_block,
latest_height,
)
}
}
}