use core::fmt::{Display, Error as FmtError, Formatter};
use core::time::Duration;
use std::thread;
use ibc_proto::google::protobuf::Any;
use serde::Serialize;
use tracing::{debug, error, info, warn};
use ibc_relayer_types::core::ics02_client::height::Height;
use ibc_relayer_types::core::ics03_connection::connection::{
ConnectionEnd, Counterparty, IdentifiedConnectionEnd, State,
};
use ibc_relayer_types::core::ics03_connection::msgs::conn_open_ack::MsgConnectionOpenAck;
use ibc_relayer_types::core::ics03_connection::msgs::conn_open_confirm::MsgConnectionOpenConfirm;
use ibc_relayer_types::core::ics03_connection::msgs::conn_open_init::MsgConnectionOpenInit;
use ibc_relayer_types::core::ics03_connection::msgs::conn_open_try::MsgConnectionOpenTry;
use ibc_relayer_types::core::ics24_host::identifier::{ClientId, ConnectionId};
use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::timestamp::ZERO_DURATION;
use ibc_relayer_types::tx_msg::Msg;
use crate::chain::counterparty::connection_state_on_destination;
use crate::chain::handle::ChainHandle;
use crate::chain::requests::{
IncludeProof, PageRequest, QueryConnectionRequest, QueryConnectionsRequest, QueryHeight,
};
use crate::chain::tracking::TrackedMsgs;
use crate::foreign_client::{ForeignClient, HasExpiredOrFrozenError};
use crate::object::Connection as WorkerConnectionObject;
use crate::util::pretty::{PrettyDuration, PrettyOption};
use crate::util::retry::{retry_with_index, RetryResult};
use crate::util::task::Next;
mod error;
pub use error::ConnectionError;
pub const MAX_PACKET_DELAY: Duration = Duration::from_secs(120);
mod handshake_retry {
use crate::connection::ConnectionError;
use crate::util::retry::{clamp_total, ConstantGrowth};
use core::time::Duration;
const PER_BLOCK_RETRIES: u32 = 10;
const DELAY_INCREMENT: u64 = 0;
const BLOCK_NUMBER_DELAY: u32 = 10;
pub fn default_strategy(max_block_times: Duration) -> impl Iterator<Item = Duration> {
let retry_delay = max_block_times / PER_BLOCK_RETRIES;
clamp_total(
ConstantGrowth::new(retry_delay, Duration::from_secs(DELAY_INCREMENT)),
retry_delay,
max_block_times * BLOCK_NUMBER_DELAY,
)
}
pub fn from_retry_error(
e: retry::Error<ConnectionError>,
description: String,
) -> ConnectionError {
ConnectionError::max_retry(description, e.tries, e.total_delay, e.error)
}
}
#[derive(Clone, Debug, Serialize)]
#[serde(bound(serialize = "(): Serialize"))]
pub struct ConnectionSide<Chain: ChainHandle> {
#[serde(skip)]
pub(crate) chain: Chain,
client_id: ClientId,
connection_id: Option<ConnectionId>,
}
impl<Chain: ChainHandle> Display for ConnectionSide<Chain> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
match &self.connection_id {
Some(connection_id) => write!(
f,
"ConnectionSide {{ chain: {}, client_id: {}, connection_id: {} }}",
self.chain, self.client_id, connection_id
),
None => write!(
f,
"ConnectionSide {{ chain: {}, client_id: {}, connection_id: None }}",
self.chain, self.client_id
),
}
}
}
impl<Chain: ChainHandle> ConnectionSide<Chain> {
pub fn new(chain: Chain, client_id: ClientId, connection_id: Option<ConnectionId>) -> Self {
Self {
chain,
client_id,
connection_id,
}
}
pub fn connection_id(&self) -> Option<&ConnectionId> {
self.connection_id.as_ref()
}
pub fn map_chain<ChainB: ChainHandle>(
self,
mapper: impl FnOnce(Chain) -> ChainB,
) -> ConnectionSide<ChainB> {
ConnectionSide {
chain: mapper(self.chain),
client_id: self.client_id,
connection_id: self.connection_id,
}
}
}
#[derive(Clone, Debug, Serialize)]
#[serde(bound(serialize = "(): Serialize"))]
pub struct Connection<ChainA: ChainHandle, ChainB: ChainHandle> {
pub delay_period: Duration,
pub a_side: ConnectionSide<ChainA>,
pub b_side: ConnectionSide<ChainB>,
}
impl<ChainA: ChainHandle, ChainB: ChainHandle> Display for Connection<ChainA, ChainB> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
write!(
f,
"Connection {{ delay_period: {}, a_side: {}, b_side: {} }}",
PrettyDuration(&self.delay_period),
self.a_side,
self.b_side
)
}
}
impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
pub fn new(
b_to_a_client: ForeignClient<ChainA, ChainB>,
a_to_b_client: ForeignClient<ChainB, ChainA>,
delay_period: Duration,
) -> Result<Self, ConnectionError> {
Self::validate_clients(&b_to_a_client, &a_to_b_client)?;
if delay_period > MAX_PACKET_DELAY {
return Err(ConnectionError::max_delay_period(
delay_period,
MAX_PACKET_DELAY,
));
}
let mut c = Self {
delay_period,
a_side: ConnectionSide::new(
b_to_a_client.dst_chain(),
b_to_a_client.id().clone(),
Default::default(),
),
b_side: ConnectionSide::new(
a_to_b_client.dst_chain(),
a_to_b_client.id().clone(),
Default::default(),
),
};
c.handshake()?;
Ok(c)
}
pub fn restore_from_event(
chain: ChainA,
counterparty_chain: ChainB,
connection_open_event: &IbcEvent,
) -> Result<Connection<ChainA, ChainB>, ConnectionError> {
let connection_event_attributes = connection_open_event
.connection_attributes()
.ok_or_else(|| ConnectionError::invalid_event(connection_open_event.clone()))?;
let connection_id = connection_event_attributes.connection_id.clone();
let counterparty_connection_id = connection_event_attributes
.counterparty_connection_id
.clone();
let client_id = connection_event_attributes.client_id.clone();
let counterparty_client_id = connection_event_attributes.counterparty_client_id.clone();
Ok(Connection {
delay_period: Default::default(),
a_side: ConnectionSide::new(chain, client_id, connection_id),
b_side: ConnectionSide::new(
counterparty_chain,
counterparty_client_id,
counterparty_connection_id,
),
})
}
pub fn restore_from_state(
chain: ChainA,
counterparty_chain: ChainB,
connection: WorkerConnectionObject,
height: Height,
) -> Result<(Connection<ChainA, ChainB>, State), ConnectionError> {
let (a_connection, _) = chain
.query_connection(
QueryConnectionRequest {
connection_id: connection.src_connection_id.clone(),
height: QueryHeight::Specific(height),
},
IncludeProof::No,
)
.map_err(ConnectionError::relayer)?;
let client_id = a_connection.client_id();
let delay_period = a_connection.delay_period();
let counterparty_connection_id = a_connection.counterparty().connection_id.clone();
let counterparty_client_id = a_connection.counterparty().client_id();
let mut handshake_connection = Connection {
delay_period,
a_side: ConnectionSide::new(
chain,
client_id.clone(),
Some(connection.src_connection_id.clone()),
),
b_side: ConnectionSide::new(
counterparty_chain.clone(),
counterparty_client_id.clone(),
counterparty_connection_id.clone(),
),
};
if a_connection.state_matches(&State::Init) && counterparty_connection_id.is_none() {
let connections: Vec<IdentifiedConnectionEnd> = counterparty_chain
.query_connections(QueryConnectionsRequest {
pagination: Some(PageRequest::all()),
})
.map_err(ConnectionError::relayer)?;
for conn in connections {
if !conn
.connection_end
.client_id_matches(a_connection.counterparty().client_id())
{
continue;
}
if let Some(remote_connection_id) =
conn.connection_end.counterparty().connection_id()
{
if remote_connection_id == &connection.src_connection_id {
handshake_connection.b_side.connection_id = Some(conn.connection_id);
break;
}
}
}
}
Ok((handshake_connection, *a_connection.state()))
}
pub fn find(
a_client: ForeignClient<ChainA, ChainB>,
b_client: ForeignClient<ChainB, ChainA>,
conn_end_a: &IdentifiedConnectionEnd,
) -> Result<Connection<ChainA, ChainB>, ConnectionError> {
Self::validate_clients(&a_client, &b_client)?;
if conn_end_a.end().client_id().ne(a_client.id()) {
return Err(ConnectionError::connection_client_id_mismatch(
conn_end_a.end().client_id().clone(),
a_client.id().clone(),
));
}
if conn_end_a.end().counterparty().client_id() != b_client.id() {
return Err(ConnectionError::connection_client_id_mismatch(
conn_end_a.end().counterparty().client_id().clone(),
b_client.id().clone(),
));
}
if !conn_end_a.end().state_matches(&State::Open) {
return Err(ConnectionError::connection_not_open(
*conn_end_a.end().state(),
));
}
let b_conn_id = conn_end_a
.end()
.counterparty()
.connection_id()
.cloned()
.ok_or_else(|| {
ConnectionError::missing_counterparty_connection_id_field(
conn_end_a.end().counterparty().clone(),
)
})?;
let c = Connection {
delay_period: conn_end_a.end().delay_period(),
a_side: ConnectionSide {
chain: a_client.dst_chain.clone(),
client_id: a_client.id,
connection_id: Some(conn_end_a.id().clone()),
},
b_side: ConnectionSide {
chain: b_client.dst_chain.clone(),
client_id: b_client.id,
connection_id: Some(b_conn_id),
},
};
Ok(c)
}
fn validate_clients(
a_client: &ForeignClient<ChainA, ChainB>,
b_client: &ForeignClient<ChainB, ChainA>,
) -> Result<(), ConnectionError> {
if a_client.src_chain().id() != b_client.dst_chain().id() {
return Err(ConnectionError::chain_id_mismatch(
a_client.src_chain().id(),
b_client.dst_chain().id(),
));
}
if a_client.dst_chain().id() != b_client.src_chain().id() {
return Err(ConnectionError::chain_id_mismatch(
a_client.dst_chain().id(),
b_client.src_chain().id(),
));
}
Ok(())
}
pub fn src_chain(&self) -> ChainA {
self.a_side.chain.clone()
}
pub fn dst_chain(&self) -> ChainB {
self.b_side.chain.clone()
}
pub fn a_chain(&self) -> ChainA {
self.a_side.chain.clone()
}
pub fn b_chain(&self) -> ChainB {
self.b_side.chain.clone()
}
pub fn src_client_id(&self) -> &ClientId {
&self.a_side.client_id
}
pub fn dst_client_id(&self) -> &ClientId {
&self.b_side.client_id
}
pub fn src_connection_id(&self) -> Option<&ConnectionId> {
self.a_side.connection_id()
}
pub fn dst_connection_id(&self) -> Option<&ConnectionId> {
self.b_side.connection_id()
}
pub fn a_connection_id(&self) -> Option<&ConnectionId> {
self.a_side.connection_id()
}
pub fn b_connection_id(&self) -> Option<&ConnectionId> {
self.b_side.connection_id()
}
fn a_connection(
&self,
connection_id: Option<&ConnectionId>,
) -> Result<ConnectionEnd, ConnectionError> {
if let Some(id) = connection_id {
self.a_chain()
.query_connection(
QueryConnectionRequest {
connection_id: id.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map(|(connection_end, _)| connection_end)
.map_err(|e| ConnectionError::chain_query(self.a_chain().id(), e))
} else {
Ok(ConnectionEnd::default())
}
}
fn b_connection(
&self,
connection_id: Option<&ConnectionId>,
) -> Result<ConnectionEnd, ConnectionError> {
if let Some(id) = connection_id {
self.b_chain()
.query_connection(
QueryConnectionRequest {
connection_id: id.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map(|(connection_end, _)| connection_end)
.map_err(|e| ConnectionError::chain_query(self.b_chain().id(), e))
} else {
Ok(ConnectionEnd::default())
}
}
fn max_block_times(&self) -> Result<Duration, ConnectionError> {
let a_block_time = self
.a_chain()
.config()
.map_err(ConnectionError::relayer)?
.max_block_time;
let b_block_time = self
.b_chain()
.config()
.map_err(ConnectionError::relayer)?
.max_block_time;
Ok(a_block_time.max(b_block_time))
}
pub fn flipped(&self) -> Connection<ChainB, ChainA> {
Connection {
a_side: self.b_side.clone(),
b_side: self.a_side.clone(),
delay_period: self.delay_period,
}
}
fn update_connection_and_query_states(&mut self) -> Result<(State, State), ConnectionError> {
let relayer_a_id = self.a_side.connection_id();
let relayer_b_id = self.b_side.connection_id().cloned();
let a_connection = self.a_connection(relayer_a_id)?;
let a_counterparty_id = a_connection.counterparty().connection_id();
if a_counterparty_id.is_some() && a_counterparty_id != relayer_b_id.as_ref() {
warn!(
"updating the expected {} of side_b({}) since it is different than the \
counterparty of {}: {}, on {}. This is typically caused by crossing handshake \
messages in the presence of multiple relayers.",
PrettyOption(&relayer_b_id),
self.b_chain().id(),
PrettyOption(&relayer_a_id),
PrettyOption(&a_counterparty_id),
self.a_chain().id(),
);
self.b_side.connection_id = a_counterparty_id.cloned();
}
let updated_relayer_b_id = self.b_side.connection_id();
let b_connection = self.b_connection(updated_relayer_b_id)?;
let b_counterparty_id = b_connection.counterparty().connection_id();
if b_counterparty_id.is_some() && b_counterparty_id != relayer_a_id {
if updated_relayer_b_id == relayer_b_id.as_ref() {
warn!(
"updating the expected {} of side_a({}) since it is different than the \
counterparty of {}: {}, on {}. This is typically caused by crossing handshake \
messages in the presence of multiple relayers.",
PrettyOption(&relayer_a_id),
self.a_chain().id(),
PrettyOption(&updated_relayer_b_id),
PrettyOption(&b_counterparty_id),
self.b_chain().id(),
);
self.a_side.connection_id = b_counterparty_id.cloned();
} else {
panic!(
"mismatched connection ids in connection ends: {} - {:?} and {} - {:?}",
self.a_chain().id(),
a_connection,
self.b_chain().id(),
b_connection,
);
}
}
Ok((*a_connection.state(), *b_connection.state()))
}
fn do_conn_open_handshake(&mut self) -> Result<(), ConnectionError> {
let (a_state, b_state) = self.update_connection_and_query_states()?;
debug!(
"do_conn_open_handshake with connection end states: {}, {}",
a_state, b_state
);
match (a_state, b_state) {
(State::Uninitialized, State::Uninitialized) => {
let event = self.flipped().build_conn_init_and_send().map_err(|e| {
error!("failed ConnOpenInit {}: {}", self.a_side, e);
e
})?;
let connection_id = extract_connection_id(&event)?;
self.a_side.connection_id = Some(connection_id.clone());
}
(State::Uninitialized, State::Init) | (State::Init, State::Init) => {
let event = self.flipped().build_conn_try_and_send().map_err(|e| {
error!("failed ConnOpenTry {}: {}", self.a_side, e);
e
})?;
let connection_id = extract_connection_id(&event)?;
self.a_side.connection_id = Some(connection_id.clone());
}
(State::Init, State::Uninitialized) => {
let event = self.build_conn_try_and_send().map_err(|e| {
error!("failed ConnOpenTry {}: {}", self.b_side, e);
e
})?;
let connection_id = extract_connection_id(&event)?;
self.b_side.connection_id = Some(connection_id.clone());
}
(State::Init, State::TryOpen) | (State::TryOpen, State::TryOpen) => {
self.flipped().build_conn_ack_and_send().map_err(|e| {
error!("failed ConnOpenAck {}: {}", self.a_side, e);
e
})?;
}
(State::TryOpen, State::Init) => {
self.build_conn_ack_and_send().map_err(|e| {
error!("failed ConnOpenAck {}: {}", self.b_side, e);
e
})?;
}
(State::Open, State::TryOpen) => {
self.build_conn_confirm_and_send().map_err(|e| {
error!("failed ConnOpenConfirm {}: {}", self.b_side, e);
e
})?;
}
(State::TryOpen, State::Open) => {
self.flipped().build_conn_confirm_and_send().map_err(|e| {
error!("failed ConnOpenConfirm {}: {}", self.a_side, e);
e
})?;
}
(State::Open, State::Open) => {
info!("connection handshake already finished for {}", self);
return Ok(());
}
(a_state, b_state) => {
warn!(
"do_conn_open_handshake does not handle connection end state combination: \
{}-{}, {}-{}. will retry to account for RPC node data availability issues.",
self.a_chain().id(),
a_state,
self.b_chain().id(),
b_state
);
}
}
Err(ConnectionError::handshake_finalize())
}
fn handshake(&mut self) -> Result<(), ConnectionError> {
let max_block_times = self.max_block_times()?;
retry_with_index(handshake_retry::default_strategy(max_block_times), |_| {
if let Err(e) = self.do_conn_open_handshake() {
if e.is_expired_or_frozen_error() {
RetryResult::Err(e)
} else {
RetryResult::Retry(e)
}
} else {
RetryResult::Ok(())
}
})
.map_err(|err| {
error!("failed to open connection after {} retries", err.tries);
handshake_retry::from_retry_error(
err,
format!("failed to finish connection handshake for {:?}", self),
)
})?;
Ok(())
}
pub fn counterparty_state(&self) -> Result<State, ConnectionError> {
let connection_id = self
.src_connection_id()
.ok_or_else(ConnectionError::missing_local_connection_id)?;
let (connection_end, _) = self
.src_chain()
.query_connection(
QueryConnectionRequest {
connection_id: connection_id.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map_err(|e| ConnectionError::connection_query(connection_id.clone(), e))?;
let connection = IdentifiedConnectionEnd {
connection_end,
connection_id: connection_id.clone(),
};
connection_state_on_destination(&connection, &self.dst_chain())
.map_err(ConnectionError::supervisor)
}
pub fn handshake_step(
&mut self,
state: State,
) -> Result<(Option<IbcEvent>, Next), ConnectionError> {
let event = match (state, self.counterparty_state()?) {
(State::Init, State::Uninitialized) => Some(self.build_conn_try_and_send()?),
(State::Init, State::Init) => Some(self.build_conn_try_and_send()?),
(State::TryOpen, State::Init) => Some(self.build_conn_ack_and_send()?),
(State::TryOpen, State::TryOpen) => Some(self.build_conn_ack_and_send()?),
(State::Open, State::TryOpen) => Some(self.build_conn_confirm_and_send()?),
(State::Open, State::Open) => return Ok((None, Next::Abort)),
(State::TryOpen, State::Open) => return Ok((None, Next::Abort)),
_ => None,
};
match event {
Some(IbcEvent::OpenConfirmConnection(_)) | Some(IbcEvent::OpenAckConnection(_)) => {
Ok((event, Next::Abort))
}
_ => Ok((event, Next::Continue)),
}
}
pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<Next, u64> {
match self.handshake_step(state) {
Err(e) => {
if e.is_expired_or_frozen_error() {
error!(
"failed to establish connection handshake on frozen client: {}",
e
);
RetryResult::Err(index)
} else {
error!("failed {} with error {}", state, e);
RetryResult::Retry(index)
}
}
Ok((Some(ev), handshake_completed)) => {
info!("connection handshake step completed with events: {}", ev);
RetryResult::Ok(handshake_completed)
}
Ok((None, handshake_completed)) => RetryResult::Ok(handshake_completed),
}
}
pub fn step_event(&mut self, event: &IbcEvent, index: u64) -> RetryResult<Next, u64> {
let state = match event {
IbcEvent::OpenInitConnection(_) => State::Init,
IbcEvent::OpenTryConnection(_) => State::TryOpen,
IbcEvent::OpenAckConnection(_) => State::Open,
IbcEvent::OpenConfirmConnection(_) => State::Open,
_ => State::Uninitialized,
};
self.step_state(state, index)
}
fn validated_expected_connection(
&self,
msg_type: ConnectionMsgType,
) -> Result<ConnectionEnd, ConnectionError> {
let dst_connection_id = self
.dst_connection_id()
.ok_or_else(ConnectionError::missing_counterparty_connection_id)?;
let prefix = self
.src_chain()
.query_commitment_prefix()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let counterparty = Counterparty::new(
self.src_client_id().clone(),
self.src_connection_id().cloned(),
prefix,
);
let highest_state = match msg_type {
ConnectionMsgType::OpenAck => State::TryOpen,
ConnectionMsgType::OpenConfirm => State::TryOpen,
_ => State::Uninitialized,
};
let versions = self
.src_chain()
.query_compatible_versions()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let dst_expected_connection = ConnectionEnd::new(
highest_state,
self.dst_client_id().clone(),
counterparty,
versions,
ZERO_DURATION,
);
let (dst_connection, _) = self
.dst_chain()
.query_connection(
QueryConnectionRequest {
connection_id: dst_connection_id.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
if dst_connection.state_matches(&State::Uninitialized) {
return Err(ConnectionError::missing_connection_id(
self.dst_chain().id(),
));
}
check_destination_connection_state(
dst_connection_id.clone(),
dst_connection,
dst_expected_connection.clone(),
)?;
Ok(dst_expected_connection)
}
pub fn build_update_client_on_src(&self, height: Height) -> Result<Vec<Any>, ConnectionError> {
let client = self.restore_src_client();
client.wait_and_build_update_client(height).map_err(|e| {
ConnectionError::client_operation(
self.src_client_id().clone(),
self.src_chain().id(),
e,
)
})
}
pub fn build_update_client_on_dst(&self, height: Height) -> Result<Vec<Any>, ConnectionError> {
let client = self.restore_dst_client();
client.wait_and_build_update_client(height).map_err(|e| {
ConnectionError::client_operation(
self.dst_client_id().clone(),
self.dst_chain().id(),
e,
)
})
}
pub fn build_conn_init(&self) -> Result<Vec<Any>, ConnectionError> {
let signer = self
.dst_chain()
.get_signer()
.map_err(|e| ConnectionError::signer(self.dst_chain().id(), e))?;
let prefix = self
.src_chain()
.query_commitment_prefix()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let counterparty = Counterparty::new(self.src_client_id().clone(), None, prefix);
let version = self
.dst_chain()
.query_compatible_versions()
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?[0]
.clone();
let new_msg = MsgConnectionOpenInit {
client_id: self.dst_client_id().clone(),
counterparty,
version: Some(version),
delay_period: self.delay_period,
signer,
};
Ok(vec![new_msg.to_any()])
}
pub fn build_conn_init_and_send(&self) -> Result<IbcEvent, ConnectionError> {
let dst_msgs = self.build_conn_init()?;
let tm = TrackedMsgs::new_static(dst_msgs, "ConnectionOpenInit");
let events = self
.dst_chain()
.send_messages_and_wait_commit(tm)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;
let result = events
.into_iter()
.find(|event_with_height| {
matches!(event_with_height.event, IbcEvent::OpenInitConnection(_))
|| matches!(event_with_height.event, IbcEvent::ChainError(_))
})
.ok_or_else(ConnectionError::missing_connection_init_event)?;
match &result.event {
IbcEvent::OpenInitConnection(_) => {
info!("🥂 {} => {}", self.dst_chain().id(), result);
Ok(result.event)
}
IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())),
_ => Err(ConnectionError::invalid_event(result.event)),
}
}
fn wait_for_dest_app_height_higher_than_consensus_proof_height(
&self,
consensus_height: Height,
) -> Result<(), ConnectionError> {
let dst_application_latest_height = || {
self.dst_chain()
.query_latest_height()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))
};
while consensus_height >= dst_application_latest_height()? {
warn!(
"client consensus proof height too high, \
waiting for destination chain to advance beyond {}",
consensus_height
);
thread::sleep(Duration::from_millis(500));
}
Ok(())
}
pub fn build_conn_try(&self) -> Result<(Vec<Any>, Height), ConnectionError> {
let src_connection_id = self
.src_connection_id()
.ok_or_else(ConnectionError::missing_local_connection_id)?;
let (src_connection, _) = self
.src_chain()
.query_connection(
QueryConnectionRequest {
connection_id: src_connection_id.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let delay = if src_connection.delay_period() != self.delay_period {
warn!("`delay_period` for ConnectionEnd @{} is {}s; delay period on local Connection object is set to {}s",
self.src_chain().id(), src_connection.delay_period().as_secs_f64(), self.delay_period.as_secs_f64());
warn!(
"Overriding delay period for local connection object to {}s",
src_connection.delay_period().as_secs_f64()
);
src_connection.delay_period()
} else {
self.delay_period
};
let src_client_target_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
let client_msgs = self.build_update_client_on_src(src_client_target_height)?;
let tm =
TrackedMsgs::new_static(client_msgs, "update client on source for ConnectionOpenTry");
self.src_chain()
.send_messages_and_wait_commit(tm)
.map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?;
let query_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let (client_state, proofs) = self
.src_chain()
.build_connection_proofs_and_client_state(
ConnectionMsgType::OpenTry,
src_connection_id,
self.src_client_id(),
query_height,
)
.map_err(ConnectionError::connection_proof)?;
let mut msgs = self.build_update_client_on_dst(proofs.height())?;
let counterparty_versions = if src_connection.versions().is_empty() {
self.src_chain()
.query_compatible_versions()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?
} else {
src_connection.versions().to_vec()
};
let signer = self
.dst_chain()
.get_signer()
.map_err(|e| ConnectionError::signer(self.dst_chain().id(), e))?;
let prefix = self
.src_chain()
.query_commitment_prefix()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let counterparty = Counterparty::new(
self.src_client_id().clone(),
self.src_connection_id().cloned(),
prefix,
);
let previous_connection_id = if src_connection.counterparty().connection_id.is_none() {
self.b_side.connection_id.clone()
} else {
src_connection.counterparty().connection_id.clone()
};
let new_msg = MsgConnectionOpenTry {
client_id: self.dst_client_id().clone(),
client_state: client_state.map(Into::into),
previous_connection_id,
counterparty,
counterparty_versions,
proofs,
delay_period: delay,
signer,
};
msgs.push(new_msg.to_any());
Ok((msgs, src_client_target_height))
}
pub fn build_conn_try_and_send(&self) -> Result<IbcEvent, ConnectionError> {
let (dst_msgs, src_client_target_height) = self.build_conn_try()?;
self.wait_for_dest_app_height_higher_than_consensus_proof_height(src_client_target_height)?;
let tm = TrackedMsgs::new_static(dst_msgs, "ConnectionOpenTry");
let events = self
.dst_chain()
.send_messages_and_wait_commit(tm)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;
let result = events
.into_iter()
.find(|event_with_height| {
matches!(event_with_height.event, IbcEvent::OpenTryConnection(_))
|| matches!(event_with_height.event, IbcEvent::ChainError(_))
})
.ok_or_else(ConnectionError::missing_connection_try_event)?;
match &result.event {
IbcEvent::OpenTryConnection(_) => {
info!("🥂 {} => {}", self.dst_chain().id(), result);
Ok(result.event)
}
IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())),
_ => Err(ConnectionError::invalid_event(result.event)),
}
}
pub fn build_conn_ack(&self) -> Result<(Vec<Any>, Height), ConnectionError> {
let src_connection_id = self
.src_connection_id()
.ok_or_else(ConnectionError::missing_local_connection_id)?;
let dst_connection_id = self
.dst_connection_id()
.ok_or_else(ConnectionError::missing_counterparty_connection_id)?;
let _expected_dst_connection =
self.validated_expected_connection(ConnectionMsgType::OpenAck)?;
let (src_connection, _) = self
.src_chain()
.query_connection(
QueryConnectionRequest {
connection_id: src_connection_id.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let src_client_target_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?;
let client_msgs = self.build_update_client_on_src(src_client_target_height)?;
let tm =
TrackedMsgs::new_static(client_msgs, "update client on source for ConnectionOpenAck");
self.src_chain()
.send_messages_and_wait_commit(tm)
.map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?;
let query_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let (client_state, proofs) = self
.src_chain()
.build_connection_proofs_and_client_state(
ConnectionMsgType::OpenAck,
src_connection_id,
self.src_client_id(),
query_height,
)
.map_err(ConnectionError::connection_proof)?;
let mut msgs = self.build_update_client_on_dst(proofs.height())?;
let signer = self
.dst_chain()
.get_signer()
.map_err(|e| ConnectionError::signer(self.dst_chain().id(), e))?;
let new_msg = MsgConnectionOpenAck {
connection_id: dst_connection_id.clone(),
counterparty_connection_id: src_connection_id.clone(),
client_state: client_state.map(Into::into),
proofs,
version: src_connection.versions()[0].clone(),
signer,
};
msgs.push(new_msg.to_any());
Ok((msgs, src_client_target_height))
}
pub fn build_conn_ack_and_send(&self) -> Result<IbcEvent, ConnectionError> {
let (dst_msgs, src_client_target_height) = self.build_conn_ack()?;
self.wait_for_dest_app_height_higher_than_consensus_proof_height(src_client_target_height)?;
let tm = TrackedMsgs::new_static(dst_msgs, "ConnectionOpenAck");
let events = self
.dst_chain()
.send_messages_and_wait_commit(tm)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;
let result = events
.into_iter()
.find(|event_with_height| {
matches!(event_with_height.event, IbcEvent::OpenAckConnection(_))
|| matches!(event_with_height.event, IbcEvent::ChainError(_))
})
.ok_or_else(ConnectionError::missing_connection_ack_event)?;
match &result.event {
IbcEvent::OpenAckConnection(_) => {
info!("🥂 {} => {}", self.dst_chain().id(), result);
Ok(result.event)
}
IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())),
_ => Err(ConnectionError::invalid_event(result.event)),
}
}
pub fn build_conn_confirm(&self) -> Result<Vec<Any>, ConnectionError> {
let src_connection_id = self
.src_connection_id()
.ok_or_else(ConnectionError::missing_local_connection_id)?;
let dst_connection_id = self
.dst_connection_id()
.ok_or_else(ConnectionError::missing_counterparty_connection_id)?;
let _expected_dst_connection =
self.validated_expected_connection(ConnectionMsgType::OpenAck)?;
let query_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ConnectionError::chain_query(self.src_chain().id(), e))?;
let (_src_connection, _) = self
.src_chain()
.query_connection(
QueryConnectionRequest {
connection_id: src_connection_id.clone(),
height: QueryHeight::Specific(query_height),
},
IncludeProof::No,
)
.map_err(|e| ConnectionError::connection_query(src_connection_id.clone(), e))?;
let (_, proofs) = self
.src_chain()
.build_connection_proofs_and_client_state(
ConnectionMsgType::OpenConfirm,
src_connection_id,
self.src_client_id(),
query_height,
)
.map_err(ConnectionError::connection_proof)?;
let mut msgs = self.build_update_client_on_dst(proofs.height())?;
let signer = self
.dst_chain()
.get_signer()
.map_err(|e| ConnectionError::signer(self.dst_chain().id(), e))?;
let new_msg = MsgConnectionOpenConfirm {
connection_id: dst_connection_id.clone(),
proofs,
signer,
};
msgs.push(new_msg.to_any());
Ok(msgs)
}
pub fn build_conn_confirm_and_send(&self) -> Result<IbcEvent, ConnectionError> {
let dst_msgs = self.build_conn_confirm()?;
let tm = TrackedMsgs::new_static(dst_msgs, "ConnectionOpenConfirm");
let events = self
.dst_chain()
.send_messages_and_wait_commit(tm)
.map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?;
let result = events
.into_iter()
.find(|event_with_height| {
matches!(event_with_height.event, IbcEvent::OpenConfirmConnection(_))
|| matches!(event_with_height.event, IbcEvent::ChainError(_))
})
.ok_or_else(ConnectionError::missing_connection_confirm_event)?;
match &result.event {
IbcEvent::OpenConfirmConnection(_) => {
info!("🥂 {} => {}", self.dst_chain().id(), result);
Ok(result.event)
}
IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())),
_ => Err(ConnectionError::invalid_event(result.event)),
}
}
fn restore_src_client(&self) -> ForeignClient<ChainA, ChainB> {
ForeignClient::restore(
self.src_client_id().clone(),
self.src_chain(),
self.dst_chain(),
)
}
fn restore_dst_client(&self) -> ForeignClient<ChainB, ChainA> {
ForeignClient::restore(
self.dst_client_id().clone(),
self.dst_chain(),
self.src_chain(),
)
}
pub fn map_chain<ChainC: ChainHandle, ChainD: ChainHandle>(
self,
mapper_a: impl Fn(ChainA) -> ChainC,
mapper_b: impl Fn(ChainB) -> ChainD,
) -> Connection<ChainC, ChainD> {
Connection {
delay_period: self.delay_period,
a_side: self.a_side.map_chain(mapper_a),
b_side: self.b_side.map_chain(mapper_b),
}
}
}
pub fn extract_connection_id(event: &IbcEvent) -> Result<&ConnectionId, ConnectionError> {
match event {
IbcEvent::OpenInitConnection(ev) => ev.connection_id(),
IbcEvent::OpenTryConnection(ev) => ev.connection_id(),
IbcEvent::OpenAckConnection(ev) => ev.connection_id(),
IbcEvent::OpenConfirmConnection(ev) => ev.connection_id(),
_ => None,
}
.ok_or_else(ConnectionError::missing_connection_id_from_event)
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConnectionMsgType {
OpenTry,
OpenAck,
OpenConfirm,
}
fn check_destination_connection_state(
connection_id: ConnectionId,
existing_connection: ConnectionEnd,
expected_connection: ConnectionEnd,
) -> Result<(), ConnectionError> {
let good_client_ids = existing_connection.client_id() == expected_connection.client_id()
&& existing_connection.counterparty().client_id()
== expected_connection.counterparty().client_id();
let good_state = *existing_connection.state() as u32 <= *expected_connection.state() as u32;
let good_connection_ids = existing_connection.counterparty().connection_id().is_none()
|| existing_connection.counterparty().connection_id()
== expected_connection.counterparty().connection_id();
let good_version = existing_connection.versions() == expected_connection.versions();
let good_counterparty_prefix =
existing_connection.counterparty().prefix() == expected_connection.counterparty().prefix();
if good_state
&& good_client_ids
&& good_connection_ids
&& good_version
&& good_counterparty_prefix
{
Ok(())
} else {
Err(ConnectionError::connection_already_exists(connection_id))
}
}