1use either::{Either, Left, Right};
11use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
15use jiff::Timestamp;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
17use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status};
18use serde::de::DeserializeOwned;
19use serde_json::{self, Value};
20#[cfg(feature = "ws")]
21use tokio_tungstenite::{WebSocketStream, tungstenite as ws};
22use tokio_util::{
23 codec::{FramedRead, LinesCodec, LinesCodecError},
24 io::StreamReader,
25};
26use tower::{BoxError, Service, ServiceExt as _, buffer::Buffer};
27use tower_http::ServiceExt as _;
28
29pub use self::body::Body;
30use crate::{Config, Error, Result, api::WatchEvent, config::Kubeconfig};
31
32mod auth;
33mod body;
34mod builder;
35pub use kube_core::discovery::v2::{
36 APIGroupDiscovery, APIGroupDiscoveryList, APIResourceDiscovery, APISubresourceDiscovery,
37 APIVersionDiscovery, GroupVersionKind as DiscoveryGroupVersionKind,
38};
39#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
40#[cfg(feature = "unstable-client")]
41mod client_ext;
42#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
43#[cfg(feature = "unstable-client")]
44pub use client_ext::scope;
45mod config_ext;
46pub use auth::Error as AuthError;
47pub use config_ext::ConfigExt;
48pub mod middleware;
49pub mod retry;
50
51#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
52
53#[cfg(feature = "openssl-tls")]
54pub use tls::openssl_tls::Error as OpensslTlsError;
55#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
56#[cfg(feature = "ws")] mod upgrade;
57
58#[cfg(feature = "oauth")]
59#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
60pub use auth::OAuthError;
61
62#[cfg(feature = "oidc")]
63#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
64pub use auth::oidc_errors;
65
66#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
67
68#[cfg(feature = "kubelet-debug")]
69#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
70mod kubelet_debug;
71
72pub use builder::{ClientBuilder, DynBody};
73
74#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
81#[derive(Clone)]
82pub struct Client {
83 inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
86 default_ns: String,
87 valid_until: Option<Timestamp>,
88}
89
90#[cfg(feature = "ws")]
93#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
94pub struct Connection {
95 stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
96 protocol: upgrade::StreamProtocol,
97}
98
99#[cfg(feature = "ws")]
100#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
101impl Connection {
102 pub fn supports_stream_close(&self) -> bool {
104 self.protocol.supports_stream_close()
105 }
106
107 pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
109 self.stream
110 }
111}
112
113impl Client {
119 pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
149 where
150 S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
151 S::Future: Send + 'static,
152 S::Error: Into<BoxError>,
153 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
154 B::Error: Into<BoxError>,
155 T: Into<String>,
156 {
157 let service = service
159 .map_response_body(Body::wrap_body)
160 .map_err(Into::into)
161 .boxed();
162 Self {
163 inner: Buffer::new(service, 1024),
164 default_ns: default_namespace.into(),
165 valid_until: None,
166 }
167 }
168
169 pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
171 Client { valid_until, ..self }
172 }
173
174 pub fn valid_until(&self) -> &Option<Timestamp> {
176 &self.valid_until
177 }
178
179 pub async fn try_default() -> Result<Self> {
197 Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
198 }
199
200 pub fn default_namespace(&self) -> &str {
206 &self.default_ns
207 }
208
209 pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
213 let mut svc = self.inner.clone();
214 let res = svc
215 .ready()
216 .await
217 .map_err(Error::Service)?
218 .call(request)
219 .await
220 .map_err(|err| {
221 err.downcast::<Error>()
223 .map(|e| *e)
224 .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
226 .unwrap_or_else(Error::Service)
228 })?;
229 Ok(res)
230 }
231
232 #[cfg(feature = "ws")]
234 #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
235 pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
236 use http::header::HeaderValue;
237 let (mut parts, body) = request.into_parts();
238 parts
239 .headers
240 .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
241 parts
242 .headers
243 .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
244 parts.headers.insert(
245 http::header::SEC_WEBSOCKET_VERSION,
246 HeaderValue::from_static("13"),
247 );
248 let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
249 parts.headers.insert(
250 http::header::SEC_WEBSOCKET_KEY,
251 key.parse().expect("valid header value"),
252 );
253 upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
254
255 let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
256 let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
257 match hyper::upgrade::on(res).await {
258 Ok(upgraded) => Ok(Connection {
259 stream: WebSocketStream::from_raw_socket(
260 TokioIo::new(upgraded),
261 ws::protocol::Role::Client,
262 None,
263 )
264 .await,
265 protocol,
266 }),
267
268 Err(e) => Err(Error::UpgradeConnection(
269 UpgradeConnectionError::GetPendingUpgrade(e),
270 )),
271 }
272 }
273
274 pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
277 where
278 T: DeserializeOwned,
279 {
280 let text = self.request_text(request).await?;
281
282 serde_json::from_str(&text).map_err(|e| {
283 tracing::warn!("{}, {:?}", text, e);
284 Error::SerdeError(e)
285 })
286 }
287
288 pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
291 let res = self.send(request.map(Body::from)).await?;
292 let res = handle_api_errors(res).await?;
293 let body_bytes = res.into_body().collect().await?.to_bytes();
294 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
295 Ok(text)
296 }
297
298 pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
303 let res = self.send(request.map(Body::from)).await?;
304 let res = handle_api_errors(res).await?;
305 let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
308 Ok(body.into_async_read())
309 }
310
311 pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
314 where
315 T: DeserializeOwned,
316 {
317 let text = self.request_text(request).await?;
318 let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
320 if v["kind"] == "Status" {
321 tracing::trace!("Status from {}", text);
322 Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
323 tracing::warn!("{}, {:?}", text, e);
324 Error::SerdeError(e)
325 })?))
326 } else {
327 Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
328 tracing::warn!("{}, {:?}", text, e);
329 Error::SerdeError(e)
330 })?))
331 }
332 }
333
334 pub async fn request_events<T>(
336 &self,
337 request: Request<Vec<u8>>,
338 ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
339 where
340 T: Clone + DeserializeOwned,
341 {
342 let res = self.send(request.map(Body::from)).await?;
343 tracing::trace!("headers: {:?}", res.headers());
345
346 let frames = FramedRead::new(
347 StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
348 if e.to_string().contains("unexpected EOF during chunk") {
351 return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
352 }
353 std::io::Error::other(e)
354 })),
355 LinesCodec::new(),
356 );
357
358 Ok(frames.filter_map(|res| async {
359 match res {
360 Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
361 Ok(event) => Some(Ok(event)),
362 Err(e) => {
363 if e.is_eof() {
365 return None;
366 }
367
368 if let Ok(status) = serde_json::from_str::<Status>(&line) {
370 return Some(Err(Error::Api(status.boxed())));
371 }
372 Some(Err(Error::SerdeError(e)))
374 }
375 },
376
377 Err(LinesCodecError::Io(e)) => match e.kind() {
378 std::io::ErrorKind::TimedOut => {
380 tracing::warn!("timeout in poll: {}", e); None
382 }
383 std::io::ErrorKind::UnexpectedEof => {
386 tracing::warn!("eof in poll: {}", e);
387 None
388 }
389 _ => Some(Err(Error::ReadEvents(e))),
390 },
391
392 Err(LinesCodecError::MaxLineLengthExceeded) => {
395 Some(Err(Error::LinesCodecMaxLineLengthExceeded))
396 }
397 }
398 }))
399 }
400}
401
402impl Client {
408 pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
410 self.request(
411 Request::builder()
412 .uri("/version")
413 .body(vec![])
414 .map_err(Error::HttpError)?,
415 )
416 .await
417 }
418
419 pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
421 self.request(
422 Request::builder()
423 .uri("/apis")
424 .body(vec![])
425 .map_err(Error::HttpError)?,
426 )
427 .await
428 }
429
430 pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
449 let url = format!("/apis/{apiversion}");
450 self.request(
451 Request::builder()
452 .uri(url)
453 .body(vec![])
454 .map_err(Error::HttpError)?,
455 )
456 .await
457 }
458
459 pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
461 self.request(
462 Request::builder()
463 .uri("/api")
464 .body(vec![])
465 .map_err(Error::HttpError)?,
466 )
467 .await
468 }
469
470 pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
472 let url = format!("/api/{version}");
473 self.request(
474 Request::builder()
475 .uri(url)
476 .body(vec![])
477 .map_err(Error::HttpError)?,
478 )
479 .await
480 }
481}
482
483impl Client {
489 pub async fn list_api_groups_aggregated(&self) -> Result<APIGroupDiscoveryList> {
514 self.request(
515 Request::builder()
516 .uri("/apis")
517 .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
518 .body(vec![])
519 .map_err(Error::HttpError)?,
520 )
521 .await
522 }
523
524 pub async fn list_core_api_versions_aggregated(&self) -> Result<APIGroupDiscoveryList> {
547 self.request(
548 Request::builder()
549 .uri("/api")
550 .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
551 .body(vec![])
552 .map_err(Error::HttpError)?,
553 )
554 .await
555 }
556}
557
558async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
566 let status = res.status();
567 if status.is_client_error() || status.is_server_error() {
568 let body_bytes = res.into_body().collect().await?.to_bytes();
570 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
571 if let Ok(status) = serde_json::from_str::<Status>(&text) {
574 tracing::debug!("Unsuccessful: {status:?}");
575 Err(Error::Api(status.boxed()))
576 } else {
577 tracing::warn!("Unsuccessful data error parse: {text}");
578 let status = Status::failure(&text, "Failed to parse error data").with_code(status.as_u16());
579 tracing::debug!("Unsuccessful: {status:?} (reconstruct)");
580 Err(Error::Api(status.boxed()))
581 }
582 } else {
583 Ok(res)
584 }
585}
586
587impl TryFrom<Config> for Client {
588 type Error = Error;
589
590 fn try_from(config: Config) -> Result<Self> {
594 Ok(ClientBuilder::try_from(config)?.build())
595 }
596}
597
598impl TryFrom<Kubeconfig> for Client {
599 type Error = Error;
600
601 fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
602 let config = Config::try_from(kubeconfig)?;
603 Ok(ClientBuilder::try_from(config)?.build())
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use std::pin::pin;
610
611 use crate::{
612 Api, Client,
613 client::Body,
614 config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
615 };
616
617 use http::{Request, Response};
618 use k8s_openapi::api::core::v1::Pod;
619 use tower_test::mock;
620
621 #[tokio::test]
622 async fn test_default_ns() {
623 let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
624 let client = Client::new(mock_service, "test-namespace");
625 assert_eq!(client.default_namespace(), "test-namespace");
626 }
627
628 #[tokio::test]
629 async fn test_try_from_kubeconfig() {
630 let config = Kubeconfig {
631 current_context: Some("test-context".to_string()),
632 auth_infos: vec![NamedAuthInfo {
633 name: "test-user".to_string(),
634 auth_info: Some(AuthInfo::default()), }],
636 contexts: vec![NamedContext {
637 name: "test-context".to_string(),
638 context: Some(Context {
639 cluster: "test-cluster".to_string(),
640 user: Some("test-user".to_string()),
641 namespace: Some("test-namespace".to_string()),
642 ..Default::default()
643 }),
644 }],
645 clusters: vec![NamedCluster {
646 name: "test-cluster".to_string(),
647 cluster: Some(Cluster {
648 server: Some("http://localhost:8080".to_string()),
649 ..Default::default()
650 }),
651 }],
652 ..Default::default()
653 };
654 let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
655 assert_eq!(client.default_namespace(), "test-namespace");
656 }
657
658 #[tokio::test]
659 async fn test_mock() {
660 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
661 let spawned = tokio::spawn(async move {
662 let mut handle = pin!(handle);
664 let (request, send) = handle.next_request().await.expect("service not called");
665 assert_eq!(request.method(), http::Method::GET);
666 assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
667 let pod: Pod = serde_json::from_value(serde_json::json!({
668 "apiVersion": "v1",
669 "kind": "Pod",
670 "metadata": {
671 "name": "test",
672 "annotations": { "kube-rs": "test" },
673 },
674 "spec": {
675 "containers": [{ "name": "test", "image": "test-image" }],
676 }
677 }))
678 .unwrap();
679 send.send_response(
680 Response::builder()
681 .body(Body::from(serde_json::to_vec(&pod).unwrap()))
682 .unwrap(),
683 );
684 });
685
686 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
687 let pod = pods.get("test").await.unwrap();
688 assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
689 spawned.await.unwrap();
690 }
691}