use super::types::{Addresses, EthereumAccount, EthereumLedgerTxSigner, EthereumStore};
use super::utils::{filter_transfer_logs, make_tx, sent_to_us, ERC20Transfer};
use super::EthAddress;
use crate::create_settlement_engine_filter;
use bytes::Bytes;
use clarity::Signature;
use log::{debug, error, trace};
use parking_lot::RwLock;
use sha3::{Digest, Keccak256 as Sha3};
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::Arc;
use hyper::StatusCode;
use log::info;
use num_bigint::BigUint;
use redis::ConnectionInfo;
use redis::IntoConnectionInfo;
use reqwest::r#async::{Client, Response as HttpResponse};
use serde::{de::Error as DeserializeError, Deserialize, Deserializer, Serialize};
use serde_json::json;
use std::net::SocketAddr;
use std::{
marker::PhantomData,
str::FromStr,
time::{Duration, Instant},
};
use std::{str, u64};
use tokio::timer::Interval;
use tokio_retry::{strategy::ExponentialBackoff, Retry};
use url::Url;
use uuid::Uuid;
use web3::{
api::Web3,
futures::future::{err, join_all, ok, Either, Future},
futures::stream::Stream,
transports::Http,
types::{Address, BlockNumber, CallRequest, TransactionId, H256, U256},
};
use crate::stores::redis_ethereum_ledger::*;
use crate::{ApiResponse, SettlementEngine};
use interledger_http::error::*;
use interledger_settlement::{
scale_with_precision_loss, LeftoversStore, Quantity, CONVERSION_ERROR_TYPE,
};
use secrecy::Secret;
const MAX_RETRIES: usize = 10;
const ETH_CREATE_ACCOUNT_PREFIX: &[u8] = b"ilp-ethl-create-account-message";
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PaymentDetailsRequest {
challenge: Vec<u8>,
}
impl PaymentDetailsRequest {
fn new(challenge: Vec<u8>) -> Self {
PaymentDetailsRequest { challenge }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PaymentDetailsResponse {
to: Addresses,
signature: Signature,
challenge: Option<Vec<u8>>,
}
impl PaymentDetailsResponse {
fn new(to: Addresses, signature: Signature, challenge: Option<Vec<u8>>) -> Self {
PaymentDetailsResponse {
to,
signature,
challenge,
}
}
}
#[derive(Debug, Clone)]
pub struct EthereumLedgerSettlementEngine<S, Si, A> {
store: S,
signer: Si,
account_type: PhantomData<A>,
web3: Web3<Http>,
address: Addresses,
chain_id: u8,
confirmations: u8,
poll_frequency: Duration,
connector_url: Url,
asset_scale: u8,
net_version: String,
challenges: Arc<RwLock<HashMap<String, Vec<u8>>>>,
}
pub struct EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> {
store: S,
signer: Si,
ethereum_endpoint: Option<&'a str>,
chain_id: Option<u8>,
confirmations: Option<u8>,
poll_frequency: Option<Duration>,
connector_url: Option<Url>,
token_address: Option<Address>,
asset_scale: Option<u8>,
watch_incoming: bool,
account_type: PhantomData<A>,
}
impl<'a, S, Si, A> EthereumLedgerSettlementEngineBuilder<'a, S, Si, A>
where
S: EthereumStore<Account = A>
+ LeftoversStore<AccountId = String, AssetType = BigUint>
+ Clone
+ Send
+ Sync
+ 'static,
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
A: EthereumAccount<AccountId = String> + Clone + Send + Sync + 'static,
{
pub fn new(store: S, signer: Si) -> Self {
Self {
store,
signer,
ethereum_endpoint: None,
chain_id: None,
confirmations: None,
poll_frequency: None,
connector_url: None,
token_address: None,
asset_scale: None,
watch_incoming: false,
account_type: PhantomData,
}
}
pub fn token_address(&mut self, token_address: Option<Address>) -> &mut Self {
self.token_address = token_address;
self
}
pub fn ethereum_endpoint(&mut self, endpoint: &'a str) -> &mut Self {
self.ethereum_endpoint = Some(endpoint);
self
}
pub fn asset_scale(&mut self, asset_scale: u8) -> &mut Self {
self.asset_scale = Some(asset_scale);
self
}
pub fn chain_id(&mut self, chain_id: u8) -> &mut Self {
self.chain_id = Some(chain_id);
self
}
pub fn confirmations(&mut self, confirmations: u8) -> &mut Self {
self.confirmations = Some(confirmations);
self
}
pub fn poll_frequency(&mut self, poll_frequency: u64) -> &mut Self {
self.poll_frequency = Some(Duration::from_millis(poll_frequency));
self
}
pub fn watch_incoming(&mut self, watch_incoming: bool) -> &mut Self {
self.watch_incoming = watch_incoming;
self
}
pub fn connector_url(&mut self, connector_url: &'a str) -> &mut Self {
self.connector_url = Some(connector_url.parse().unwrap());
self
}
pub fn connect(
&self,
) -> impl Future<Item = EthereumLedgerSettlementEngine<S, Si, A>, Error = ()> {
let ethereum_endpoint = if let Some(ref ethereum_endpoint) = self.ethereum_endpoint {
ðereum_endpoint
} else {
"http://localhost:8545"
};
let chain_id = if let Some(chain_id) = self.chain_id {
chain_id
} else {
1
};
let connector_url = if let Some(connector_url) = self.connector_url.clone() {
connector_url
} else {
"http://localhost:7771".parse().unwrap()
};
let confirmations = if let Some(confirmations) = self.confirmations {
confirmations
} else {
6
};
let poll_frequency = if let Some(poll_frequency) = self.poll_frequency {
poll_frequency
} else {
Duration::from_secs(5)
};
let asset_scale = if let Some(asset_scale) = self.asset_scale {
asset_scale
} else {
18
};
let (eloop, transport) = Http::new(ethereum_endpoint).unwrap();
eloop.into_remote();
let web3 = Web3::new(transport);
let address = Addresses {
own_address: self.signer.address(),
token_address: self.token_address,
};
let store = self.store.clone();
let signer = self.signer.clone();
let watch_incoming = self.watch_incoming;
web3.net().version().then(move |result| {
let net_version = result.unwrap_or_else(|_| chain_id.to_string());
let engine = EthereumLedgerSettlementEngine {
web3,
store,
signer,
address,
chain_id,
confirmations,
poll_frequency,
connector_url,
asset_scale,
net_version,
account_type: PhantomData,
challenges: Arc::new(RwLock::new(HashMap::new())),
};
if watch_incoming {
engine.notify_connector_on_incoming_settlement();
}
Ok(engine)
})
}
}
impl<S, Si, A> EthereumLedgerSettlementEngine<S, Si, A>
where
S: EthereumStore<Account = A>
+ LeftoversStore<AccountId = String, AssetType = BigUint>
+ Clone
+ Send
+ Sync
+ 'static,
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
A: EthereumAccount<AccountId = String> + Clone + Send + Sync + 'static,
{
pub fn notify_connector_on_incoming_settlement(&self) {
let _self = self.clone();
let interval = self.poll_frequency;
let address = self.address;
debug!(
"[{:?}] settlement engine service for listening to incoming settlements. Interval: {:?}",
address, interval,
);
std::thread::spawn(move || {
tokio::run(
Interval::new(Instant::now(), interval)
.map_err(|e| panic!("interval errored; err={:?}", e))
.for_each(move |_| _self.handle_received_transactions())
.then(|_| {
Ok(())
}),
);
});
}
pub fn handle_received_transactions(&self) -> impl Future<Item = (), Error = ()> + Send {
let confirmations = self.confirmations;
let web3 = self.web3.clone();
let store = self.store.clone();
let store_clone = self.store.clone();
let self_clone = self.clone();
let self_clone2 = self.clone();
let our_address = self.address.own_address;
let token_address = self.address.token_address;
let net_version = self.net_version.clone();
let net_version_clone = net_version.clone();
web3.eth()
.block_number()
.map_err(move |err| error!("Could not fetch current block number {:?}", err))
.and_then(move |current_block| {
let fetch_until = current_block - confirmations;
Ok((
Ok(fetch_until),
store.load_recently_observed_block(net_version),
))
})
.flatten()
.and_then(move |(to_block, last_observed_block)| {
let from_block = if let Some(last_observed_block) = last_observed_block {
if to_block == last_observed_block {
return Either::A(ok(()));
} else {
last_observed_block + 1
}
} else {
to_block
};
trace!("Fetching txs from block {} until {}", from_block, to_block);
let notify_all_txs_fut = if let Some(token_address) = token_address {
let notify_all_erc20_txs_fut = filter_transfer_logs(
web3.clone(),
token_address,
None,
Some(our_address),
BlockNumber::Number(from_block.low_u64()),
BlockNumber::Number(to_block.low_u64()),
)
.and_then(move |transfers: Vec<ERC20Transfer>| {
join_all(transfers.into_iter().map(move |transfer| {
self_clone2.notify_erc20_transfer(transfer, token_address)
}))
});
Either::A(notify_all_erc20_txs_fut)
} else {
let checked_blocks = from_block.low_u64()..=to_block.low_u64();
let notify_eth_txs_fut = checked_blocks
.map(move |block_num| self_clone.notify_eth_txs_in_block(block_num));
Either::B(join_all(notify_eth_txs_fut))
};
Either::B(notify_all_txs_fut.and_then(move |_| {
trace!("Processed all transctions up to block {}", to_block);
store_clone.save_recently_observed_block(net_version_clone, to_block)
}))
})
}
fn notify_erc20_transfer(
&self,
transfer: ERC20Transfer,
token_address: Address,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let store = self.store.clone();
let tx_hash = transfer.tx_hash;
let self_clone = self.clone();
let addr = Addresses {
own_address: transfer.from,
token_address: Some(token_address),
};
let amount = transfer.amount;
Box::new(store
.check_if_tx_processed(tx_hash)
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
.and_then(move |processed| {
if !processed {
Either::A(
store
.load_account_id_from_address(addr)
.and_then(move |id| {
debug!("Notifying connector about incoming ERC20 transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash);
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
})
.and_then(move |_| {
store.mark_tx_processed(tx_hash)
}),
)
} else {
Either::B(ok(())) }
}))
}
fn notify_eth_txs_in_block(&self, block_number: u64) -> impl Future<Item = (), Error = ()> {
trace!("Getting txs for block {}", block_number);
let self_clone = self.clone();
self.web3
.eth()
.block(BlockNumber::Number(block_number).into())
.map_err(move |err| error!("Got error while getting block {}: {:?}", block_number, err))
.and_then(move |maybe_block| {
if let Some(block) = maybe_block {
ok(block)
} else {
err(())
}
})
.and_then(move |block| {
let submit_txs_to_connector_future = block
.transactions
.into_iter()
.map(move |tx_hash| self_clone.notify_eth_transfer(tx_hash));
join_all(submit_txs_to_connector_future)
})
.and_then(|_| Ok(()))
}
fn notify_eth_transfer(&self, tx_hash: H256) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let our_address = self.address.own_address;
let web3 = self.web3.clone();
let store = self.store.clone();
let self_clone = self.clone();
Box::new(store.check_if_tx_processed(tx_hash)
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
.and_then(move |processed| {
if !processed {
Either::A(web3.eth().transaction(TransactionId::Hash(tx_hash))
.map_err(move |err| error!("Could not fetch transaction data from transaction hash: {:?}. Got error: {:?}", tx_hash, err))
.and_then(move |maybe_tx| {
if let Some(tx) = maybe_tx { ok(tx) } else { err(())}
})
.and_then(move |tx| {
if let Some((from, amount)) = sent_to_us(tx, our_address) {
trace!("Got transaction for our account from {} for amount {}", from, amount);
if amount > U256::from(0) {
let addr = Addresses {
own_address: from,
token_address: None,
};
return Either::A(store.load_account_id_from_address(addr)
.and_then(move |id| {
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
})
.and_then(move |_| {
store.mark_tx_processed(tx_hash)
}));
}
}
Either::B(ok(()))
}))
} else {
Either::B(ok(())) }
}))
}
fn notify_connector(
&self,
account_id: String,
amount: String,
tx_hash: H256,
) -> impl Future<Item = (), Error = ()> {
let engine_scale = self.asset_scale;
let mut url = self.connector_url.clone();
url.path_segments_mut()
.expect("Invalid connector URL")
.push("accounts")
.push(&account_id.clone())
.push("settlements");
debug!(
"Making POST to {:?} {:?} about {:?}",
url,
amount,
format!("{:?}", tx_hash)
);
let account_id_clone = account_id.clone();
let amount_clone = amount.clone();
let action = move || {
let client = Client::new();
let account_id = account_id.clone();
let amount = amount.clone();
client
.post(url.as_ref())
.header("Idempotency-Key", format!("{:?}", tx_hash))
.json(&json!(Quantity::new(amount.clone(), engine_scale)))
.send()
.map_err(move |err| {
error!(
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
account_id, amount, err
);
})
};
Retry::spawn(
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
action,
)
.map_err(move |_| {
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount_clone, tx_hash)
})
.and_then(move |ret| {
trace!("Accounting system responded with {:?}", ret);
Ok(())
})
}
fn settle_to(
&self,
to: Address,
amount: U256,
token_address: Option<Address>,
) -> Box<dyn Future<Item = Option<H256>, Error = ()> + Send> {
if amount == U256::from(0) {
return Box::new(ok(None));
}
let web3 = self.web3.clone();
let own_address = self.address.own_address;
let chain_id = self.chain_id;
let signer = self.signer.clone();
let mut tx = make_tx(to, amount, token_address);
let value = U256::from_str(&tx.value.to_string()).unwrap();
let estimate_gas_destination = if let Some(token_address) = token_address {
token_address
} else {
to
};
let gas_amount_fut = Either::A(Either::A(
web3.eth()
.estimate_gas(
CallRequest {
to: estimate_gas_destination,
from: None,
gas: None,
gas_price: None,
value: Some(value),
data: Some(tx.data.clone().into()),
},
None,
)
.then(move |res| {
Ok(match res {
Ok(amount) => amount,
Err(_) => U256::from(100_000),
})
}),
));
let gas_price_fut = Either::A(Either::B(web3.eth().gas_price()));
let nonce_fut = Either::B(
web3.eth()
.transaction_count(own_address, Some(BlockNumber::Pending)),
);
Box::new(
join_all(vec![gas_price_fut, gas_amount_fut, nonce_fut])
.map_err(|err| error!("Error when querying gas price / nonce: {:?}", err))
.and_then(move |data| {
tx.gas_price = data[0];
tx.gas = data[1];
tx.nonce = data[2];
trace!(
"Gas required for transaction: {}, gas price: {}",
data[1],
data[0]
);
let signed_tx = signer.sign_raw_tx(tx.clone(), chain_id); let action = move || {
trace!("Sending tx to Ethereum: {}", hex::encode(&signed_tx));
web3.eth() .send_raw_transaction(signed_tx.clone().into())
.map_err(|err| {
error!("Error sending transaction to Ethereum ledger: {:?}", err);
err
})
};
Retry::spawn(
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
action,
)
.map_err(move |_err| {
error!("Unable to submit tx to Ethereum ledger");
})
.and_then(move |tx_hash| {
debug!("Transaction submitted. Hash: {:?}", tx_hash);
Ok(Some(tx_hash))
})
}),
)
}
fn load_account(
&self,
account_id: String,
) -> impl Future<Item = (String, Addresses), Error = String> {
let store = self.store.clone();
let addr = self.address;
let account_id_clone = account_id.clone();
store
.load_account_addresses(vec![account_id.clone()])
.map_err(move |_err| {
let error_msg = format!("[{:?}] Error getting account: {}", addr, account_id_clone);
error!("{}", error_msg);
error_msg
})
.and_then(move |addresses| ok((account_id, addresses[0])))
}
}
impl<S, Si, A> SettlementEngine for EthereumLedgerSettlementEngine<S, Si, A>
where
S: EthereumStore<Account = A>
+ LeftoversStore<AccountId = String, AssetType = BigUint>
+ Clone
+ Send
+ Sync
+ 'static,
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
A: EthereumAccount<AccountId = String> + Clone + Send + Sync + 'static,
{
fn create_account(
&self,
account_id: String,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
let self_clone = self.clone();
let store: S = self.store.clone();
let signer = self.signer.clone();
let address = self.address;
let idempotency_uuid = Uuid::new_v4().to_hyphenated().to_string();
let challenge = Uuid::new_v4().to_hyphenated().to_string();
let challenge = challenge.into_bytes();
let challenge_clone = challenge.clone();
let client = Client::new();
let mut url = self_clone.connector_url.clone();
url.path_segments_mut()
.expect("Invalid connector URL")
.push("accounts")
.push(&account_id.to_string())
.push("messages");
let body =
serde_json::to_string(&PaymentDetailsRequest::new(challenge_clone.clone())).unwrap();
let url_clone = url.clone();
let action = move || {
client
.post(url.as_ref())
.header("Content-Type", "application/octet-stream")
.header("Idempotency-Key", idempotency_uuid.clone())
.body(body.clone())
.send()
};
Box::new(
Retry::spawn(
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
action,
)
.map_err(move |err| {
let err = format!("Couldn't notify connector {:?}", err);
error!("{}", err);
ApiError::internal_server_error().detail(err)
})
.and_then(move |resp| {
parse_body_into_payment_details(resp).and_then(move |payment_details| {
let data = prefixed_message(challenge_clone);
let challenge_hash = Sha3::digest(&data);
let recovered_address = payment_details.signature.recover(&challenge_hash);
trace!("Received payment details {:?}", payment_details);
match recovered_address {
Ok(recovered_address) => {
if recovered_address.as_bytes()
!= &payment_details.to.own_address.as_bytes()[..]
{
let error_msg = format!(
"Recovered address did not match: {:?}. Expected {:?}",
recovered_address.to_string(),
payment_details.to
);
error!("{}", error_msg);
return Either::A(err(
ApiError::internal_server_error().detail(error_msg)
));
}
}
Err(error_msg) => {
let error_msg = format!("Could not recover address {:?}", error_msg);
error!("{}", error_msg);
return Either::A(err(
ApiError::internal_server_error().detail(error_msg)
));
}
};
if let Some(challenge) = payment_details.challenge {
let data = prefixed_message(challenge);
let signature = signer.sign_message(&data);
let resp = {
let ret = PaymentDetailsResponse::new(address, signature, None);
serde_json::to_string(&ret).unwrap()
};
let idempotency_uuid = Uuid::new_v4().to_hyphenated().to_string();
let client = Client::new();
let action = move || {
client
.post(url_clone.as_ref())
.header("Content-Type", "application/octet-stream")
.header("Idempotency-Key", idempotency_uuid.clone())
.body(resp.clone())
.send()
.map_err(|err| error!("{}", err))
.and_then(move |_| Ok(()))
};
tokio::executor::spawn(
Retry::spawn(
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
action,
)
.map_err(|err| error!("{:?}", err)),
);
}
let data = HashMap::from_iter(vec![(account_id, payment_details.to)]);
Either::B(
store
.save_account_addresses(data)
.map_err(move |err| {
let err_type = ApiErrorType {
r#type: &ProblemType::Default,
title: "Store connection error",
status: StatusCode::BAD_REQUEST,
};
let err = format!("Couldn't connect to store {:?}", err);
error!("{}", err);
ApiError::from_api_error_type(&err_type).detail(err)
})
.and_then(move |_| {
Ok((StatusCode::from_u16(201).unwrap(), Bytes::from("CREATED")))
}),
)
})
}),
)
}
fn receive_message(
&self,
account_id: String,
body: Vec<u8>,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
let address = self.address;
let store = self.store.clone();
if let Ok(req) = serde_json::from_slice::<PaymentDetailsRequest>(&body) {
debug!(
"Received account creation request. Responding with our account's details {} {:?}",
account_id, address
);
let data = prefixed_message(req.challenge);
let signature = self.signer.sign_message(&data);
let resp = {
let challenge = Uuid::new_v4().to_hyphenated().to_string();
let challenge = challenge.into_bytes();
let mut guard = self.challenges.write();
(*guard).insert(account_id, challenge.clone());
let ret = PaymentDetailsResponse::new(address, signature, Some(challenge));
serde_json::to_vec(&ret).unwrap()
};
Box::new(ok((StatusCode::from_u16(200).unwrap(), resp.into())))
} else if let Ok(resp) = serde_json::from_slice::<PaymentDetailsResponse>(&body) {
debug!("Received payment details: {:?}", resp);
let guard = self.challenges.read();
let fut = if let Some(challenge) = (*guard).get(&account_id) {
let data = prefixed_message(challenge.to_vec());
let challenge_hash = Sha3::digest(&data);
let recovered_address = resp.signature.recover(&challenge_hash);
match recovered_address {
Ok(recovered_address) => {
Either::A(
if recovered_address.as_bytes() != &resp.to.own_address.as_bytes()[..] {
Either::A(ok(()))
} else {
let data = HashMap::from_iter(vec![(account_id, resp.to)]);
Either::B(store.save_account_addresses(data).map_err(move |err| {
let error_msg = format!("Couldn't connect to store {:?}", err);
error!("{}", error_msg);
ApiError::internal_server_error().detail(error_msg)
}))
},
)
}
Err(error_msg) => {
let error_msg = format!("Could not recover address {:?}", error_msg);
let err_type = ApiErrorType {
r#type: &ProblemType::Default,
title: "Signature verification failure",
status: StatusCode::BAD_REQUEST,
};
error!("{}", error_msg);
Either::B(err(
ApiError::from_api_error_type(&err_type).detail(error_msg)
))
}
}
} else {
Either::B(ok(()))
};
Box::new(
fut.and_then(move |_| Ok((StatusCode::from_u16(200).unwrap(), Bytes::from("OK")))),
)
} else {
let error_msg = "Ignoring message that was neither a PaymentDetailsRequest nor a PaymentDetailsResponse";
error!("{}", error_msg);
let err_type = ApiErrorType {
r#type: &ProblemType::Default,
title: "Invalid message type",
status: StatusCode::BAD_REQUEST,
};
Box::new(err(
ApiError::from_api_error_type(&err_type).detail(error_msg)
))
}
}
fn send_money(
&self,
account_id: String,
body: Quantity,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
let self_clone = self.clone();
let store = self.store.clone();
let engine_scale = self.asset_scale;
let connector_scale = body.scale;
let amount_from_connector = match BigUint::from_str(&body.amount) {
Ok(a) => a,
Err(_err) => {
let error_msg = format!("Error converting to BigUint {:?}", _err);
error!("{:?}", error_msg);
return Box::new(err(
ApiError::from_api_error_type(&CONVERSION_ERROR_TYPE).detail(error_msg)
));
}
};
let (amount, precision_loss) =
scale_with_precision_loss(amount_from_connector, engine_scale, connector_scale);
Box::new(
self.store
.load_uncredited_settlement_amount(account_id.clone(), engine_scale)
.map_err(move |err| {
let error_msg = format!("Error loading leftovers {:?}", err);
error!("{}", error_msg);
ApiError::internal_server_error().detail(error_msg)
})
.join(self_clone.load_account(account_id).map_err(move |err| {
let error_msg = format!("Error loading account {:?}", err);
error!("{}", error_msg);
ApiError::internal_server_error().detail(error_msg)
}))
.and_then(
move |(uncredited_settlement_amount, (account_id, addresses))| {
debug!(
"Sending settlement to account {} (Ethereum address: {}) for amount: {}{}",
account_id,
addresses.own_address,
amount,
if let Some(token_address) = addresses.token_address {
format!(" (token address: {}", token_address)
} else {
"".to_string()
}
);
let total_amount = amount + uncredited_settlement_amount;
let total_amount = match U256::from_dec_str(&total_amount.to_string()) {
Ok(a) => a,
Err(_err) => {
let error_msg = format!("Error converting to U256 {:?}", _err);
error!("{:?}", error_msg);
return Either::A(err(ApiError::from_api_error_type(&CONVERSION_ERROR_TYPE).detail(error_msg)))
}
};
Either::B(join_all(vec![
Either::A(self_clone.settle_to(addresses.own_address, total_amount, addresses.token_address).and_then(move |_| Ok(()))),
Either::B(store.save_uncredited_settlement_amount(account_id, (precision_loss, connector_scale)))
])
.map_err(move |_| {
let error_msg = "Error connecting to the blockchain.".to_string();
error!("{}", error_msg);
let err_type = ApiErrorType {
r#type: &ProblemType::Default,
title: "Blockchain connection error",
status: StatusCode::BAD_GATEWAY,
};
ApiError::from_api_error_type(&err_type).detail(error_msg)
}))
},
)
.and_then(move |_| Ok((StatusCode::OK, Bytes::from("OK")))),
)
}
}
fn parse_body_into_payment_details(
resp: HttpResponse,
) -> impl Future<Item = PaymentDetailsResponse, Error = ApiError> {
resp.into_body()
.concat2()
.map_err(|err| {
let err = format!("Couldn't retrieve body {:?}", err);
error!("{}", err);
ApiError::internal_server_error().detail(err)
})
.and_then(move |body| {
serde_json::from_slice::<PaymentDetailsResponse>(&body).map_err(|err| {
let err = format!(
"Couldn't parse body {:?} into payment details {:?}",
body, err
);
error!("{}", err);
ApiError::internal_server_error().detail(err)
})
})
}
fn prefixed_message(challenge: Vec<u8>) -> Vec<u8> {
let mut ret = ETH_CREATE_ACCOUNT_PREFIX.to_vec();
ret.extend(challenge);
ret
}
#[doc(hidden)]
#[allow(clippy::all)]
pub fn run_ethereum_engine(opt: EthereumLedgerOpt) -> impl Future<Item = (), Error = ()> {
EthereumLedgerRedisStoreBuilder::new(opt.redis_connection.clone())
.connect()
.and_then(move |ethereum_store| {
let engine_fut = EthereumLedgerSettlementEngineBuilder::new(
ethereum_store.clone(),
opt.private_key.clone(),
)
.ethereum_endpoint(&opt.ethereum_url)
.chain_id(opt.chain_id)
.connector_url(&opt.connector_url)
.confirmations(opt.confirmations)
.asset_scale(opt.asset_scale)
.poll_frequency(opt.poll_frequency)
.watch_incoming(opt.watch_incoming)
.token_address(opt.token_address)
.connect();
engine_fut.and_then(move |engine| {
let api = create_settlement_engine_filter(engine, ethereum_store);
tokio::spawn(warp::serve(api).bind(opt.settlement_api_bind_address));
info!(
"Ethereum Settlement Engine listening on: {}",
&opt.settlement_api_bind_address
);
Ok(())
})
})
}
#[derive(Deserialize, Clone)]
pub struct EthereumLedgerOpt {
pub private_key: Secret<String>,
pub settlement_api_bind_address: SocketAddr,
pub ethereum_url: String,
pub token_address: Option<EthAddress>,
pub connector_url: String,
#[serde(deserialize_with = "deserialize_redis_connection", alias = "redis_url")]
pub redis_connection: ConnectionInfo,
pub chain_id: u8,
pub confirmations: u8,
pub asset_scale: u8,
pub poll_frequency: u64,
pub watch_incoming: bool,
}
fn deserialize_redis_connection<'de, D>(deserializer: D) -> Result<ConnectionInfo, D::Error>
where
D: Deserializer<'de>,
{
Url::parse(&String::deserialize(deserializer)?)
.map_err(|err| DeserializeError::custom(format!("Invalid URL: {:?}", err)))?
.into_connection_info()
.map_err(|err| {
DeserializeError::custom(format!(
"Error converting into Redis connection info: {:?}",
err
))
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engines::ethereum_ledger::test_helpers::{
block_on,
fixtures::{ALICE, BOB, MESSAGES_API},
test_engine, test_store, TestAccount,
};
use lazy_static::lazy_static;
use mockito;
use secrecy::Secret;
lazy_static! {
pub static ref ALICE_PK: Secret<String> = Secret::new(String::from(
"380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"
));
pub static ref BOB_PK: Secret<String> = Secret::new(String::from(
"cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e"
));
}
#[test]
fn test_create_get_account() {
let bob: TestAccount = BOB.clone();
let challenge = Uuid::new_v4().to_hyphenated().to_string();
let signature = BOB_PK.clone().sign_message(&challenge.clone().into_bytes());
let body_se_data = serde_json::to_string(&PaymentDetailsResponse {
to: Addresses {
own_address: bob.address,
token_address: None,
},
signature,
challenge: None,
})
.unwrap();
let m = mockito::mock("POST", MESSAGES_API.clone())
.with_status(200)
.with_body(body_se_data)
.expect(1) .create();
let connector_url = mockito::server_url();
let store = test_store(bob.clone(), false, false, false);
let engine = test_engine(
store.clone(),
ALICE_PK.clone(),
0,
&connector_url,
None,
false,
);
let ret: ApiError = block_on(engine.create_account(bob.id)).unwrap_err();
assert_eq!(ret.status.as_u16(), 500);
let error_msg = ret.detail.unwrap();
assert!(error_msg.starts_with("Recovered address did not match:"));
assert!(error_msg.ends_with("Expected Addresses { own_address: 0x9b925641c5ef3fd86f63bff2da55a0deeafd1263, token_address: None }"));
m.assert();
}
#[test]
fn test_receive_message() {
let bob: TestAccount = BOB.clone();
let challenge = Uuid::new_v4().to_hyphenated().to_string().into_bytes();
let signed_challenge = prefixed_message(challenge.clone());
let signature = ALICE_PK.clone().sign_message(&signed_challenge);
let store = test_store(ALICE.clone(), false, false, false);
let engine = test_engine(
store.clone(),
ALICE_PK.clone(),
0,
"http://127.0.0.1:8770",
None,
false,
);
let c = serde_json::to_vec(&PaymentDetailsRequest::new(challenge)).unwrap();
let ret = block_on(engine.receive_message(bob.id.to_string(), c)).unwrap();
assert_eq!(ret.0.as_u16(), 200);
let alice_addrs = Addresses {
own_address: ALICE.address,
token_address: None,
};
let data: PaymentDetailsResponse = serde_json::from_slice(&ret.1).unwrap();
assert_eq!(data.to, alice_addrs);
assert_eq!(data.signature, signature);
assert!(data.challenge.is_some());
let challenge = data.challenge.unwrap();
let signed_challenge = prefixed_message(challenge.clone());
let signature = BOB_PK.clone().sign_message(&signed_challenge);
let bob_addrs = Addresses {
own_address: BOB.address,
token_address: None,
};
let c =
serde_json::to_vec(&PaymentDetailsResponse::new(bob_addrs, signature, None)).unwrap();
let ret = block_on(engine.receive_message(bob.id.to_string(), c)).unwrap();
assert_eq!(ret.0.as_u16(), 200);
assert_eq!(ret.1, Bytes::from("OK".to_owned()));
let addrs = store
.load_account_addresses(vec![bob.id.to_string()])
.wait()
.unwrap();
assert_eq!(addrs[0], bob_addrs);
}
}