1use either::{Either, Left, Right};
11use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")]
15use hyper_util::rt::TokioIo;
16use jiff::Timestamp;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
18use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status};
19use serde::de::DeserializeOwned;
20use serde_json::{self, Value};
21#[cfg(feature = "ws")]
22use tokio_tungstenite::{WebSocketStream, tungstenite as ws};
23use tokio_util::{
24 codec::{FramedRead, LinesCodec, LinesCodecError},
25 io::StreamReader,
26};
27use tower::{BoxError, Service, ServiceExt as _, buffer::Buffer};
28use tower_http::ServiceExt as _;
29
30pub use self::body::Body;
31use crate::{Config, Error, Result, api::WatchEvent, config::Kubeconfig};
32
33mod auth;
34mod body;
35mod builder;
36pub use kube_core::discovery::v2::{
37 APIGroupDiscovery, APIGroupDiscoveryList, APIResourceDiscovery, APISubresourceDiscovery,
38 APIVersionDiscovery, GroupVersionKind as DiscoveryGroupVersionKind,
39};
40#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
41#[cfg(feature = "unstable-client")]
42mod client_ext;
43#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
44#[cfg(feature = "unstable-client")]
45pub use client_ext::scope;
46mod config_ext;
47pub use auth::Error as AuthError;
48pub use config_ext::ConfigExt;
49pub mod middleware;
50pub mod retry;
51
52#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))]
53mod tls;
54
55#[cfg(feature = "openssl-tls")]
56pub use tls::openssl_tls::Error as OpensslTlsError;
57#[cfg(feature = "rustls-tls")]
58pub use tls::rustls_tls::Error as RustlsTlsError;
59#[cfg(feature = "ws")]
60mod upgrade;
61
62#[cfg(feature = "oauth")]
63#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
64pub use auth::OAuthError;
65
66#[cfg(feature = "oidc")]
67#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
68pub use auth::oidc_errors;
69
70#[cfg(feature = "ws")]
71pub use upgrade::UpgradeConnectionError;
72
73#[cfg(feature = "kubelet-debug")]
74#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
75mod kubelet_debug;
76
77pub use builder::{ClientBuilder, DynBody};
78
79#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
86#[derive(Clone)]
87pub struct Client {
88 inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
91 default_ns: String,
92 valid_until: Option<Timestamp>,
93}
94
95#[cfg(feature = "ws")]
98#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
99pub struct Connection {
100 stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
101 protocol: upgrade::StreamProtocol,
102}
103
104#[cfg(feature = "ws")]
105#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
106impl Connection {
107 pub fn supports_stream_close(&self) -> bool {
109 self.protocol.supports_stream_close()
110 }
111
112 pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
114 self.stream
115 }
116}
117
118impl Client {
124 pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
154 where
155 S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
156 S::Future: Send + 'static,
157 S::Error: Into<BoxError>,
158 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
159 B::Error: Into<BoxError>,
160 T: Into<String>,
161 {
162 let service = service
164 .map_response_body(Body::wrap_body)
165 .map_err(Into::into)
166 .boxed();
167 Self {
168 inner: Buffer::new(service, 1024),
169 default_ns: default_namespace.into(),
170 valid_until: None,
171 }
172 }
173
174 pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
176 Client { valid_until, ..self }
177 }
178
179 pub fn valid_until(&self) -> &Option<Timestamp> {
181 &self.valid_until
182 }
183
184 pub async fn try_default() -> Result<Self> {
202 Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
203 }
204
205 pub fn default_namespace(&self) -> &str {
211 &self.default_ns
212 }
213
214 pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
218 let mut svc = self.inner.clone();
219 let res = svc
220 .ready()
221 .await
222 .map_err(Error::Service)?
223 .call(request)
224 .await
225 .map_err(|err| {
226 err.downcast::<Error>()
228 .map(|e| *e)
229 .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
231 .unwrap_or_else(Error::Service)
233 })?;
234 Ok(res)
235 }
236
237 #[cfg(feature = "ws")]
239 #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
240 pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
241 use http::header::HeaderValue;
242 let (mut parts, body) = request.into_parts();
243 parts
244 .headers
245 .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
246 parts
247 .headers
248 .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
249 parts.headers.insert(
250 http::header::SEC_WEBSOCKET_VERSION,
251 HeaderValue::from_static("13"),
252 );
253 let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
254 parts.headers.insert(
255 http::header::SEC_WEBSOCKET_KEY,
256 key.parse().expect("valid header value"),
257 );
258 upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
259
260 let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
261 let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
262 match hyper::upgrade::on(res).await {
263 Ok(upgraded) => Ok(Connection {
264 stream: WebSocketStream::from_raw_socket(
265 TokioIo::new(upgraded),
266 ws::protocol::Role::Client,
267 None,
268 )
269 .await,
270 protocol,
271 }),
272
273 Err(e) => Err(Error::UpgradeConnection(
274 UpgradeConnectionError::GetPendingUpgrade(e),
275 )),
276 }
277 }
278
279 pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
282 where
283 T: DeserializeOwned,
284 {
285 let text = self.request_text(request).await?;
286
287 serde_json::from_str(&text).map_err(|e| {
288 tracing::warn!("{}, {:?}", text, e);
289 Error::SerdeError(e)
290 })
291 }
292
293 pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
296 let res = self.send(request.map(Body::from)).await?;
297 let res = handle_api_errors(res).await?;
298 let body_bytes = res.into_body().collect().await?.to_bytes();
299 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
300 Ok(text)
301 }
302
303 pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
308 let res = self.send(request.map(Body::from)).await?;
309 let res = handle_api_errors(res).await?;
310 let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
313 Ok(body.into_async_read())
314 }
315
316 pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
319 where
320 T: DeserializeOwned,
321 {
322 let text = self.request_text(request).await?;
323 let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
325 if v["kind"] == "Status" {
326 tracing::trace!("Status from {}", text);
327 Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
328 tracing::warn!("{}, {:?}", text, e);
329 Error::SerdeError(e)
330 })?))
331 } else {
332 Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
333 tracing::warn!("{}, {:?}", text, e);
334 Error::SerdeError(e)
335 })?))
336 }
337 }
338
339 pub async fn request_events<T>(
341 &self,
342 request: Request<Vec<u8>>,
343 ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
344 where
345 T: Clone + DeserializeOwned,
346 {
347 let res = self.send(request.map(Body::from)).await?;
348 tracing::trace!("headers: {:?}", res.headers());
350
351 let frames = FramedRead::new(
352 StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
353 if e.to_string().contains("unexpected EOF during chunk") {
356 return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
357 }
358 std::io::Error::other(e)
359 })),
360 LinesCodec::new(),
361 );
362
363 Ok(frames.filter_map(|res| async {
364 match res {
365 Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
366 Ok(event) => Some(Ok(event)),
367 Err(e) => {
368 if e.is_eof() {
370 return None;
371 }
372
373 if let Ok(status) = serde_json::from_str::<Status>(&line) {
375 return Some(Err(Error::Api(status.boxed())));
376 }
377 Some(Err(Error::SerdeError(e)))
379 }
380 },
381
382 Err(LinesCodecError::Io(e)) => match e.kind() {
383 std::io::ErrorKind::TimedOut => {
385 tracing::warn!("timeout in poll: {}", e); None
387 }
388 std::io::ErrorKind::UnexpectedEof => {
391 tracing::warn!("eof in poll: {}", e);
392 None
393 }
394 _ => Some(Err(Error::ReadEvents(e))),
395 },
396
397 Err(LinesCodecError::MaxLineLengthExceeded) => {
400 Some(Err(Error::LinesCodecMaxLineLengthExceeded))
401 }
402 }
403 }))
404 }
405}
406
407impl Client {
413 pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
415 self.request(Request::get("/version").body(vec![]).map_err(Error::HttpError)?)
416 .await
417 }
418
419 pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
421 self.request(Request::get("/apis").body(vec![]).map_err(Error::HttpError)?)
422 .await
423 }
424
425 pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
444 let url = format!("/apis/{apiversion}");
445 self.request(Request::get(url).body(vec![]).map_err(Error::HttpError)?)
446 .await
447 }
448
449 pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
451 self.request(Request::get("/api").body(vec![]).map_err(Error::HttpError)?)
452 .await
453 }
454
455 pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
457 let url = format!("/api/{version}");
458 self.request(Request::get(url).body(vec![]).map_err(Error::HttpError)?)
459 .await
460 }
461}
462
463impl Client {
469 pub async fn list_api_groups_aggregated(&self) -> Result<APIGroupDiscoveryList> {
494 self.request(
495 Request::get("/apis")
496 .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
497 .body(vec![])
498 .map_err(Error::HttpError)?,
499 )
500 .await
501 }
502
503 pub async fn list_core_api_versions_aggregated(&self) -> Result<APIGroupDiscoveryList> {
526 self.request(
527 Request::get("/api")
528 .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
529 .body(vec![])
530 .map_err(Error::HttpError)?,
531 )
532 .await
533 }
534}
535
536async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
544 let status = res.status();
545 if status.is_client_error() || status.is_server_error() {
546 let body_bytes = res.into_body().collect().await?.to_bytes();
548 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
549 if let Ok(status) = serde_json::from_str::<Status>(&text) {
552 tracing::debug!("Unsuccessful: {status:?}");
553 Err(Error::Api(status.boxed()))
554 } else {
555 tracing::warn!("Unsuccessful data error parse: {text}");
556 let status = Status::failure(&text, "Failed to parse error data").with_code(status.as_u16());
557 tracing::debug!("Unsuccessful: {status:?} (reconstruct)");
558 Err(Error::Api(status.boxed()))
559 }
560 } else {
561 Ok(res)
562 }
563}
564
565impl TryFrom<Config> for Client {
566 type Error = Error;
567
568 fn try_from(config: Config) -> Result<Self> {
572 Ok(ClientBuilder::try_from(config)?.build())
573 }
574}
575
576impl TryFrom<Kubeconfig> for Client {
577 type Error = Error;
578
579 fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
580 let config = Config::try_from(kubeconfig)?;
581 Ok(ClientBuilder::try_from(config)?.build())
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use std::pin::pin;
588
589 use crate::{
590 Api, Client,
591 client::Body,
592 config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
593 };
594
595 use http::{Request, Response};
596 use k8s_openapi::api::core::v1::Pod;
597 use tower_test::mock;
598
599 #[tokio::test]
600 async fn test_default_ns() {
601 let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
602 let client = Client::new(mock_service, "test-namespace");
603 assert_eq!(client.default_namespace(), "test-namespace");
604 }
605
606 #[tokio::test]
607 async fn test_try_from_kubeconfig() {
608 let config = Kubeconfig {
609 current_context: Some("test-context".to_string()),
610 auth_infos: vec![NamedAuthInfo {
611 name: "test-user".to_string(),
612 auth_info: Some(AuthInfo::default()), }],
614 contexts: vec![NamedContext {
615 name: "test-context".to_string(),
616 context: Some(Context {
617 cluster: "test-cluster".to_string(),
618 user: Some("test-user".to_string()),
619 namespace: Some("test-namespace".to_string()),
620 ..Default::default()
621 }),
622 }],
623 clusters: vec![NamedCluster {
624 name: "test-cluster".to_string(),
625 cluster: Some(Cluster {
626 server: Some("http://localhost:8080".to_string()),
627 ..Default::default()
628 }),
629 }],
630 ..Default::default()
631 };
632 let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
633 assert_eq!(client.default_namespace(), "test-namespace");
634 }
635
636 #[tokio::test]
637 async fn test_mock() {
638 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
639 let spawned = tokio::spawn(async move {
640 let mut handle = pin!(handle);
642 let (request, send) = handle.next_request().await.expect("service not called");
643 assert_eq!(request.method(), http::Method::GET);
644 assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
645 let pod: Pod = serde_json::from_value(serde_json::json!({
646 "apiVersion": "v1",
647 "kind": "Pod",
648 "metadata": {
649 "name": "test",
650 "annotations": { "kube-rs": "test" },
651 },
652 "spec": {
653 "containers": [{ "name": "test", "image": "test-image" }],
654 }
655 }))
656 .unwrap();
657 send.send_response(Response::new(Body::from(serde_json::to_vec(&pod).unwrap())));
658 });
659
660 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
661 let pod = pods.get("test").await.unwrap();
662 assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
663 spawned.await.unwrap();
664 }
665}