ibc-relayer 0.32.2

Implementation of an IBC Relayer in Rust, as a library
use std::ops::RangeInclusive;
use std::thread;
use std::time::{Duration, Instant};

use ibc_relayer_types::core::ics04_channel::packet::Sequence;
use itertools::Itertools;
use tracing::{error_span, info};

use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::Height;

use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets};
use crate::chain::handle::ChainHandle;
use crate::chain::requests::{Paginate, Qualified};
use crate::chain::tracking::TrackingId;
use crate::error::Error;
use crate::event::IbcEventWithHeight;
use crate::link::error::LinkError;
use crate::link::operational_data::{OperationalData, TrackedEvents};
use crate::link::packet_events::{
    query_packet_events_with, query_send_packet_events, query_write_ack_events,
};
use crate::link::relay_path::RelayPath;
use crate::link::relay_sender::SyncSender;
use crate::link::Link;
use crate::path::PathIdentifiers;
use crate::util::collate::CollatedIterExt;
use crate::util::pretty::{PrettyDuration, PrettySlice};

impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
    /// Fetches an operational data that has fulfilled its predefined delay period. May _block_
    /// waiting for the delay period to pass.
    /// Returns `Ok(None)` if there is no operational data scheduled.
    pub(crate) fn fetch_scheduled_operational_data(
        &self,
    ) -> Result<Option<OperationalData>, LinkError> {
        if let Some(odata) = self.src_operational_data.pop_front() {
            Ok(Some(wait_for_conn_delay(
                odata,
                &|| self.src_time_latest(),
                &|| self.src_max_block_time(),
                &|| self.src_latest_height(),
            )?))
        } else if let Some(odata) = self.dst_operational_data.pop_front() {
            Ok(Some(wait_for_conn_delay(
                odata,
                &|| self.dst_time_latest(),
                &|| self.dst_max_block_time(),
                &|| self.dst_latest_height(),
            )?))
        } else {
            Ok(None)
        }
    }

    /// Given a vector of [`OperationalData`], this method proceeds to relaying
    /// all the messages therein. It accumulates all events generated in the
    /// mutable vector of [`IbcEvent`]s.
    pub fn relay_and_accumulate_results(
        &self,
        from: Vec<OperationalData>,
        results: &mut Vec<IbcEvent>,
    ) -> Result<(), LinkError> {
        for od in from {
            let mut last_res = self.relay_from_operational_data::<SyncSender>(od)?;
            results.append(&mut last_res.events);
        }

        Ok(())
    }
}

impl<ChainA: ChainHandle, ChainB: ChainHandle> Link<ChainA, ChainB> {
    pub fn relay_recv_packet_and_timeout_messages(
        &self,
        sequences: Vec<RangeInclusive<Sequence>>,
    ) -> Result<Vec<IbcEvent>, LinkError> {
        self.relay_recv_packet_and_timeout_messages_with_packet_data_query_height(sequences, None)
    }
    /// Implements the `packet-recv` CLI
    pub fn relay_recv_packet_and_timeout_messages_with_packet_data_query_height(
        &self,
        sequence_filter: Vec<RangeInclusive<Sequence>>,
        packet_data_query_height: Option<Height>,
    ) -> Result<Vec<IbcEvent>, LinkError> {
        let _span = error_span!(
            "relay_recv_packet_and_timeout_messages",
            src_chain = %self.a_to_b.src_chain().id(),
            src_port = %self.a_to_b.src_port_id(),
            src_channel = %self.a_to_b.src_channel_id(),
            dst_chain = %self.a_to_b.dst_chain().id(),
        )
        .entered();

        // Find the sequence numbers of unreceived packets
        let (mut sequences, src_response_height) = unreceived_packets(
            self.a_to_b.dst_chain(),
            self.a_to_b.src_chain(),
            &self.a_to_b.path_id,
            Paginate::All,
        )
        .map_err(LinkError::supervisor)?;

        if sequences.is_empty() {
            return Ok(vec![]);
        }

        if !sequence_filter.is_empty() {
            info!("filtering unreceived packets by given sequence ranges");
            sequences.retain(|seq| sequence_filter.iter().any(|range| range.contains(seq)));
        }

        // Retain only sequences which should not be filtered out
        let raw_sequences: Vec<Sequence> = sequences
            .into_iter()
            .filter(|sequence| !self.a_to_b.exclude_src_sequences.contains(sequence))
            .collect();

        info!(
            "{} unreceived packets found: {} ",
            raw_sequences.len(),
            PrettySlice(&raw_sequences)
        );

        let query_height = match packet_data_query_height {
            Some(height) => Qualified::Equal(height),
            None => Qualified::SmallerEqual(src_response_height),
        };

        let chunk_size = self
            .a_to_b
            .src_chain()
            .config()
            .map_or(50, |cfg| cfg.query_packets_chunk_size());

        self.relay_packet_messages(
            raw_sequences,
            query_height,
            chunk_size,
            query_send_packet_events,
            TrackingId::new_static("packet-recv"),
        )
    }

    pub fn relay_ack_packet_messages(
        &self,
        sequences: Vec<RangeInclusive<Sequence>>,
    ) -> Result<Vec<IbcEvent>, LinkError> {
        self.relay_ack_packet_messages_with_packet_data_query_height(sequences, None)
    }

    /// Implements the `packet-ack` CLI
    pub fn relay_ack_packet_messages_with_packet_data_query_height(
        &self,
        sequence_filter: Vec<RangeInclusive<Sequence>>,
        packet_data_query_height: Option<Height>,
    ) -> Result<Vec<IbcEvent>, LinkError> {
        let _span = error_span!(
            "relay_ack_packet_messages",
            src_chain = %self.a_to_b.src_chain().id(),
            src_port = %self.a_to_b.src_port_id(),
            src_channel = %self.a_to_b.src_channel_id(),
            dst_chain = %self.a_to_b.dst_chain().id(),
        )
        .entered();

        // Find the sequence numbers of unreceived acknowledgements
        let Some((mut sequences, src_response_height)) = unreceived_acknowledgements(
            self.a_to_b.dst_chain(),
            self.a_to_b.src_chain(),
            &self.a_to_b.path_id,
            Paginate::All,
        )
        .map_err(LinkError::supervisor)?
        else {
            return Ok(vec![]);
        };

        if sequences.is_empty() {
            return Ok(vec![]);
        }

        if !sequence_filter.is_empty() {
            info!("filtering unreceived acknowledgements by given sequence ranges");
            sequences.retain(|seq| sequence_filter.iter().any(|range| range.contains(seq)));
        }

        // Retain only sequences which should not be filtered out
        let raw_sequences: Vec<Sequence> = sequences
            .into_iter()
            .filter(|sequence| !self.a_to_b.exclude_src_sequences.contains(sequence))
            .collect();

        info!(
            "{} unreceived acknowledgements found: {} ",
            raw_sequences.len(),
            raw_sequences.iter().copied().collated().format(", "),
        );

        let query_height = match packet_data_query_height {
            Some(height) => Qualified::Equal(height),
            None => Qualified::SmallerEqual(src_response_height),
        };

        let chunk_size = self
            .a_to_b
            .src_chain()
            .config()
            .map_or(50, |cfg| cfg.query_packets_chunk_size());

        self.relay_packet_messages(
            raw_sequences,
            query_height,
            chunk_size,
            query_write_ack_events,
            TrackingId::new_static("packet-ack"),
        )
    }

    fn relay_packet_messages<QueryFn>(
        &self,
        sequences: Vec<Sequence>,
        query_height: Qualified<Height>,
        chunk_size: usize,
        query_fn: QueryFn,
        tracking_id: TrackingId,
    ) -> Result<Vec<IbcEvent>, LinkError>
    where
        QueryFn: Fn(
            &ChainA,
            &PathIdentifiers,
            &[Sequence],
            Qualified<Height>,
        ) -> Result<Vec<IbcEventWithHeight>, Error>,
    {
        let event_chunks = query_packet_events_with(
            &sequences,
            query_height,
            self.a_to_b.src_chain(),
            &self.a_to_b.path_id,
            chunk_size,
            query_fn,
        );

        let mut results = vec![];

        for event_chunk in event_chunks {
            let tracked_events = TrackedEvents::new(event_chunk, tracking_id);
            self.a_to_b.events_to_operational_data(tracked_events)?;

            // In case of zero connection delay, the op. data will already be ready
            let (src_ods, dst_ods) = self.a_to_b.try_fetch_scheduled_operational_data()?;
            self.a_to_b
                .relay_and_accumulate_results(Vec::from(src_ods), &mut results)?;
            self.a_to_b
                .relay_and_accumulate_results(Vec::from(dst_ods), &mut results)?;
        }

        // In case of non-zero connection delay, we block here waiting for all op.data
        // until the connection delay elapses
        while let Some(odata) = self.a_to_b.fetch_scheduled_operational_data()? {
            self.a_to_b
                .relay_and_accumulate_results(vec![odata], &mut results)?;
        }

        Ok(results)
    }
}

fn wait_for_conn_delay<ChainTime, MaxBlockTime, LatestHeight>(
    odata: OperationalData,
    chain_time: &ChainTime,
    max_expected_time_per_block: &MaxBlockTime,
    latest_height: &LatestHeight,
) -> Result<OperationalData, LinkError>
where
    ChainTime: Fn() -> Result<Instant, LinkError>,
    MaxBlockTime: Fn() -> Result<Duration, LinkError>,
    LatestHeight: Fn() -> Result<Height, LinkError>,
{
    let (time_left, blocks_left) =
        odata.conn_delay_remaining(chain_time, max_expected_time_per_block, latest_height)?;

    match (time_left, blocks_left) {
        (Duration::ZERO, 0) => {
            info!(
                "ready to fetch a scheduled op. data with batch of size {} targeting {}",
                odata.batch.len(),
                odata.target,
            );
            Ok(odata)
        }
        (Duration::ZERO, blocks_left) => {
            info!(
                    "waiting ({} blocks left) for a scheduled op. data with batch of size {} targeting {}",
                    blocks_left,
                    odata.batch.len(),
                    odata.target,
                );

            let blocks_left: u32 = blocks_left.try_into().expect("blocks_left > u32::MAX");

            // Wait until the delay period passes
            thread::sleep(blocks_left * max_expected_time_per_block()?);

            Ok(odata)
        }
        (time_left, _) => {
            info!(
                "waiting ({} left) for a scheduled op. data with batch of size {} targeting {}",
                PrettyDuration(&time_left),
                odata.batch.len(),
                odata.target,
            );

            // Wait until the delay period passes
            thread::sleep(time_left);

            // `blocks_left` maybe non-zero, so recurse to recheck that all delays are handled.
            wait_for_conn_delay(
                odata,
                chain_time,
                max_expected_time_per_block,
                latest_height,
            )
        }
    }
}