use getset::Getters;
use reqwest::Response;
use thiserror::Error;
use tokio_stream::StreamExt;
use tokio_tungstenite::tungstenite;
use tokio_tungstenite::tungstenite::Message;
use url::Url;
use std::fs::File;
use std::path::{Path, PathBuf};
use crate::server::event::{SchedulerEvent, SchedulerEventResult};
use crate::server::model::Flow;
use crate::server::record::{FlowListRecord, FlowRecord};
#[derive(Error, Debug)]
pub enum ClientError {
#[error("invalid url error: {0}")]
InvalidUrl(
#[source]
#[from]
url::ParseError,
),
#[error("request error: {0}")]
Request(
#[source]
#[from]
reqwest::Error,
),
#[error("response {0} error: {1}")]
ResponseNotOk(u16, String),
#[error("io error: {0}")]
Io(
#[source]
#[from]
std::io::Error,
),
#[error("url scheme conversion error")]
UrlSchemeConversion,
#[error("websocket error: {0}")]
Websocket(
#[source]
#[from]
tokio_tungstenite::tungstenite::Error,
),
}
#[derive(Error, Debug)]
pub enum ClientWebsocketError {
#[error("websocket error: {0}")]
Websocket(
#[source]
#[from]
tokio_tungstenite::tungstenite::Error,
),
#[error("malformed event error: {0}")]
MalformedEvent(serde_json::Error),
#[error("lag error: {0}")]
Lag(u64),
}
#[derive(Getters, Debug)]
pub struct FlowList {
#[getset(get = "pub")]
list: Vec<FlowListRecord>,
}
impl IntoIterator for FlowList {
type Item = FlowListRecord;
type IntoIter = std::vec::IntoIter<FlowListRecord>;
fn into_iter(self) -> Self::IntoIter {
self.list.into_iter()
}
}
impl<'a> IntoIterator for &'a FlowList {
type Item = &'a FlowListRecord;
type IntoIter = std::slice::Iter<'a, FlowListRecord>;
fn into_iter(self) -> Self::IntoIter {
self.list.iter()
}
}
#[derive(Getters, Debug)]
pub struct BytesDownloaded {
#[getset(get = "pub")]
num_bytes: u64,
}
pub struct Okay();
fn get_abs_url(url: &str, path: &str) -> Result<Url, ClientError> {
let base = Url::parse(url)?;
let joined = base.join(path)?;
Ok(joined)
}
pub async fn list_workflows(url: &str) -> Result<FlowList, ClientError> {
let abs_url = get_abs_url(url, "/api/v1/job")?;
Ok(FlowList {
list: reqwest::get(abs_url)
.await?
.json::<Vec<FlowListRecord>>()
.await?,
})
}
pub async fn get_status(url: &str, id: &str) -> Result<FlowRecord, ClientError> {
let abs_url = get_abs_url(url, &format!("/api/v1/job/{}", id))?;
Ok(reqwest::get(abs_url).await?.json::<FlowRecord>().await?)
}
async fn check_status(response: reqwest::Response) -> Result<reqwest::Response, ClientError> {
let response_status = response.status();
if response_status != 200 {
return Err(ClientError::ResponseNotOk(
response_status.as_u16(),
response.text().await?,
));
}
Ok(response)
}
async fn check_status_take(response: reqwest::Response) -> Result<Okay, ClientError> {
check_status(response).await?;
Ok(Okay())
}
pub async fn create_secret(url: &str, key: &str, value: &str) -> Result<Okay, ClientError> {
let abs_url = get_abs_url(url, &format!("api/v1/secret/{}", key))?;
let client = reqwest::Client::new();
check_status_take(client.post(abs_url).json::<str>(value).send().await?).await
}
pub async fn update_secret(url: &str, key: &str, value: &str) -> Result<Okay, ClientError> {
let abs_url = get_abs_url(url, &format!("api/v1/secret/{}", key))?;
let client = reqwest::Client::new();
check_status_take(client.put(abs_url).json::<str>(value).send().await?).await
}
pub async fn delete_secret(url: &str, key: &str) -> Result<Okay, ClientError> {
let abs_url = get_abs_url(url, &format!("api/v1/secret/{}", key))?;
let client = reqwest::Client::new();
check_status_take(client.delete(abs_url).send().await?).await
}
fn get_path_from_response_url(
response: &reqwest::Response,
dir_path: &str,
default_name: &str,
) -> PathBuf {
let file_name = response
.url()
.path_segments()
.and_then(|segments| segments.last())
.and_then(|name| if name.is_empty() { None } else { Some(name) })
.unwrap_or(default_name);
Path::new(dir_path).join(file_name)
}
pub async fn download_artefact(url: &str, id: &str, name: &str) -> Result<Response, ClientError> {
let abs_url = get_abs_url(url, &format!("/api/v1/artefact/{}/{}", id, name))?;
let response = reqwest::get(abs_url).await?;
check_status(response).await
}
pub async fn download_artefact_to_path(
url: &str,
id: &str,
name: &str,
dest: &str,
) -> Result<BytesDownloaded, ClientError> {
let response = download_artefact(url, id, name).await?;
let file_path = get_path_from_response_url(&response, dest, &format!("flow-{}-output", id));
let content = response.text().await?;
let mut file = File::create(file_path)?;
let num_bytes = std::io::copy(&mut content.as_bytes(), &mut file)?;
Ok(BytesDownloaded { num_bytes })
}
fn get_ws_scheme(secure: bool) -> &'static str {
if secure {
return "wss";
}
"ws"
}
pub async fn subscribe(
url: &str,
secure: bool,
) -> Result<impl StreamExt<Item = Result<SchedulerEvent, ClientWebsocketError>>, ClientError> {
let mut abs_url = get_abs_url(url, "/api/v1/scheduler/ws")?;
if abs_url.set_scheme(get_ws_scheme(secure)).is_err() {
return Err(ClientError::UrlSchemeConversion);
};
let (ws_stream, _) = tokio_tungstenite::connect_async(abs_url).await?;
fn text_only(msg: &Result<Message, tungstenite::Error>) -> bool {
match msg {
Err(_) => true,
Ok(msg) => msg.is_text(),
}
}
fn deserialize_msg(
msg: Result<Message, tungstenite::Error>,
) -> Result<SchedulerEvent, ClientWebsocketError> {
match msg {
Err(error) => Err(ClientWebsocketError::Websocket(error)),
Ok(msg) => match serde_json::from_str::<SchedulerEventResult>(&msg.to_string()) {
Err(error) => Err(ClientWebsocketError::MalformedEvent(error)),
Ok(event_result) => match event_result {
SchedulerEventResult::Lag(lag) => Err(ClientWebsocketError::Lag(lag)),
SchedulerEventResult::Event(event) => Ok(event),
},
},
}
}
let output_stream = ws_stream.filter(text_only).map(deserialize_msg);
Ok(output_stream)
}
pub async fn submit(url: &str, flow: &Flow) -> Result<Okay, ClientError> {
let abs_url = get_abs_url(url, "/api/v1/job")?;
let client = reqwest::Client::new();
check_status_take(client.post(abs_url).json(flow).send().await?).await
}