ibc-relayer 0.21.0

Implementation of an IBC Relayer in Rust, as a library
Documentation
use alloc::sync::Arc;
use core::fmt::{Display, Error as FmtError, Formatter};
use ibc_relayer_types::core::ics04_channel::channel::Order;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use tracing::error;

use crate::foreign_client::ForeignClient;
use crate::link::{Link, LinkParameters, Resubmit};
use crate::{
    chain::handle::{ChainHandle, ChainHandlePair},
    config::Config,
    object::Object,
};

pub mod retry_strategy;

mod error;
pub use error::RunError;

mod handle;
pub use handle::{WorkerData, WorkerHandle};

mod cmd;
pub use cmd::WorkerCmd;

mod map;
pub use map::WorkerMap;

pub mod channel;
pub mod client;
pub mod connection;
pub mod packet;
pub mod wallet;

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct WorkerId(u64);

impl WorkerId {
    pub fn new(id: u64) -> Self {
        Self(id)
    }

    pub fn next(self) -> Self {
        Self(self.0 + 1)
    }
}

impl Display for WorkerId {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
        write!(f, "{}", self.0)
    }
}

pub fn spawn_worker_tasks<ChainA: ChainHandle, ChainB: ChainHandle>(
    chains: ChainHandlePair<ChainA, ChainB>,
    id: WorkerId,
    object: Object,
    config: &Config,
) -> WorkerHandle {
    let mut task_handles = Vec::new();

    let (cmd_tx, data) = match &object {
        Object::Client(client) => {
            let client = ForeignClient::restore(client.dst_client_id.clone(), chains.b, chains.a);

            let (mut refresh, mut misbehaviour) = (false, false);

            let refresh_task = client::spawn_refresh_client(client.clone());
            if let Some(refresh_task) = refresh_task {
                task_handles.push(refresh_task);
                refresh = true;
            }

            let cmd_tx = if config.mode.clients.misbehaviour {
                let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
                let misbehavior_task = client::detect_misbehavior_task(cmd_rx, client);
                if let Some(task) = misbehavior_task {
                    task_handles.push(task);
                    misbehaviour = true;
                }

                Some(cmd_tx)
            } else {
                None
            };

            let data = WorkerData::Client {
                misbehaviour,
                refresh,
            };

            (cmd_tx, Some(data))
        }
        Object::Connection(connection) => {
            let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
            let connection_task =
                connection::spawn_connection_worker(connection.clone(), chains, cmd_rx);
            task_handles.push(connection_task);

            (Some(cmd_tx), None)
        }
        Object::Channel(channel) => {
            let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
            let channel_task = channel::spawn_channel_worker(channel.clone(), chains, cmd_rx);
            task_handles.push(channel_task);

            (Some(cmd_tx), None)
        }
        Object::Packet(path) => {
            let packets_config = config.mode.packets;
            let link_res = Link::new_from_opts(
                chains.a.clone(),
                chains.b,
                LinkParameters {
                    src_port_id: path.src_port_id.clone(),
                    src_channel_id: path.src_channel_id.clone(),
                },
                packets_config.tx_confirmation,
                packets_config.auto_register_counterparty_payee,
            );

            match link_res {
                Ok(link) => {
                    let channel_ordering = link.a_to_b.channel().ordering;
                    let should_clear_on_start =
                        packets_config.clear_on_start || channel_ordering == Order::Ordered;

                    let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
                    let link = Arc::new(Mutex::new(link));
                    let resubmit = Resubmit::from_clear_interval(packets_config.clear_interval);

                    let packet_task = packet::spawn_packet_cmd_worker(
                        cmd_rx,
                        link.clone(),
                        should_clear_on_start,
                        packets_config.clear_interval,
                        path.clone(),
                    );
                    task_handles.push(packet_task);

                    let link_task = packet::spawn_packet_worker(path.clone(), link, resubmit);
                    task_handles.push(link_task);

                    (Some(cmd_tx), None)
                }
                Err(e) => {
                    error!("error initializing link object for packet worker: {}", e);
                    (None, None)
                }
            }
        }

        Object::Wallet(wallet) => {
            assert_eq!(wallet.chain_id, chains.a.id());

            let wallet_task = wallet::spawn_wallet_worker(chains.a);
            task_handles.push(wallet_task);

            (None, None)
        }
    };

    WorkerHandle::new(id, object, data, cmd_tx, task_handles)
}