use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::Value;
use std::{error::Error, path::Path};
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct DataSet {
pub id: Option<String>,
pub name: Option<String>,
pub description: Option<String>,
pub owner: Option<Owner>,
pub created_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
pub data_current_at: Option<DateTime<Utc>>,
pub schema: Option<Schema>,
pub pdp_enabled: Option<bool>,
pub policies: Option<Vec<Policy>>,
pub rows: Option<u64>,
pub columns: Option<u32>,
}
impl DataSet {
pub fn new() -> Self {
DataSet {
id: None,
name: None,
description: None,
owner: None,
created_at: None,
updated_at: None,
data_current_at: None,
schema: None,
pdp_enabled: None,
policies: None,
rows: None,
columns: None,
}
}
pub fn template() -> Self {
DataSet {
id: Some(String::from("UUID")),
name: Some(String::from("DataSet Name")),
description: Some(String::from("DataSet Description")),
owner: Some(Owner {
id: 1234,
name: Some(String::from("DataSet Owner's Name")),
}),
created_at: Some(Utc::now()),
updated_at: Some(Utc::now()),
data_current_at: Some(Utc::now()),
schema: Some(Schema {
columns: Some(vec![Column {
name: Some(String::from("Column Name")),
column_type: Some(String::from(
"STRING | DECIMAL | LONG | DOUBLE | DATE | DATETIME",
)),
}]),
}),
pdp_enabled: Some(false),
policies: Some(vec![Policy::template()]),
rows: Some(0),
columns: Some(0),
}
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default)]
pub struct Owner {
pub id: u32,
pub name: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default)]
pub struct Schema {
pub columns: Option<Vec<Column>>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default)]
pub struct Column {
pub name: Option<String>,
#[serde(rename = "type")]
pub column_type: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default)]
pub struct Policy {
pub id: Option<u32>,
pub name: Option<String>,
#[serde(rename = "type")]
pub policy_type: Option<String>,
pub filters: Option<Vec<Filter>>,
pub users: Option<Vec<u64>>,
pub virtual_users: Option<Vec<String>>,
pub groups: Option<Vec<String>>,
}
impl Policy {
pub fn new() -> Self {
Self {
id: None,
name: None,
policy_type: None,
filters: None,
users: None,
virtual_users: None,
groups: None,
}
}
pub fn template() -> Self {
Self {
id: Some(0),
name: Some(String::from("Policy Name")),
policy_type: Some(String::from("user | system")),
filters: Some(vec![Filter {
column: Some(String::from("Column to filter on")),
not: Some(false),
operator: Some(String::from("EQUALS")),
values: vec![String::from("values in this column that match will apply")],
}]),
users: Some(vec![27]),
virtual_users: Some(vec![String::from("vu:324ds")]),
groups: Some(vec![String::from("15")]),
}
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default)]
pub struct Filter {
pub column: Option<String>,
pub not: Option<bool>,
pub operator: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub values: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct QueryResult {
pub datasource: Option<String>,
pub columns: Option<Vec<String>>,
pub metadata: Option<Vec<QueryMetadata>>,
pub rows: Option<Vec<Vec<Value>>>,
pub num_rows: Option<u64>,
pub num_columns: Option<u32>,
pub from_cache: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct QueryMetadata {
#[serde(rename = "type")]
pub column_type: Option<String>,
pub datasource_id: Option<String>,
pub max_length: Option<i32>,
pub min_length: Option<i32>,
pub period_index: Option<i32>,
pub aggregated: Option<bool>,
}
impl super::Client {
pub async fn get_datasets(
&self,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<DataSet>, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
#[derive(Serialize)]
struct ListParams {
pub limit: Option<u32>,
pub offset: Option<u32>,
pub sort: String,
}
let q = ListParams {
limit,
offset,
sort: "name".to_string(),
};
let mut response = surf::get(&format!("{}{}", self.host, "/v1/datasets"))
.query(&q)?
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn post_dataset(
&self,
ds: DataSet,
) -> Result<DataSet, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::post(&format!("{}{}", self.host, "/v1/datasets"))
.header("Authorization", at)
.body(surf::Body::from_json(&ds)?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_dataset(
&self,
id: &str,
) -> Result<DataSet, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::get(&format!("{}{}{}", self.host, "/v1/datasets/", id))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn put_dataset(
&self,
id: &str,
ds: DataSet,
) -> Result<DataSet, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::put(&format!("{}{}{}", self.host, "/v1/datasets/", id))
.header("Authorization", at)
.body(surf::Body::from_json(&ds)?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn delete_dataset(
&self,
id: &str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::delete(&format!("{}{}{}", self.host, "/v1/datasets/", id))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_dataset_data(
&self,
id: &str,
) -> Result<String, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
#[derive(Serialize)]
struct QueryParams {
#[serde(rename = "includeHeader")]
pub include_header: bool,
}
let q = QueryParams {
include_header: true,
};
let mut response = surf::get(&format!(
"{}{}{}{}",
self.host, "/v1/datasets/", id, "/data"
))
.query(&q)?
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_string().await?)
}
pub async fn put_dataset_data(
&self,
id: &str,
csv: impl AsRef<Path>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::put(&format!(
"{}{}{}{}",
self.host, "/v1/datasets/", id, "/data"
))
.header("Authorization", at)
.body(surf::Body::from_file(csv).await?)
.header("Content-Type", "text/csv")
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn post_dataset_query(
&self,
id: &str,
query: &str,
) -> Result<QueryResult, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::post(&format!(
"{}{}{}",
self.host, "/v1/datasets/query/execute/", id
))
.header("Authorization", at)
.body(surf::Body::from_json(&json!({ "sql": query }))?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_dataset_policies(
&self,
id: &str,
) -> Result<Vec<Policy>, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::get(&format!(
"{}{}{}{}",
self.host, "/v1/datasets/", id, "/policies"
))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn post_dataset_policy(
&self,
id: &str,
policy: Policy,
) -> Result<Policy, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::post(&format!(
"{}{}{}{}",
self.host, "/v1/datasets/", id, "/policies"
))
.header("Authorization", at)
.body(surf::Body::from_json(&policy)?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_dataset_policy(
&self,
id: &str,
policy_id: u32,
) -> Result<Policy, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::get(&format!(
"{}{}{}{}{}",
self.host, "/v1/datasets/", id, "/policies/", policy_id
))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn put_dataset_policy(
&self,
id: &str,
policy_id: u32,
policy: Policy,
) -> Result<Policy, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::put(&format!(
"{}{}{}{}{}",
self.host, "/v1/datasets/", id, "/policies/", policy_id
))
.header("Authorization", at)
.body(surf::Body::from_json(&policy)?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn delete_dataset_policy(
&self,
id: &str,
policy_id: u32,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::delete(&format!(
"{}{}{}{}{}",
self.host, "/v1/datasets/", id, "/policies/", policy_id
))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
}