use crate::{
api::{Meta, WatchEvent},
config::Config,
error::ErrorResponse,
service::Service,
Error, Result,
};
#[cfg(feature = "ws")]
use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use hyper::Body;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
use serde::{de::DeserializeOwned, Deserialize};
use serde_json::{self, Value};
use tokio_util::{
codec::{FramedRead, LinesCodec, LinesCodecError},
io::StreamReader,
};
use tower::{Service as _, ServiceExt};
use std::convert::{TryFrom, TryInto};
#[derive(Clone)]
pub struct Client {
inner: Service,
}
impl Client {
pub fn new(service: Service) -> Self {
Self { inner: service }
}
pub async fn try_default() -> Result<Self> {
let client_config = Config::infer().await?;
Self::try_from(client_config)
}
async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
let mut svc = self.inner.clone();
let res = svc
.ready()
.await
.map_err(Error::Service)?
.call(request)
.await
.map_err(|err| {
if err.is::<Error>() {
*err.downcast::<Error>().expect("kube::Error")
} else if err.is::<hyper::Error>() {
Error::HyperError(*err.downcast::<hyper::Error>().expect("hyper::Error"))
} else {
Error::Service(err)
}
})?;
Ok(res)
}
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect(
&self,
request: Request<Vec<u8>>,
) -> Result<WebSocketStream<hyper::upgrade::Upgraded>> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
parts
.headers
.insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
parts
.headers
.insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
parts.headers.insert(
http::header::SEC_WEBSOCKET_VERSION,
HeaderValue::from_static("13"),
);
let key = "kube";
parts.headers.insert(
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
);
parts.headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static("v4.channel.k8s.io"),
);
let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(Error::ProtocolSwitch(res.status()));
}
match hyper::upgrade::on(res).await {
Ok(upgraded) => {
Ok(WebSocketStream::from_raw_socket(upgraded, ws::protocol::Role::Client, None).await)
}
Err(e) => Err(Error::HyperError(e)),
}
}
pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
where
T: DeserializeOwned,
{
let text = self.request_text(request).await?;
serde_json::from_str(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})
}
pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
let res = self.send(request.map(Body::from)).await?;
let status = res.status();
let body_bytes = hyper::body::to_bytes(res.into_body()).await?;
let text = String::from_utf8(body_bytes.to_vec())?;
handle_api_errors(&text, status)?;
Ok(text)
}
pub async fn request_text_stream(
&self,
request: Request<Vec<u8>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
let res = self.send(request.map(Body::from)).await?;
Ok(res.into_body().map_err(Error::HyperError))
}
pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
where
T: DeserializeOwned,
{
let text = self.request_text(request).await?;
let v: Value = serde_json::from_str(&text)?;
if v["kind"] == "Status" {
trace!("Status from {}", text);
Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
} else {
Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
}
}
pub async fn request_events<T: Clone + Meta>(
&self,
request: Request<Vec<u8>>,
) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
where
T: DeserializeOwned,
{
let res = self.send(request.map(Body::from)).await?;
trace!("headers: {:?}", res.headers());
let frames = FramedRead::new(
StreamReader::new(res.into_body().map_err(|e| {
if e.is_timeout() {
return std::io::Error::new(std::io::ErrorKind::TimedOut, e);
}
if e.to_string().contains("unexpected EOF during chunk") {
return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
}
std::io::Error::new(std::io::ErrorKind::Other, e)
})),
LinesCodec::new(),
);
Ok(frames.filter_map(|res| async {
match res {
Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
Ok(event) => Some(Ok(event)),
Err(e) => {
if e.is_eof() {
return None;
}
if let Ok(e_resp) = serde_json::from_str::<ErrorResponse>(&line) {
return Some(Err(Error::Api(e_resp)));
}
Some(Err(Error::SerdeError(e)))
}
},
Err(LinesCodecError::Io(e)) => match e.kind() {
std::io::ErrorKind::TimedOut => {
warn!("timeout in poll: {}", e);
None
}
std::io::ErrorKind::UnexpectedEof => {
warn!("eof in poll: {}", e);
None
}
_ => Some(Err(Error::ReadEvents(e))),
},
Err(LinesCodecError::MaxLineLengthExceeded) => {
Some(Err(Error::LinesCodecMaxLineLengthExceeded))
}
}
}))
}
pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
self.request(Request::builder().uri("/version").body(vec![])?)
.await
}
pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
self.request(Request::builder().uri("/apis").body(vec![])?).await
}
pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
let url = format!("/apis/{}", apiversion);
self.request(Request::builder().uri(url).body(vec![])?).await
}
pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
self.request(Request::builder().uri("/api").body(vec![])?).await
}
pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
let url = format!("/api/{}", version);
self.request(Request::builder().uri(url).body(vec![])?).await
}
}
fn handle_api_errors(text: &str, s: StatusCode) -> Result<()> {
if s.is_client_error() || s.is_server_error() {
if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(text) {
debug!("Unsuccessful: {:?}", errdata);
Err(Error::Api(errdata))
} else {
warn!("Unsuccessful data error parse: {}", text);
let ae = ErrorResponse {
status: s.to_string(),
code: s.as_u16(),
message: format!("{:?}", text),
reason: "Failed to parse error data".into(),
};
debug!("Unsuccessful: {:?} (reconstruct)", ae);
Err(Error::Api(ae))
}
} else {
Ok(())
}
}
impl TryFrom<Config> for Client {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
Ok(Self::new(config.try_into()?))
}
}
#[allow(missing_docs)]
#[derive(Deserialize, Debug)]
pub struct Status {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub status: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub message: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub reason: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub details: Option<StatusDetails>,
#[serde(default, skip_serializing_if = "num::Zero::is_zero")]
pub code: u16,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
#[allow(missing_docs)]
pub struct StatusDetails {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub name: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub group: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub kind: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub uid: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub causes: Vec<StatusCause>,
#[serde(default, skip_serializing_if = "num::Zero::is_zero")]
pub retry_after_seconds: u32,
}
#[derive(Deserialize, Debug)]
#[allow(missing_docs)]
pub struct StatusCause {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub reason: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub message: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub field: String,
}
#[cfg(test)]
mod test {
use super::Status;
#[test]
fn delete_deserialize_test() {
let statusresp = r#"{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success","details":{"name":"some-app","group":"clux.dev","kind":"foos","uid":"1234-some-uid"}}"#;
let s: Status = serde_json::from_str::<Status>(statusresp).unwrap();
assert_eq!(s.details.unwrap().name, "some-app");
let statusnoname = r#"{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Success","details":{"group":"clux.dev","kind":"foos","uid":"1234-some-uid"}}"#;
let s2: Status = serde_json::from_str::<Status>(statusnoname).unwrap();
assert_eq!(s2.details.unwrap().name, "");
}
}