use chrono::{DateTime, Utc};
use either::{Either, Left, Right};
use futures::{future::BoxFuture, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response};
use http_body_util::BodyExt;
#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
pub use kube_core::response::Status;
use serde::de::DeserializeOwned;
use serde_json::{self, Value};
#[cfg(feature = "ws")]
use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
use tokio_util::{
codec::{FramedRead, LinesCodec, LinesCodecError},
io::StreamReader,
};
use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceExt};
use tower_http::map_response_body::MapResponseBodyLayer;
pub use self::body::Body;
use crate::{api::WatchEvent, config::Kubeconfig, error::ErrorResponse, Config, Error, Result};
mod auth;
mod body;
mod builder;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[cfg(feature = "unstable-client")]
mod client_ext;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[cfg(feature = "unstable-client")]
pub use client_ext::scope;
mod config_ext;
pub use auth::Error as AuthError;
pub use config_ext::ConfigExt;
pub mod middleware;
#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
#[cfg(feature = "openssl-tls")]
pub use tls::openssl_tls::Error as OpensslTlsError;
#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
#[cfg(feature = "ws")] mod upgrade;
#[cfg(feature = "oauth")]
#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
pub use auth::OAuthError;
#[cfg(feature = "oidc")]
#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
pub use auth::oidc_errors;
#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
#[cfg(feature = "kubelet-debug")]
#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
mod kubelet_debug;
pub use builder::{ClientBuilder, DynBody};
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
#[derive(Clone)]
pub struct Client {
inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
default_ns: String,
valid_until: Option<DateTime<Utc>>,
}
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub struct Connection {
stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
protocol: upgrade::StreamProtocol,
}
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Connection {
pub fn supports_stream_close(&self) -> bool {
self.protocol.supports_stream_close()
}
pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
self.stream
}
}
impl Client {
pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
where
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
T: Into<String>,
{
let service = MapResponseBodyLayer::new(Body::wrap_body)
.layer(service)
.map_err(|e| e.into());
Self {
inner: Buffer::new(BoxService::new(service), 1024),
default_ns: default_namespace.into(),
valid_until: None,
}
}
pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
Client { valid_until, ..self }
}
pub fn valid_until(&self) -> &Option<DateTime<Utc>> {
&self.valid_until
}
pub async fn try_default() -> Result<Self> {
Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
}
pub fn default_namespace(&self) -> &str {
&self.default_ns
}
pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
let mut svc = self.inner.clone();
let res = svc
.ready()
.await
.map_err(Error::Service)?
.call(request)
.await
.map_err(|err| {
err.downcast::<Error>()
.map(|e| *e)
.or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
.unwrap_or_else(Error::Service)
})?;
Ok(res)
}
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
parts
.headers
.insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
parts
.headers
.insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
parts.headers.insert(
http::header::SEC_WEBSOCKET_VERSION,
HeaderValue::from_static("13"),
);
let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
parts.headers.insert(
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
);
upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => Ok(Connection {
stream: WebSocketStream::from_raw_socket(
TokioIo::new(upgraded),
ws::protocol::Role::Client,
None,
)
.await,
protocol,
}),
Err(e) => Err(Error::UpgradeConnection(
UpgradeConnectionError::GetPendingUpgrade(e),
)),
}
}
pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
where
T: DeserializeOwned,
{
let text = self.request_text(request).await?;
serde_json::from_str(&text).map_err(|e| {
tracing::warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})
}
pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
let res = self.send(request.map(Body::from)).await?;
let res = handle_api_errors(res).await?;
let body_bytes = res.into_body().collect().await?.to_bytes();
let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
Ok(text)
}
pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
let res = self.send(request.map(Body::from)).await?;
let res = handle_api_errors(res).await?;
let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
Ok(body.into_async_read())
}
pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
where
T: DeserializeOwned,
{
let text = self.request_text(request).await?;
let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
if v["kind"] == "Status" {
tracing::trace!("Status from {}", text);
Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
tracing::warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
} else {
Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
tracing::warn!("{}, {:?}", text, e);
Error::SerdeError(e)
})?))
}
}
pub async fn request_events<T>(
&self,
request: Request<Vec<u8>>,
) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
where
T: Clone + DeserializeOwned,
{
let res = self.send(request.map(Body::from)).await?;
tracing::trace!("headers: {:?}", res.headers());
let frames = FramedRead::new(
StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
if e.to_string().contains("unexpected EOF during chunk") {
return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
}
std::io::Error::other(e)
})),
LinesCodec::new(),
);
Ok(frames.filter_map(|res| async {
match res {
Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
Ok(event) => Some(Ok(event)),
Err(e) => {
if e.is_eof() {
return None;
}
if let Ok(e_resp) = serde_json::from_str::<ErrorResponse>(&line) {
return Some(Err(Error::Api(e_resp)));
}
Some(Err(Error::SerdeError(e)))
}
},
Err(LinesCodecError::Io(e)) => match e.kind() {
std::io::ErrorKind::TimedOut => {
tracing::warn!("timeout in poll: {}", e); None
}
std::io::ErrorKind::UnexpectedEof => {
tracing::warn!("eof in poll: {}", e);
None
}
_ => Some(Err(Error::ReadEvents(e))),
},
Err(LinesCodecError::MaxLineLengthExceeded) => {
Some(Err(Error::LinesCodecMaxLineLengthExceeded))
}
}
}))
}
}
impl Client {
pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
self.request(
Request::builder()
.uri("/version")
.body(vec![])
.map_err(Error::HttpError)?,
)
.await
}
pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
self.request(
Request::builder()
.uri("/apis")
.body(vec![])
.map_err(Error::HttpError)?,
)
.await
}
pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
let url = format!("/apis/{apiversion}");
self.request(
Request::builder()
.uri(url)
.body(vec![])
.map_err(Error::HttpError)?,
)
.await
}
pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
self.request(
Request::builder()
.uri("/api")
.body(vec![])
.map_err(Error::HttpError)?,
)
.await
}
pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
let url = format!("/api/{version}");
self.request(
Request::builder()
.uri(url)
.body(vec![])
.map_err(Error::HttpError)?,
)
.await
}
}
async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
let status = res.status();
if status.is_client_error() || status.is_server_error() {
let body_bytes = res.into_body().collect().await?.to_bytes();
let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(&text) {
tracing::debug!("Unsuccessful: {errdata:?}");
Err(Error::Api(errdata))
} else {
tracing::warn!("Unsuccessful data error parse: {}", text);
let error_response = ErrorResponse {
status: status.to_string(),
code: status.as_u16(),
message: format!("{text:?}"),
reason: "Failed to parse error data".into(),
};
tracing::debug!("Unsuccessful: {error_response:?} (reconstruct)");
Err(Error::Api(error_response))
}
} else {
Ok(res)
}
}
impl TryFrom<Config> for Client {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
Ok(ClientBuilder::try_from(config)?.build())
}
}
impl TryFrom<Kubeconfig> for Client {
type Error = Error;
fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
let config = Config::try_from(kubeconfig)?;
Ok(ClientBuilder::try_from(config)?.build())
}
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use crate::{
client::Body,
config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
Api, Client,
};
use http::{Request, Response};
use k8s_openapi::api::core::v1::Pod;
use tower_test::mock;
#[tokio::test]
async fn test_default_ns() {
let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
let client = Client::new(mock_service, "test-namespace");
assert_eq!(client.default_namespace(), "test-namespace");
}
#[tokio::test]
async fn test_try_from_kubeconfig() {
let config = Kubeconfig {
current_context: Some("test-context".to_string()),
auth_infos: vec![NamedAuthInfo {
name: "test-user".to_string(),
auth_info: Some(AuthInfo::default()), }],
contexts: vec![NamedContext {
name: "test-context".to_string(),
context: Some(Context {
cluster: "test-cluster".to_string(),
user: Some("test-user".to_string()),
namespace: Some("test-namespace".to_string()),
..Default::default()
}),
}],
clusters: vec![NamedCluster {
name: "test-cluster".to_string(),
cluster: Some(Cluster {
server: Some("http://localhost:8080".to_string()),
..Default::default()
}),
}],
..Default::default()
};
let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
assert_eq!(client.default_namespace(), "test-namespace");
}
#[tokio::test]
async fn test_mock() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::GET);
assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
let pod: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "test",
"annotations": { "kube-rs": "test" },
},
"spec": {
"containers": [{ "name": "test", "image": "test-image" }],
}
}))
.unwrap();
send.send_response(
Response::builder()
.body(Body::from(serde_json::to_vec(&pod).unwrap()))
.unwrap(),
);
});
let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
let pod = pods.get("test").await.unwrap();
assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
spawned.await.unwrap();
}
}