use std::num::NonZeroU8;
use futures::{Stream, StreamExt};
pub use hopr_types::{
internal::prelude::{ChannelId, TicketBuilder, VerifiedTicket},
primitive::balance::HoprBalance,
};
use crate::chain::{
ChainReadChannelOperations, ChainWriteTicketOperations, ChannelEntry, ChannelSelector, WinningProbability,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ChannelStats {
pub winning_tickets: u128,
pub unredeemed_value: HoprBalance,
pub rejected_value: HoprBalance,
pub neglected_value: HoprBalance,
}
#[derive(Clone, Debug, PartialEq, Eq, strum::Display)]
pub enum RedemptionResult {
#[strum(to_string = "redeemed {0}")]
Redeemed(VerifiedTicket),
#[strum(to_string = "neglected {0} due to low value")]
ValueTooLow(VerifiedTicket),
#[strum(to_string = "rejected {0} on-chain: {1}")]
RejectedOnChain(VerifiedTicket, String),
}
impl AsRef<VerifiedTicket> for RedemptionResult {
fn as_ref(&self) -> &VerifiedTicket {
match self {
RedemptionResult::Redeemed(ticket) => ticket,
RedemptionResult::ValueTooLow(ticket) => ticket,
RedemptionResult::RejectedOnChain(ticket, _) => ticket,
}
}
}
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait TicketManagement {
type Error: std::error::Error + Send + Sync + 'static;
fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
&self,
client: C,
channel_id: ChannelId,
min_amount: Option<HoprBalance>,
) -> Result<impl Stream<Item = Result<RedemptionResult, Self::Error>> + Send, Self::Error>;
fn neglect_tickets(
&self,
channel_id: &ChannelId,
max_ticket_index: Option<u64>,
) -> Result<Vec<VerifiedTicket>, Self::Error>;
fn ticket_stats(&self, channel_id: Option<&ChannelId>) -> Result<ChannelStats, Self::Error>;
fn insert_incoming_ticket(
&self,
ticket: hopr_types::internal::prelude::RedeemableTicket,
) -> Result<Vec<VerifiedTicket>, Self::Error>;
}
#[async_trait::async_trait]
pub trait TicketManagementExt: TicketManagement {
async fn redeem_in_channels<C>(
&self,
client: C,
selector: Option<ChannelSelector>,
min_amount: Option<HoprBalance>,
min_grace_period: Option<std::time::Duration>,
) -> Result<
impl Stream<Item = Result<RedemptionResult, Self::Error>> + Send,
<C as ChainReadChannelOperations>::Error,
>
where
C: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
{
let mut stream_group = futures_concurrency::stream::StreamGroup::new();
client
.stream_channels(
selector
.unwrap_or_default()
.with_destination(*client.me())
.with_redeemable_channels(min_grace_period),
)?
.filter_map(|channel| {
futures::future::ready(
self.redeem_stream(client.clone(), *channel.get_id(), min_amount)
.inspect_err(
|error| tracing::error!(%error, %channel, "failed to open redeem stream for channel"),
)
.ok(),
)
})
.for_each(|stream| {
stream_group.insert(stream);
futures::future::ready(())
})
.await;
Ok(stream_group)
}
}
impl<T: TicketManagement + ?Sized> TicketManagementExt for T {}
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait TicketFactory {
type Error: std::error::Error + Send + Sync + 'static;
fn new_multihop_ticket(
&self,
channel: &ChannelEntry,
path_position: NonZeroU8,
winning_probability: WinningProbability,
price_per_hop: HoprBalance,
) -> Result<TicketBuilder, Self::Error>;
fn remaining_incoming_channel_stake(&self, channel: &ChannelEntry) -> Result<HoprBalance, Self::Error> {
Ok(channel.balance)
}
}
#[cfg(test)]
mod tests {
use futures::{StreamExt, TryStreamExt, stream};
use hopr_types::{crypto::prelude::Keypair, internal::prelude::*, primitive::prelude::Address};
use mockall::{mock, predicate::*};
use super::*;
use crate::{
ChainKeypair,
chain::{ChainReadChannelOperations, ChainWriteTicketOperations},
};
mock! {
pub TicketManager {}
#[allow(refining_impl_trait)]
impl TicketManagement for TicketManager {
type Error = std::io::Error;
fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
&self,
client: C,
channel_id: ChannelId,
min_amount: Option<HoprBalance>,
) -> Result<stream::BoxStream<'static, Result<RedemptionResult, std::io::Error>>, std::io::Error>;
fn neglect_tickets(
&self,
channel_id: &ChannelId,
max_ticket_index: Option<u64>,
) -> Result<Vec<VerifiedTicket>, std::io::Error>;
fn ticket_stats<'a>(&self, channel_id: Option<&'a ChannelId>) -> Result<ChannelStats, std::io::Error>;
fn insert_incoming_ticket(
&self,
ticket: hopr_types::internal::prelude::RedeemableTicket,
) -> Result<Vec<VerifiedTicket>, std::io::Error>;
}
}
mock! {
pub ChainClient {}
impl ChainReadChannelOperations for ChainClient {
type Error = std::io::Error;
fn me(&self) -> &Address;
fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, std::io::Error>;
fn stream_channels<'a>(
&'a self,
selector: ChannelSelector,
) -> Result<stream::BoxStream<'a, ChannelEntry>, std::io::Error>;
}
#[async_trait::async_trait]
impl ChainWriteTicketOperations for ChainClient {
type Error = std::io::Error;
async fn redeem_ticket<'a>(
&'a self,
ticket: RedeemableTicket,
) -> Result<
futures::future::BoxFuture<'a, Result<(VerifiedTicket, hopr_types::crypto::prelude::Hash), crate::chain::TicketRedeemError<std::io::Error>>>,
crate::chain::TicketRedeemError<std::io::Error>,
>;
}
impl Clone for ChainClient {
fn clone(&self) -> Self;
}
}
#[tokio::test]
async fn test_redeem_in_channels_empty() -> anyhow::Result<()> {
let mock_tm = MockTicketManager::new();
let mut mock_client = MockChainClient::new();
let my_address = Address::default();
mock_client.expect_me().return_const(my_address);
mock_client
.expect_stream_channels()
.returning(|_| Ok(stream::empty().boxed()));
let result = mock_tm.redeem_in_channels(mock_client, None, None, None).await?;
let results: Vec<_> = result.collect().await;
assert!(results.is_empty());
Ok(())
}
fn generate_tickets_in_channel(issuer: &ChainKeypair, channel: &ChannelEntry, count: usize) -> Vec<VerifiedTicket> {
assert_eq!(issuer.public().to_address(), channel.source);
(0..count)
.map(|index| {
TicketBuilder::default()
.counterparty(channel.destination)
.amount(1)
.win_prob(WinningProbability::ALWAYS)
.index(index as u64)
.channel_epoch(channel.channel_epoch)
.eth_challenge(Default::default())
.build_signed(&issuer, &Default::default())
.unwrap()
})
.collect()
}
#[tokio::test]
async fn test_redeem_in_channels_multiple_channels() -> anyhow::Result<()> {
let mut mock_tm = MockTicketManager::new();
let mut mock_client = MockChainClient::new();
let my_address = Address::from([0u8; 20]);
mock_client.expect_me().return_const(my_address);
let source_1 = ChainKeypair::random();
let source_2 = ChainKeypair::random();
let channel_1 = ChannelBuilder::default()
.source(&source_1)
.destination(my_address)
.balance(HoprBalance::default())
.status(ChannelStatus::Open)
.build()?;
let channel_2 = ChannelBuilder::default()
.source(&source_2)
.destination(my_address)
.balance(HoprBalance::default())
.status(ChannelStatus::Open)
.build()?;
let channel_1_clone = channel_1.clone();
let channel_2_clone = channel_2.clone();
let channel_1_id = *channel_1.get_id();
let channel_2_id = *channel_2.get_id();
mock_client
.expect_stream_channels()
.with(function(move |selector: &ChannelSelector| {
selector.destination == Some(my_address)
}))
.returning(move |_| Ok(stream::iter(vec![channel_1_clone.clone(), channel_2_clone.clone()]).boxed()));
mock_client.expect_clone().returning(MockChainClient::default);
let min_amount = Some(HoprBalance::from(100));
let tickets_1 = generate_tickets_in_channel(&source_1, &channel_1, 10);
let tickets_2 = generate_tickets_in_channel(&source_2, &channel_2, 10);
let tickets_1_clone = tickets_1.clone();
mock_tm
.expect_redeem_stream::<MockChainClient>()
.once()
.with(always(), eq(channel_1_id), eq(min_amount))
.return_once(|_, _, _| {
Ok(stream::iter(tickets_1_clone)
.map(|t| Ok(RedemptionResult::Redeemed(t)))
.boxed())
});
let tickets_2_clone = tickets_2.clone();
mock_tm
.expect_redeem_stream::<MockChainClient>()
.once()
.with(always(), eq(channel_2_id), eq(min_amount))
.return_once(|_, _, _| {
Ok(stream::iter(tickets_2_clone)
.map(|t| Ok(RedemptionResult::Redeemed(t)))
.boxed())
});
let result = mock_tm.redeem_in_channels(mock_client, None, min_amount, None).await?;
let results: Vec<_> = result.try_collect().await?;
assert_eq!(results.len(), tickets_1.len() + tickets_2.len());
Ok(())
}
}