use super::{
get_hash_of,
idempotency::{make_idempotent_call, IdempotentStore},
types::{Quantity, SettlementEngine},
};
use bytes::buf::FromBuf;
use bytes::Bytes;
use futures::Future;
use http::StatusCode;
use hyper::Response;
use interledger_http::error::default_rejection_handler;
use serde::{Deserialize, Serialize};
use warp::{self, reject::Rejection, Filter};
#[derive(Serialize, Deserialize, Debug, Clone, Hash)]
pub struct CreateAccount {
id: String,
}
pub fn create_settlement_engine_filter<E, S>(
engine: E,
store: S,
) -> warp::filters::BoxedFilter<(impl warp::Reply,)>
where
E: SettlementEngine + Clone + Send + Sync + 'static,
S: IdempotentStore + Clone + Send + Sync + 'static,
{
let with_store = warp::any().map(move || store.clone()).boxed();
let with_engine = warp::any().map(move || engine.clone()).boxed();
let idempotency = warp::header::optional::<String>("idempotency-key");
let account_id = warp::path("accounts").and(warp::path::param2::<String>());
let accounts = warp::post2()
.and(warp::path("accounts"))
.and(warp::path::end())
.and(idempotency)
.and(warp::body::json())
.and(with_engine.clone())
.and(with_store.clone())
.and_then(
move |idempotency_key: Option<String>,
account_id: CreateAccount,
engine: E,
store: S| {
let account_id = account_id.id;
let input_hash = get_hash_of(account_id.as_ref());
let create_account_fn = move || engine.create_account(account_id);
make_idempotent_call(
store,
create_account_fn,
input_hash,
idempotency_key,
StatusCode::CREATED,
Bytes::from("CREATED"),
)
.map_err::<_, Rejection>(move |err| err.into())
.and_then(move |(status_code, message)| {
Ok(Response::builder()
.header("Content-Type", "application/json")
.status(status_code)
.body(message)
.unwrap())
})
},
);
let del_account = warp::delete2()
.and(account_id)
.and(warp::path::end())
.and(idempotency)
.and(with_engine.clone())
.and(with_store.clone())
.and_then(
move |id: String, idempotency_key: Option<String>, engine: E, store: S| {
let input_hash = get_hash_of(id.as_ref());
let delete_account_fn = move || engine.delete_account(id);
make_idempotent_call(
store,
delete_account_fn,
input_hash,
idempotency_key,
StatusCode::NO_CONTENT,
Bytes::from("DELETED"),
)
.map_err::<_, Rejection>(move |err| err.into())
.and_then(move |(status_code, message)| {
Ok(Response::builder()
.header("Content-Type", "application/json")
.status(status_code)
.body(message)
.unwrap())
})
},
);
let settlement_endpoint = account_id.and(warp::path("settlements"));
let settlements = warp::post2()
.and(settlement_endpoint)
.and(warp::path::end())
.and(idempotency)
.and(warp::body::json())
.and(with_engine.clone())
.and(with_store.clone())
.and_then(
move |id: String,
idempotency_key: Option<String>,
quantity: Quantity,
engine: E,
store: S| {
let input = format!("{}{:?}", id, quantity);
let input_hash = get_hash_of(input.as_ref());
let send_money_fn = move || engine.send_money(id, quantity);
make_idempotent_call(
store,
send_money_fn,
input_hash,
idempotency_key,
StatusCode::CREATED,
Bytes::from("EXECUTED"),
)
.map_err::<_, Rejection>(move |err| err.into())
.and_then(move |(status_code, message)| {
Ok(Response::builder()
.header("Content-Type", "application/json")
.status(status_code)
.body(message)
.unwrap())
})
},
);
let messages_endpoint = account_id.and(warp::path("messages"));
let messages = warp::post2()
.and(messages_endpoint)
.and(warp::path::end())
.and(idempotency)
.and(warp::body::concat())
.and(with_engine.clone())
.and(with_store.clone())
.and_then(
move |id: String,
idempotency_key: Option<String>,
body: warp::body::FullBody,
engine: E,
store: S| {
let message = Vec::from_buf(body);
let input = format!("{}{:?}", id, message);
let input_hash = get_hash_of(input.as_ref());
let receive_message_fn = move || engine.receive_message(id, message);
make_idempotent_call(
store,
receive_message_fn,
input_hash,
idempotency_key,
StatusCode::CREATED,
Bytes::from("RECEIVED"),
)
.map_err::<_, Rejection>(move |err| err.into())
.and_then(move |(status_code, message)| {
Ok(Response::builder()
.header("Content-Type", "application/json")
.status(status_code)
.body(message)
.unwrap())
})
},
);
accounts
.or(del_account)
.or(settlements)
.or(messages)
.recover(default_rejection_handler)
.boxed()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::idempotency::IdempotentData;
use crate::core::types::ApiResponse;
use bytes::Bytes;
use futures::future::ok;
use http::StatusCode;
use interledger_http::error::ApiError;
use parking_lot::RwLock;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
fn check_error_status_and_message(response: Response<Bytes>, status_code: u16, message: &str) {
let err: Value = serde_json::from_slice(response.body()).unwrap();
assert_eq!(response.status().as_u16(), status_code);
assert_eq!(err.get("status").unwrap(), status_code);
assert_eq!(err.get("detail").unwrap(), message);
}
#[derive(Clone)]
struct TestEngine;
#[derive(Debug, Clone)]
pub struct TestAccount;
#[derive(Clone)]
pub struct TestStore {
#[allow(clippy::all)]
pub cache: Arc<RwLock<HashMap<String, IdempotentData>>>,
pub cache_hits: Arc<RwLock<u64>>,
}
fn test_store() -> TestStore {
TestStore {
cache: Arc::new(RwLock::new(HashMap::new())),
cache_hits: Arc::new(RwLock::new(0)),
}
}
impl IdempotentStore for TestStore {
fn load_idempotent_data(
&self,
idempotency_key: String,
) -> Box<dyn Future<Item = Option<IdempotentData>, Error = ()> + Send> {
let cache = self.cache.read();
if let Some(data) = cache.get(&idempotency_key) {
let mut guard = self.cache_hits.write();
*guard += 1; Box::new(ok(Some(data.clone())))
} else {
Box::new(ok(None))
}
}
fn save_idempotent_data(
&self,
idempotency_key: String,
input_hash: [u8; 32],
status_code: StatusCode,
data: Bytes,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let mut cache = self.cache.write();
cache.insert(
idempotency_key,
IdempotentData::new(status_code, data, input_hash),
);
Box::new(ok(()))
}
}
pub static IDEMPOTENCY: &str = "abcd01234";
impl SettlementEngine for TestEngine {
fn send_money(
&self,
_account_id: String,
_money: Quantity,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
Box::new(ok(ApiResponse::Default))
}
fn receive_message(
&self,
_account_id: String,
_message: Vec<u8>,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
Box::new(ok(ApiResponse::Default))
}
fn create_account(
&self,
_account_id: String,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
Box::new(ok(ApiResponse::Default))
}
fn delete_account(
&self,
_account_id: String,
) -> Box<dyn Future<Item = ApiResponse, Error = ApiError> + Send> {
Box::new(ok(ApiResponse::Default))
}
}
#[test]
fn idempotent_execute_settlement() {
let store = test_store();
let engine = TestEngine;
let api = create_settlement_engine_filter(engine, store.clone());
let settlement_call = move |id, amount, scale| {
warp::test::request()
.method("POST")
.path(&format!("/accounts/{}/settlements", id))
.body(json!(Quantity::new(amount, scale)).to_string())
.header("Idempotency-Key", IDEMPOTENCY)
.reply(&api)
};
let ret = settlement_call("1".to_owned(), 100, 6);
assert_eq!(ret.status(), StatusCode::CREATED);
assert_eq!(ret.body(), "EXECUTED");
let ret = settlement_call("1".to_owned(), 100, 6);
assert_eq!(ret.status(), StatusCode::CREATED);
assert_eq!(ret.body(), "EXECUTED");
let ret = settlement_call("42".to_owned(), 100, 6);
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let ret = settlement_call("1".to_owned(), 42, 6);
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let ret = settlement_call("42".to_owned(), 42, 6);
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let cache = store.cache.read();
let cached_data = cache.get(&IDEMPOTENCY.to_string()).unwrap();
let cache_hits = store.cache_hits.read();
assert_eq!(*cache_hits, 4);
assert_eq!(cached_data.status, 201);
assert_eq!(cached_data.body, "EXECUTED".to_string());
}
#[test]
fn idempotent_receive_message() {
let store = test_store();
let engine = TestEngine;
let api = create_settlement_engine_filter(engine, store.clone());
let messages_call = move |id, msg| {
warp::test::request()
.method("POST")
.path(&format!("/accounts/{}/messages", id))
.body(msg)
.header("Idempotency-Key", IDEMPOTENCY)
.reply(&api)
};
let ret = messages_call("1", vec![0]);
assert_eq!(ret.status().as_u16(), StatusCode::CREATED);
assert_eq!(ret.body(), "RECEIVED");
let ret = messages_call("1", vec![0]);
assert_eq!(ret.status().as_u16(), StatusCode::CREATED);
assert_eq!(ret.body(), "RECEIVED");
let ret = messages_call("42", vec![0]);
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let ret = messages_call("1", vec![42]);
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let ret = messages_call("42", vec![42]);
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let cache = store.cache.read();
let cached_data = cache.get(&IDEMPOTENCY.to_string()).unwrap();
let cache_hits = store.cache_hits.read();
assert_eq!(*cache_hits, 4);
assert_eq!(cached_data.status, 201);
assert_eq!(cached_data.body, "RECEIVED".to_string());
}
#[test]
fn idempotent_create_account() {
let store = test_store();
let engine = TestEngine;
let api = create_settlement_engine_filter(engine, store.clone());
let create_account_call = move |id: &str| {
warp::test::request()
.method("POST")
.path("/accounts")
.body(json!(CreateAccount { id: id.to_string() }).to_string())
.header("Idempotency-Key", IDEMPOTENCY)
.reply(&api)
};
let ret = create_account_call("1");
assert_eq!(ret.status().as_u16(), StatusCode::CREATED);
assert_eq!(ret.body(), "CREATED");
let ret = create_account_call("1");
assert_eq!(ret.status().as_u16(), StatusCode::CREATED);
assert_eq!(ret.body(), "CREATED");
let ret = create_account_call("42");
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let cache = store.cache.read();
let cached_data = cache.get(&IDEMPOTENCY.to_string()).unwrap();
let cache_hits = store.cache_hits.read();
assert_eq!(*cache_hits, 2);
assert_eq!(cached_data.status, 201);
assert_eq!(cached_data.body, "CREATED".to_string());
}
#[test]
fn idempotent_delete_account() {
let store = test_store();
let engine = TestEngine;
let api = create_settlement_engine_filter(engine, store.clone());
let delete_account_call = move |id: &str| {
warp::test::request()
.method("DELETE")
.path(&format!("/accounts/{}", id))
.header("Idempotency-Key", IDEMPOTENCY)
.reply(&api)
};
let ret = delete_account_call("1");
assert_eq!(ret.status(), StatusCode::NO_CONTENT);
assert_eq!(ret.body(), "DELETED");
let ret = delete_account_call("1");
assert_eq!(ret.status(), StatusCode::NO_CONTENT);
assert_eq!(ret.body(), "DELETED");
let ret = delete_account_call("42");
check_error_status_and_message(ret, 409, "Provided idempotency key is tied to other input");
let cache = store.cache.read();
let cached_data = cache.get(&IDEMPOTENCY.to_string()).unwrap();
let cache_hits = store.cache_hits.read();
assert_eq!(*cache_hits, 2);
assert_eq!(cached_data.status, 204);
assert_eq!(cached_data.body, "DELETED".to_string());
}
}