#[cfg(test)]
use crate::packet::PEER_PROTOCOL_CONDITION;
use crate::{
packet::{
Mode, Route, RouteControlRequest, RouteUpdateRequest, CCP_CONTROL_DESTINATION,
CCP_RESPONSE, CCP_UPDATE_DESTINATION,
},
routing_table::RoutingTable,
CcpRoutingAccount, RouteManagerStore,
};
use bytes::Bytes;
use futures::{
future::{err, join_all, ok, Either},
Future, Stream,
};
#[cfg(test)]
use interledger_packet::PrepareBuilder;
use interledger_packet::{Address, ErrorCode, Fulfill, Reject, RejectBuilder};
use interledger_service::{
Account, BoxedIlpFuture, IncomingRequest, IncomingService, OutgoingRequest, OutgoingService,
};
#[cfg(test)]
use lazy_static::lazy_static;
use log::{debug, error, trace, warn};
use parking_lot::{Mutex, RwLock};
use ring::digest::{digest, SHA256};
use std::collections::HashMap;
use std::{
cmp::min,
convert::TryFrom,
str,
sync::Arc,
time::{Duration, Instant},
};
use tokio_timer::Interval;
#[cfg(not(test))]
use tokio_executor::spawn;
const DEFAULT_ROUTE_EXPIRY_TIME: u32 = 45000;
const DEFAULT_BROADCAST_INTERVAL: u64 = 30000;
const DUMMY_ROUTING_TABLE_ID: [u8; 16] = [0; 16];
fn hash(preimage: &[u8; 32]) -> [u8; 32] {
let mut out = [0; 32];
out.copy_from_slice(digest(&SHA256, preimage).as_ref());
out
}
type NewAndWithdrawnRoutes = (Vec<Route>, Vec<Bytes>);
pub struct CcpRouteManagerBuilder<I, O, S> {
next_incoming: I,
outgoing: O,
store: S,
ilp_address: Address,
global_prefix: Bytes,
broadcast_interval: u64,
}
impl<I, O, S, A> CcpRouteManagerBuilder<I, O, S>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
S: RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount + Send + Sync + 'static,
{
pub fn new(ilp_address: Address, store: S, outgoing: O, next_incoming: I) -> Self {
CcpRouteManagerBuilder {
ilp_address,
global_prefix: Bytes::from_static(b"g."),
next_incoming,
outgoing,
store,
broadcast_interval: DEFAULT_BROADCAST_INTERVAL,
}
}
pub fn ilp_address(&mut self, ilp_address: Address) -> &mut Self {
self.global_prefix = ilp_address
.to_bytes()
.iter()
.position(|c| c == &b'.')
.map(|index| ilp_address.to_bytes().slice_to(index + 1))
.unwrap_or_else(|| ilp_address.to_bytes().clone());
self.ilp_address = ilp_address;
self
}
pub fn broadcast_interval(&mut self, ms: u64) -> &mut Self {
self.broadcast_interval = ms;
self
}
pub fn to_service(&self) -> CcpRouteManager<I, O, S, A> {
#[allow(clippy::let_and_return)]
let service = CcpRouteManager {
ilp_address: self.ilp_address.clone(),
global_prefix: self.global_prefix.clone(),
next_incoming: self.next_incoming.clone(),
outgoing: self.outgoing.clone(),
store: self.store.clone(),
forwarding_table: Arc::new(RwLock::new(RoutingTable::default())),
forwarding_table_updates: Arc::new(RwLock::new(Vec::new())),
last_epoch_updates_sent_for: Arc::new(Mutex::new(0)),
local_table: Arc::new(RwLock::new(RoutingTable::default())),
incoming_tables: Arc::new(RwLock::new(HashMap::new())),
};
#[cfg(not(test))]
{
spawn(service.start_broadcast_interval(self.broadcast_interval));
}
service
}
}
#[derive(Clone)]
pub struct CcpRouteManager<I, O, S, A: Account> {
ilp_address: Address,
global_prefix: Bytes,
next_incoming: I,
outgoing: O,
forwarding_table: Arc<RwLock<RoutingTable<A>>>,
last_epoch_updates_sent_for: Arc<Mutex<u32>>,
forwarding_table_updates: Arc<RwLock<Vec<NewAndWithdrawnRoutes>>>,
local_table: Arc<RwLock<RoutingTable<A>>>,
incoming_tables: Arc<RwLock<HashMap<A::AccountId, RoutingTable<A>>>>,
store: S,
}
impl<I, O, S, A> CcpRouteManager<I, O, S, A>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
S: RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount + Send + Sync + 'static,
{
pub fn start_broadcast_interval(&self, interval: u64) -> impl Future<Item = (), Error = ()> {
let clone = self.clone();
self.request_all_routes().and_then(move |_| {
Interval::new(Instant::now(), Duration::from_millis(interval))
.map_err(|err| error!("Interval error, no longer sending route updates: {:?}", err))
.for_each(move |_| {
clone.broadcast_routes().then(|_| {
Ok(())
})
})
})
}
pub fn broadcast_routes(&self) -> impl Future<Item = (), Error = ()> {
let clone = self.clone();
self.update_best_routes(None)
.and_then(move |_| clone.send_route_updates())
}
fn request_all_routes(&self) -> impl Future<Item = (), Error = ()> {
let clone = self.clone();
self.store
.get_accounts_to_receive_routes_from()
.then(|result| {
let accounts = result.unwrap_or_else(|_| Vec::new());
join_all(accounts.into_iter().map(move |account| {
clone.send_route_control_request(account, DUMMY_ROUTING_TABLE_ID, 0)
}))
})
.then(|_| Ok(()))
}
fn handle_route_control_request(
&self,
request: IncomingRequest<A>,
) -> impl Future<Item = Fulfill, Error = Reject> {
if !request.from.should_send_routes() {
return Either::A(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"We are not configured to send routes to you, sorry",
triggered_by: Some(&self.ilp_address),
data: &[],
}
.build()));
}
let control = RouteControlRequest::try_from(&request.prepare);
if control.is_err() {
return Either::A(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"Invalid route control request",
triggered_by: Some(&self.ilp_address),
data: &[],
}
.build()));
}
let control = control.unwrap();
debug!(
"Got route control request from account {}: {:?}",
request.from.id(),
control
);
if control.mode == Mode::Sync {
let (from_epoch_index, to_epoch_index) = {
let forwarding_table = self.forwarding_table.read();
let to_epoch_index = forwarding_table.epoch();
let from_epoch_index =
if control.last_known_routing_table_id != forwarding_table.id() {
0
} else {
min(control.last_known_epoch, to_epoch_index)
};
(from_epoch_index, to_epoch_index)
};
#[cfg(test)]
{
let ilp_address = self.ilp_address.clone();
return Either::B(Either::A(
self.send_route_update(request.from.clone(), from_epoch_index, to_epoch_index)
.map_err(move |_| {
RejectBuilder {
code: ErrorCode::T01_PEER_UNREACHABLE,
message: b"Error sending route update request",
data: &[],
triggered_by: Some(&ilp_address),
}
.build()
})
.and_then(|_| Ok(CCP_RESPONSE.clone())),
));
}
#[cfg(not(test))]
{
spawn(self.send_route_update(
request.from.clone(),
from_epoch_index,
to_epoch_index,
));
}
}
#[cfg(not(test))]
{
Either::B(ok(CCP_RESPONSE.clone()))
}
#[cfg(test)]
{
Either::B(Either::B(ok(CCP_RESPONSE.clone())))
}
}
fn filter_routes(&self, mut update: RouteUpdateRequest) -> RouteUpdateRequest {
update.new_routes = update
.new_routes
.into_iter()
.filter(|route| {
if !route.prefix.starts_with(&self.global_prefix) {
warn!("Got route for a different global prefix: {:?}", route);
false
} else if route.prefix.len() <= self.global_prefix.len() {
warn!("Got route broadcast for the global prefix: {:?}", route);
false
} else if route.prefix.starts_with(self.ilp_address.as_ref()) {
trace!("Ignoring route broadcast for a prefix that starts with our own address: {:?}", route);
false
} else if route.path.contains(self.ilp_address.as_ref()) {
trace!(
"Ignoring route broadcast for a route that includes us: {:?}",
route
);
false
} else {
true
}
})
.collect();
update
}
fn handle_route_update_request(&self, request: IncomingRequest<A>) -> BoxedIlpFuture {
if !request.from.should_receive_routes() {
return Box::new(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"Your route broadcasts are not accepted here",
triggered_by: Some(&self.ilp_address),
data: &[],
}
.build()));
}
let update = RouteUpdateRequest::try_from(&request.prepare);
if update.is_err() {
return Box::new(err(RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: b"Invalid route update request",
triggered_by: Some(&self.ilp_address),
data: &[],
}
.build()));
}
let update = update.unwrap();
debug!(
"Got route update request from account {}: {:?}",
request.from.id(),
update
);
let update = self.filter_routes(update);
let mut incoming_tables = self.incoming_tables.write();
if !&incoming_tables.contains_key(&request.from.id()) {
incoming_tables.insert(
request.from.id(),
RoutingTable::new(update.routing_table_id),
);
}
match (*incoming_tables)
.get_mut(&request.from.id())
.expect("Should have inserted a routing table for this account")
.handle_update_request(request.from.clone(), update)
{
Ok(prefixes_updated) => {
if prefixes_updated.is_empty() {
trace!("Route update request did not contain any prefixes we need to update our routes for");
return Box::new(ok(CCP_RESPONSE.clone()));
}
debug!("Recalculating best routes for prefixes: {}", {
let updated: Vec<&str> = prefixes_updated
.iter()
.map(|prefix| str::from_utf8(&prefix).unwrap_or("<not utf8>"))
.collect();
updated.join(", ")
});
let future = self.update_best_routes(Some(prefixes_updated));
#[cfg(not(test))]
{
spawn(future);
Box::new(ok(CCP_RESPONSE.clone()))
}
#[cfg(test)]
{
let ilp_address = self.ilp_address.clone();
Box::new(
future
.map_err(move |_| {
RejectBuilder {
code: ErrorCode::T00_INTERNAL_ERROR,
message: b"Error processing route update",
data: &[],
triggered_by: Some(&ilp_address),
}
.build()
})
.and_then(|_| Ok(CCP_RESPONSE.clone())),
)
}
}
Err(message) => {
warn!("Error handling incoming Route Update request, sending a Route Control request to get updated routing table info from peer. Error was: {}", &message);
let reject = RejectBuilder {
code: ErrorCode::F00_BAD_REQUEST,
message: &message.as_bytes(),
data: &[],
triggered_by: Some(&self.ilp_address),
}
.build();
let table = &incoming_tables[&request.from.id()];
let future = self.send_route_control_request(
request.from.clone(),
table.id(),
table.epoch(),
);
#[cfg(not(test))]
{
spawn(future);
Box::new(err(reject))
}
#[cfg(test)]
Box::new(future.then(move |_| Err(reject)))
}
}
}
fn send_route_control_request(
&self,
account: A,
last_known_routing_table_id: [u8; 16],
last_known_epoch: u32,
) -> impl Future<Item = (), Error = ()> {
let account_id = account.id();
let control = RouteControlRequest {
mode: Mode::Sync,
last_known_routing_table_id,
last_known_epoch,
features: Vec::new(),
};
debug!("Sending Route Control Request to account: {}, last known table id: {}, last known epoch: {}", account_id, hex::encode(&last_known_routing_table_id[..]), last_known_epoch);
let prepare = control.to_prepare();
self.clone()
.outgoing
.send_request(OutgoingRequest {
from: account.clone(),
to: account,
original_amount: prepare.amount(),
prepare,
})
.then(move |result| {
if let Err(err) = result {
warn!(
"Error sending Route Control Request to account {}: {:?}",
account_id, err
)
} else {
trace!("Sent Route Control Request to account: {}", account_id);
}
Ok(())
})
}
fn update_best_routes(
&self,
prefixes: Option<Vec<Bytes>>,
) -> impl Future<Item = (), Error = ()> + 'static {
let local_table = self.local_table.clone();
let forwarding_table = self.forwarding_table.clone();
let forwarding_table_updates = self.forwarding_table_updates.clone();
let incoming_tables = self.incoming_tables.clone();
let ilp_address = self.ilp_address.clone();
let global_prefix = self.global_prefix.clone();
let mut store = self.store.clone();
self.store.get_local_and_configured_routes().and_then(
move |(ref local_routes, ref configured_routes)| {
let (better_routes, withdrawn_routes) = {
let local_table = local_table.read();
let incoming_tables = incoming_tables.read();
let prefixes_to_check: Box<dyn Iterator<Item = Bytes>> = if let Some(prefixes) = prefixes {
Box::new(prefixes.into_iter())
} else {
let routes = configured_routes.iter().chain(local_routes.iter());
Box::new(routes.map(|(prefix, _account)| prefix.clone()))
};
let mut better_routes: Vec<(Bytes, A, Route)> = Vec::with_capacity(prefixes_to_check.size_hint().0);
let mut withdrawn_routes: Vec<Bytes> = Vec::new();
for prefix in prefixes_to_check {
if let Some((best_next_account, best_route)) = get_best_route_for_prefix(
local_routes,
configured_routes,
&incoming_tables,
prefix.as_ref(),
) {
if let Some((ref next_account, ref route)) = local_table.get_route(&prefix) {
if next_account.id() == best_next_account.id() {
continue
} else {
better_routes.push((prefix.clone(), next_account.clone(), route.clone()));
}
} else {
better_routes.push((prefix.clone(), best_next_account, best_route));
}
} else {
withdrawn_routes.push(prefix);
}
}
(better_routes, withdrawn_routes)
};
if !better_routes.is_empty() || !withdrawn_routes.is_empty() {
let mut local_table = local_table.write();
let mut forwarding_table = forwarding_table.write();
let mut forwarding_table_updates = forwarding_table_updates.write();
let mut new_routes: Vec<Route> = Vec::with_capacity(better_routes.len());
for (prefix, account, mut route) in better_routes {
debug!(
"Setting new route for prefix: {} -> Account {}",
str::from_utf8(prefix.as_ref()).unwrap_or("<not utf8>"),
account.id(),
);
local_table.set_route(prefix.clone(), account.clone(), route.clone());
if route.prefix.starts_with(&global_prefix[..])
&& route.prefix != global_prefix
&& !(route.prefix.starts_with(ilp_address.as_ref()) && route.path.is_empty())
&& !withdrawn_routes.contains(&prefix) {
let old_route = forwarding_table.get_route(&prefix);
if old_route.is_none() || old_route.unwrap().0.id() != account.id() {
route.path.insert(0, ilp_address.to_bytes());
route.auth = hash(&route.auth);
forwarding_table.set_route(prefix.clone(), account.clone(), route.clone());
new_routes.push(route);
}
}
}
for prefix in withdrawn_routes.iter() {
debug!("Removed route for prefix: {}", str::from_utf8(&prefix[..]).unwrap_or("<not utf8>"));
local_table.delete_route(prefix);
forwarding_table.delete_route(prefix);
}
let epoch = forwarding_table.increment_epoch();
forwarding_table_updates.push((new_routes, withdrawn_routes));
debug_assert_eq!(epoch as usize + 1, forwarding_table_updates.len());
Either::A(store.set_routes(local_table.get_simplified_table()))
} else {
Either::B(ok(()))
}
},
)
}
fn send_route_updates(&self) -> impl Future<Item = (), Error = ()> {
let mut outgoing = self.outgoing.clone();
let to_epoch_index = self.forwarding_table.read().epoch();
let from_epoch_index: u32 = {
let mut lock = self.last_epoch_updates_sent_for.lock();
let epoch = *lock;
*lock = to_epoch_index;
epoch
};
let route_update_request = self.create_route_update(from_epoch_index, to_epoch_index);
debug!(
"Sending route udpates for epochs {} - {}: {:?}",
from_epoch_index, to_epoch_index, route_update_request,
);
let prepare = route_update_request.to_prepare();
self.store
.get_accounts_to_send_routes_to()
.and_then(move |mut accounts| {
accounts.sort_unstable_by_key(|a| a.id().to_string());
accounts.dedup_by_key(|a| a.id());
let broadcasting = !accounts.is_empty();
if broadcasting {
debug!("Sending route updates to accounts: {}", {
let account_list: Vec<String> = accounts
.iter()
.map(|a| format!("{} ({})", a.id(), a.client_address()))
.collect();
account_list.join(", ")
});
Either::A(
join_all(accounts.into_iter().map(move |account| {
let account_id = account.id();
outgoing
.send_request(OutgoingRequest {
from: account.clone(),
to: account,
original_amount: prepare.amount(),
prepare: prepare.clone(),
})
.map_err(move |err| {
warn!(
"Error sending route update to account {}: {:?}",
account_id, err
)
})
.then(|_| Ok(()))
}))
.and_then(|_| {
trace!("Finished sending route updates");
Ok(())
}),
)
} else {
trace!("No accounts to broadcast routes to");
Either::B(ok(()))
}
})
}
fn create_route_update(
&self,
from_epoch_index: u32,
to_epoch_index: u32,
) -> RouteUpdateRequest {
let (start, end) = (from_epoch_index as usize, to_epoch_index as usize);
let (routing_table_id, current_epoch_index) = {
let table = self.forwarding_table.read();
(table.id(), table.epoch())
};
let forwarding_table_updates = self.forwarding_table_updates.read();
let epochs_to_take = end.saturating_sub(start);
let mut new_routes: Vec<Route> = Vec::with_capacity(epochs_to_take);
let mut withdrawn_routes: Vec<Bytes> = Vec::new();
for (new, withdrawn) in forwarding_table_updates
.iter()
.skip(start)
.take(epochs_to_take)
{
for new_route in new {
new_routes.push(new_route.clone());
if withdrawn_routes.contains(&new_route.prefix) {
withdrawn_routes = withdrawn_routes
.into_iter()
.filter(|prefix| prefix != &new_route.prefix)
.collect();
}
}
for withdrawn_route in withdrawn {
withdrawn_routes.push(withdrawn_route.clone());
if new_routes
.iter()
.any(|route| route.prefix == withdrawn_route)
{
new_routes = new_routes
.into_iter()
.filter(|route| route.prefix != withdrawn_route)
.collect();
}
}
}
RouteUpdateRequest {
routing_table_id,
from_epoch_index,
to_epoch_index,
current_epoch_index,
new_routes: new_routes.clone(),
withdrawn_routes: withdrawn_routes.clone(),
speaker: self.ilp_address.clone(),
hold_down_time: DEFAULT_ROUTE_EXPIRY_TIME,
}
}
fn send_route_update(
&self,
account: A,
from_epoch_index: u32,
to_epoch_index: u32,
) -> impl Future<Item = (), Error = ()> {
let prepare = self
.create_route_update(from_epoch_index, to_epoch_index)
.to_prepare();
let account_id = account.id();
debug!(
"Sending individual route update to account: {} for epochs from: {} to: {}",
account_id, from_epoch_index, to_epoch_index
);
self.outgoing
.clone()
.send_request(OutgoingRequest {
from: account.clone(),
to: account,
original_amount: prepare.amount(),
prepare,
})
.and_then(|_| Ok(()))
.then(move |result| {
if let Err(err) = result {
error!(
"Error sending route update to account {}: {:?}",
account_id, err
)
}
Ok(())
})
}
}
fn get_best_route_for_prefix<A: CcpRoutingAccount>(
local_routes: &HashMap<Bytes, A>,
configured_routes: &HashMap<Bytes, A>,
incoming_tables: &HashMap<A::AccountId, RoutingTable<A>>,
prefix: &[u8],
) -> Option<(A, Route)> {
let segments: Vec<&[u8]> = prefix.split(|c| c == &b'.').collect();
for i in 0..segments.len() {
let prefix = &segments[0..segments.len() - i].join(&b'.');
if let Some(account) = configured_routes.get(prefix.as_ref() as &[u8]) {
return Some((
account.clone(),
Route {
prefix: account.client_address().to_bytes(),
auth: [0; 32],
path: Vec::new(),
props: Vec::new(),
},
));
}
}
if let Some(account) = local_routes.get(prefix) {
return Some((
account.clone(),
Route {
prefix: account.client_address().to_bytes(),
auth: [0; 32],
path: Vec::new(),
props: Vec::new(),
},
));
}
let mut candidate_routes = incoming_tables
.values()
.filter_map(|incoming_table| incoming_table.get_route(prefix));
if let Some((account, route)) = candidate_routes.next() {
let (best_account, best_route) = candidate_routes.fold(
(account, route),
|(best_account, best_route), (account, route)| {
if best_account.routing_relation() > account.routing_relation() {
return (best_account, best_route);
} else if best_account.routing_relation() < account.routing_relation() {
return (account, route);
}
if best_route.path.len() < route.path.len() {
return (best_account, best_route);
} else if best_route.path.len() > route.path.len() {
return (account, route);
}
if best_account.id().to_string() < account.id().to_string() {
(best_account, best_route)
} else {
(account, route)
}
},
);
Some((best_account.clone(), best_route.clone()))
} else {
None
}
}
impl<I, O, S, A> IncomingService<A> for CcpRouteManager<I, O, S, A>
where
I: IncomingService<A> + Clone + Send + Sync + 'static,
O: OutgoingService<A> + Clone + Send + Sync + 'static,
S: RouteManagerStore<Account = A> + Clone + Send + Sync + 'static,
A: CcpRoutingAccount + Send + Sync + 'static,
{
type Future = BoxedIlpFuture;
fn handle_request(&mut self, request: IncomingRequest<A>) -> Self::Future {
let destination = request.prepare.destination();
if destination == *CCP_CONTROL_DESTINATION {
Box::new(self.handle_route_control_request(request))
} else if destination == *CCP_UPDATE_DESTINATION {
Box::new(self.handle_route_update_request(request))
} else {
Box::new(self.next_incoming.handle_request(request))
}
}
}
#[cfg(test)]
mod ranking_routes {
use super::*;
use crate::test_helpers::*;
use crate::RoutingRelation;
use std::iter::FromIterator;
lazy_static! {
static ref LOCAL: HashMap<Bytes, TestAccount> = HashMap::from_iter(vec![
(
Bytes::from("example.a"),
TestAccount::new(1, "example.local.one")
),
(
Bytes::from("example.b"),
TestAccount::new(2, "example.local.two")
),
(
Bytes::from("example.c"),
TestAccount::new(3, "example.local.three")
),
]);
static ref CONFIGURED: HashMap<Bytes, TestAccount> = HashMap::from_iter(vec![
(
Bytes::from("example.a"),
TestAccount::new(4, "example.local.four")
),
(
Bytes::from("example.b"),
TestAccount::new(5, "example.local.five")
),
]);
static ref INCOMING: HashMap<u64, RoutingTable<TestAccount>> = {
let mut child_table = RoutingTable::default();
let mut child = TestAccount::new(6, "example.child");
child.relation = RoutingRelation::Child;
child_table.add_route(
child.clone(),
Route {
prefix: Bytes::from("example.d"),
path: vec![Bytes::from("example.one")],
auth: [0; 32],
props: Vec::new(),
},
);
let mut peer_table_1 = RoutingTable::default();
let peer_1 = TestAccount::new(7, "example.peer1");
peer_table_1.add_route(
peer_1.clone(),
Route {
prefix: Bytes::from("example.d"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
},
);
peer_table_1.add_route(
peer_1.clone(),
Route {
prefix: Bytes::from("example.e"),
path: vec![Bytes::from("example.one")],
auth: [0; 32],
props: Vec::new(),
},
);
peer_table_1.add_route(
peer_1.clone(),
Route {
prefix: Bytes::from("example.a.sub-prefix"),
path: vec![Bytes::from("example.one")],
auth: [0; 32],
props: Vec::new(),
},
);
let mut peer_table_2 = RoutingTable::default();
let peer_2 = TestAccount::new(8, "example.peer2");
peer_table_2.add_route(
peer_2.clone(),
Route {
prefix: Bytes::from("example.e"),
path: vec![Bytes::from("example.one"), Bytes::from("example.two")],
auth: [0; 32],
props: Vec::new(),
},
);
HashMap::from_iter(vec![(6, child_table), (7, peer_table_1), (8, peer_table_2)])
};
}
#[test]
fn prioritizes_configured_routes() {
let best_route = get_best_route_for_prefix(&LOCAL, &CONFIGURED, &INCOMING, b"example.a");
assert_eq!(best_route.unwrap().0.id(), 4);
}
#[test]
fn prioritizes_shorter_configured_routes() {
let best_route =
get_best_route_for_prefix(&LOCAL, &CONFIGURED, &INCOMING, b"example.a.sub-prefix");
assert_eq!(best_route.unwrap().0.id(), 4);
}
#[test]
fn prioritizes_local_routes_over_broadcasted_ones() {
let best_route = get_best_route_for_prefix(&LOCAL, &CONFIGURED, &INCOMING, b"example.c");
assert_eq!(best_route.unwrap().0.id(), 3);
}
#[test]
fn prioritizes_children_over_peers() {
let best_route = get_best_route_for_prefix(&LOCAL, &CONFIGURED, &INCOMING, b"example.d");
assert_eq!(best_route.unwrap().0.id(), 6);
}
#[test]
fn prioritizes_shorter_paths() {
let best_route = get_best_route_for_prefix(&LOCAL, &CONFIGURED, &INCOMING, b"example.e");
assert_eq!(best_route.unwrap().0.id(), 7);
}
#[test]
fn returns_none_for_no_route() {
let best_route = get_best_route_for_prefix(&LOCAL, &CONFIGURED, &INCOMING, b"example.z");
assert!(best_route.is_none());
}
}
#[cfg(test)]
mod handle_route_control_request {
use super::*;
use crate::fixtures::*;
use crate::test_helpers::*;
use std::time::{Duration, SystemTime};
#[test]
fn handles_valid_request() {
test_service_with_routes()
.0
.handle_request(IncomingRequest {
prepare: CONTROL_REQUEST.to_prepare(),
from: ROUTING_ACCOUNT.clone(),
})
.wait()
.unwrap();
}
#[test]
fn rejects_from_non_sending_account() {
let result = test_service()
.handle_request(IncomingRequest {
prepare: CONTROL_REQUEST.to_prepare(),
from: NON_ROUTING_ACCOUNT.clone(),
})
.wait();
assert!(result.is_err());
assert_eq!(
str::from_utf8(result.unwrap_err().message()).unwrap(),
"We are not configured to send routes to you, sorry"
);
}
#[test]
fn rejects_invalid_packet() {
let result = test_service()
.handle_request(IncomingRequest {
prepare: PrepareBuilder {
destination: CCP_CONTROL_DESTINATION.clone(),
amount: 0,
expires_at: SystemTime::now() + Duration::from_secs(30),
data: &[],
execution_condition: &PEER_PROTOCOL_CONDITION,
}
.build(),
from: ROUTING_ACCOUNT.clone(),
})
.wait();
assert!(result.is_err());
assert_eq!(
str::from_utf8(result.unwrap_err().message()).unwrap(),
"Invalid route control request"
);
}
#[test]
fn sends_update_in_response() {
let (mut service, outgoing_requests) = test_service_with_routes();
(*service.forwarding_table.write()).set_id([0; 16]);
service.update_best_routes(None).wait().unwrap();
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: RouteControlRequest {
last_known_routing_table_id: [0; 16],
mode: Mode::Sync,
last_known_epoch: 0,
features: Vec::new(),
}
.to_prepare(),
})
.wait()
.unwrap();
let request: &OutgoingRequest<TestAccount> = &outgoing_requests.lock()[0];
assert_eq!(request.to.id(), ROUTING_ACCOUNT.id());
let update = RouteUpdateRequest::try_from(&request.prepare).unwrap();
assert_eq!(update.routing_table_id, [0; 16]);
assert_eq!(update.from_epoch_index, 0);
assert_eq!(update.to_epoch_index, 1);
assert_eq!(update.current_epoch_index, 1);
assert_eq!(update.new_routes.len(), 2);
}
#[test]
fn sends_whole_table_if_id_is_different() {
let (mut service, outgoing_requests) = test_service_with_routes();
service.update_best_routes(None).wait().unwrap();
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: RouteControlRequest {
last_known_routing_table_id: [0; 16],
mode: Mode::Sync,
last_known_epoch: 32,
features: Vec::new(),
}
.to_prepare(),
})
.wait()
.unwrap();
let routing_table_id = service.forwarding_table.read().id();
let request: &OutgoingRequest<TestAccount> = &outgoing_requests.lock()[0];
assert_eq!(request.to.id(), ROUTING_ACCOUNT.id());
let update = RouteUpdateRequest::try_from(&request.prepare).unwrap();
assert_eq!(update.routing_table_id, routing_table_id);
assert_eq!(update.from_epoch_index, 0);
assert_eq!(update.to_epoch_index, 1);
assert_eq!(update.current_epoch_index, 1);
assert_eq!(update.new_routes.len(), 2);
}
}
#[cfg(test)]
mod handle_route_update_request {
use super::*;
use crate::fixtures::*;
use crate::test_helpers::*;
use std::{
iter::FromIterator,
time::{Duration, SystemTime},
};
#[test]
fn handles_valid_request() {
let mut service = test_service();
let mut update = UPDATE_REQUEST_SIMPLE.clone();
update.to_epoch_index = 1;
update.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
prepare: update.to_prepare(),
from: ROUTING_ACCOUNT.clone(),
})
.wait()
.unwrap();
}
#[test]
fn rejects_from_child_account() {
let result = test_service()
.handle_request(IncomingRequest {
prepare: UPDATE_REQUEST_SIMPLE.to_prepare(),
from: CHILD_ACCOUNT.clone(),
})
.wait();
assert!(result.is_err());
assert_eq!(
str::from_utf8(result.unwrap_err().message()).unwrap(),
"Your route broadcasts are not accepted here",
);
}
#[test]
fn rejects_from_non_routing_account() {
let result = test_service()
.handle_request(IncomingRequest {
prepare: UPDATE_REQUEST_SIMPLE.to_prepare(),
from: NON_ROUTING_ACCOUNT.clone(),
})
.wait();
assert!(result.is_err());
assert_eq!(
str::from_utf8(result.unwrap_err().message()).unwrap(),
"Your route broadcasts are not accepted here",
);
}
#[test]
fn rejects_invalid_packet() {
let result = test_service()
.handle_request(IncomingRequest {
prepare: PrepareBuilder {
destination: CCP_UPDATE_DESTINATION.clone(),
amount: 0,
expires_at: SystemTime::now() + Duration::from_secs(30),
data: &[],
execution_condition: &PEER_PROTOCOL_CONDITION,
}
.build(),
from: ROUTING_ACCOUNT.clone(),
})
.wait();
assert!(result.is_err());
assert_eq!(
str::from_utf8(result.unwrap_err().message()).unwrap(),
"Invalid route update request"
);
}
#[test]
fn adds_table_on_first_request() {
let mut service = test_service();
let mut update = UPDATE_REQUEST_SIMPLE.clone();
update.to_epoch_index = 1;
update.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
prepare: update.to_prepare(),
from: ROUTING_ACCOUNT.clone(),
})
.wait()
.unwrap();
assert_eq!(service.incoming_tables.read().len(), 1);
}
#[test]
fn filters_routes_with_other_global_prefix() {
let service = test_service();
let mut request = UPDATE_REQUEST_SIMPLE.clone();
request.new_routes.push(Route {
prefix: Bytes::from("example.valid"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
request.new_routes.push(Route {
prefix: Bytes::from("other.prefix"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
let request = service.filter_routes(request);
assert_eq!(request.new_routes.len(), 1);
assert_eq!(request.new_routes[0].prefix, Bytes::from("example.valid"));
}
#[test]
fn filters_routes_for_global_prefix() {
let service = test_service();
let mut request = UPDATE_REQUEST_SIMPLE.clone();
request.new_routes.push(Route {
prefix: Bytes::from("example.valid"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
request.new_routes.push(Route {
prefix: Bytes::from("example."),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
let request = service.filter_routes(request);
assert_eq!(request.new_routes.len(), 1);
assert_eq!(request.new_routes[0].prefix, Bytes::from("example.valid"));
}
#[test]
fn filters_routing_loops() {
let service = test_service();
let mut request = UPDATE_REQUEST_SIMPLE.clone();
request.new_routes.push(Route {
prefix: Bytes::from("example.valid"),
path: vec![
Bytes::from("example.a"),
service.ilp_address.to_bytes(),
Bytes::from("example.b"),
],
auth: [0; 32],
props: Vec::new(),
});
request.new_routes.push(Route {
prefix: Bytes::from("example.valid"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
let request = service.filter_routes(request);
assert_eq!(request.new_routes.len(), 1);
assert_eq!(request.new_routes[0].prefix, Bytes::from("example.valid"));
}
#[test]
fn filters_own_prefix_routes() {
let service = test_service();
let mut request = UPDATE_REQUEST_SIMPLE.clone();
request.new_routes.push(Route {
prefix: Bytes::from("example.connector.invalid-route"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
request.new_routes.push(Route {
prefix: Bytes::from("example.valid"),
path: Vec::new(),
auth: [0; 32],
props: Vec::new(),
});
let request = service.filter_routes(request);
assert_eq!(request.new_routes.len(), 1);
assert_eq!(request.new_routes[0].prefix, Bytes::from("example.valid"));
}
#[test]
fn updates_local_routing_table() {
let mut service = test_service();
let mut request = UPDATE_REQUEST_COMPLEX.clone();
request.to_epoch_index = 1;
request.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request.to_prepare(),
})
.wait()
.unwrap();
assert_eq!(
(*service.local_table.read())
.get_route(b"example.prefix1")
.unwrap()
.0
.id(),
ROUTING_ACCOUNT.id()
);
assert_eq!(
(*service.local_table.read())
.get_route(b"example.prefix2")
.unwrap()
.0
.id(),
ROUTING_ACCOUNT.id()
);
}
#[test]
fn writes_local_routing_table_to_store() {
let mut service = test_service();
let mut request = UPDATE_REQUEST_COMPLEX.clone();
request.to_epoch_index = 1;
request.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request.to_prepare(),
})
.wait()
.unwrap();
assert_eq!(
service
.store
.routes
.lock()
.get(&b"example.prefix1"[..])
.unwrap()
.id(),
ROUTING_ACCOUNT.id()
);
assert_eq!(
service
.store
.routes
.lock()
.get(&b"example.prefix2"[..])
.unwrap()
.id(),
ROUTING_ACCOUNT.id()
);
}
#[test]
fn doesnt_overwrite_configured_or_local_routes() {
let mut service = test_service();
let store = TestStore::with_routes(
HashMap::from_iter(vec![(
Bytes::from("example.prefix1"),
TestAccount::new(9, "example.account9"),
)]),
HashMap::from_iter(vec![(
Bytes::from("example.prefix2"),
TestAccount::new(10, "example.account10"),
)]),
);
service.store = store;
let mut request = UPDATE_REQUEST_COMPLEX.clone();
request.to_epoch_index = 1;
request.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request.to_prepare(),
})
.wait()
.unwrap();
assert_eq!(
(*service.local_table.read())
.get_route(b"example.prefix1")
.unwrap()
.0
.id(),
9
);
assert_eq!(
(*service.local_table.read())
.get_route(b"example.prefix2")
.unwrap()
.0
.id(),
10
);
}
#[test]
fn removes_withdrawn_routes() {
let mut service = test_service();
let mut request = UPDATE_REQUEST_COMPLEX.clone();
request.to_epoch_index = 1;
request.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request.to_prepare(),
})
.wait()
.unwrap();
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: RouteUpdateRequest {
routing_table_id: UPDATE_REQUEST_COMPLEX.routing_table_id,
from_epoch_index: 1,
to_epoch_index: 3,
current_epoch_index: 3,
hold_down_time: 45000,
speaker: UPDATE_REQUEST_COMPLEX.speaker.clone(),
new_routes: Vec::new(),
withdrawn_routes: vec![Bytes::from("example.prefix2")],
}
.to_prepare(),
})
.wait()
.unwrap();
assert_eq!(
(*service.local_table.read())
.get_route(b"example.prefix1")
.unwrap()
.0
.id(),
ROUTING_ACCOUNT.id()
);
assert!((*service.local_table.read())
.get_route(b"example.prefix2")
.is_none());
}
#[test]
fn sends_control_request_if_routing_table_id_changed() {
let (mut service, outgoing_requests) = test_service_with_routes();
let mut request1 = UPDATE_REQUEST_COMPLEX.clone();
request1.to_epoch_index = 3;
request1.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request1.to_prepare(),
})
.wait()
.unwrap();
let mut request2 = UPDATE_REQUEST_COMPLEX.clone();
request2.to_epoch_index = 8;
request2.from_epoch_index = 7;
request2.routing_table_id = [9; 16];
let err = service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request2.to_prepare(),
})
.wait()
.unwrap_err();
assert_eq!(err.code(), ErrorCode::F00_BAD_REQUEST);
let request = &outgoing_requests.lock()[0];
let control = RouteControlRequest::try_from(&request.prepare).unwrap();
assert_eq!(control.last_known_epoch, 0);
assert_eq!(
control.last_known_routing_table_id,
request2.routing_table_id
);
}
#[test]
fn sends_control_request_if_missing_epochs() {
let (mut service, outgoing_requests) = test_service_with_routes();
let mut request = UPDATE_REQUEST_COMPLEX.clone();
request.to_epoch_index = 1;
request.from_epoch_index = 0;
service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request.to_prepare(),
})
.wait()
.unwrap();
let mut request = UPDATE_REQUEST_COMPLEX.clone();
request.to_epoch_index = 8;
request.from_epoch_index = 7;
let err = service
.handle_request(IncomingRequest {
from: ROUTING_ACCOUNT.clone(),
prepare: request.to_prepare(),
})
.wait()
.unwrap_err();
assert_eq!(err.code(), ErrorCode::F00_BAD_REQUEST);
let request = &outgoing_requests.lock()[0];
let control = RouteControlRequest::try_from(&request.prepare).unwrap();
assert_eq!(control.last_known_epoch, 1);
}
}
#[cfg(test)]
mod create_route_update {
use super::*;
use crate::test_helpers::*;
#[test]
fn heartbeat_message_for_empty_table() {
let service = test_service();
let update = service.create_route_update(0, 0);
assert_eq!(update.from_epoch_index, 0);
assert_eq!(update.to_epoch_index, 0);
assert_eq!(update.current_epoch_index, 0);
assert!(update.new_routes.is_empty());
assert!(update.withdrawn_routes.is_empty());
}
#[test]
fn includes_the_given_range_of_epochs() {
let service = test_service();
(*service.forwarding_table.write()).set_epoch(4);
*service.forwarding_table_updates.write() = vec![
(
vec![Route {
prefix: Bytes::from("example.a"),
path: vec![Bytes::from("example.x")],
auth: [1; 32],
props: Vec::new(),
}],
Vec::new(),
),
(
vec![Route {
prefix: Bytes::from("example.b"),
path: vec![Bytes::from("example.x")],
auth: [2; 32],
props: Vec::new(),
}],
Vec::new(),
),
(
vec![Route {
prefix: Bytes::from("example.c"),
path: vec![Bytes::from("example.x"), Bytes::from("example.y")],
auth: [3; 32],
props: Vec::new(),
}],
vec![Bytes::from("example.m")],
),
(
vec![Route {
prefix: Bytes::from("example.d"),
path: vec![Bytes::from("example.x"), Bytes::from("example.y")],
auth: [4; 32],
props: Vec::new(),
}],
vec![Bytes::from("example.n")],
),
];
let update = service.create_route_update(1, 3);
assert_eq!(update.from_epoch_index, 1);
assert_eq!(update.to_epoch_index, 3);
assert_eq!(update.current_epoch_index, 4);
assert_eq!(update.new_routes.len(), 2);
assert_eq!(update.withdrawn_routes.len(), 1);
let new_routes: Vec<&str> = update
.new_routes
.iter()
.map(|r| str::from_utf8(r.prefix.as_ref()).unwrap())
.collect();
assert!(new_routes.contains(&"example.b"));
assert!(new_routes.contains(&"example.c"));
assert!(!new_routes.contains(&"example.m"));
assert_eq!(update.withdrawn_routes[0], &Bytes::from("example.m"));
}
}
#[cfg(test)]
mod send_route_updates {
use super::*;
use crate::test_helpers::*;
use std::str::FromStr;
#[test]
fn broadcasts_to_all_accounts_we_send_updates_to() {
let (service, outgoing_requests) = test_service_with_routes();
service.send_route_updates().wait().unwrap();
let mut accounts: Vec<u64> = outgoing_requests
.lock()
.iter()
.map(|request| request.to.id())
.collect();
accounts.sort_unstable();
assert_eq!(accounts, vec![1, 2]);
}
#[test]
fn broadcasts_configured_and_local_routes() {
let (service, outgoing_requests) = test_service_with_routes();
service.update_best_routes(None).wait().unwrap();
service.send_route_updates().wait().unwrap();
let update = RouteUpdateRequest::try_from(&outgoing_requests.lock()[0].prepare).unwrap();
assert_eq!(update.new_routes.len(), 2);
let prefixes: Vec<&str> = update
.new_routes
.iter()
.map(|route| str::from_utf8(route.prefix.as_ref()).unwrap())
.collect();
assert!(prefixes.contains(&"example.local.1"));
assert!(prefixes.contains(&"example.configured.1"));
}
#[test]
fn broadcasts_received_routes() {
let (service, outgoing_requests) = test_service_with_routes();
service.update_best_routes(None).wait().unwrap();
service
.handle_route_update_request(IncomingRequest {
from: TestAccount::new(10, "example.peer"),
prepare: RouteUpdateRequest {
routing_table_id: [0; 16],
current_epoch_index: 1,
from_epoch_index: 0,
to_epoch_index: 1,
hold_down_time: 30000,
speaker: Address::from_str("example.remote").unwrap(),
new_routes: vec![Route {
prefix: Bytes::from("example.remote"),
path: vec![Bytes::from("example.peer")],
auth: [0; 32],
props: Vec::new(),
}],
withdrawn_routes: Vec::new(),
}
.to_prepare(),
})
.wait()
.unwrap();
service.send_route_updates().wait().unwrap();
let update = RouteUpdateRequest::try_from(&outgoing_requests.lock()[0].prepare).unwrap();
assert_eq!(update.new_routes.len(), 3);
let prefixes: Vec<&str> = update
.new_routes
.iter()
.map(|route| str::from_utf8(route.prefix.as_ref()).unwrap())
.collect();
assert!(prefixes.contains(&"example.local.1"));
assert!(prefixes.contains(&"example.configured.1"));
assert!(prefixes.contains(&"example.remote"));
}
#[test]
fn broadcasts_withdrawn_routes() {
let (service, outgoing_requests) = test_service_with_routes();
service.update_best_routes(None).wait().unwrap();
service
.handle_route_update_request(IncomingRequest {
from: TestAccount::new(10, "example.peer"),
prepare: RouteUpdateRequest {
routing_table_id: [0; 16],
current_epoch_index: 1,
from_epoch_index: 0,
to_epoch_index: 1,
hold_down_time: 30000,
speaker: Address::from_str("example.remote").unwrap(),
new_routes: vec![Route {
prefix: Bytes::from("example.remote"),
path: vec![Bytes::from("example.peer")],
auth: [0; 32],
props: Vec::new(),
}],
withdrawn_routes: Vec::new(),
}
.to_prepare(),
})
.wait()
.unwrap();
service
.handle_route_update_request(IncomingRequest {
from: TestAccount::new(10, "example.peer"),
prepare: RouteUpdateRequest {
routing_table_id: [0; 16],
current_epoch_index: 4,
from_epoch_index: 1,
to_epoch_index: 4,
hold_down_time: 30000,
speaker: Address::from_str("example.remote").unwrap(),
new_routes: Vec::new(),
withdrawn_routes: vec![Bytes::from("example.remote")],
}
.to_prepare(),
})
.wait()
.unwrap();
service.send_route_updates().wait().unwrap();
let update = RouteUpdateRequest::try_from(&outgoing_requests.lock()[0].prepare).unwrap();
assert_eq!(update.new_routes.len(), 2);
let prefixes: Vec<&str> = update
.new_routes
.iter()
.map(|route| str::from_utf8(route.prefix.as_ref()).unwrap())
.collect();
assert!(prefixes.contains(&"example.local.1"));
assert!(prefixes.contains(&"example.configured.1"));
assert!(!prefixes.contains(&"example.remote"));
assert_eq!(update.withdrawn_routes.len(), 1);
assert_eq!(
str::from_utf8(&update.withdrawn_routes[0]).unwrap(),
"example.remote"
);
}
}