use crate::{http_retry::Client, ApiError, NodeStore};
use bytes::Buf;
use futures::{
future::{err, join_all, Either},
Future,
};
use interledger_http::{HttpAccount, HttpStore};
use interledger_router::RouterStore;
use interledger_service::Account;
use interledger_service_util::{BalanceStore, ExchangeRateStore};
use interledger_settlement::SettlementAccount;
use log::{error, trace};
use serde::Serialize;
use serde_json::json;
use std::{
collections::HashMap,
iter::FromIterator,
str::{self, FromStr},
};
use url::Url;
use warp::{self, Filter};
pub fn node_settings_api<S, A>(
admin_api_token: String,
store: S,
) -> warp::filters::BoxedFilter<(impl warp::Reply,)>
where
S: NodeStore<Account = A>
+ HttpStore<Account = A>
+ BalanceStore<Account = A>
+ ExchangeRateStore
+ RouterStore,
A: Account + HttpAccount + SettlementAccount + Serialize + 'static,
{
let admin_auth_header = format!("Bearer {}", admin_api_token);
let admin_only = warp::header::<String>("authorization")
.and_then(move |authorization| -> Result<(), warp::Rejection> {
if authorization == admin_auth_header {
Ok(())
} else {
Err(warp::reject::custom(ApiError::Unauthorized))
}
})
.untuple_one()
.boxed();
let with_store = warp::any().map(move || store.clone()).boxed();
let get_root = warp::get2()
.and(warp::path::end())
.and(with_store.clone())
.map(|store: S| {
warp::reply::json(&json!({
"status": "Ready".to_string(),
"ilp_address": store.get_ilp_address(),
}))
})
.boxed();
let put_rates = warp::put2()
.and(admin_only.clone())
.and(warp::path("rates"))
.and(warp::path::end())
.and(warp::body::json())
.and(with_store.clone())
.and_then(|rates: HashMap<String, f64>, store: S| {
if store.set_exchange_rates(rates.clone()).is_ok() {
Ok(warp::reply::json(&rates))
} else {
error!("Error setting exchange rates");
Err(warp::reject::custom(ApiError::InternalServerError))
}
})
.boxed();
let get_rates = warp::get2()
.and(warp::path("rates"))
.and(warp::path::end())
.and(with_store.clone())
.and_then(|store: S| {
if let Ok(rates) = store.get_all_exchange_rates() {
Ok(warp::reply::json(&rates))
} else {
error!("Error getting exchange rates");
Err(warp::reject::custom(ApiError::InternalServerError))
}
})
.boxed();
let get_routes = warp::get2()
.and(warp::path("routes"))
.and(warp::path::end())
.and(with_store.clone())
.map(|store: S| {
let routes: HashMap<String, String> =
HashMap::from_iter(store.routing_table().into_iter().filter_map(
|(address, account)| {
if let Ok(address) = str::from_utf8(address.as_ref()) {
Some((address.to_string(), account.to_string()))
} else {
None
}
},
));
warp::reply::json(&routes)
})
.boxed();
let put_static_routes = warp::put2()
.and(admin_only.clone())
.and(warp::path("routes"))
.and(warp::path("static"))
.and(warp::path::end())
.and(warp::body::json())
.and(with_store.clone())
.and_then(|routes: HashMap<String, String>, store: S| {
let mut parsed = HashMap::with_capacity(routes.len());
for (prefix, id) in routes.into_iter() {
if let Ok(id) = A::AccountId::from_str(id.as_str()) {
parsed.insert(prefix, id);
} else {
error!("Invalid Account ID: {}", id);
return Either::B(err(warp::reject::custom(ApiError::BadRequest)));
}
}
Either::A(
store
.set_static_routes(parsed.clone())
.map_err(|_| {
error!("Error setting static routes");
warp::reject::custom(ApiError::InternalServerError)
})
.map(move |_| warp::reply::json(&parsed)),
)
})
.boxed();
let put_static_route = warp::put2()
.and(admin_only.clone())
.and(warp::path("routes"))
.and(warp::path("static"))
.and(warp::path::param2::<String>())
.and(warp::path::end())
.and(warp::body::concat())
.and(with_store.clone())
.and_then(|prefix: String, body: warp::body::FullBody, store: S| {
if let Ok(string) = str::from_utf8(body.bytes()) {
if let Ok(id) = A::AccountId::from_str(string) {
return Either::A(
store
.set_static_route(prefix, id)
.map_err(|_| {
error!("Error setting static route");
warp::reject::custom(ApiError::InternalServerError)
})
.map(move |_| id.to_string()),
);
}
}
error!("Body was not a valid Account ID");
Either::B(err(warp::reject::custom(ApiError::BadRequest)))
})
.boxed();
let put_settlement_engines = warp::put2()
.and(admin_only.clone())
.and(warp::path("settlement"))
.and(warp::path("engines"))
.and(warp::path::end())
.and(warp::body::json())
.and(with_store.clone())
.and_then(|asset_to_url_map: HashMap<String, Url>, store: S| {
let asset_to_url_map_clone = asset_to_url_map.clone();
store
.set_settlement_engines(asset_to_url_map.clone())
.map_err(|_| {
error!("Error setting settlement engines");
warp::reject::custom(ApiError::InternalServerError)
})
.and_then(move |_| {
store.get_all_accounts()
.map_err(|_| {
warp::reject::custom(ApiError::InternalServerError)
})
.and_then(move |accounts| {
let client = Client::default();
let create_settlement_accounts =
accounts.into_iter().filter_map(move |account| {
let id = account.id();
if let Some(details) = account.settlement_engine_details() {
if Some(&details.url) == asset_to_url_map.get(account.asset_code()) {
return Some(client.create_engine_account(details.url, account.id())
.map_err(|_| {
warp::reject::custom(ApiError::InternalServerError)
})
.and_then(move |status_code| {
if status_code.is_success() {
trace!("Account {} created on the SE", id);
} else {
error!("Error creating account. Settlement engine responded with HTTP code: {}", status_code);
}
Ok(())
}));
}
}
None
});
join_all(create_settlement_accounts)
})
})
.and_then(move |_| Ok(warp::reply::json(&asset_to_url_map_clone)))
})
.boxed();
get_root
.or(put_rates)
.or(get_rates)
.or(get_routes)
.or(put_static_routes)
.or(put_static_route)
.or(put_settlement_engines)
.boxed()
}