use crate::{
constants::{ERR_INVALID_TIMEOUT, ERR_RECORD_EXISTS},
domain::{Record, RecordAdd, RecordUpdate},
};
use chrono::{DateTime, Duration, Utc};
use serde::Serialize;
use std::collections::HashMap;
use urlencoding::encode;
static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ClientError {
RecordExists,
InvalidTimeout,
ReqwestError(reqwest::Error),
}
impl std::fmt::Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ClientError::RecordExists => ERR_RECORD_EXISTS.to_string(),
ClientError::InvalidTimeout => ERR_INVALID_TIMEOUT.to_string(),
ClientError::ReqwestError(e) => format!("Reqwest Error: {e}"),
}
)
}
}
impl From<reqwest::Error> for ClientError {
fn from(error: reqwest::Error) -> Self {
ClientError::ReqwestError(error)
}
}
impl From<chrono::OutOfRangeError> for ClientError {
fn from(_: chrono::OutOfRangeError) -> Self {
ClientError::InvalidTimeout
}
}
#[derive(Clone)]
pub struct AuditorClientBuilder {
address: String,
timeout: Duration,
}
impl AuditorClientBuilder {
pub fn new() -> AuditorClientBuilder {
AuditorClientBuilder {
address: "http://127.0.0.1:8080".into(),
timeout: Duration::try_seconds(30).expect("This should never fail"),
}
}
#[must_use]
pub fn address<T: AsRef<str>>(mut self, address: &T, port: u16) -> Self {
self.address = format!("http://{}:{}", address.as_ref(), port);
self
}
#[must_use]
pub fn connection_string<T: AsRef<str>>(mut self, connection_string: &T) -> Self {
self.address = connection_string.as_ref().into();
self
}
#[must_use]
pub fn timeout(mut self, timeout: i64) -> Self {
self.timeout = Duration::try_seconds(timeout)
.unwrap_or_else(|| panic!("Could not convert {} to duration", timeout));
self
}
pub fn build(self) -> Result<AuditorClient, ClientError> {
Ok(AuditorClient {
address: self.address,
client: reqwest::ClientBuilder::new()
.user_agent(APP_USER_AGENT)
.timeout(self.timeout.to_std()?)
.build()?,
})
}
pub fn build_blocking(self) -> Result<AuditorClientBlocking, ClientError> {
Ok(AuditorClientBlocking {
address: self.address,
client: reqwest::blocking::ClientBuilder::new()
.user_agent(APP_USER_AGENT)
.timeout(self.timeout.to_std()?)
.build()?,
})
}
}
impl Default for AuditorClientBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(serde::Deserialize, Debug, Default, Clone)]
pub struct DateTimeUtcWrapper(pub DateTime<Utc>);
impl Serialize for DateTimeUtcWrapper {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.0.to_rfc3339())
}
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Default, Clone)]
pub struct QueryParameters {
pub record_id: Option<String>,
pub start_time: Option<Operator>,
pub stop_time: Option<Operator>,
pub runtime: Option<Operator>,
pub meta: Option<MetaQuery>,
pub component: Option<ComponentQuery>,
pub sort_by: Option<SortBy>,
pub limit: Option<u64>,
}
impl Default for QueryBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(serde::Deserialize, Debug, Clone)]
pub enum Value {
Datetime(DateTimeUtcWrapper),
Runtime(u64),
Count(u8),
}
impl Serialize for Value {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Value::Datetime(datetime) => datetime.serialize(serializer),
Value::Runtime(runtime) => runtime.serialize(serializer),
Value::Count(count) => count.serialize(serializer),
}
}
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Default, Clone)]
pub struct Operator {
pub gt: Option<Value>,
pub lt: Option<Value>,
pub gte: Option<Value>,
pub lte: Option<Value>,
pub equals: Option<Value>,
}
impl Operator {
pub fn gt(mut self, value: Value) -> Self {
self.gt = Some(value);
self
}
pub fn lt(mut self, value: Value) -> Self {
self.lt = Some(value);
self
}
pub fn gte(mut self, value: Value) -> Self {
self.gte = Some(value);
self
}
pub fn lte(mut self, value: Value) -> Self {
self.lte = Some(value);
self
}
pub fn equals(mut self, value: Value) -> Self {
if !matches!(value, Value::Datetime(_)) {
self.equals = Some(value);
self
} else {
self
}
}
}
impl From<chrono::DateTime<Utc>> for Value {
fn from(item: chrono::DateTime<Utc>) -> Self {
Value::Datetime(DateTimeUtcWrapper(item))
}
}
impl From<u64> for Value {
fn from(item: u64) -> Self {
Value::Runtime(item)
}
}
impl From<u8> for Value {
fn from(item: u8) -> Self {
Value::Count(item)
}
}
#[derive(Debug, Clone)]
pub struct QueryBuilder {
pub query_params: QueryParameters,
}
impl QueryBuilder {
pub fn new() -> Self {
QueryBuilder {
query_params: QueryParameters {
record_id: None,
start_time: None,
stop_time: None,
runtime: None,
meta: None,
component: None,
sort_by: None,
limit: None,
},
}
}
pub fn with_record_id(mut self, record_id: String) -> Self {
self.query_params.record_id = Some(record_id);
self
}
pub fn with_start_time(mut self, time_operator: Operator) -> Self {
self.query_params.start_time = Some(time_operator);
self
}
pub fn with_stop_time(mut self, time_operator: Operator) -> Self {
self.query_params.stop_time = Some(time_operator);
self
}
pub fn with_runtime(mut self, time_operator: Operator) -> Self {
self.query_params.runtime = Some(time_operator);
self
}
pub fn with_meta_query(mut self, meta: MetaQuery) -> Self {
self.query_params.meta = Some(meta);
self
}
pub fn with_component_query(mut self, component: ComponentQuery) -> Self {
self.query_params.component = Some(component);
self
}
pub fn sort_by(mut self, sort: SortBy) -> Self {
self.query_params.sort_by = Some(sort);
self
}
pub fn limit(mut self, number: u64) -> Self {
self.query_params.limit = Some(number);
self
}
pub async fn get(&self, client: AuditorClient) -> Result<Vec<Record>, ClientError> {
let query_string = self.build();
client.advanced_query(query_string).await
}
pub fn build(&self) -> String {
serde_qs::to_string(&self.query_params).expect("Failed to serialize query parameters")
}
}
#[derive(serde::Deserialize, Debug, Default, Clone)]
pub struct MetaQuery {
pub meta_query: HashMap<String, Option<MetaOperator>>,
}
impl MetaQuery {
pub fn new() -> Self {
MetaQuery {
meta_query: HashMap::new(),
}
}
pub fn meta_operator(mut self, query_id: String, operator: MetaOperator) -> Self {
self.meta_query.insert(query_id.to_string(), Some(operator));
self
}
}
impl Serialize for MetaQuery {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.meta_query.serialize(serializer)
}
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Default, Clone)]
pub struct MetaOperator {
pub c: Option<String>,
pub dnc: Option<String>,
}
impl MetaOperator {
pub fn contains(mut self, c: String) -> Self {
self.c = Some(c);
self
}
pub fn does_not_contains(mut self, dnc: String) -> Self {
self.dnc = Some(dnc);
self
}
}
#[derive(serde::Deserialize, Debug, Default, Clone)]
pub struct ComponentQuery {
pub component_query: HashMap<String, Option<Operator>>,
}
impl ComponentQuery {
pub fn new() -> Self {
ComponentQuery {
component_query: HashMap::new(),
}
}
pub fn component_operator(mut self, query_id: String, operator: Operator) -> Self {
self.component_query
.insert(query_id.to_string(), Some(operator));
self
}
}
impl Serialize for ComponentQuery {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.component_query.serialize(serializer)
}
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Default, Clone)]
pub struct SortBy {
pub asc: Option<String>,
pub desc: Option<String>,
}
impl SortBy {
pub fn new() -> Self {
Self {
asc: None,
desc: None,
}
}
pub fn ascending(mut self, column: String) -> Self {
self.asc = Some(column);
self
}
pub fn descending(mut self, column: String) -> Self {
self.desc = Some(column);
self
}
}
#[derive(Clone)]
pub struct AuditorClient {
address: String,
client: reqwest::Client,
}
impl AuditorClient {
#[tracing::instrument(name = "Checking health of AUDITOR server.", skip(self))]
pub async fn health_check(&self) -> bool {
match self
.client
.get(&format!("{}/health_check", &self.address))
.send()
.await
{
Ok(s) => s.error_for_status().is_ok(),
Err(_) => false,
}
}
#[tracing::instrument(
name = "Sending a record to AUDITOR server.",
skip(self, record),
fields(record_id = %record.record_id),
level = "debug"
)]
pub async fn add(&self, record: &RecordAdd) -> Result<(), ClientError> {
let response = self
.client
.post(&format!("{}/record", &self.address))
.header("Content-Type", "application/json")
.json(record)
.send()
.await?;
if response.text().await? == ERR_RECORD_EXISTS {
Err(ClientError::RecordExists)
} else {
Ok(())
}
}
#[tracing::instrument(
name = "Sending multiple records to AUDITOR server.",
skip(self, records)
)]
pub async fn bulk_insert(&self, records: &Vec<RecordAdd>) -> Result<(), ClientError> {
let response = self
.client
.post(&format!("{}/records", &self.address))
.header("Content-Type", "application/json")
.json(records)
.send()
.await?;
if response.text().await? == ERR_RECORD_EXISTS {
Err(ClientError::RecordExists)
} else {
Ok(())
}
}
#[tracing::instrument(
name = "Sending a record update to AUDITOR server.",
skip(self, record),
fields(record_id = %record.record_id)
)]
pub async fn update(&self, record: &RecordUpdate) -> Result<(), ClientError> {
self.client
.put(&format!("{}/record", &self.address))
.header("Content-Type", "application/json")
.json(record)
.send()
.await?
.error_for_status()?;
Ok(())
}
#[tracing::instrument(name = "Getting all records from AUDITOR server.", skip(self))]
pub async fn get(&self) -> Result<Vec<Record>, ClientError> {
Ok(self
.client
.get(&format!("{}/records", &self.address))
.send()
.await?
.error_for_status()?
.json()
.await?)
}
#[tracing::instrument(
name = "Getting all records started since a given date from AUDITOR server.",
skip(self),
fields(started_since = %since)
)]
#[deprecated(since = "0.4.0", note = "please use `advanced_query` instead")]
pub async fn get_started_since(
&self,
since: &DateTime<Utc>,
) -> Result<Vec<Record>, ClientError> {
dbg!(since.to_rfc3339());
let since_str = since.to_rfc3339();
let encoded_since = encode(&since_str);
Ok(self
.client
.get(&format!(
"{}/records?start_time[gte]={}",
&self.address, encoded_since
))
.send()
.await?
.error_for_status()?
.json()
.await?)
}
#[tracing::instrument(
name = "Getting all records stopped since a given date from AUDITOR server.",
skip(self),
fields(started_since = %since)
)]
#[deprecated(since = "0.4.0", note = "please use `advanced_query` instead")]
pub async fn get_stopped_since(
&self,
since: &DateTime<Utc>,
) -> Result<Vec<Record>, ClientError> {
let since_str = since.to_rfc3339();
let encoded_since = encode(&since_str);
Ok(self
.client
.get(&format!(
"{}/records?stop_time[gte]={}",
&self.address, encoded_since
))
.send()
.await?
.error_for_status()?
.json()
.await?)
}
#[tracing::instrument(
name = "Getting records from AUDITOR server using custom query",
skip(self)
)]
pub async fn advanced_query(&self, query_string: String) -> Result<Vec<Record>, ClientError> {
Ok(self
.client
.get(&format!("{}/records?{}", &self.address, query_string))
.send()
.await?
.error_for_status()?
.json()
.await?)
}
#[tracing::instrument(
name = "Getting a single record from AUDITOR server using record_id",
skip(self)
)]
pub async fn get_single_record(&self, record_id: String) -> Result<Record, ClientError> {
Ok(self
.client
.get(&format!("{}/record/{}", &self.address, record_id))
.send()
.await?
.error_for_status()?
.json()
.await?)
}
}
#[derive(Clone)]
pub struct AuditorClientBlocking {
address: String,
client: reqwest::blocking::Client,
}
impl AuditorClientBlocking {
#[tracing::instrument(name = "Checking health of AUDITOR server.", skip(self))]
pub fn health_check(&self) -> bool {
match self
.client
.get(format!("{}/health_check", &self.address))
.send()
{
Ok(s) => s.error_for_status().is_ok(),
Err(_) => false,
}
}
#[tracing::instrument(
name = "Sending a record to AUDITOR server.",
skip(self, record),
fields(record_id = %record.record_id)
)]
pub fn add(&self, record: &RecordAdd) -> Result<(), ClientError> {
let response = self
.client
.post(format!("{}/record", &self.address))
.header("Content-Type", "application/json")
.json(record)
.send()?;
if response.text()? == ERR_RECORD_EXISTS {
Err(ClientError::RecordExists)
} else {
Ok(())
}
}
#[tracing::instrument(
name = "Sending multiple records to AUDITOR server.",
skip(self, records)
)]
pub fn bulk_insert(&self, records: &Vec<RecordAdd>) -> Result<(), ClientError> {
let response = self
.client
.post(format!("{}/records", &self.address))
.header("Content-Type", "application/json")
.json(records)
.send()?;
if response.text()? == ERR_RECORD_EXISTS {
Err(ClientError::RecordExists)
} else {
Ok(())
}
}
#[tracing::instrument(
name = "Sending a record update to AUDITOR server.",
skip(self, record),
fields(record_id = %record.record_id)
)]
pub fn update(&self, record: &RecordUpdate) -> Result<(), ClientError> {
self.client
.put(format!("{}/record", &self.address))
.header("Content-Type", "application/json")
.json(record)
.send()?
.error_for_status()?;
Ok(())
}
#[tracing::instrument(name = "Getting all records from AUDITOR server.", skip(self))]
pub fn get(&self) -> Result<Vec<Record>, ClientError> {
Ok(self
.client
.get(format!("{}/records", &self.address))
.send()?
.error_for_status()?
.json()?)
}
#[tracing::instrument(
name = "Getting all records started since a given date from AUDITOR server.",
skip(self),
fields(started_since = %since)
)]
#[deprecated(since = "0.4.0", note = "please use `advanced_query` instead")]
pub fn get_started_since(&self, since: &DateTime<Utc>) -> Result<Vec<Record>, ClientError> {
dbg!(since.to_rfc3339());
let since_str = since.to_rfc3339();
let encoded_since = encode(&since_str);
Ok(self
.client
.get(format!(
"{}/records?start_time[gte]={}",
&self.address, encoded_since
))
.send()?
.error_for_status()?
.json()?)
}
#[tracing::instrument(
name = "Getting all records stopped since a given date from AUDITOR server.",
skip(self),
fields(started_since = %since)
)]
#[deprecated(since = "0.4.0", note = "please use `advanced_query` instead")]
pub fn get_stopped_since(&self, since: &DateTime<Utc>) -> Result<Vec<Record>, ClientError> {
let since_str = since.to_rfc3339();
let encoded_since = encode(&since_str);
Ok(self
.client
.get(format!(
"{}/records?stop_time[gte]={}",
&self.address, encoded_since
))
.send()?
.error_for_status()?
.json()?)
}
pub fn advanced_query(&self, query_params: String) -> Result<Vec<Record>, ClientError> {
Ok(self
.client
.get(format!("{}/records?{}", &self.address, query_params))
.send()?
.error_for_status()?
.json()?)
}
#[tracing::instrument(
name = "Getting a single record from AUDITOR server using record_id",
skip(self)
)]
pub fn get_single_record(&self, record_id: &str) -> Result<Record, ClientError> {
Ok(self
.client
.get(format!("{}/record/{}", &self.address, record_id))
.send()?
.error_for_status()?
.json()?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::RecordTest;
use chrono::TimeZone;
use claim::assert_err;
use fake::{Fake, Faker};
use wiremock::matchers::{any, body_json, header, method, path, query_param};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn record<T: TryFrom<RecordTest>>() -> T
where
<T as TryFrom<RecordTest>>::Error: std::fmt::Debug,
{
T::try_from(Faker.fake::<RecordTest>()).unwrap()
}
#[tokio::test]
async fn get_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response = client.get().await.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn blocking_get_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response = tokio::task::spawn_blocking(move || client.get().unwrap())
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn health_check_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
Mock::given(method("GET"))
.and(path("/health_check"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
assert!(client.health_check().await);
}
#[tokio::test]
async fn blocking_health_check_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
Mock::given(method("GET"))
.and(path("/health_check"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let response = tokio::task::spawn_blocking(move || client.health_check())
.await
.unwrap();
assert!(response);
}
#[tokio::test]
async fn health_check_fails_on_timeout() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.timeout(1)
.build()
.unwrap();
Mock::given(method("GET"))
.and(path("/health_check"))
.respond_with(
ResponseTemplate::new(200).set_delay(
Duration::try_seconds(180)
.expect("This should never fail")
.to_std()
.expect("This should never fail"),
),
)
.expect(1)
.mount(&mock_server)
.await;
assert!(!client.health_check().await);
}
#[tokio::test]
async fn blocking_health_check_fails_on_timeout() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.timeout(1)
.build_blocking()
.unwrap()
})
.await
.unwrap();
Mock::given(method("GET"))
.and(path("/health_check"))
.respond_with(
ResponseTemplate::new(200).set_delay(
Duration::try_seconds(180)
.expect("This should never fail")
.to_std()
.expect("This should never fail"),
),
)
.expect(1)
.mount(&mock_server)
.await;
let response = tokio::task::spawn_blocking(move || client.health_check())
.await
.unwrap();
assert!(!response);
}
#[tokio::test]
async fn health_check_fails_on_500() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.timeout(1)
.build()
.unwrap();
Mock::given(method("GET"))
.and(path("/health_check"))
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
assert!(!client.health_check().await);
}
#[tokio::test]
async fn blocking_health_check_fails_on_500() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.timeout(1)
.build_blocking()
.unwrap()
})
.await
.unwrap();
Mock::given(method("GET"))
.and(path("/health_check"))
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
let response = tokio::task::spawn_blocking(move || client.health_check())
.await
.unwrap();
assert!(!response);
}
#[tokio::test]
async fn add_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let record: RecordAdd = record();
Mock::given(method("POST"))
.and(path("/record"))
.and(header("Content-Type", "application/json"))
.and(body_json(&record))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let _res = client.add(&record).await;
}
#[tokio::test]
async fn blocking_add_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let record: RecordAdd = record();
Mock::given(method("POST"))
.and(path("/record"))
.and(header("Content-Type", "application/json"))
.and(body_json(&record))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let _res = tokio::task::spawn_blocking(move || client.add(&record))
.await
.unwrap();
}
#[tokio::test]
async fn add_fails_on_existing_record() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let record: RecordAdd = record();
Mock::given(any())
.respond_with(ResponseTemplate::new(500).set_body_string(ERR_RECORD_EXISTS))
.expect(1)
.mount(&mock_server)
.await;
assert_err!(client.add(&record).await);
}
#[tokio::test]
async fn blocking_add_fails_on_existing_record() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let record: RecordAdd = record();
Mock::given(any())
.respond_with(ResponseTemplate::new(500).set_body_string(ERR_RECORD_EXISTS))
.expect(1)
.mount(&mock_server)
.await;
let res = tokio::task::spawn_blocking(move || client.add(&record))
.await
.unwrap();
assert_err!(res);
}
#[tokio::test]
async fn update_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let record: RecordUpdate = record();
Mock::given(method("PUT"))
.and(path("/record"))
.and(header("Content-Type", "application/json"))
.and(body_json(&record))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let _res = client.update(&record).await;
}
#[tokio::test]
async fn blocking_update_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let record: RecordUpdate = record();
Mock::given(method("PUT"))
.and(path("/record"))
.and(header("Content-Type", "application/json"))
.and(body_json(&record))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let _res = tokio::task::spawn_blocking(move || client.update(&record))
.await
.unwrap();
}
#[tokio::test]
async fn update_fails_on_500() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let record: RecordUpdate = record();
Mock::given(any())
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
assert_err!(client.update(&record).await);
}
#[tokio::test]
async fn blocking_update_fails_on_500() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let record: RecordUpdate = record();
Mock::given(any())
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
let res = tokio::task::spawn_blocking(move || client.update(&record))
.await
.unwrap();
assert_err!(res);
}
#[tokio::test]
async fn get_advanced_queries_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("start_time[gte]", "2022-08-03T09:47:00+00:00"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
let response = QueryBuilder::new()
.with_start_time(Operator::default().gte(datetime_utc.into()))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_record_query_with_start_time_and_stop_time_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("start_time[gte]", "2022-08-03T09:47:00+00:00"))
.and(query_param("stop_time[gte]", "2022-08-03T09:47:00+00:00"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
let response = QueryBuilder::new()
.with_start_time(Operator::default().gte(datetime_utc.into()))
.with_stop_time(Operator::default().gte(datetime_utc.into()))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_record_query_with_start_time_gte_and_start_time_lte_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("start_time[gte]", "2022-08-03T09:47:00+00:00"))
.and(query_param("start_time[lte]", "2022-08-04T09:47:00+00:00"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc_gte = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
let datetime_utc_lte = Utc.with_ymd_and_hms(2022, 8, 4, 9, 47, 0).unwrap();
let response = QueryBuilder::new()
.with_start_time(
Operator::default()
.gte(datetime_utc_gte.into())
.lte(datetime_utc_lte.into()),
)
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_record_query_with_start_time_gte_and_start_time_lte_runtime_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("start_time[gte]", "2022-08-03T09:47:00+00:00"))
.and(query_param("start_time[lte]", "2022-08-04T09:47:00+00:00"))
.and(query_param("runtime[gte]", "100000"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc_gte = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
let datetime_utc_lte = Utc.with_ymd_and_hms(2022, 8, 4, 9, 47, 0).unwrap();
let runtime: u64 = 100000;
let response = QueryBuilder::new()
.with_start_time(
Operator::default()
.gte(datetime_utc_gte.into())
.lte(datetime_utc_lte.into()),
)
.with_runtime(Operator::default().gte(runtime.into()))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_record_query_with_start_time_stop_time_and_runtime_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("start_time[gte]", "2022-08-03T09:47:00+00:00"))
.and(query_param("start_time[lte]", "2022-08-04T09:47:00+00:00"))
.and(query_param("runtime[gte]", "100000"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc_gte = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
let datetime_utc_lte = Utc.with_ymd_and_hms(2022, 8, 4, 9, 47, 0).unwrap();
let runtime_gte: u64 = 100000;
let runtime_lte: u64 = 200000;
let response = QueryBuilder::new()
.with_start_time(
Operator::default()
.gte(datetime_utc_gte.into())
.lte(datetime_utc_lte.into()),
)
.with_stop_time(
Operator::default()
.gte(datetime_utc_gte.into())
.lte(datetime_utc_lte.into()),
)
.with_runtime(
Operator::default()
.gte(runtime_gte.into())
.lte(runtime_lte.into()),
)
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_advanced_queries_fails_on_500() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
Mock::given(any())
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc_gte = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
assert_err!(
QueryBuilder::new()
.with_stop_time(Operator::default().gte(datetime_utc_gte.into()))
.get(client)
.await
);
}
#[tokio::test]
async fn get_meta_queries_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("meta[site_id][c]", "group_1"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response = QueryBuilder::new()
.with_meta_query(MetaQuery::new().meta_operator(
"site_id".to_string(),
MetaOperator::default().contains("group_1".to_string()),
))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_meta_queries_and_start_time_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("meta[site_id][c]", "group_1"))
.and(query_param("start_time[lte]", "2022-08-04T09:47:00+00:00"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc_lte = Utc.with_ymd_and_hms(2022, 8, 4, 9, 47, 0).unwrap();
let response = QueryBuilder::new()
.with_meta_query(MetaQuery::new().meta_operator(
"site_id".to_string(),
MetaOperator::default().contains("group_1".to_string()),
))
.with_start_time(Operator::default().lte(datetime_utc_lte.into()))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_component_queries_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("component[cpu][equals]", "4"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let count: u8 = 4;
let response =
QueryBuilder::new()
.with_component_query(ComponentQuery::new().component_operator(
"cpu".to_string(),
Operator::default().equals(count.into()),
))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn blocking_advanced_queries_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("stop_time[gte]", "2022-08-03T09:47:00+00:00"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let datetime_utc = Utc.with_ymd_and_hms(2022, 8, 3, 9, 47, 0).unwrap();
let query_string = QueryBuilder::new()
.with_stop_time(Operator::default().gte(datetime_utc.into()))
.build();
let response = tokio::task::spawn_blocking(move || client.advanced_query(query_string))
.await
.unwrap()
.unwrap();
println!("{:?}", &response);
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_sort_by_query_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("sort_by[asc]", "start_time"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response = QueryBuilder::new()
.sort_by(SortBy::new().ascending("start_time".to_string()))
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn limit_get_query_records_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("limit", "500"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let number: u64 = 500;
let response = QueryBuilder::new()
.sort_by(SortBy::new().ascending("start_time".to_string()))
.limit(number)
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_exact_record_using_record_id_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let body: Vec<Record> = vec![record()];
Mock::given(method("GET"))
.and(path("/records"))
.and(query_param("record_id", "r1"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response = QueryBuilder::new()
.with_record_id("r1".to_string())
.get(client)
.await
.unwrap();
response
.into_iter()
.zip(body)
.map(|(rr, br)| assert_eq!(rr, br))
.count();
}
#[tokio::test]
async fn get_single_record_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let record_id: &str = "r3";
let body: Record = record();
Mock::given(method("GET"))
.and(path("/record/r3"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response = client
.get_single_record(record_id.to_string())
.await
.unwrap();
assert_eq!(body, response)
}
#[tokio::test]
async fn blocking_get_single_record_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let record_id: &str = "r3";
let body: Record = record();
Mock::given(method("GET"))
.and(path("/record/r3"))
.respond_with(ResponseTemplate::new(200).set_body_json(&body))
.expect(1)
.mount(&mock_server)
.await;
let response =
tokio::task::spawn_blocking(move || client.get_single_record(record_id).unwrap())
.await
.unwrap();
assert_eq!(body, response)
}
#[tokio::test]
async fn get_single_record_fails_on_500() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let record_id: &str = "r3";
Mock::given(any())
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
assert_err!(client.get_single_record(record_id.to_string()).await);
}
#[tokio::test]
async fn blocking_get_single_record_fails_on_500() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let record_id: &str = "r3";
Mock::given(any())
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
let res = tokio::task::spawn_blocking(move || client.get_single_record(record_id))
.await
.unwrap();
assert_err!(res);
}
#[tokio::test]
async fn bulk_insert_succeeds() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let records: Vec<RecordAdd> = (0..10).map(|_| record()).collect();
Mock::given(method("POST"))
.and(path("/records"))
.and(header("Content-Type", "application/json"))
.and(body_json(&records))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let _res = client.bulk_insert(&records).await;
}
#[tokio::test]
async fn blocking_bulk_insert_succeeds() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let records: Vec<RecordAdd> = (0..10).map(|_| record()).collect();
Mock::given(method("POST"))
.and(path("/records"))
.and(header("Content-Type", "application/json"))
.and(body_json(&records))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let _res = tokio::task::spawn_blocking(move || client.bulk_insert(&records))
.await
.unwrap();
}
#[tokio::test]
async fn bulk_insert_fails_on_existing_record() {
let mock_server = MockServer::start().await;
let client = AuditorClientBuilder::new()
.connection_string(&mock_server.uri())
.build()
.unwrap();
let records: Vec<RecordAdd> = (0..10).map(|_| record()).collect();
Mock::given(any())
.respond_with(ResponseTemplate::new(500).set_body_string(ERR_RECORD_EXISTS))
.expect(1)
.mount(&mock_server)
.await;
assert_err!(client.bulk_insert(&records).await);
}
#[tokio::test]
async fn blocking_bulk_insert_fails_on_existing_record() {
let mock_server = MockServer::start().await;
let uri = mock_server.uri();
let client = tokio::task::spawn_blocking(move || {
AuditorClientBuilder::new()
.connection_string(&uri)
.build_blocking()
.unwrap()
})
.await
.unwrap();
let records: Vec<RecordAdd> = (0..10).map(|_| record()).collect();
Mock::given(any())
.respond_with(ResponseTemplate::new(500).set_body_string(ERR_RECORD_EXISTS))
.expect(1)
.mount(&mock_server)
.await;
let res = tokio::task::spawn_blocking(move || client.bulk_insert(&records))
.await
.unwrap();
assert_err!(res);
}
}