1#![cfg_attr(docsrs, feature(doc_cfg))]
64#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68 ($($item:item)*) => {
69 $(
70 #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71 #[cfg(feature = "client")]
72 $item
73 )*
74 }
75}
76macro_rules! cfg_config {
77 ($($item:item)*) => {
78 $(
79 #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80 #[cfg(feature = "config")]
81 $item
82 )*
83 }
84}
85
86macro_rules! cfg_error {
87 ($($item:item)*) => {
88 $(
89 #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90 #[cfg(any(feature = "config", feature = "client"))]
91 $item
92 )*
93 }
94}
95
96cfg_client! {
97 pub mod api;
98 pub mod discovery;
99 pub mod client;
100
101 #[doc(inline)]
102 pub use api::Api;
103 #[doc(inline)]
104 pub use client::Client;
105 #[doc(inline)]
106 pub use discovery::Discovery;
107}
108
109cfg_config! {
110 pub mod config;
111 #[doc(inline)]
112 pub use config::Config;
113}
114
115cfg_error! {
116 pub mod error;
117 #[doc(inline)] pub use error::Error;
118 pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123pub use kube_core as core;
125
126#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] mod test {
132 use crate::{
133 Api, Client, Config, ResourceExt,
134 api::{AttachParams, AttachedProcess},
135 client::ConfigExt,
136 };
137 use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138 use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
139 use kube_core::{
140 params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
141 response::StatusSummary,
142 };
143 use serde_json::json;
144 use tower::ServiceBuilder;
145
146 #[allow(dead_code)]
148 #[ignore = "needs cluster (lists pods)"]
150 #[cfg(feature = "rustls-tls")]
151 async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
152 use hyper_util::rt::TokioExecutor;
153
154 let config = Config::infer().await?;
155 let https = config.rustls_https_connector()?;
156 let service = ServiceBuilder::new()
157 .layer(config.base_uri_layer())
158 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
159 let client = Client::new(service, config.default_namespace);
160 let pods: Api<Pod> = Api::default_namespaced(client);
161 pods.list(&Default::default()).await?;
162 Ok(())
163 }
164
165 #[tokio::test]
166 #[ignore = "needs cluster (lists pods)"]
167 #[cfg(feature = "openssl-tls")]
168 async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
169 use hyper_util::rt::TokioExecutor;
170
171 let config = Config::infer().await?;
172 let https = config.openssl_https_connector()?;
173 let service = ServiceBuilder::new()
174 .layer(config.base_uri_layer())
175 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
176 let client = Client::new(service, config.default_namespace);
177 let pods: Api<Pod> = Api::default_namespaced(client);
178 pods.list(&Default::default()).await?;
179 Ok(())
180 }
181
182 #[tokio::test]
183 #[ignore = "needs cluster (lists api resources)"]
184 #[cfg(feature = "client")]
185 async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
186 use crate::{core::DynamicObject, discovery};
187 let client = Client::try_default().await?;
188 let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
189 let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
190 let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
191 api.list(&Default::default()).await?;
192
193 Ok(())
194 }
195
196 #[tokio::test]
197 #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
198 #[cfg(feature = "client")]
199 async fn aggregated_discovery_apis() -> Result<(), Box<dyn std::error::Error>> {
200 let client = Client::try_default().await?;
201
202 let apis_discovery = client.list_api_groups_aggregated().await?;
204 assert!(!apis_discovery.items.is_empty(), "should have API groups");
205
206 let apps_group = apis_discovery
208 .items
209 .iter()
210 .find(|g| g.metadata.as_ref().and_then(|m| m.name.as_ref()) == Some(&"apps".to_string()));
211 assert!(apps_group.is_some(), "should have apps group");
212
213 let apps = apps_group.unwrap();
214 assert!(!apps.versions.is_empty(), "apps should have versions");
215
216 let v1 = apps.versions.iter().find(|v| v.version == Some("v1".to_string()));
218 assert!(v1.is_some(), "apps should have v1");
219
220 let deployments = v1
221 .unwrap()
222 .resources
223 .iter()
224 .find(|r| r.resource == Some("deployments".to_string()));
225 assert!(deployments.is_some(), "apps/v1 should have deployments");
226
227 Ok(())
228 }
229
230 #[tokio::test]
231 #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
232 #[cfg(feature = "client")]
233 async fn aggregated_discovery_core() -> Result<(), Box<dyn std::error::Error>> {
234 let client = Client::try_default().await?;
235
236 let core_discovery = client.list_core_api_versions_aggregated().await?;
238 assert!(!core_discovery.items.is_empty(), "should have core group");
239
240 let core = &core_discovery.items[0];
241 let v1 = core.versions.iter().find(|v| v.version == Some("v1".to_string()));
242 assert!(v1.is_some(), "core should have v1");
243
244 let pods = v1
246 .unwrap()
247 .resources
248 .iter()
249 .find(|r| r.resource == Some("pods".to_string()));
250 assert!(pods.is_some(), "core/v1 should have pods");
251
252 let pods_resource = pods.unwrap();
253 assert_eq!(pods_resource.scope, Some("Namespaced".to_string()));
254 assert!(pods_resource.verbs.contains(&"list".to_string()));
255
256 Ok(())
257 }
258
259 #[tokio::test]
260 #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
261 #[cfg(feature = "client")]
262 async fn discovery_run_aggregated() -> Result<(), Box<dyn std::error::Error>> {
263 use crate::discovery::{Discovery, verbs};
264
265 let client = Client::try_default().await?;
266
267 let discovery = Discovery::new(client.clone()).run_aggregated().await?;
269
270 assert!(discovery.groups().count() > 0, "should have discovered groups");
272
273 assert!(discovery.has_group(""), "should have core group");
275
276 assert!(discovery.has_group("apps"), "should have apps group");
278
279 let apps = discovery.get("apps").expect("apps group");
281 let (ar, caps) = apps.recommended_kind("Deployment").expect("Deployment kind");
282 assert_eq!(ar.kind, "Deployment");
283 assert!(caps.supports_operation(verbs::LIST));
284
285 let core = discovery.get("").expect("core group");
287 let (ar, caps) = core.recommended_kind("Pod").expect("Pod kind");
288 assert_eq!(ar.kind, "Pod");
289 assert!(caps.supports_operation(verbs::GET));
290
291 Ok(())
292 }
293
294 #[tokio::test]
295 #[ignore = "needs cluster (will create and edit a pod)"]
296 async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
297 use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
298
299 let client = Client::try_default().await?;
300 let pods: Api<Pod> = Api::default_namespaced(client);
301
302 let p: Pod = serde_json::from_value(json!({
304 "apiVersion": "v1",
305 "kind": "Pod",
306 "metadata": {
307 "name": "busybox-kube1",
308 "labels": { "app": "kube-rs-test" },
309 },
310 "spec": {
311 "terminationGracePeriodSeconds": 1,
312 "restartPolicy": "Never",
313 "containers": [{
314 "name": "busybox",
315 "image": "busybox:stable",
316 "command": ["sh", "-c", "sleep 30"],
317 }],
318 }
319 }))?;
320
321 let pp = PostParams::default();
322 match pods.create(&pp, &p).await {
323 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
324 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
327
328 let wp = WatchParams::default()
331 .fields(&format!("metadata.name={}", "busybox-kube1"))
332 .timeout(15);
333 let mut stream = pods.watch(&wp, "0").await?.boxed();
334 while let Some(ev) = stream.try_next().await? {
335 let _ = format!("we: {ev:?}");
337 match ev {
338 WatchEvent::Modified(o) => {
339 let s = o.status.as_ref().expect("status exists on pod");
340 let phase = s.phase.clone().unwrap_or_default();
341 if phase == "Running" {
342 break;
343 }
344 }
345 WatchEvent::Error(e) => panic!("watch error: {e}"),
346 _ => {}
347 }
348 }
349
350 let mut pod = pods.get("busybox-kube1").await?;
352 assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
353
354 {
357 assert!(pod.resource_version().is_some());
358 pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
359
360 let pp = PostParams::default();
361 let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
362 assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
363 }
364
365 let dp = DeleteParams::default();
367 pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
368 assert_eq!(pdel.name_unchecked(), "busybox-kube1");
369 });
370
371 Ok(())
372 }
373
374 #[tokio::test]
375 #[ignore = "needs cluster (will create and attach to a pod)"]
376 #[cfg(feature = "ws")]
377 async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
378 use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
379 use tokio::io::AsyncWriteExt;
380
381 let client = Client::try_default().await?;
382 let pods: Api<Pod> = Api::default_namespaced(client);
383
384 let p: Pod = serde_json::from_value(json!({
386 "apiVersion": "v1",
387 "kind": "Pod",
388 "metadata": {
389 "name": "busybox-kube2",
390 "labels": { "app": "kube-rs-test" },
391 },
392 "spec": {
393 "terminationGracePeriodSeconds": 1,
394 "restartPolicy": "Never",
395 "containers": [{
396 "name": "busybox",
397 "image": "busybox:stable",
398 "command": ["sh", "-c", "sleep 30"],
399 }],
400 }
401 }))?;
402
403 match pods.create(&Default::default(), &p).await {
404 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
405 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
408
409 let wp = WatchParams::default()
412 .fields(&format!("metadata.name={}", "busybox-kube2"))
413 .timeout(15);
414 let mut stream = pods.watch(&wp, "0").await?.boxed();
415 while let Some(ev) = stream.try_next().await? {
416 match ev {
417 WatchEvent::Modified(o) => {
418 let s = o.status.as_ref().expect("status exists on pod");
419 let phase = s.phase.clone().unwrap_or_default();
420 if phase == "Running" {
421 break;
422 }
423 }
424 WatchEvent::Error(e) => panic!("watch error: {e}"),
425 _ => {}
426 }
427 }
428
429 {
431 let mut attached = pods
432 .exec(
433 "busybox-kube2",
434 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
435 &AttachParams::default().stderr(false),
436 )
437 .await?;
438 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
439 let out = stdout
440 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
441 .collect::<Vec<_>>()
442 .await
443 .join("");
444 attached.join().await.unwrap();
445 assert_eq!(out.lines().count(), 3);
446 assert_eq!(out, "1\n2\n3\n");
447 }
448
449 {
451 let name = "busybox-kube2";
452 let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
453 let ap = AttachParams::default().stdin(true).stderr(false);
454
455 let mut req = pods.request.exec(name, command.clone(), &ap)?;
457 req.extensions_mut().insert("exec");
458 let stream = pods.client.connect(req).await?;
459
460 if stream.supports_stream_close() {
463 let mut attached = pods.exec(name, command, &ap).await?;
464 let mut stdin_writer = attached.stdin().unwrap();
465 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
466
467 stdin_writer.write_all(b"this will be ignored\n").await?;
468 _ = stdin_writer.shutdown().await;
469
470 let next_stdout = stdout_stream.next();
471 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
472 assert_eq!(stdout, "test string 2\n");
473
474 let status = attached.take_status().unwrap();
476 if let Some(status) = status.await {
477 assert_eq!(status.status, Some("Success".to_owned()));
478 assert_eq!(status.reason, None);
479 }
480 }
481 }
482
483 {
485 let mut attached = pods
486 .exec(
487 "busybox-kube2",
488 vec!["sh"],
489 &AttachParams::default().stdin(true).stderr(false),
490 )
491 .await?;
492 let mut stdin_writer = attached.stdin().unwrap();
493 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
494 let next_stdout = stdout_stream.next();
495 stdin_writer.write_all(b"echo test string 1\n").await?;
496 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
497 println!("{stdout}");
498 assert_eq!(stdout, "test string 1\n");
499
500 stdin_writer.write_all(b"exit 1\n").await?;
503 let status = attached.take_status().unwrap();
504 if let Some(status) = status.await {
505 println!("{status:?}");
506 assert_eq!(status.status, Some("Failure".to_owned()));
507 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
508 }
509 }
510
511 let dp = DeleteParams::default();
513 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
514 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
515 });
516
517 Ok(())
518 }
519
520 #[tokio::test]
521 #[ignore = "needs cluster (will create and tail logs from a pod)"]
522 async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
523 use crate::{
524 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
525 core::subresource::LogParams,
526 };
527
528 let client = Client::try_default().await?;
529 let pods: Api<Pod> = Api::default_namespaced(client);
530
531 let p: Pod = serde_json::from_value(json!({
533 "apiVersion": "v1",
534 "kind": "Pod",
535 "metadata": {
536 "name": "busybox-kube3",
537 "labels": { "app": "kube-rs-test" },
538 },
539 "spec": {
540 "terminationGracePeriodSeconds": 1,
541 "restartPolicy": "Never",
542 "containers": [{
543 "name": "busybox",
544 "image": "busybox:stable",
545 "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
546 }],
547 }
548 }))?;
549
550 match pods.create(&Default::default(), &p).await {
551 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
552 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
555
556 let wp = WatchParams::default()
559 .fields(&format!("metadata.name={}", "busybox-kube3"))
560 .timeout(15);
561 let mut stream = pods.watch(&wp, "0").await?.boxed();
562 while let Some(ev) = stream.try_next().await? {
563 match ev {
564 WatchEvent::Modified(o) => {
565 let s = o.status.as_ref().expect("status exists on pod");
566 let phase = s.phase.clone().unwrap_or_default();
567 if phase == "Running" {
568 break;
569 }
570 }
571 WatchEvent::Error(e) => panic!("watch error: {e}"),
572 _ => {}
573 }
574 }
575
576 let lp = LogParams {
578 follow: true,
579 ..LogParams::default()
580 };
581 let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
582
583 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
585
586 let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
587 assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
588
589 let mut output = vec![];
591 while let Some(line) = logs_stream.try_next().await? {
592 output.push(line);
593 }
594 assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
595
596 let ep = EvictParams::default();
598 let eres = pods.evict("busybox-kube3", &ep).await?;
599 assert_eq!(eres.code, 201); assert!(eres.is_success());
601
602 Ok(())
603 }
604
605 #[tokio::test]
606 #[ignore = "requires a cluster"]
607 async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
608 use crate::{
609 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
610 core::subresource::LogParams,
611 };
612 use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
613
614 let client = Client::try_default().await?;
615 let pods: Api<Pod> = Api::default_namespaced(client);
616
617 let p: Pod = serde_json::from_value(json!({
619 "apiVersion": "v1",
620 "kind": "Pod",
621 "metadata": {
622 "name": "busybox-kube-meta",
623 "labels": { "app": "kube-rs-test" },
624 },
625 "spec": {
626 "terminationGracePeriodSeconds": 1,
627 "restartPolicy": "Never",
628 "containers": [{
629 "name": "busybox",
630 "image": "busybox:stable",
631 "command": ["sh", "-c", "sleep 30s"],
632 }],
633 }
634 }))?;
635
636 match pods.create(&Default::default(), &p).await {
637 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
638 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
641
642 let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
645 assert_eq!("busybox-kube-meta", pod_metadata.name_any());
646 assert_eq!(
647 Some((&"app".to_string(), &"kube-rs-test".to_string())),
648 pod_metadata.labels().get_key_value("app")
649 );
650
651 let p_list = pods.list_metadata(&ListParams::default()).await?;
653
654 let pod_metadata = p_list
657 .items
658 .into_iter()
659 .find(|p| p.name_any() == "busybox-kube-meta")
660 .unwrap();
661 assert_eq!(
662 pod_metadata.labels().get("app"),
663 Some(&"kube-rs-test".to_string())
664 );
665
666 let patch = ObjectMeta {
668 annotations: Some([("test".to_string(), "123".to_string())].into()),
669 ..Default::default()
670 }
671 .into_request_partial::<Pod>();
672
673 let patchparams = PatchParams::default();
674 let p_patched = pods
675 .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
676 .await?;
677 assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
678 assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
679 assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
680
681 let dp = DeleteParams::default();
683 pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
684 assert_eq!(pdel.name_any(), "busybox-kube-meta");
685 });
686
687 Ok(())
688 }
689 #[tokio::test]
690 #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
691 async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
692 use crate::api::PostParams;
693 use k8s_openapi::api::certificates::v1::{
694 CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
695 };
696
697 let csr_name = "fake";
698 let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
699 "apiVersion": "certificates.k8s.io/v1",
700 "kind": "CertificateSigningRequest",
701 "metadata": { "name": csr_name },
702 "spec": {
703 "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
704 "signerName": "kubernetes.io/kube-apiserver-client",
705 "expirationSeconds": 86400,
706 "usages": ["client auth"]
707 }
708 }))?;
709
710 let client = Client::try_default().await?;
711 let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
712 assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
713
714 let approval_type = "ApprovedFake";
716 let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
717 certificate: None,
718 conditions: Some(vec![CertificateSigningRequestCondition {
719 type_: approval_type.to_string(),
720 last_update_time: None,
721 last_transition_time: None,
722 message: Some(format!("{} {}", approval_type, "by kube-rs client")),
723 reason: Some("kube-rsClient".to_string()),
724 status: "True".to_string(),
725 }]),
726 };
727 let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
728 let _ = csr
729 .patch_approval(csr_name, &Default::default(), &csr_status_patch)
730 .await?;
731 let csr_after_approval = csr.get_approval(csr_name).await?;
732
733 assert_eq!(
734 csr_after_approval
735 .status
736 .as_ref()
737 .unwrap()
738 .conditions
739 .as_ref()
740 .unwrap()[0]
741 .type_,
742 approval_type.to_string()
743 );
744 csr.delete(csr_name, &DeleteParams::default()).await?;
745 Ok(())
746 }
747
748 #[tokio::test]
749 #[ignore = "needs cluster for ephemeral containers operations"]
750 async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
751 let client = Client::try_default().await?;
752
753 let api_version = client.apiserver_version().await?;
756 if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
757 return Ok(());
758 }
759
760 let pod: Pod = serde_json::from_value(serde_json::json!({
761 "apiVersion": "v1",
762 "kind": "Pod",
763 "metadata": {
764 "name": "ephemeral-container-test",
765 "labels": { "app": "kube-rs-test" },
766 },
767 "spec": {
768 "restartPolicy": "Never",
769 "containers": [{
770 "name": "busybox",
771 "image": "busybox:stable",
772 "command": ["sh", "-c", "sleep 2"],
773 }],
774 }
775 }))?;
776
777 let pod_name = pod.name_any();
778 let pods = Api::<Pod>::default_namespaced(client);
779
780 let _ = pods
785 .delete(&pod.name_any(), &DeleteParams::default())
786 .await
787 .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
788
789 match pods.create(&Default::default(), &pod).await {
792 Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
793 Err(e) => return Err(e.into()), }
795
796 let current_ephemeral_containers = pods
797 .get_ephemeral_containers(&pod.name_any())
798 .await?
799 .spec
800 .unwrap()
801 .ephemeral_containers;
802
803 assert_eq!(current_ephemeral_containers, None);
806
807 let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
808 {
809 "name": "myephemeralcontainer1",
810 "image": "busybox:stable",
811 "command": ["sh", "-c", "sleep 2"],
812 }
813 ))?;
814
815 let patch: Pod = serde_json::from_value(json!({
818 "metadata": { "name": pod_name },
819 "spec":{ "ephemeralContainers": [ busybox_eph ] }
820 }))?;
821
822 let current_containers = pods
823 .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
824 .await?
825 .spec
826 .unwrap()
827 .ephemeral_containers
828 .expect("could find ephemeral container");
829
830 assert_eq!(current_containers.len(), 1);
833 assert_eq!(current_containers[0].name, busybox_eph.name);
834 assert_eq!(current_containers[0].image, busybox_eph.image);
835 assert_eq!(current_containers[0].command, busybox_eph.command);
836
837 busybox_eph = serde_json::from_value(json!(
842 {
843 "name": "myephemeralcontainer2",
844 "image": "busybox:stable",
845 "command": ["sh", "-c", "sleep 1"],
846 }
847 ))?;
848
849 let patch: Pod =
850 serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
851
852 let current_containers = pods
853 .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
854 .await?
855 .spec
856 .unwrap()
857 .ephemeral_containers
858 .expect("could find ephemeral container");
859
860 assert_eq!(current_containers.len(), 2);
863
864 let new_container = current_containers
865 .iter()
866 .find(|c| c.name == busybox_eph.name)
867 .expect("could find myephemeralcontainer2");
868
869 assert_eq!(new_container.image, busybox_eph.image);
872 assert_eq!(new_container.command, busybox_eph.command);
873
874 let expected_containers = current_containers;
877
878 let current_containers = pods
879 .get_ephemeral_containers(&pod.name_any())
880 .await?
881 .spec
882 .unwrap()
883 .ephemeral_containers
884 .unwrap();
885
886 assert_eq!(current_containers, expected_containers);
887
888 pods.delete(&pod.name_any(), &DeleteParams::default())
889 .await?
890 .map_left(|pdel| {
891 assert_eq!(pdel.name_any(), pod.name_any());
892 });
893
894 Ok(())
895 }
896
897 #[tokio::test]
898 #[ignore = "needs kubelet debug methods"]
899 #[cfg(feature = "kubelet-debug")]
900 async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
901 use crate::{
902 api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
903 core::kubelet_debug::KubeletDebugParams,
904 };
905
906 let client = Client::try_default().await?;
907 let pods: Api<Pod> = Api::default_namespaced(client);
908
909 let p: Pod = serde_json::from_value(json!({
911 "apiVersion": "v1",
912 "kind": "Pod",
913 "metadata": {
914 "name": "busybox-kube2",
915 "labels": { "app": "kube-rs-test" },
916 },
917 "spec": {
918 "terminationGracePeriodSeconds": 1,
919 "restartPolicy": "Never",
920 "containers": [{
921 "name": "busybox",
922 "image": "busybox:stable",
923 "command": ["sh", "-c", "sleep 30"],
924 }],
925 }
926 }))?;
927
928 match pods.create(&Default::default(), &p).await {
929 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
930 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
933
934 let wp = WatchParams::default()
937 .fields(&format!("metadata.name={}", "busybox-kube2"))
938 .timeout(15);
939 let mut stream = pods.watch(&wp, "0").await?.boxed();
940 while let Some(ev) = stream.try_next().await? {
941 match ev {
942 WatchEvent::Modified(o) => {
943 let s = o.status.as_ref().expect("status exists on pod");
944 let phase = s.phase.clone().unwrap_or_default();
945 if phase == "Running" {
946 break;
947 }
948 }
949 WatchEvent::Error(e) => panic!("watch error: {e}"),
950 _ => {}
951 }
952 }
953
954 let mut config = Config::infer().await?;
955 config.accept_invalid_certs = true;
956 config.cluster_url = "https://localhost:10250".parse().unwrap();
957 let kubelet_client: Client = config.try_into()?;
958
959 {
961 let mut attached = kubelet_client
962 .kubelet_node_exec(
963 &KubeletDebugParams {
964 name: "busybox-kube2",
965 namespace: "default",
966 ..Default::default()
967 },
968 "busybox",
969 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
970 &AttachParams::default().stderr(false),
971 )
972 .await?;
973 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
974 let out = stdout
975 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
976 .collect::<Vec<_>>()
977 .await
978 .join("");
979 attached.join().await.unwrap();
980 assert_eq!(out.lines().count(), 3);
981 assert_eq!(out, "1\n2\n3\n");
982 }
983
984 {
986 use tokio::io::AsyncWriteExt;
987 let mut attached = kubelet_client
988 .kubelet_node_exec(
989 &KubeletDebugParams {
990 name: "busybox-kube2",
991 namespace: "default",
992 ..Default::default()
993 },
994 "busybox",
995 vec!["sh"],
996 &AttachParams::default().stdin(true).stderr(false),
997 )
998 .await?;
999 let mut stdin_writer = attached.stdin().unwrap();
1000 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
1001 let next_stdout = stdout_stream.next();
1002 stdin_writer.write_all(b"echo test string 1\n").await?;
1003 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
1004 println!("{stdout}");
1005 assert_eq!(stdout, "test string 1\n");
1006
1007 stdin_writer.write_all(b"exit 1\n").await?;
1010 let status = attached.take_status().unwrap();
1011 if let Some(status) = status.await {
1012 println!("{status:?}");
1013 assert_eq!(status.status, Some("Failure".to_owned()));
1014 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
1015 }
1016 }
1017
1018 let dp = DeleteParams::default();
1020 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
1021 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
1022 });
1023
1024 Ok(())
1025 }
1026}