use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{error::Error, path::Path};
use crate::public::dataset::DataSet;
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct Stream {
pub id: Option<u32>,
pub created_at: Option<DateTime<Utc>>,
pub modified_at: Option<DateTime<Utc>>,
pub update_method: Option<String>,
pub key_column_name: Option<String>,
#[serde(rename = "dataSet")]
pub dataset: Option<DataSet>,
pub deleted: Option<bool>,
}
impl Stream {
pub fn new() -> Self {
Stream {
id: None,
created_at: None,
modified_at: None,
update_method: None,
key_column_name: None,
dataset: None,
deleted: None,
}
}
pub fn template() -> Self {
Stream {
id: Some(0),
created_at: Some(Utc::now()),
modified_at: Some(Utc::now()),
update_method: Some(String::from("APPEND | REPLACE | UPSERT")),
key_column_name: Some(String::from(
"Defines the key column used for UPSERT updates",
)),
dataset: Some(DataSet::template()),
deleted: Some(false),
}
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct Execution {
pub id: Option<u32>,
pub started_at: Option<DateTime<Utc>>,
pub current_state: Option<String>,
pub created_at: Option<DateTime<Utc>>,
pub modified_at: Option<DateTime<Utc>>,
}
impl super::Client {
pub async fn get_streams(
&self,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<Stream>, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
#[derive(Serialize)]
struct QueryParams {
pub limit: Option<u32>,
pub offset: Option<u32>,
}
let q = QueryParams { limit, offset };
let mut response = surf::get(&format!("{}{}", self.host, "/v1/streams"))
.query(&q)?
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_stream_search_dataset_id(
&self,
dsid: &str,
) -> Result<Vec<Stream>, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
#[derive(Serialize)]
struct QueryParams {
pub q: String,
}
let query = QueryParams {
q: String::from("dataSource.id:") + dsid,
};
let mut response = surf::get(&format!("{}{}", self.host, "/v1/streams/search"))
.query(&query)?
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_stream_search_dataset_owner_id(
&self,
dsoid: &str,
) -> Result<Vec<Stream>, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
#[derive(Serialize)]
struct QueryParams {
pub q: String,
}
let query = QueryParams {
q: String::from("dataSource.owner.id:") + dsoid,
};
let mut response = surf::get(&format!("{}{}", self.host, "/v1/streams/search"))
.query(&query)?
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn post_stream(
&self,
stream: Stream,
) -> Result<Stream, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::post(&format!("{}{}", self.host, "/v1/streams"))
.header("Authorization", at)
.body(surf::Body::from_json(&stream)?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_stream(
&self,
id: &str,
) -> Result<Stream, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::get(&format!("{}{}{}", self.host, "/v1/streams/", id))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn patch_stream(
&self,
id: &str,
stream: Stream,
) -> Result<Stream, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::patch(&format!("{}{}{}", self.host, "/v1/streams/", id))
.header("Authorization", at)
.body(surf::Body::from_json(&stream)?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn delete_stream(
&self,
id: &str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::delete(&format!("{}{}{}", self.host, "/v1/streams/", id))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_stream_execution(
&self,
id: &str,
execution_id: &str,
) -> Result<Execution, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::get(&format!(
"{}{}{}{}{}",
self.host, "/v1/streams/", id, "/executions/", execution_id
))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn post_stream_execution(
&self,
id: &str,
) -> Result<Execution, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::post(&format!(
"{}{}{}{}",
self.host, "/v1/streams/", id, "/executions"
))
.header("Authorization", at)
.body(surf::Body::from_json(&json!({}))?)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn get_stream_executions(
&self,
id: &str,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<Execution>, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
#[derive(Serialize)]
struct QueryParams {
pub limit: Option<u32>,
pub offset: Option<u32>,
}
let q = QueryParams { limit, offset };
let mut response = surf::get(&format!(
"{}{}{}{}",
self.host, "/v1/streams/", id, "/executions"
))
.query(&q)?
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn put_stream_execution_part(
&self,
id: &str,
execution_id: &str,
part_id: &str,
csv: impl AsRef<Path>,
) -> Result<Execution, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::put(&format!(
"{}{}{}{}{}{}{}",
self.host, "/v1/streams/", id, "/executions/", execution_id, "/part/", part_id
))
.header("Authorization", at)
.body(surf::Body::from_file(csv).await?)
.header("Content-Type", "text/csv")
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn put_stream_execution_commit(
&self,
id: &str,
execution_id: &str,
) -> Result<Execution, Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::put(&format!(
"{}{}{}{}{}{}",
self.host, "/v1/streams/", id, "/executions/", execution_id, "/commit"
))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
pub async fn put_stream_execution_abort(
&self,
id: &str,
execution_id: &str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let at = self.get_access_token("data").await?;
let mut response = surf::put(&format!(
"{}{}{}{}{}{}",
self.host, "/v1/streams/", id, "/executions/", execution_id, "/abort"
))
.header("Authorization", at)
.await?;
if !response.status().is_success() {
let e: Box<super::PubAPIError> = response.body_json().await?;
return Err(e);
}
Ok(response.body_json().await?)
}
}