use core::fmt::{Display, Error as FmtError, Formatter};
use std::ops::Add;
use std::time::{Duration, Instant};
use tracing::{debug, info};
use ibc_proto::google::protobuf::Any;
use ibc_relayer_types::core::ics02_client::client_state::ClientState;
use ibc_relayer_types::Height;
use crate::chain::handle::ChainHandle;
use crate::chain::requests::IncludeProof;
use crate::chain::requests::QueryClientStateRequest;
use crate::chain::requests::QueryHeight;
use crate::chain::tracking::TrackedMsgs;
use crate::chain::tracking::TrackingId;
use crate::event::IbcEventWithHeight;
use crate::link::error::LinkError;
use crate::link::RelayPath;
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum OperationalDataTarget {
Source,
Destination,
}
impl Display for OperationalDataTarget {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
match self {
OperationalDataTarget::Source => write!(f, "Source"),
OperationalDataTarget::Destination => write!(f, "Destination"),
}
}
}
pub struct TrackedEvents {
events: Vec<IbcEventWithHeight>,
tracking_id: TrackingId,
}
impl TrackedEvents {
pub fn new(events: Vec<IbcEventWithHeight>, tracking_id: TrackingId) -> Self {
Self {
events,
tracking_id,
}
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn events(&self) -> &[IbcEventWithHeight] {
&self.events
}
pub fn tracking_id(&self) -> TrackingId {
self.tracking_id
}
pub fn len(&self) -> usize {
self.events.len()
}
}
#[derive(Clone)]
pub struct TransitMessage {
pub event_with_height: IbcEventWithHeight,
pub msg: Any,
}
#[derive(Clone)]
pub struct OperationalData {
pub proofs_height: Height,
pub batch: Vec<TransitMessage>,
pub target: OperationalDataTarget,
pub tracking_id: TrackingId,
connection_delay: Option<ConnectionDelay>,
}
impl OperationalData {
pub fn new(
proofs_height: Height,
target: OperationalDataTarget,
tracking_id: TrackingId,
connection_delay: Duration,
) -> Self {
let connection_delay = if !connection_delay.is_zero() {
Some(ConnectionDelay::new(connection_delay))
} else {
None
};
OperationalData {
proofs_height,
batch: vec![],
target,
connection_delay,
tracking_id,
}
}
pub fn push(&mut self, msg: TransitMessage) {
self.batch.push(msg)
}
pub fn info(&self) -> OperationalInfo {
OperationalInfo {
tracking_id: self.tracking_id,
target: self.target,
proofs_height: self.proofs_height,
batch_len: self.batch.len(),
}
}
pub fn into_events(self) -> TrackedEvents {
let events = self
.batch
.into_iter()
.map(|gm| gm.event_with_height)
.collect();
TrackedEvents {
events,
tracking_id: self.tracking_id,
}
}
pub fn assemble_msgs<ChainA: ChainHandle, ChainB: ChainHandle>(
&self,
relay_path: &RelayPath<ChainA, ChainB>,
) -> Result<TrackedMsgs, LinkError> {
let client_update_msgs = if !self.conn_delay_needed() {
let update_height = self.proofs_height.increment();
debug!(
"prepending {} client update at height {}",
self.target, update_height
);
match self.target {
OperationalDataTarget::Source => {
relay_path.build_update_client_on_src(update_height)?
}
OperationalDataTarget::Destination => {
relay_path.build_update_client_on_dst(update_height)?
}
}
} else {
let (client_state, _) = match self.target {
OperationalDataTarget::Source => relay_path
.src_chain()
.query_client_state(
QueryClientStateRequest {
client_id: relay_path.src_client_id().clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map_err(|e| LinkError::query(relay_path.src_chain().id(), e))?,
OperationalDataTarget::Destination => relay_path
.dst_chain()
.query_client_state(
QueryClientStateRequest {
client_id: relay_path.dst_client_id().clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map_err(|e| LinkError::query(relay_path.dst_chain().id(), e))?,
};
if client_state.is_frozen() {
return Ok(TrackedMsgs::new(vec![], self.tracking_id));
}
vec![]
};
let msgs = client_update_msgs
.into_iter()
.chain(self.batch.iter().map(|gm| gm.msg.clone()))
.collect();
let tm = TrackedMsgs::new(msgs, self.tracking_id);
info!("assembled batch of {} message(s)", tm.messages().len());
Ok(tm)
}
fn has_packet_msgs(&self) -> bool {
self.batch
.iter()
.any(|msg| msg.event_with_height.event.packet().is_some())
}
fn get_delay_if_needed(&self) -> Option<&ConnectionDelay> {
self.connection_delay
.as_ref()
.filter(|_| self.has_packet_msgs())
}
pub fn conn_delay_needed(&self) -> bool {
self.get_delay_if_needed().is_some()
}
pub fn set_scheduled_time(&mut self, scheduled_time: Instant) {
if let Some(delay) = self.connection_delay.as_mut() {
delay.scheduled_time = scheduled_time;
}
}
pub fn set_update_height(&mut self, update_height: Height) {
if let Some(delay) = self.connection_delay.as_mut() {
delay.update_height = Some(update_height);
}
}
fn conn_time_delay_remaining<ChainTime>(
&self,
chain_time: &ChainTime,
) -> Result<Duration, LinkError>
where
ChainTime: Fn() -> Result<Instant, LinkError>,
{
if let Some(delay) = self.get_delay_if_needed() {
Ok(delay.conn_time_delay_remaining(chain_time()?))
} else {
Ok(Duration::ZERO)
}
}
fn conn_block_delay_remaining<MaxBlockTime, LatestHeight>(
&self,
max_expected_time_per_block: &MaxBlockTime,
latest_height: &LatestHeight,
) -> Result<u64, LinkError>
where
MaxBlockTime: Fn() -> Result<Duration, LinkError>,
LatestHeight: Fn() -> Result<Height, LinkError>,
{
if let Some(delay) = self.get_delay_if_needed() {
let block_delay = delay.conn_block_delay(max_expected_time_per_block()?);
Ok(delay.conn_block_delay_remaining(block_delay, latest_height()?))
} else {
Ok(0)
}
}
pub fn has_conn_delay_elapsed<ChainTime, MaxBlockTime, LatestHeight>(
&self,
chain_time: &ChainTime,
max_expected_time_per_block: &MaxBlockTime,
latest_height: &LatestHeight,
) -> Result<bool, LinkError>
where
ChainTime: Fn() -> Result<Instant, LinkError>,
MaxBlockTime: Fn() -> Result<Duration, LinkError>,
LatestHeight: Fn() -> Result<Height, LinkError>,
{
Ok(self.conn_time_delay_remaining(chain_time)?.is_zero()
&& self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)? == 0)
}
pub fn conn_delay_remaining<ChainTime, MaxBlockTime, LatestHeight>(
&self,
chain_time: &ChainTime,
max_expected_time_per_block: &MaxBlockTime,
latest_height: &LatestHeight,
) -> Result<(Duration, u64), LinkError>
where
ChainTime: Fn() -> Result<Instant, LinkError>,
MaxBlockTime: Fn() -> Result<Duration, LinkError>,
LatestHeight: Fn() -> Result<Height, LinkError>,
{
Ok((
self.conn_time_delay_remaining(chain_time)?,
self.conn_block_delay_remaining(max_expected_time_per_block, latest_height)?,
))
}
}
#[derive(Clone)]
struct ConnectionDelay {
delay: Duration,
scheduled_time: Instant,
update_height: Option<Height>,
}
impl ConnectionDelay {
fn new(delay: Duration) -> Self {
Self {
delay,
scheduled_time: Instant::now(),
update_height: None,
}
}
fn conn_time_delay_remaining(&self, chain_time: Instant) -> Duration {
let elapsed = chain_time.saturating_duration_since(self.scheduled_time);
if elapsed >= self.delay {
Duration::ZERO
} else {
self.delay - elapsed
}
}
fn conn_block_delay_remaining(&self, block_delay: u64, latest_height: Height) -> u64 {
let acceptable_height = self
.update_height
.expect("processed height not set")
.add(block_delay);
if latest_height >= acceptable_height {
0
} else {
debug_assert!(acceptable_height.revision_number() == latest_height.revision_number());
acceptable_height.revision_height() - latest_height.revision_height()
}
}
fn conn_block_delay(&self, max_expected_time_per_block: Duration) -> u64 {
calculate_block_delay(self.delay, max_expected_time_per_block)
}
}
fn calculate_block_delay(
delay_period_time: Duration,
max_expected_time_per_block: Duration,
) -> u64 {
if max_expected_time_per_block.is_zero() {
return 0;
}
(delay_period_time.as_secs_f64() / max_expected_time_per_block.as_secs_f64()).ceil() as u64
}
pub struct OperationalInfo {
tracking_id: TrackingId,
target: OperationalDataTarget,
proofs_height: Height,
batch_len: usize,
}
impl OperationalInfo {
pub fn target(&self) -> OperationalDataTarget {
self.target
}
pub fn batch_len(&self) -> usize {
self.batch_len
}
}
impl Display for OperationalInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
write!(
f,
"{} ->{} @{}; len={}",
self.tracking_id, self.target, self.proofs_height, self.batch_len,
)
}
}