#[cfg(feature = "async")]
use crate::types::response::QueryAsyncResponse;
#[cfg(not(feature = "async"))]
use crate::types::response::QueryResponse;
use crate::types::{
error::CruxError,
http::{Actions, Order},
query::Query,
response::{EntityHistoryResponse, EntityTxResponse, TxLogResponse, TxLogsResponse},
CruxId,
};
use chrono::prelude::*;
use edn_rs::Edn;
#[cfg(not(feature = "async"))]
use reqwest::blocking;
use reqwest::header::HeaderMap;
use std::collections::BTreeSet;
use std::str::FromStr;
static DATE_FORMAT: &'static str = "%Y-%m-%dT%H:%M:%S%Z";
pub struct HttpClient {
#[cfg(not(feature = "async"))]
pub(crate) client: blocking::Client,
#[cfg(feature = "async")]
pub(crate) client: reqwest::Client,
pub(crate) uri: String,
pub(crate) headers: HeaderMap,
}
#[cfg(not(feature = "async"))]
impl HttpClient {
pub fn tx_log(&self, actions: Actions) -> Result<TxLogResponse, CruxError> {
if actions.is_empty() {
return Err(CruxError::TxLogActionError(
"Actions cannot be empty.".to_string(),
));
}
let body = actions.build();
let resp = self
.client
.post(&format!("{}/tx-log", self.uri))
.headers(self.headers.clone())
.body(body)
.send()?
.text()?;
let clean_resp = resp.replace("#inst", "");
edn_rs::from_str(&clean_resp).map_err(|e| e.into())
}
pub fn tx_logs(&self) -> Result<TxLogsResponse, CruxError> {
let resp = self
.client
.get(&format!("{}/tx-log", self.uri))
.headers(self.headers.clone())
.send()?
.text()?;
TxLogsResponse::from_str(&resp)
}
pub fn entity(&self, id: CruxId) -> Result<Edn, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let resp = self
.client
.post(&format!("{}/entity", self.uri))
.headers(self.headers.clone())
.body(s)
.send()?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text()?;
let edn_resp = Edn::from_str(&resp_body.replace("#inst", ""));
edn_resp.or_else(|_| {
Err(CruxError::ResponseFailed(format!(
"entity responded with {} for id \"{}\" ",
500, crux_id
)))
})
} else {
Err(CruxError::BadResponse(format!(
"entity responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub fn entity_timed(
&self,
id: CruxId,
transaction_time: Option<DateTime<FixedOffset>>,
valid_time: Option<DateTime<FixedOffset>>,
) -> Result<Edn, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let url = build_timed_url(self.uri.clone(), "entity", transaction_time, valid_time);
let resp = self
.client
.post(&url)
.headers(self.headers.clone())
.body(s)
.send()?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text()?;
let edn_resp = Edn::from_str(&resp_body.replace("#inst", ""));
edn_resp.or_else(|_| {
Err(CruxError::ResponseFailed(format!(
"entity-timed responded with {} for id \"{}\" ",
500, crux_id
)))
})
} else {
Err(CruxError::BadResponse(format!(
"entity-timed responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub fn entity_tx(&self, id: CruxId) -> Result<EntityTxResponse, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let resp = self
.client
.post(&format!("{}/entity-tx", self.uri))
.headers(self.headers.clone())
.body(s)
.send()?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text()?;
EntityTxResponse::from_str(&resp_body.replace("#inst", ""))
} else {
Err(CruxError::BadResponse(format!(
"entity-tx responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub fn entity_tx_timed(
&self,
id: CruxId,
transaction_time: Option<DateTime<FixedOffset>>,
valid_time: Option<DateTime<FixedOffset>>,
) -> Result<EntityTxResponse, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let url = build_timed_url(self.uri.clone(), "entity-tx", transaction_time, valid_time);
let resp = self
.client
.post(&url)
.headers(self.headers.clone())
.body(s)
.send()?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text()?;
EntityTxResponse::from_str(&resp_body.replace("#inst", ""))
} else {
Err(CruxError::BadResponse(format!(
"entity-tx responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub fn entity_history(
&self,
hash: String,
order: Order,
with_docs: bool,
) -> Result<EntityHistoryResponse, CruxError> {
let url = format!(
"{}/entity-history/{}?sort-order={}&with-docs={}",
self.uri,
hash,
edn_rs::to_string(order),
with_docs
);
let resp = self
.client
.get(&url)
.headers(self.headers.clone())
.send()?
.text()?;
EntityHistoryResponse::from_str(&resp.replace("#inst", ""))
}
pub fn entity_history_timed(
&self,
hash: String,
order: Order,
with_docs: bool,
time: Vec<crate::types::http::TimeHistory>,
) -> Result<EntityHistoryResponse, CruxError> {
let url = format!(
"{}/entity-history/{}?sort-order={}&with-docs={}{}",
self.uri,
hash,
edn_rs::to_string(order),
with_docs,
edn_rs::to_string(time).replace("[", "").replace("]", ""),
);
let resp = self
.client
.get(&url)
.headers(self.headers.clone())
.send()?
.text()?;
EntityHistoryResponse::from_str(&resp.replace("#inst", ""))
}
pub fn query(&self, query: Query) -> Result<BTreeSet<Vec<String>>, CruxError> {
let resp = self
.client
.post(&format!("{}/query", self.uri))
.headers(self.headers.clone())
.body(edn_rs::to_string(query))
.send()?
.text()?;
let query_response: QueryResponse = edn_rs::from_str(&resp)?;
Ok(query_response.0)
}
}
#[cfg(feature = "async")]
impl HttpClient {
pub async fn tx_log(&self, actions: Actions) -> Result<TxLogResponse, CruxError> {
if actions.is_empty() {
return Err(CruxError::TxLogActionError(
"Actions cannot be empty.".to_string(),
));
}
let body = actions.build();
let resp = self
.client
.post(&format!("{}/tx-log", self.uri))
.headers(self.headers.clone())
.body(body)
.send()
.await?
.text()
.await?;
edn_rs::from_str(&resp).map_err(|e| e.into())
}
pub async fn tx_logs(&self) -> Result<TxLogsResponse, CruxError> {
let resp = self
.client
.get(&format!("{}/tx-log", self.uri))
.headers(self.headers.clone())
.send()
.await?
.text()
.await?;
TxLogsResponse::from_str(&resp)
}
pub async fn entity(&self, id: CruxId) -> Result<Edn, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let resp = self
.client
.post(&format!("{}/entity", self.uri))
.headers(self.headers.clone())
.body(s)
.send()
.await?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text().await?;
let edn_resp = Edn::from_str(&resp_body.replace("#inst", ""));
edn_resp.or_else(|_| {
Err(CruxError::ResponseFailed(format!(
"entity responded with {} for id \"{}\" ",
500, crux_id
)))
})
} else {
Err(CruxError::BadResponse(format!(
"entity responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub async fn entity_timed(
&self,
id: CruxId,
transaction_time: Option<DateTime<FixedOffset>>,
valid_time: Option<DateTime<FixedOffset>>,
) -> Result<Edn, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let url = build_timed_url(self.uri.clone(), "entity", transaction_time, valid_time);
let resp = self
.client
.post(&url)
.headers(self.headers.clone())
.body(s)
.send()
.await?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text().await?;
let edn_resp = Edn::from_str(&resp_body.replace("#inst", ""));
edn_resp.or_else(|_| {
Err(CruxError::ResponseFailed(format!(
"entity responded with {} for id \"{}\" ",
500, crux_id
)))
})
} else {
Err(CruxError::BadResponse(format!(
"entity responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub async fn entity_tx(&self, id: CruxId) -> Result<EntityTxResponse, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let resp = self
.client
.post(&format!("{}/entity-tx", self.uri))
.headers(self.headers.clone())
.body(s)
.send()
.await?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text().await?;
EntityTxResponse::from_str(&resp_body.replace("#inst", ""))
} else {
Err(CruxError::BadResponse(format!(
"entity-tx responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub async fn entity_tx_timed(
&self,
id: CruxId,
transaction_time: Option<DateTime<FixedOffset>>,
valid_time: Option<DateTime<FixedOffset>>,
) -> Result<EntityTxResponse, CruxError> {
let crux_id = edn_rs::to_string(id);
let mut s = String::new();
s.push_str("{:eid ");
s.push_str(&crux_id);
s.push_str("}");
let url = build_timed_url(self.uri.clone(), "entity-tx", transaction_time, valid_time);
let resp = self
.client
.post(&url)
.headers(self.headers.clone())
.body(s)
.send()
.await?;
if resp.status().as_u16() < 300 {
let resp_body = resp.text().await?;
EntityTxResponse::from_str(&resp_body.replace("#inst", ""))
} else {
Err(CruxError::BadResponse(format!(
"entity-tx responded with {} for id \"{}\" ",
resp.status().as_u16(),
crux_id
)))
}
}
pub async fn entity_history(
&self,
hash: String,
order: Order,
with_docs: bool,
) -> Result<EntityHistoryResponse, CruxError> {
let url = format!(
"{}/entity-history/{}?sort-order={}&with-docs={}",
self.uri,
hash,
edn_rs::to_string(order),
with_docs
);
let resp = self
.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?
.text()
.await?;
EntityHistoryResponse::from_str(&resp.replace("#inst", ""))
}
pub async fn entity_history_timed(
&self,
hash: String,
order: Order,
with_docs: bool,
time: Vec<crate::types::http::TimeHistory>,
) -> Result<EntityHistoryResponse, CruxError> {
let url = format!(
"{}/entity-history/{}?sort-order={}&with-docs={}{}",
self.uri,
hash,
edn_rs::to_string(order),
with_docs,
edn_rs::to_string(time).replace("[", "").replace("]", ""),
);
let resp = self
.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?
.text()
.await?;
EntityHistoryResponse::from_str(&resp.replace("#inst", ""))
}
pub async fn query(&self, query: Query) -> Result<BTreeSet<Vec<String>>, CruxError> {
let resp = self
.client
.post(&format!("{}/query", self.uri))
.headers(self.headers.clone())
.body(edn_rs::to_string(query))
.send()
.await?
.text()
.await?;
let query_response: QueryAsyncResponse = edn_rs::from_str(&resp)?;
Ok(query_response.0)
}
}
fn build_timed_url(
url: String,
endpoint: &str,
transaction_time: Option<DateTime<FixedOffset>>,
valid_time: Option<DateTime<FixedOffset>>,
) -> String {
match (transaction_time, valid_time) {
(None, None) => format!("{}/{}", url, endpoint),
(Some(tx), None) => format!(
"{}/{}?transaction-time={}",
url,
endpoint,
tx.format(DATE_FORMAT).to_string()
),
(None, Some(valid)) => format!(
"{}/{}?valid-time={}",
url,
endpoint,
valid.format(DATE_FORMAT).to_string()
),
(Some(tx), Some(valid)) => format!(
"{}/{}?transaction-time={}&valid-time={}",
url,
endpoint,
tx.format(DATE_FORMAT).to_string(),
valid.format(DATE_FORMAT).to_string()
),
}
.replace("+", "%2B")
}
#[cfg(test)]
mod http {
use crate::client::Crux;
use crate::types::http::Actions;
use crate::types::http::Order;
use crate::types::{
query::Query,
response::{EntityHistoryElement, EntityHistoryResponse, EntityTxResponse, TxLogResponse},
CruxId,
};
use edn_derive::Serialize;
use mockito::mock;
#[derive(Debug, Clone, Serialize)]
#[allow(non_snake_case)]
pub struct Person {
crux__db___id: CruxId,
first_name: String,
last_name: String,
}
#[test]
fn tx_log() {
let _m = mock("POST", "/tx-log")
.with_status(200)
.match_body("[[:crux.tx/put { :crux.db/id :jorge-3, :first-name \"Michael\", :last-name \"Jorge\", }], [:crux.tx/put { :crux.db/id :manuel-1, :first-name \"Diego\", :last-name \"Manuel\", }]]")
.with_header("content-type", "text/plain")
.with_body("{:crux.tx/tx-id 8, :crux.tx/tx-time #inst \"2020-07-16T21:53:14.628-00:00\"}")
.create();
let person1 = Person {
crux__db___id: CruxId::new("jorge-3"),
first_name: "Michael".to_string(),
last_name: "Jorge".to_string(),
};
let person2 = Person {
crux__db___id: CruxId::new("manuel-1"),
first_name: "Diego".to_string(),
last_name: "Manuel".to_string(),
};
let actions = Actions::new().append_put(person1).append_put(person2);
let response = Crux::new("localhost", "4000").http_client().tx_log(actions);
assert_eq!(response.unwrap(), TxLogResponse::default())
}
#[test]
#[should_panic(expected = "TxLogActionError(\"Actions cannot be empty.\")")]
fn empty_actions_on_tx_log() {
let actions = Actions::new();
let err = Crux::new("localhost", "4000").http_client().tx_log(actions);
err.unwrap();
}
#[test]
fn tx_logs() {
let _m = mock("GET", "/tx-log")
.with_status(200)
.with_header("content-type", "application/edn")
.with_body("({:crux.tx/tx-id 0, :crux.tx/tx-time #inst \"2020-07-09T23:38:06.465-00:00\", :crux.tx.event/tx-events [[:crux.tx/put \"a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e\" \"125d29eb3bed1bf51d64194601ad4ff93defe0e2\"]]}{:crux.tx/tx-id 1, :crux.tx/tx-time #inst \"2020-07-09T23:39:33.815-00:00\", :crux.tx.event/tx-events [[:crux.tx/put \"a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e\" \"1b42e0d5137e3833423f7bb958622bee29f91eee\"]]})")
.create();
let response = Crux::new("localhost", "4000").http_client().tx_logs();
assert_eq!(response.unwrap().tx_events.len(), 2);
}
#[test]
#[should_panic(
expected = "DeserializeError(\"The following Edn cannot be deserialized to TxLogs: Symbol(\\\"Holy\\\")\")"
)]
fn tx_log_error() {
let _m = mock("GET", "/tx-log")
.with_status(200)
.with_header("content-type", "application/edn")
.with_body("Holy errors!")
.create();
let _error = Crux::new("localhost", "4000")
.http_client()
.tx_logs()
.unwrap();
}
#[test]
fn entity() {
let expected_body = "Map(Map({\":crux.db/id\": Key(\":hello-entity\"), \":first-name\": Str(\"Hello\"), \":last-name\": Str(\"World\")}))";
let _m = mock("POST", "/entity")
.with_status(200)
.match_body("{:eid :ivan}")
.with_header("content-type", "application/edn")
.with_body("{:crux.db/id :hello-entity :first-name \"Hello\", :last-name \"World\"}")
.create();
let id = CruxId::new(":ivan");
let edn_body = Crux::new("localhost", "3000")
.http_client()
.entity(id)
.unwrap();
let resp = format!("{:?}", edn_body);
assert_eq!(resp, expected_body);
}
#[test]
fn entity_tx() {
let expected_body = "{:crux.db/id \"d72ccae848ce3a371bd313865cedc3d20b1478ca\", :crux.db/content-hash \"1828ebf4466f98ea3f5252a58734208cd0414376\", :crux.db/valid-time #inst \"2020-07-19T04:12:13.788-00:00\", :crux.tx/tx-time #inst \"2020-07-19T04:12:13.788-00:00\", :crux.tx/tx-id 28}";
let _m = mock("POST", "/entity-tx")
.with_status(200)
.match_body("{:eid :ivan}")
.with_header("content-type", "application/edn")
.with_body(expected_body)
.create();
let id = CruxId::new(":ivan");
let body = Crux::new("localhost", "3000")
.http_client()
.entity_tx(id)
.unwrap();
assert_eq!(body, EntityTxResponse::default());
}
#[test]
fn simple_query() {
let expected_body = "#{[:postgres \"Postgres\" true] [:mysql \"MySQL\" true]}";
let _m = mock("POST", "/query")
.with_status(200)
.with_header("content-type", "application/edn")
.with_body(expected_body)
.create();
let query = Query::find(vec!["?p1", "?n", "?s"])
.unwrap()
.where_clause(vec!["?p1 :name ?n", "?p1 :is-sql ?s", "?p1 :is-sql true"])
.unwrap()
.build();
let body = Crux::new("localhost", "3000")
.http_client()
.query(query.unwrap())
.unwrap();
let response = format!("{:?}", body);
assert_eq!(
response,
"{[\":mysql\", \"MySQL\", \"true\"], [\":postgres\", \"Postgres\", \"true\"]}"
);
}
#[test]
fn entity_history() {
let expected_body = "({:crux.tx/tx-time \"2020-07-19T04:12:13.788-00:00\", :crux.tx/tx-id 28, :crux.db/valid-time \"2020-07-19T04:12:13.788-00:00\", :crux.db/content-hash \"1828ebf4466f98ea3f5252a58734208cd0414376\"})";
let _m = mock("GET", "/entity-history/ecc6475b7ef9acf689f98e479d539e869432cb5e?sort-order=asc&with-docs=false")
.with_status(200)
.with_header("content-type", "application/edn")
.with_body(expected_body)
.create();
let edn_body = Crux::new("localhost", "3000")
.http_client()
.entity_history(
"ecc6475b7ef9acf689f98e479d539e869432cb5e".to_string(),
Order::Asc,
false,
)
.unwrap();
let expected = EntityHistoryResponse {
history: vec![EntityHistoryElement::default()],
};
assert_eq!(edn_body, expected);
}
#[test]
fn entity_history_docs() {
let expected_body = "({:crux.tx/tx-time \"2020-07-19T04:12:13.788-00:00\", :crux.tx/tx-id 28, :crux.db/valid-time \"2020-07-19T04:12:13.788-00:00\", :crux.db/content-hash \"1828ebf4466f98ea3f5252a58734208cd0414376\", :crux.db/doc :docs})";
let _m = mock("GET", "/entity-history/ecc6475b7ef9acf689f98e479d539e869432cb5e?sort-order=asc&with-docs=true")
.with_status(200)
.with_header("content-type", "application/edn")
.with_body(expected_body)
.create();
let edn_body = Crux::new("localhost", "3000")
.http_client()
.entity_history(
"ecc6475b7ef9acf689f98e479d539e869432cb5e".to_string(),
Order::Asc,
true,
)
.unwrap();
let expected = EntityHistoryResponse {
history: vec![EntityHistoryElement::default_docs()],
};
assert_eq!(edn_body, expected);
}
}
#[cfg(test)]
mod build_url {
use super::build_timed_url;
use chrono::prelude::*;
#[test]
fn both_times_are_none() {
let url = build_timed_url("localhost:3000".to_string(), "entity", None, None);
assert_eq!(url, "localhost:3000/entity");
}
#[test]
fn both_times_are_some() {
let url = build_timed_url(
"localhost:3000".to_string(),
"entity",
Some(
"2020-08-09T18:05:29.301-03:00"
.parse::<DateTime<FixedOffset>>()
.unwrap(),
),
Some(
"2020-11-09T18:05:29.301-03:00"
.parse::<DateTime<FixedOffset>>()
.unwrap(),
),
);
assert_eq!(url, "localhost:3000/entity?transaction-time=2020-08-09T18:05:29-03:00&valid-time=2020-11-09T18:05:29-03:00");
}
#[test]
fn only_tx_time_is_some() {
let url = build_timed_url(
"localhost:3000".to_string(),
"entity",
Some(
"2020-08-09T18:05:29.301-03:00"
.parse::<DateTime<FixedOffset>>()
.unwrap(),
),
None,
);
assert_eq!(
url,
"localhost:3000/entity?transaction-time=2020-08-09T18:05:29-03:00"
);
}
#[test]
fn only_valid_time_is_some() {
let url = build_timed_url(
"localhost:3000".to_string(),
"entity",
None,
Some(
"2020-08-09T18:05:29.301+03:00"
.parse::<DateTime<FixedOffset>>()
.unwrap(),
),
);
assert_eq!(
url,
"localhost:3000/entity?valid-time=2020-08-09T18:05:29%2B03:00"
);
}
}