use std::convert::identity;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use hopr_types::{internal::prelude::*, primitive::prelude::*};
use super::ChannelId;
use crate::{
chain::{
AccountSelector, ChainInfo, ChainReadAccountOperations, ChainReadChannelOperations, ChainReadSafeOperations,
ChainValues, ChainWriteAccountOperations, ChainWriteChannelOperations, ChannelSelector, HoprChainApi,
},
node::{
ChainOutput, CompoundResult, EitherErr,
accessors::{HasChainApi, HasTicketManagement},
},
tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
};
#[async_trait::async_trait]
pub trait IncentiveChannelOperations: HasChainApi {
const CHAIN_OPERATION_TIMEOUT_MULTIPLIER: u32 = 2;
async fn open_channel<A: Into<Address> + Send>(
&self,
destination: A,
amount: HoprBalance,
) -> CompoundResult<ChainOutput<ChannelId>, <Self::ChainApi as HoprChainApi>::ChainError, Self::ChainError> {
let destination = destination.into();
let channel_id = generate_channel_id(&self.identity().node_address, &destination);
let (event_awaiter, event_abort) = self
.wait_for_on_chain_event(
move |event| matches!(event, hopr_types::chain::chain_events::ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
format!("open channel to {destination} ({channel_id})"),
Self::CHAIN_OPERATION_TIMEOUT_MULTIPLIER
* self
.chain_api()
.typical_resolution_time()
.await
.map_err(EitherErr::left)?,
)
.map_err(EitherErr::left)?;
let confirm_awaiter = self
.chain_api()
.open_channel(&destination, amount)
.await
.map_err(EitherErr::left)?;
let tx_hash = confirm_awaiter
.await
.inspect_err(|_| {
event_abort.abort();
})
.map_err(EitherErr::left)?;
let event = event_awaiter.await?;
tracing::debug!(%event, "open channel event received");
Ok(ChainOutput::new(tx_hash, channel_id))
}
async fn fund_channel(
&self,
channel_id: &ChannelId,
amount: HoprBalance,
) -> CompoundResult<ChainOutput<()>, <Self::ChainApi as HoprChainApi>::ChainError, Self::ChainError> {
let channel_id = *channel_id;
let (event_awaiter, event_abort) = self.wait_for_on_chain_event(
move |event| matches!(event, hopr_types::chain::chain_events::ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
format!("fund channel {channel_id}"),
Self::CHAIN_OPERATION_TIMEOUT_MULTIPLIER *
self.chain_api().typical_resolution_time().await.map_err(EitherErr::left)?
).map_err(EitherErr::left)?;
let confirm_awaiter = self
.chain_api()
.fund_channel(&channel_id, amount)
.await
.map_err(EitherErr::left)?;
let res = confirm_awaiter
.await
.inspect_err(|_| {
event_abort.abort();
})
.map_err(EitherErr::left)?;
let event = event_awaiter.await?;
tracing::debug!(%event, "fund channel event received");
Ok(res.into())
}
async fn close_channel_by_id(
&self,
channel_id: &ChannelId,
) -> CompoundResult<ChainOutput<ChannelStatus>, <Self::ChainApi as HoprChainApi>::ChainError, Self::ChainError>
{
let channel_id = *channel_id;
let (event_awaiter, event_abort) = self
.wait_for_on_chain_event(
move |event| {
use hopr_types::chain::chain_events::ChainEvent;
matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
|| matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
},
format!("close channel {channel_id}"),
Self::CHAIN_OPERATION_TIMEOUT_MULTIPLIER
* self
.chain_api()
.typical_resolution_time()
.await
.map_err(EitherErr::left)?,
)
.map_err(EitherErr::left)?;
let confirm_awaiter = self
.chain_api()
.close_channel(&channel_id)
.await
.map_err(EitherErr::left)?;
let tx_hash = confirm_awaiter
.await
.inspect_err(|_| {
event_abort.abort();
})
.map_err(EitherErr::left)?;
let event = event_awaiter.await?;
tracing::debug!(%event, "close channel event received");
let status = match event {
hopr_types::chain::chain_events::ChainEvent::ChannelClosureInitiated(c)
| hopr_types::chain::chain_events::ChainEvent::ChannelClosed(c) => c.status,
_ => unreachable!(),
};
Ok(ChainOutput::new(tx_hash, status))
}
fn channel_by_id(
&self,
channel_id: &ChannelId,
) -> Result<Option<ChannelEntry>, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().channel_by_id(channel_id)
}
fn channel<A: Into<Address> + Send, B: Into<Address> + Send>(
&self,
source: A,
destination: B,
) -> Result<Option<ChannelEntry>, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().channel_by_parties(&source.into(), &destination.into())
}
async fn channels_to<A: Into<Address> + Send>(
&self,
destination: A,
) -> Result<Vec<ChannelEntry>, <Self::ChainApi as HoprChainApi>::ChainError> {
let dest = destination.into();
Ok(self
.chain_api()
.stream_channels(ChannelSelector::default().with_destination(dest).with_allowed_states(&[
ChannelStatusDiscriminants::Closed,
ChannelStatusDiscriminants::Open,
ChannelStatusDiscriminants::PendingToClose,
]))?
.collect()
.await)
}
async fn channels_from<A: Into<Address> + Send>(
&self,
source: A,
) -> Result<Vec<ChannelEntry>, <Self::ChainApi as HoprChainApi>::ChainError> {
let src = source.into();
Ok(self
.chain_api()
.stream_channels(ChannelSelector::default().with_source(src).with_allowed_states(&[
ChannelStatusDiscriminants::Closed,
ChannelStatusDiscriminants::Open,
ChannelStatusDiscriminants::PendingToClose,
]))?
.collect()
.await)
}
async fn get_balance<C: Currency + Send>(
&self,
) -> Result<Balance<C>, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().balance(self.identity().node_address).await
}
async fn get_safe_balance<C: Currency + Send>(
&self,
) -> Result<Balance<C>, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().balance(self.identity().safe_address).await
}
async fn safe_allowance(&self) -> Result<HoprBalance, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().safe_allowance(self.identity().safe_address).await
}
async fn withdraw<C: Currency + Send>(
&self,
recipient: &Address,
amount: Balance<C>,
) -> Result<ChainOutput<()>, <Self::ChainApi as HoprChainApi>::ChainError> {
Ok(self
.chain_api()
.withdraw(amount, recipient)
.and_then(identity)
.await?
.into())
}
async fn chain_info(&self) -> Result<ChainInfo, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().chain_info().await
}
async fn get_ticket_price(&self) -> Result<HoprBalance, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().minimum_ticket_price().await
}
async fn get_minimum_incoming_ticket_win_probability(
&self,
) -> Result<WinningProbability, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().minimum_incoming_ticket_win_prob().await
}
async fn get_channel_closure_notice_period(
&self,
) -> Result<std::time::Duration, <Self::ChainApi as HoprChainApi>::ChainError> {
self.chain_api().channel_closure_notice_period().await
}
async fn announced_peers(&self) -> Result<Vec<AccountEntry>, <Self::ChainApi as HoprChainApi>::ChainError> {
Ok(self
.chain_api()
.stream_accounts(AccountSelector {
public_only: true,
..Default::default()
})?
.collect()
.await)
}
}
impl<T> IncentiveChannelOperations for T where T: HasChainApi + Send + Sync {}
#[async_trait::async_trait]
pub trait IncentiveRedeemOperations: HasChainApi + HasTicketManagement {
const PENDING_TO_CLOSE_REDEMPTION_TOLERANCE: std::time::Duration = std::time::Duration::from_secs(30);
async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(
&self,
min_value: B,
) -> CompoundResult<
Vec<RedemptionResult>,
<Self::ChainApi as HoprChainApi>::ChainError,
<<Self as HasTicketManagement>::TicketManager as TicketManagement>::Error,
> {
let min_value = min_value.into();
self.ticket_management()
.redeem_in_channels(
self.chain_api().clone(),
None,
min_value.into(),
Some(Self::PENDING_TO_CLOSE_REDEMPTION_TOLERANCE),
)
.await
.map_err(EitherErr::left)?
.try_collect::<Vec<_>>()
.await
.map_err(EitherErr::right)
}
async fn redeem_tickets_with_counterparty<A: Into<Address> + Send, B: Into<HoprBalance> + Send>(
&self,
issuer: A,
min_value: B,
) -> CompoundResult<
Vec<RedemptionResult>,
<Self::ChainApi as HoprChainApi>::ChainError,
<<Self as HasTicketManagement>::TicketManager as TicketManagement>::Error,
> {
let min_value = min_value.into();
self.ticket_management()
.redeem_in_channels(
self.chain_api().clone(),
ChannelSelector::default()
.with_source(issuer)
.with_destination(self.identity().node_address)
.into(),
min_value.into(),
Some(Self::PENDING_TO_CLOSE_REDEMPTION_TOLERANCE),
)
.await
.map_err(EitherErr::left)?
.try_collect::<Vec<_>>()
.await
.map_err(EitherErr::right)
}
fn ticket_statistics(
&self,
) -> CompoundResult<
ChannelStats,
<Self::ChainApi as HoprChainApi>::ChainError,
<<Self as HasTicketManagement>::TicketManager as TicketManagement>::Error,
> {
self.ticket_management().ticket_stats(None).map_err(EitherErr::right)
}
}
impl<T> IncentiveRedeemOperations for T where T: HasChainApi + HasTicketManagement + Send + Sync {}