use super::crypto::*;
use super::packet::*;
use base64;
use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use futures::{future::result, sync::mpsc::UnboundedSender};
use hex;
use interledger_packet::{
Address, ErrorCode, Fulfill, FulfillBuilder, PacketType as IlpPacketType, Prepare, Reject,
RejectBuilder,
};
use interledger_service::{Account, BoxedIlpFuture, OutgoingRequest, OutgoingService, Username};
use log::debug;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use std::time::SystemTime;
const STREAM_SERVER_SECRET_GENERATOR: &[u8] = b"ilp_stream_shared_secret";
#[derive(Clone)]
pub struct ConnectionGenerator {
secret_generator: Bytes,
}
impl ConnectionGenerator {
pub fn new(server_secret: Bytes) -> Self {
assert_eq!(server_secret.len(), 32, "Server secret must be 32 bytes");
ConnectionGenerator {
secret_generator: Bytes::from(
&hmac_sha256(&server_secret[..], STREAM_SERVER_SECRET_GENERATOR)[..],
),
}
}
pub fn generate_address_and_secret(&self, base_address: &Address) -> (Address, [u8; 32]) {
let token = base64::encode_config(&generate_token(), base64::URL_SAFE_NO_PAD);
let shared_secret = hmac_sha256(&self.secret_generator[..], &token.as_bytes()[..]);
let destination_account = base_address.with_suffix(&token.as_ref()).unwrap();
debug!("Generated address: {}", destination_account,);
(destination_account, shared_secret)
}
pub fn rederive_secret(&self, destination_account: &Address) -> Result<[u8; 32], ()> {
let local_part = destination_account.segments().rev().next().unwrap();
let shared_secret = hmac_sha256(&self.secret_generator[..], &local_part.as_bytes()[..]);
Ok(shared_secret)
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PaymentNotification {
pub to_username: Username,
pub from_username: Username,
pub destination: Address,
pub amount: u64,
pub timestamp: String,
}
pub trait StreamNotificationsStore {
type Account: Account;
fn add_payment_notification_subscription(
&self,
account_id: <Self::Account as Account>::AccountId,
sender: UnboundedSender<PaymentNotification>,
);
fn publish_payment_notification(&self, _payment: PaymentNotification);
}
#[derive(Clone)]
pub struct StreamReceiverService<S, O: OutgoingService<A>, A: Account> {
connection_generator: ConnectionGenerator,
next: O,
account_type: PhantomData<A>,
store: S,
}
impl<S, O, A> StreamReceiverService<S, O, A>
where
S: StreamNotificationsStore<Account = A>,
O: OutgoingService<A>,
A: Account,
{
pub fn new(server_secret: Bytes, store: S, next: O) -> Self {
let connection_generator = ConnectionGenerator::new(server_secret);
StreamReceiverService {
connection_generator,
next,
account_type: PhantomData,
store,
}
}
}
impl<S, O, A> OutgoingService<A> for StreamReceiverService<S, O, A>
where
S: StreamNotificationsStore + Send + Sync + 'static + Clone,
O: OutgoingService<A>,
A: Account,
{
type Future = BoxedIlpFuture;
fn send_request(&mut self, request: OutgoingRequest<A>) -> Self::Future {
let to_username = request.to.username().clone();
let from_username = request.from.username().clone();
let amount = request.prepare.amount();
let store = self.store.clone();
let destination = request.prepare.destination();
let to_address = request.to.ilp_address();
let dest: &[u8] = destination.as_ref();
if dest.starts_with(to_address.as_ref()) {
if let Ok(shared_secret) = self.connection_generator.rederive_secret(&destination) {
let response = receive_money(
&shared_secret,
&to_address,
request.to.asset_code(),
request.to.asset_scale(),
&request.prepare,
);
match response {
Ok(ref _fulfill) => store.publish_payment_notification(PaymentNotification {
to_username,
from_username,
amount,
destination: destination.clone(),
timestamp: DateTime::<Utc>::from(SystemTime::now()).to_rfc3339(),
}),
Err(ref reject) => {
if reject.code() == ErrorCode::F06_UNEXPECTED_PAYMENT {
return Box::new(self.next.send_request(request));
}
}
};
return Box::new(result(response));
}
}
Box::new(self.next.send_request(request))
}
}
fn receive_money(
shared_secret: &[u8; 32],
ilp_address: &Address,
asset_code: &str,
asset_scale: u8,
prepare: &Prepare,
) -> Result<Fulfill, Reject> {
let fulfillment = generate_fulfillment(&shared_secret[..], prepare.data());
let condition = hash_sha256(&fulfillment);
let is_fulfillable = condition == prepare.execution_condition();
let prepare_amount = prepare.amount();
let copied_data = BytesMut::from(prepare.data());
let stream_packet = StreamPacket::from_encrypted(shared_secret, copied_data).map_err(|_| {
debug!("Unable to parse data, rejecting Prepare packet");
RejectBuilder {
code: ErrorCode::F06_UNEXPECTED_PAYMENT,
message: b"Could not decrypt data",
triggered_by: Some(ilp_address),
data: &[],
}
.build()
})?;
let mut response_frames: Vec<Frame> = Vec::new();
for frame in stream_packet.frames() {
if let Frame::StreamMoney(ref frame) = frame {
response_frames.push(Frame::StreamMaxMoney(StreamMaxMoneyFrame {
stream_id: frame.stream_id,
total_received: 0,
receive_max: u64::max_value(),
}));
}
if let Frame::ConnectionNewAddress(_) = frame {
response_frames.push(Frame::ConnectionAssetDetails(ConnectionAssetDetailsFrame {
source_asset_code: asset_code,
source_asset_scale: asset_scale,
}));
}
}
if is_fulfillable && prepare_amount >= stream_packet.prepare_amount() {
let response_packet = StreamPacketBuilder {
sequence: stream_packet.sequence(),
ilp_packet_type: IlpPacketType::Fulfill,
prepare_amount,
frames: &response_frames,
}
.build();
debug!(
"Fulfilling prepare for amount {} with fulfillment: {} and encrypted stream packet: {:?}",
prepare_amount,
hex::encode(&fulfillment[..]),
response_packet
);
let encrypted_response = response_packet.into_encrypted(shared_secret);
let fulfill = FulfillBuilder {
fulfillment: &fulfillment,
data: &encrypted_response[..],
}
.build();
Ok(fulfill)
} else {
let response_packet = StreamPacketBuilder {
sequence: stream_packet.sequence(),
ilp_packet_type: IlpPacketType::Reject,
prepare_amount,
frames: &response_frames,
}
.build();
if !is_fulfillable {
debug!("Packet is unfulfillable");
} else if prepare_amount < stream_packet.prepare_amount() {
debug!(
"Received only: {} when we should have received at least: {}",
prepare_amount,
stream_packet.prepare_amount()
);
}
debug!(
"Rejecting Prepare and including encrypted stream packet {:?}",
response_packet
);
let encrypted_response = response_packet.into_encrypted(shared_secret);
let reject = RejectBuilder {
code: ErrorCode::F99_APPLICATION_ERROR,
message: &[],
triggered_by: Some(&ilp_address),
data: &encrypted_response[..],
}
.build();
Err(reject)
}
}
#[cfg(test)]
mod connection_generator {
use super::*;
use std::str::FromStr;
#[test]
fn generates_valid_ilp_address() {
let server_secret = [9; 32];
let receiver_address = Address::from_str("example.receiver").unwrap();
let connection_generator = ConnectionGenerator::new(Bytes::from(&server_secret[..]));
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&receiver_address);
assert!(destination_account
.to_bytes()
.starts_with(receiver_address.as_ref()));
assert_eq!(
connection_generator
.rederive_secret(&destination_account)
.unwrap(),
shared_secret
);
}
}
#[cfg(test)]
fn test_stream_packet() -> StreamPacket {
StreamPacketBuilder {
ilp_packet_type: IlpPacketType::Prepare,
prepare_amount: 0,
sequence: 1,
frames: &[Frame::StreamMoney(StreamMoneyFrame {
stream_id: 1,
shares: 1,
})],
}
.build()
}
#[cfg(test)]
mod receiving_money {
use super::*;
use interledger_packet::PrepareBuilder;
use std::convert::TryFrom;
use std::str::FromStr;
use std::time::UNIX_EPOCH;
#[test]
fn fulfills_valid_packet() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = test_stream_packet();
let data = stream_packet.into_encrypted(&shared_secret[..]);
let execution_condition = generate_condition(&shared_secret[..], &data);
let dest = Address::try_from(destination_account).unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let shared_secret = connection_generator
.rederive_secret(&prepare.destination())
.unwrap();
let result = receive_money(&shared_secret, &ilp_address, "ABC", 9, &prepare);
assert!(result.is_ok());
}
#[test]
fn fulfills_valid_packet_without_connection_tag() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = test_stream_packet();
let data = stream_packet.into_encrypted(&shared_secret[..]);
let execution_condition = generate_condition(&shared_secret[..], &data);
let dest = Address::try_from(destination_account).unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let shared_secret = connection_generator
.rederive_secret(&prepare.destination())
.unwrap();
let result = receive_money(&shared_secret, &ilp_address, "ABC", 9, &prepare);
assert!(result.is_ok());
}
#[test]
fn rejects_modified_data() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = test_stream_packet();
let mut data = stream_packet.into_encrypted(&shared_secret[..]);
data.extend_from_slice(b"x");
let execution_condition = generate_condition(&shared_secret[..], &data);
let dest = Address::try_from(destination_account).unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let shared_secret = connection_generator
.rederive_secret(&prepare.destination())
.unwrap();
let result = receive_money(&shared_secret, &ilp_address, "ABC", 9, &prepare);
assert!(result.is_err());
}
#[test]
fn rejects_too_little_money() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = StreamPacketBuilder {
ilp_packet_type: IlpPacketType::Prepare,
prepare_amount: 101,
sequence: 1,
frames: &[Frame::StreamMoney(StreamMoneyFrame {
stream_id: 1,
shares: 1,
})],
}
.build();
let data = stream_packet.into_encrypted(&shared_secret[..]);
let execution_condition = generate_condition(&shared_secret[..], &data);
let dest = Address::try_from(destination_account).unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let shared_secret = connection_generator
.rederive_secret(&prepare.destination())
.unwrap();
let result = receive_money(&shared_secret, &ilp_address, "ABC", 9, &prepare);
assert!(result.is_err());
}
#[test]
fn fulfills_packets_sent_to_javascript_receiver() {
let ilp_address = Address::from_str("test.peerB").unwrap();
let prepare = Prepare::try_from(bytes::BytesMut::from(hex::decode("0c819900000000000001f43230313931303238323134313533383338f31a96346c613011947f39a0f1f4e573c2fc3e7e53797672b01d2898e90c9a0723746573742e70656572422e4e6a584430754a504275477a353653426d4933755836682d3b6cc484c0d4e9282275d4b37c6ae18f35b497ddbfcbce6d9305b9451b4395c3158aa75e05bf27582a237109ec6ca0129d840da7abd96826c8147d0d").unwrap())).unwrap();
let condition = prepare.execution_condition().to_vec();
let server_secret = Bytes::from(vec![0u8; 32]);
let connection_generator = ConnectionGenerator::new(server_secret);
let shared_secret = connection_generator
.rederive_secret(&prepare.destination())
.expect("Receiver should be able to rederive the shared secret");
assert_eq!(
&shared_secret[..],
hex::decode("b7d09d2e16e6f83c55b60e42fcd7c2b8ed49624a1df73c59b383dbe2e8690309")
.unwrap()
.as_ref() as &[u8],
"did not regenerate the same shared secret",
);
let fulfill = receive_money(&shared_secret, &ilp_address, "ABC", 9, &prepare)
.expect("Receiver should be able to generate the fulfillment");
assert_eq!(
&hash_sha256(fulfill.fulfillment())[..],
&condition[..],
"fulfillment generated does not hash to the expected condition"
);
}
}
#[cfg(test)]
mod stream_receiver_service {
use super::*;
use crate::test_helpers::*;
use futures::Future;
use interledger_packet::PrepareBuilder;
use interledger_service::outgoing_service_fn;
use std::convert::TryFrom;
use std::str::FromStr;
use std::time::UNIX_EPOCH;
#[test]
fn fulfills_correct_packets() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = test_stream_packet();
let data = stream_packet.into_encrypted(&shared_secret[..]);
let execution_condition = generate_condition(&shared_secret[..], &data);
let dest = Address::try_from(destination_account).unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let mut service = StreamReceiverService::new(
server_secret.clone(),
DummyStore,
outgoing_service_fn(|_: OutgoingRequest<TestAccount>| -> BoxedIlpFuture {
panic!("shouldn't get here")
}),
);
let result = service
.send_request(OutgoingRequest {
from: TestAccount {
id: 0,
ilp_address: Address::from_str("example.sender").unwrap(),
asset_code: "XYZ".to_string(),
asset_scale: 9,
},
to: TestAccount {
id: 1,
ilp_address: ilp_address.clone(),
asset_code: "XYZ".to_string(),
asset_scale: 9,
},
original_amount: prepare.amount(),
prepare,
})
.wait();
assert!(result.is_ok());
}
#[test]
fn rejects_invalid_packets() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = test_stream_packet();
let mut data = stream_packet.into_encrypted(&shared_secret[..]);
let execution_condition = generate_condition(&shared_secret[..], &data);
data.extend_from_slice(b"extra");
let dest = Address::try_from(destination_account).unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let mut service = StreamReceiverService::new(
server_secret.clone(),
DummyStore,
outgoing_service_fn(
|_: OutgoingRequest<TestAccount>| -> Result<Fulfill, Reject> {
Err(RejectBuilder {
code: ErrorCode::F02_UNREACHABLE,
message: &[],
data: &[],
triggered_by: None,
}
.build())
},
),
);
let result = service
.send_request(OutgoingRequest {
from: TestAccount {
id: 0,
ilp_address: Address::from_str("example.sender").unwrap(),
asset_code: "XYZ".to_string(),
asset_scale: 9,
},
to: TestAccount {
id: 1,
ilp_address: ilp_address.clone(),
asset_code: "XYZ".to_string(),
asset_scale: 9,
},
original_amount: prepare.amount(),
prepare,
})
.wait();
assert!(result.is_err());
}
#[test]
fn passes_on_packets_not_for_it() {
let ilp_address = Address::from_str("example.destination").unwrap();
let server_secret = Bytes::from(&[1; 32][..]);
let connection_generator = ConnectionGenerator::new(server_secret.clone());
let (destination_account, shared_secret) =
connection_generator.generate_address_and_secret(&ilp_address);
let stream_packet = test_stream_packet();
let data = stream_packet.into_encrypted(&shared_secret[..]);
let execution_condition = generate_condition(&shared_secret[..], &data);
let dest = Address::try_from(destination_account).unwrap();
let dest = dest.with_suffix(b"extra").unwrap();
let prepare = PrepareBuilder {
destination: dest,
amount: 100,
expires_at: UNIX_EPOCH,
data: &data[..],
execution_condition: &execution_condition,
}
.build();
let mut service = StreamReceiverService::new(
server_secret.clone(),
DummyStore,
outgoing_service_fn(|_| {
Err(RejectBuilder {
code: ErrorCode::F02_UNREACHABLE,
message: &[],
data: &[],
triggered_by: Address::from_str("example.other-receiver").ok().as_ref(),
}
.build())
}),
);
let result = service
.send_request(OutgoingRequest {
from: TestAccount {
id: 0,
ilp_address: Address::from_str("example.sender").unwrap(),
asset_code: "XYZ".to_string(),
asset_scale: 9,
},
original_amount: prepare.amount(),
to: TestAccount {
id: 1,
ilp_address: ilp_address.clone(),
asset_code: "XYZ".to_string(),
asset_scale: 9,
},
prepare,
})
.wait();
assert!(result.is_err());
assert_eq!(
result.unwrap_err().triggered_by().unwrap(),
Address::from_str("example.other-receiver").unwrap(),
);
}
}