use crate::{
api::{Meta, WatchEvent},
config::Config,
error::ErrorResponse,
Error, Result,
};
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, TryStream, TryStreamExt};
use http::{self, Request, StatusCode};
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
use serde::{de::DeserializeOwned, Deserialize};
use serde_json::{self, Value};
use std::convert::TryFrom;
#[derive(Clone)]
pub struct Client {
cluster_url: reqwest::Url,
default_ns: String,
inner: reqwest::Client,
config: Config,
}
impl Client {
pub fn new(config: Config) -> Self {
Self::try_from(config).expect("Could not create a client from the supplied config")
}
pub async fn try_default() -> Result<Self> {
let client_config = Config::infer().await?;
Self::try_from(client_config)
}
async fn send(&self, request: http::Request<Vec<u8>>) -> Result<reqwest::Response> {
let (parts, body) = request.into_parts();
let pandq = parts.uri.path_and_query().expect("valid path+query from kube");
let uri_str = finalize_url(&self.cluster_url, &pandq);
let mut headers = parts.headers;
if let Some(auth_header) = self.config.get_auth_header().await? {
headers.insert(reqwest::header::AUTHORIZATION, auth_header);
}
let request = match parts.method {
http::Method::GET
| http::Method::POST
| http::Method::DELETE
| http::Method::PUT
| http::Method::PATCH => self.inner.request(parts.method, &uri_str),
other => return Err(Error::InvalidMethod(other.to_string())),
};
let req = request.headers(headers).body(body).build()?;
let res = self.inner.execute(req).await?;
Ok(res)
}
pub async fn request<T>(&self, request: http::Request<Vec<u8>>) -> Result<T>
where
T: DeserializeOwned,
{
let text = self.request_text(request).await?;
serde_json::from_str(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})
}
pub async fn request_text(&self, request: http::Request<Vec<u8>>) -> Result<String> {
let res: reqwest::Response = self.send(request).await?;
trace!("Status = {:?} for {}", res.status(), res.url());
let s = res.status();
let text = res.text().await?;
handle_api_errors(&text, s)?;
Ok(text)
}
pub async fn request_text_stream(
&self,
request: http::Request<Vec<u8>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
let res: reqwest::Response = self.send(request).await?;
trace!("Status = {:?} for {}", res.status(), res.url());
Ok(res.bytes_stream().map_err(Error::ReqwestError))
}
pub async fn request_status<T>(&self, request: http::Request<Vec<u8>>) -> Result<Either<T, Status>>
where
T: DeserializeOwned,
{
let res: reqwest::Response = self.send(request).await?;
trace!("Status = {:?} for {}", res.status(), res.url());
let s = res.status();
let text = res.text().await?;
handle_api_errors(&text, s)?;
let v: Value = serde_json::from_str(&text)?;
if v["kind"] == "Status" {
trace!("Status from {}", text);
Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
} else {
Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
}
}
pub async fn request_events<T: Clone + Meta>(
&self,
request: http::Request<Vec<u8>>,
) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
where
T: DeserializeOwned,
{
let res: reqwest::Response = self.send(request).await?;
trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
trace!("headers: {:?}", res.headers());
let stream = futures::stream::try_unfold((res, Vec::new()), |(mut resp, _buff)| {
async {
let mut buff = _buff;
loop {
trace!("Await chunk");
match resp.chunk().await {
Ok(Some(chunk)) => {
trace!("Some chunk of len {}", chunk.len());
buff.extend_from_slice(&chunk);
if chunk.contains(&b'\n') {
let mut new_buff = Vec::new();
let mut items = Vec::new();
for line in buff.split(|x| x == &b'\n') {
new_buff.extend_from_slice(&line);
match serde_json::from_slice(&new_buff) {
Ok(val) => {
new_buff.clear();
items.push(Ok(val));
}
Err(e) => {
if !e.is_eof() {
let e = match serde_json::from_slice(&new_buff) {
Ok(e) => Error::Api(e),
_ => {
let line = String::from_utf8_lossy(line);
warn!("Failed to parse: {}", line);
match resp.error_for_status_ref() {
Ok(_) => Error::SerdeError(e),
Err(e) => Error::ReqwestError(e),
}
}
};
new_buff.clear();
items.push(Err(e));
}
}
}
}
return Ok(Some((items, (resp, new_buff))));
}
}
Ok(None) => {
trace!("None chunk");
return Ok(None);
}
Err(e) => {
if e.is_timeout() {
warn!("timeout in poll: {}", e);
return Ok(None);
}
let inner = e.to_string();
if inner.contains("unexpected EOF during chunk") {
warn!("eof in poll: {}", e);
return Ok(None);
} else {
error!("err poll: {:?} - {}", e, inner);
return Err(Error::ReqwestError(e));
}
}
}
}
}
});
Ok(stream.map_ok(futures::stream::iter).try_flatten())
}
pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
self.request(Request::builder().uri("/version").body(vec![])?)
.await
}
pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
self.request(Request::builder().uri("/apis").body(vec![])?).await
}
pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
let url = format!("/apis/{}", apiversion);
self.request(Request::builder().uri(url).body(vec![])?).await
}
pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
self.request(Request::builder().uri("/api").body(vec![])?).await
}
pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
let url = format!("/api/{}", version);
self.request(Request::builder().uri(url).body(vec![])?).await
}
}
fn handle_api_errors(text: &str, s: StatusCode) -> Result<()> {
if s.is_client_error() || s.is_server_error() {
if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(text) {
debug!("Unsuccessful: {:?}", errdata);
Err(Error::Api(errdata))
} else {
warn!("Unsuccessful data error parse: {}", text);
let ae = ErrorResponse {
status: s.to_string(),
code: s.as_u16(),
message: format!("{:?}", text),
reason: "Failed to parse error data".into(),
};
debug!("Unsuccessful: {:?} (reconstruct)", ae);
Err(Error::Api(ae))
}
} else {
Ok(())
}
}
impl TryFrom<Config> for Client {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
let cluster_url = config.cluster_url.clone();
let default_ns = config.default_ns.clone();
let config_clone = config.clone();
let builder: reqwest::ClientBuilder = config.into();
Ok(Self {
cluster_url,
default_ns,
inner: builder.build()?,
config: config_clone,
})
}
}
impl From<Config> for reqwest::ClientBuilder {
fn from(config: Config) -> Self {
let mut builder = Self::new();
if let Some(i) = &config.proxy {
builder = builder.proxy(i.clone())
}
if let Some(i) = config.identity() {
builder = builder.identity(i)
}
if let Some(c) = config.root_cert {
for cert in c {
builder = builder.add_root_certificate(cert);
}
}
if let Some(t) = config.timeout {
builder = builder.timeout(t);
}
builder = builder.default_headers(config.headers);
builder = builder.danger_accept_invalid_certs(config.accept_invalid_certs);
builder
}
}
#[allow(missing_docs)]
#[derive(Deserialize, Debug)]
pub struct Status {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub status: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub message: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub reason: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub details: Option<StatusDetails>,
#[serde(default, skip_serializing_if = "num::Zero::is_zero")]
pub code: u16,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
#[allow(missing_docs)]
pub struct StatusDetails {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub name: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub group: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub kind: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub uid: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub causes: Vec<StatusCause>,
#[serde(default, skip_serializing_if = "num::Zero::is_zero")]
pub retry_after_seconds: u32,
}
#[derive(Deserialize, Debug)]
#[allow(missing_docs)]
pub struct StatusCause {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub reason: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub message: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub field: String,
}
fn finalize_url(cluster_url: &reqwest::Url, request_pandq: &http::uri::PathAndQuery) -> String {
let base = cluster_url.as_str().trim_end_matches('/');
format!("{}{}", base, request_pandq)
}
#[cfg(test)]
mod test {
use super::Status;
#[test]
fn delete_deserialize_test() {
let statusresp = r#"{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success","details":{"name":"some-app","group":"clux.dev","kind":"foos","uid":"1234-some-uid"}}"#;
let s: Status = serde_json::from_str::<Status>(statusresp).unwrap();
assert_eq!(s.details.unwrap().name, "some-app");
let statusnoname = r#"{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success","details":{"group":"clux.dev","kind":"foos","uid":"1234-some-uid"}}"#;
let s2: Status = serde_json::from_str::<Status>(statusnoname).unwrap();
assert_eq!(s2.details.unwrap().name, "");
}
#[test]
fn normal_host() {
let minikube_host = "https://192.168.1.65:8443";
let cluster_url = reqwest::Url::parse(minikube_host).unwrap();
let apipath: http::Uri = "/api/v1/nodes?hi=yes".parse().unwrap();
let pandq = apipath.path_and_query().expect("could pandq apipath");
let final_url = super::finalize_url(&cluster_url, &pandq);
assert_eq!(
final_url.as_str(),
"https://192.168.1.65:8443/api/v1/nodes?hi=yes"
);
}
#[test]
fn rancher_host() {
let rancher_host = "https://hostname/foo/bar";
let cluster_url = reqwest::Url::parse(rancher_host).unwrap();
assert_eq!(cluster_url.host_str().unwrap(), "hostname");
assert_eq!(cluster_url.path(), "/foo/bar");
assert_eq!(cluster_url.join("/api/v1/nodes").unwrap().path(), "/api/v1/nodes");
let apipath: http::Uri = "/api/v1/nodes?hi=yes".parse().unwrap();
let pandq = apipath.path_and_query().expect("could pandq apipath");
let final_url = super::finalize_url(&cluster_url, &pandq);
assert_eq!(final_url.as_str(), "https://hostname/foo/bar/api/v1/nodes?hi=yes");
}
}