use crate::{http_retry::Client, ExchangeRates, NodeStore};
use bytes::Buf;
use futures::{
future::{err, join_all, Either},
Future,
};
use interledger_http::{deserialize_json, error::*, HttpAccount, HttpStore};
use interledger_packet::Address;
use interledger_router::RouterStore;
use interledger_service::{Account, Username};
use interledger_service_util::{BalanceStore, ExchangeRateStore};
use interledger_settlement::core::types::SettlementAccount;
use log::{error, trace};
use serde::Serialize;
use std::{
collections::HashMap,
iter::FromIterator,
str::{self, FromStr},
};
use url::Url;
use warp::{self, Filter, Rejection};
#[derive(Clone, Serialize)]
struct StatusResponse {
status: String,
ilp_address: Address,
#[serde(skip_serializing_if = "Option::is_none")]
version: Option<String>,
}
pub fn node_settings_api<S, A>(
admin_api_token: String,
node_version: Option<String>,
store: S,
) -> warp::filters::BoxedFilter<(impl warp::Reply,)>
where
S: NodeStore<Account = A>
+ HttpStore<Account = A>
+ BalanceStore<Account = A>
+ ExchangeRateStore
+ RouterStore,
A: Account + HttpAccount + SettlementAccount + Serialize + 'static,
{
let admin_auth_header = format!("Bearer {}", admin_api_token);
let admin_only = warp::header::<String>("authorization")
.and_then(move |authorization| -> Result<(), Rejection> {
if authorization == admin_auth_header {
Ok(())
} else {
Err(ApiError::unauthorized().into())
}
})
.untuple_one()
.boxed();
let with_store = warp::any().map(move || store.clone()).boxed();
let get_root = warp::get2()
.and(warp::path::end())
.and(with_store.clone())
.map(move |store: S| {
warp::reply::json(&StatusResponse {
status: "Ready".to_string(),
ilp_address: store.get_ilp_address(),
version: node_version.clone(),
})
})
.boxed();
let put_rates = warp::put2()
.and(warp::path("rates"))
.and(warp::path::end())
.and(admin_only.clone())
.and(deserialize_json())
.and(with_store.clone())
.and_then(|rates: ExchangeRates, store: S| -> Result<_, Rejection> {
if store.set_exchange_rates(rates.0.clone()).is_ok() {
Ok(warp::reply::json(&rates))
} else {
error!("Error setting exchange rates");
Err(ApiError::internal_server_error().into())
}
})
.boxed();
let get_rates = warp::get2()
.and(warp::path("rates"))
.and(warp::path::end())
.and(with_store.clone())
.and_then(|store: S| -> Result<_, Rejection> {
if let Ok(rates) = store.get_all_exchange_rates() {
Ok(warp::reply::json(&rates))
} else {
error!("Error getting exchange rates");
Err(ApiError::internal_server_error().into())
}
})
.boxed();
let get_routes = warp::get2()
.and(warp::path("routes"))
.and(warp::path::end())
.and(with_store.clone())
.and_then(|store: S| {
let routes = store.routing_table().clone();
store
.get_accounts(routes.values().cloned().collect())
.map_err::<_, Rejection>(|_| {
error!("Error getting accounts from store");
ApiError::internal_server_error().into()
})
.and_then(move |accounts| {
let routes: HashMap<String, String> = HashMap::from_iter(
routes
.iter()
.map(|(prefix, _)| prefix.to_string())
.zip(accounts.into_iter().map(|a| a.username().to_string())),
);
Ok(warp::reply::json(&routes))
})
})
.boxed();
let put_static_routes = warp::put2()
.and(warp::path("routes"))
.and(warp::path("static"))
.and(warp::path::end())
.and(admin_only.clone())
.and(deserialize_json())
.and(with_store.clone())
.and_then(|routes: HashMap<String, Username>, store: S| {
let store_clone = store.clone();
let usernames: Vec<Username> = routes.values().cloned().collect();
join_all(usernames.into_iter().map(move |username| {
store_clone
.get_account_id_from_username(&username)
.map_err(move |_| {
error!("No account exists with username: {}", username);
ApiError::account_not_found().into()
})
}))
.and_then(move |account_ids| {
let prefixes = routes.keys().map(|s| s.to_string());
store
.set_static_routes(prefixes.zip(account_ids.into_iter()))
.map_err::<_, Rejection>(|_| {
error!("Error setting static routes");
ApiError::internal_server_error().into()
})
.map(move |_| warp::reply::json(&routes))
})
})
.boxed();
let put_static_route = warp::put2()
.and(warp::path("routes"))
.and(warp::path("static"))
.and(warp::path::param2::<String>())
.and(warp::path::end())
.and(admin_only.clone())
.and(warp::body::concat())
.and(with_store.clone())
.and_then(|prefix: String, body: warp::body::FullBody, store: S| {
if let Ok(username) = str::from_utf8(body.bytes())
.map_err(|_| ())
.and_then(|string| Username::from_str(string).map_err(|_| ()))
{
let username_clone = username.clone();
Either::A(
store
.clone()
.get_account_id_from_username(&username)
.map_err(move |_| {
error!("No account exists with username: {}", username_clone);
ApiError::account_not_found().into()
})
.and_then(move |account_id| {
store
.set_static_route(prefix, account_id)
.map_err::<_, Rejection>(|_| {
error!("Error setting static route");
ApiError::internal_server_error().into()
})
})
.map(move |_| username.to_string()),
)
} else {
Either::B(err(ApiError::bad_request().into()))
}
})
.boxed();
let put_settlement_engines = warp::put2()
.and(warp::path("settlement"))
.and(warp::path("engines"))
.and(warp::path::end())
.and(admin_only.clone())
.and(deserialize_json())
.and(with_store.clone())
.and_then(|asset_to_url_map: HashMap<String, Url>, store: S| {
let asset_to_url_map_clone = asset_to_url_map.clone();
store
.set_settlement_engines(asset_to_url_map.clone())
.map_err::<_, Rejection>(|_| {
error!("Error setting settlement engines");
ApiError::internal_server_error().into()
})
.and_then(move |_| {
store.get_all_accounts()
.map_err(|_| ApiError::internal_server_error().into())
.and_then(move |accounts| {
let client = Client::default();
let create_settlement_accounts =
accounts.into_iter().filter_map(move |account| {
let id = account.id();
if let Some(details) = account.settlement_engine_details() {
if Some(&details.url) == asset_to_url_map.get(account.asset_code()) {
return Some(client.create_engine_account(details.url, account.id())
.map_err(|_| ApiError::internal_server_error().into())
.and_then(move |status_code| {
if status_code.is_success() {
trace!("Account {} created on the SE", id);
} else {
error!("Error creating account. Settlement engine responded with HTTP code: {}", status_code);
}
Ok(())
}));
}
}
None
});
join_all(create_settlement_accounts)
})
})
.and_then(move |_| Ok(warp::reply::json(&asset_to_url_map_clone)))
})
.boxed();
get_root
.or(put_rates)
.or(get_rates)
.or(get_routes)
.or(put_static_routes)
.or(put_static_route)
.or(put_settlement_engines)
.boxed()
}