use tracing::{error, info};
use ibc_relayer_types::core::{
ics03_connection::connection::IdentifiedConnectionEnd,
ics04_channel::channel::State as ChannelState,
};
use crate::{
chain::{counterparty::connection_state_on_destination, handle::ChainHandle},
client_state::IdentifiedAnyClientState,
config::Config,
object::{Channel, Client, Connection, Object, Packet, Wallet},
registry::Registry,
supervisor::error::Error as SupervisorError,
telemetry,
worker::WorkerMap,
};
use super::{
scan::{ChainScan, ChainsScan, ChannelScan, ClientScan, ConnectionScan},
Error,
};
pub struct SpawnContext<'a, Chain: ChainHandle> {
config: &'a Config,
registry: &'a mut Registry<Chain>,
workers: &'a mut WorkerMap,
}
impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> {
pub fn new(
config: &'a Config,
registry: &'a mut Registry<Chain>,
workers: &'a mut WorkerMap,
) -> Self {
Self {
config,
registry,
workers,
}
}
pub fn spawn_workers(&mut self, scan: ChainsScan) {
let _span = tracing::error_span!("spawn").entered();
for chain_scan in scan.chains {
match chain_scan {
Ok(chain_scan) => self.spawn_workers_for_chain(chain_scan),
Err(e) => error!("failed to spawn worker for a chain, reason: {}", e), }
}
}
pub fn spawn_workers_for_chain(&mut self, scan: ChainScan) {
let _span = tracing::error_span!("chain", chain = %scan.chain_id).entered();
let chain = match self.registry.get_or_spawn(&scan.chain_id) {
Ok(chain_handle) => chain_handle,
Err(e) => {
error!(
"skipping workers, reason: failed to spawn chain runtime with error: {}",
e
);
return;
}
};
for (_, client_scan) in scan.clients {
self.spawn_workers_for_client(chain.clone(), client_scan);
}
telemetry!(self.spawn_wallet_worker(chain));
}
pub fn spawn_wallet_worker(&mut self, chain: Chain) {
let wallet_object = Object::Wallet(Wallet {
chain_id: chain.id(),
});
self.workers
.spawn(chain.clone(), chain, &wallet_object, self.config)
.then(|| {
info!("spawning Wallet worker: {}", wallet_object.short_name());
});
}
pub fn spawn_workers_for_client(&mut self, chain: Chain, client_scan: ClientScan) {
let _span = tracing::error_span!("client", client = %client_scan.id()).entered();
for (_, connection_scan) in client_scan.connections {
self.spawn_workers_for_connection(chain.clone(), &client_scan.client, connection_scan);
}
}
pub fn spawn_workers_for_connection(
&mut self,
chain: Chain,
client: &IdentifiedAnyClientState,
connection_scan: ConnectionScan,
) {
let _span =
tracing::error_span!("connection", connection = %connection_scan.id()).entered();
let connection_id = connection_scan.id().clone();
match self.spawn_connection_workers(
chain.clone(),
client.clone(),
connection_scan.connection,
) {
Ok(true) => info!(
chain = %chain.id(),
connection = %connection_id,
"done spawning connection workers",
),
Ok(false) => info!(
chain = %chain.id(),
connection = %connection_id,
"no connection workers were spawn",
),
Err(e) => error!(
chain = %chain.id(),
connection = %connection_id,
"skipped connection workers, reason: {}",
e
),
}
for (channel_id, channel_scan) in connection_scan.channels {
match self.spawn_workers_for_channel(chain.clone(), client, channel_scan) {
Ok(true) => info!(
chain = %chain.id(),
channel = %channel_id,
"done spawning channel workers",
),
Ok(false) => info!(
chain = %chain.id(),
channel = %channel_id,
"no channel workers were spawned",
),
Err(e) => error!(
chain = %chain.id(),
channel = %channel_id,
"skipped channel workers, reason: {}",
e
),
}
}
}
fn spawn_connection_workers(
&mut self,
chain: Chain,
client: IdentifiedAnyClientState,
connection: IdentifiedConnectionEnd,
) -> Result<bool, Error> {
let config_conn_enabled = self.config.mode.connections.enabled;
let counterparty_chain = self
.registry
.get_or_spawn(&client.client_state.chain_id())
.map_err(Error::spawn)?;
let conn_state_src = connection.connection_end.state;
let conn_state_dst = connection_state_on_destination(&connection, &counterparty_chain)?;
info!(
chain = %chain.id(),
connection = %connection.connection_id,
counterparty_chain = %counterparty_chain.id(),
"connection is {}, state on destination chain is {}",
conn_state_src,
conn_state_dst
);
if conn_state_src.is_open() && conn_state_dst.is_open() {
info!(
chain = %chain.id(),
connection = %connection.connection_id,
"connection is already open, not spawning Connection worker",
);
Ok(false)
} else if config_conn_enabled
&& !conn_state_dst.is_open()
&& conn_state_dst.less_or_equal_progress(conn_state_src)
{
let connection_object = Object::Connection(Connection {
dst_chain_id: client.client_state.chain_id(),
src_chain_id: chain.id(),
src_connection_id: connection.connection_id,
});
self.workers
.spawn(chain, counterparty_chain, &connection_object, self.config)
.then(|| {
info!(
"spawning Connection worker: {}",
connection_object.short_name()
);
});
Ok(true)
} else {
Ok(false)
}
}
pub fn spawn_workers_for_channel(
&mut self,
chain: Chain,
client: &IdentifiedAnyClientState,
channel_scan: ChannelScan,
) -> Result<bool, Error> {
let _span = tracing::error_span!("channel", channel = %channel_scan.id()).entered();
let mode = &self.config.mode;
let counterparty_chain = self
.registry
.get_or_spawn(&client.client_state.chain_id())
.map_err(SupervisorError::spawn)?;
let chan_state_src = channel_scan.channel.channel_end.state;
let chan_state_dst = channel_scan
.counterparty
.as_ref()
.map_or(ChannelState::Uninitialized, |c| c.channel_end.state);
info!(
chain = %chain.id(),
counterparty_chain = %counterparty_chain.id(),
channel = %channel_scan.id(),
"channel is {}, state on destination chain is {}",
chan_state_src,
chan_state_dst
);
let is_channel_upgrading = channel_scan.channel.channel_end.is_upgrading();
if (mode.clients.enabled || mode.packets.enabled)
&& chan_state_src.is_open()
&& (chan_state_dst.is_open() || chan_state_dst.is_closed())
&& !is_channel_upgrading
{
if mode.clients.enabled {
let client_object = Object::Client(Client {
dst_client_id: client.client_id.clone(),
dst_chain_id: chain.id(),
src_chain_id: client.client_state.chain_id(),
});
self.workers
.spawn(
counterparty_chain.clone(),
chain.clone(),
&client_object,
self.config,
)
.then(|| info!("spawned client worker: {}", client_object.short_name()));
}
if mode.packets.enabled {
let has_packets = || {
!channel_scan
.unreceived_packets_on_counterparty(&chain, &counterparty_chain)
.unwrap_or_default()
.is_empty()
};
let has_acks = || {
!channel_scan
.unreceived_acknowledgements_on_counterparty(&chain, &counterparty_chain)
.unwrap_or_default()
.is_empty()
};
if has_packets() || has_acks() {
let path_object = Object::Packet(Packet {
dst_chain_id: counterparty_chain.id(),
src_chain_id: chain.id(),
src_channel_id: channel_scan.channel.channel_id.clone(),
src_port_id: channel_scan.channel.port_id.clone(),
});
self.workers
.spawn(
chain.clone(),
counterparty_chain.clone(),
&path_object,
self.config,
)
.then(|| info!("spawned packet worker: {}", path_object.short_name()));
}
}
Ok(mode.clients.enabled)
} else if mode.channels.enabled && !is_channel_upgrading {
let has_packets = || {
!channel_scan
.unreceived_packets_on_counterparty(&counterparty_chain, &chain)
.unwrap_or_default()
.is_empty()
};
let open_handshake = chan_state_dst.less_or_equal_progress(ChannelState::TryOpen)
&& chan_state_dst.less_or_equal_progress(chan_state_src);
let close_handshake =
chan_state_src.is_closed() && !chan_state_dst.is_closed() && !has_packets();
if open_handshake || close_handshake {
let channel_object = Object::Channel(Channel {
dst_chain_id: counterparty_chain.id(),
src_chain_id: chain.id(),
src_channel_id: channel_scan.channel.channel_id,
src_port_id: channel_scan.channel.port_id,
});
self.workers
.spawn(chain, counterparty_chain, &channel_object, self.config)
.then(|| info!("spawned channel worker: {}", channel_object.short_name()));
Ok(true)
} else {
Ok(false)
}
} else if is_channel_upgrading {
let path_object = Object::Packet(Packet {
dst_chain_id: counterparty_chain.id(),
src_chain_id: chain.id(),
src_channel_id: channel_scan.channel.channel_id.clone(),
src_port_id: channel_scan.channel.port_id.clone(),
});
self.workers
.spawn(
chain.clone(),
counterparty_chain.clone(),
&path_object,
self.config,
)
.then(|| info!("spawned packet worker: {}", path_object.short_name()));
let channel_object = Object::Channel(Channel {
dst_chain_id: counterparty_chain.id(),
src_chain_id: chain.id(),
src_channel_id: channel_scan.channel.channel_id,
src_port_id: channel_scan.channel.port_id,
});
self.workers
.spawn(chain, counterparty_chain, &channel_object, self.config)
.then(|| info!("spawned channel worker: {}", channel_object.short_name()));
Ok(true)
} else {
Ok(false)
}
}
}