use crate::{config::Configuration, Error, ErrorResponse, Result};
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, TryStream, TryStreamExt};
use http::{self, StatusCode};
use serde::de::DeserializeOwned;
use serde_json::{self, Value};
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StatusDetails {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub name: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub group: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub kind: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub uid: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub causes: Vec<StatusCause>,
#[serde(default, skip_serializing_if = "num::Zero::is_zero")]
pub retry_after_seconds: u32,
}
#[derive(Deserialize, Debug)]
pub struct StatusCause {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub reason: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub message: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub field: String,
}
#[derive(Deserialize, Debug)]
pub struct Status {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub status: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub message: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub reason: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub details: Option<StatusDetails>,
#[serde(default, skip_serializing_if = "num::Zero::is_zero")]
pub code: u16,
}
#[derive(Clone)]
pub struct Client {
configuration: Configuration,
}
impl From<Configuration> for Client {
fn from(configuration: Configuration) -> Self {
Self { configuration }
}
}
impl Client {
pub async fn infer() -> Result<Self> {
let configuration = crate::config::Configuration::infer().await?;
Ok(Self { configuration })
}
async fn send(&self, request: http::Request<Vec<u8>>) -> Result<reqwest::Response> {
let (parts, body) = request.into_parts();
let uri_str = format!("{}{}", self.configuration.base_path, parts.uri);
trace!("{} {}", parts.method, uri_str);
let req = match parts.method {
http::Method::GET => self.configuration.client.get(&uri_str),
http::Method::POST => self.configuration.client.post(&uri_str),
http::Method::DELETE => self.configuration.client.delete(&uri_str),
http::Method::PUT => self.configuration.client.put(&uri_str),
http::Method::PATCH => self.configuration.client.patch(&uri_str),
other => return Err(Error::InvalidMethod(other.to_string())),
}
.headers(parts.headers)
.body(body)
.build()?;
let res = self.configuration.client.execute(req).await?;
Ok(res)
}
pub async fn request<T>(&self, request: http::Request<Vec<u8>>) -> Result<T>
where
T: DeserializeOwned,
{
let res: reqwest::Response = self.send(request).await?;
trace!("{} {}", res.status().as_str(), res.url());
let s = res.status();
let text = res.text().await?;
handle_api_errors(&text, s)?;
serde_json::from_str(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})
}
pub async fn request_text(&self, request: http::Request<Vec<u8>>) -> Result<String> {
let res: reqwest::Response = self.send(request).await?;
trace!("{} {}", res.status().as_str(), res.url());
let s = res.status();
let text = res.text().await?;
handle_api_errors(&text, s)?;
Ok(text)
}
pub async fn request_text_stream(
&self,
request: http::Request<Vec<u8>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
let res: reqwest::Response = self.send(request).await?;
trace!("{} {}", res.status().as_str(), res.url());
Ok(res.bytes_stream().map_err(Error::ReqwestError))
}
pub async fn request_status<T>(&self, request: http::Request<Vec<u8>>) -> Result<Either<T, Status>>
where
T: DeserializeOwned,
{
let res: reqwest::Response = self.send(request).await?;
trace!("{} {}", res.status().as_str(), res.url());
let s = res.status();
let text = res.text().await?;
handle_api_errors(&text, s)?;
let v: Value = serde_json::from_str(&text)?;
if v["kind"] == "Status" {
trace!("Status from {}", text);
Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
} else {
Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
}
}
pub async fn request_events<T>(
&self,
request: http::Request<Vec<u8>>,
) -> Result<impl TryStream<Item = Result<T>>>
where
T: DeserializeOwned,
{
let res: reqwest::Response = self.send(request).await?;
trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
trace!("headers: {:?}", res.headers());
let stream = futures::stream::try_unfold((res, Vec::new()), |(mut resp, _buff)| {
async {
let mut buff = _buff;
loop {
trace!("Await chunk");
match resp.chunk().await {
Ok(Some(chunk)) => {
trace!("Some chunk of len {}", chunk.len());
buff.extend_from_slice(&chunk);
if chunk.contains(&b'\n') {
let mut new_buff = Vec::new();
let mut items = Vec::new();
for line in buff.split(|x| x == &b'\n') {
new_buff.extend_from_slice(&line);
match serde_json::from_slice(&new_buff) {
Ok(val) => {
new_buff.clear();
items.push(Ok(val));
}
Err(e) => {
if !e.is_eof() {
let e = match serde_json::from_slice(&new_buff) {
Ok(e) => Error::Api(e),
_ => {
let line = String::from_utf8_lossy(line);
warn!("Failed to parse: {}", line);
Error::SerdeError(e)
}
};
new_buff.clear();
items.push(Err(e));
}
}
}
}
return Ok(Some((items, (resp, new_buff))));
}
}
Ok(None) => {
trace!("None chunk");
return Ok(None);
}
Err(e) => {
if e.is_timeout() {
warn!("timeout in poll: {}", e);
return Ok(None);
}
let inner = e.to_string();
if inner.contains("unexpected EOF during chunk") {
warn!("eof in poll: {}", e);
return Ok(None);
} else {
error!("err poll: {:?} - {}", e, inner);
return Err(Error::ReqwestError(e));
}
}
}
}
}
});
Ok(stream.map_ok(futures::stream::iter).try_flatten())
}
}
fn handle_api_errors(text: &str, s: StatusCode) -> Result<()> {
if s.is_client_error() || s.is_server_error() {
if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(text) {
debug!("Unsuccessful: {:?}", errdata);
Err(Error::Api(errdata))
} else {
warn!("Unsuccessful data error parse: {}", text);
let ae = ErrorResponse {
status: s.to_string(),
code: s.as_u16(),
message: format!("{:?}", text),
reason: "Failed to parse error data".into(),
};
debug!("Unsuccessful: {:?} (reconstruct)", ae);
Err(Error::Api(ae))
}
} else {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::Status;
#[test]
fn delete_deserialize_test() {
let statusresp = r#"{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success","details":{"name":"some-app","group":"clux.dev","kind":"foos","uid":"1234-some-uid"}}"#;
let s: Status = serde_json::from_str::<Status>(statusresp).unwrap();
assert_eq!(s.details.unwrap().name, "some-app");
let statusnoname = r#"{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success","details":{"group":"clux.dev","kind":"foos","uid":"1234-some-uid"}}"#;
let s2: Status = serde_json::from_str::<Status>(statusnoname).unwrap();
assert_eq!(s2.details.unwrap().name, "");
}
}