1use crate::{
4 api::{self, ApiResource},
5 conn::{self, get_http_connector, Headers, Payload, RequestClient, Transport},
6 models,
7 opts::*,
8 ApiVersion, Error, Result, Value, LATEST_API_VERSION,
9};
10
11#[cfg(feature = "tls")]
12use crate::conn::get_https_connector;
13#[cfg(unix)]
14use crate::conn::get_unix_connector;
15
16use crate::conn::hyper::{Body, Client, Response};
17use bytes::Bytes;
18use containers_api::url;
19use futures_util::{stream::Stream, AsyncRead, AsyncWrite, TryStreamExt};
20use serde::de::DeserializeOwned;
21use std::future::Future;
22use std::path::Path;
23use std::pin::Pin;
24
25#[derive(Debug, Clone)]
27pub struct Podman {
28 version: ApiVersion,
29 pub(crate) client: RequestClient<Error>,
30}
31
32impl Podman {
33 pub fn new<U>(uri: U) -> Result<Podman>
47 where
48 U: AsRef<str>,
49 {
50 Self::new_versioned(uri, LATEST_API_VERSION)
51 }
52
53 pub fn new_versioned<U>(uri: U, version: impl Into<ApiVersion>) -> Result<Podman>
55 where
56 U: AsRef<str>,
57 {
58 let version = version.into();
59 let uri = uri.as_ref();
60 let mut it = uri.split("://");
61
62 match it.next() {
63 #[cfg(unix)]
64 Some("unix") => {
65 if let Some(path) = it.next() {
66 Ok(Podman::unix_versioned(path, version))
67 } else {
68 Err(Error::MissingAuthority)
69 }
70 }
71 #[cfg(not(unix))]
72 Some("unix") => Err(Error::UnsupportedScheme("unix".to_string())),
73 Some("tcp") | Some("http") => {
74 if let Some(host) = it.next() {
75 Podman::tcp_versioned(host, version)
76 } else {
77 Err(Error::MissingAuthority)
78 }
79 }
80 Some(scheme) => Err(Error::UnsupportedScheme(scheme.to_string())),
81 None => unreachable!(), }
84 }
85
86 #[cfg(unix)]
87 #[cfg_attr(docsrs, doc(cfg(unix)))]
88 pub fn unix<P>(socket_path: P) -> Podman
96 where
97 P: AsRef<Path>,
98 {
99 Self::unix_versioned(socket_path, LATEST_API_VERSION)
100 }
101
102 #[cfg(unix)]
103 #[cfg_attr(docsrs, doc(cfg(unix)))]
104 pub fn unix_versioned<P>(socket_path: P, version: impl Into<ApiVersion>) -> Podman
106 where
107 P: AsRef<Path>,
108 {
109 Podman {
110 version: version.into(),
111 client: RequestClient::new(
112 Transport::Unix {
113 client: Client::builder()
114 .pool_max_idle_per_host(0)
115 .build(get_unix_connector()),
116 path: socket_path.as_ref().to_path_buf(),
117 },
118 Box::new(validate_response),
119 ),
120 }
121 }
122
123 #[cfg(feature = "tls")]
124 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
125 pub fn tls<H, P>(host: H, cert_path: P, verify: bool) -> Result<Podman>
139 where
140 H: AsRef<str>,
141 P: AsRef<Path>,
142 {
143 Self::tls_versioned(host, LATEST_API_VERSION, cert_path, verify)
144 }
145
146 #[cfg(feature = "tls")]
147 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
148 pub fn tls_versioned<H, P>(
150 host: H,
151 version: impl Into<ApiVersion>,
152 cert_path: P,
153 verify: bool,
154 ) -> Result<Podman>
155 where
156 H: AsRef<str>,
157 P: AsRef<Path>,
158 {
159 Ok(Podman {
160 version: version.into(),
161 client: RequestClient::new(
162 Transport::EncryptedTcp {
163 client: Client::builder()
164 .build(get_https_connector(cert_path.as_ref(), verify)?),
165 host: url::url::Url::parse(&format!("https://{}", host.as_ref()))
166 .map_err(Error::InvalidUrl)?,
167 },
168 Box::new(validate_response),
169 ),
170 })
171 }
172
173 pub fn tcp<H>(host: H) -> Result<Podman>
184 where
185 H: AsRef<str>,
186 {
187 Self::tcp_versioned(host, LATEST_API_VERSION)
188 }
189
190 pub fn tcp_versioned<H>(host: H, version: impl Into<ApiVersion>) -> Result<Podman>
192 where
193 H: AsRef<str>,
194 {
195 Ok(Podman {
196 version: version.into(),
197 client: RequestClient::new(
198 Transport::Tcp {
199 client: Client::builder().build(get_http_connector()),
200 host: url::url::Url::parse(&format!("tcp://{}", host.as_ref()))
201 .map_err(Error::InvalidUrl)?,
202 },
203 Box::new(validate_response),
204 ),
205 })
206 }
207
208 pub async fn adjust_api_version(&mut self) -> Result<()> {
211 let server_version: Option<ApiVersion> = self
212 .version()
213 .await
214 .map(|v| v.api_version.and_then(|v| v.parse().ok()))?;
215
216 if let Some(version) = server_version {
217 if version <= self.version {
218 self.version = version;
219 }
220 }
221
222 Ok(())
223 }
224
225 api_doc! {
232 containers
233 |
235 pub fn containers(&self) -> api::Containers {
236 api::Containers::new(self.clone())
237 }}
238
239 api_doc! {
240 exec
241 |
243 pub fn execs(&self) -> api::Execs {
244 api::Execs::new(self.clone())
245 }}
246
247 api_doc! {
248 images
249 |
251 pub fn images(&self) -> api::Images {
252 api::Images::new(self.clone())
253 }}
254
255 api_doc! {
256 manifests
257 |
259 pub fn manifests(&self) -> api::Manifests {
260 api::Manifests::new(self.clone())
261 }}
262
263 api_doc! {
264 networks
265 |
267 pub fn networks(&self) -> api::Networks {
268 api::Networks::new(self.clone())
269 }}
270
271 api_doc! {
272 pods
273 |
275 pub fn pods(&self) -> api::Pods {
276 api::Pods::new(self.clone())
277 }}
278
279 api_doc! {
280 volumes
281 |
283 pub fn volumes(&self) -> api::Volumes {
284 api::Volumes::new(self.clone())
285 }}
286
287 api_doc! {
288 secrets
289 |
291 pub fn secrets(&self) -> api::Secrets {
292 api::Secrets::new(self.clone())
293 }}
294
295 api_doc! {
302 System => InfoLibpod
303 |
304 pub async fn info(&self) -> Result<models::Info> {
320 self.get_json("/libpod/info").await
321 }}
322
323 api_doc! {
324 System => Ping
325 |
326 pub async fn ping(&self) -> Result<models::LibpodPingInfo> {
342 self.get("/libpod/_ping")
343 .await
344 .and_then(|resp| models::LibpodPingInfo::try_from(resp.headers()))
345 }}
346
347 api_doc! {
348 System => VersionLibpod
349 |
350 pub async fn version(&self) -> Result<models::VersionResponse> {
366 self.get_json("/libpod/version").await
367 }}
368
369 api_doc! {
370 System => DataUsageLibpod
371 |
372 pub async fn data_usage(&self) -> Result<models::SystemDfReport> {
388 self.get_json("/libpod/system/df").await
389 }}
390
391 api_doc! {
392 System => PruneLibpod
393 |
394 pub async fn prune(&self) -> Result<models::SystemPruneReport> {
410 self.post_json("/libpod/system/prune", Payload::empty(), Headers::none())
411 .await
412 }}
413
414 api_doc! {
415 System => EventsLibpod
416 |
417 pub fn events<'libpod>(
438 &'libpod self,
439 opts: &EventsOpts,
440 ) -> impl Stream<Item = Result<models::Event>> + Unpin + 'libpod {
441 let ep = url::construct_ep("/libpod/events", opts.serialize());
442 let reader = Box::pin(
443 self.get_stream(ep)
444 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
445 )
446 .into_async_read();
447
448 Box::pin(
449 futures_codec::FramedRead::new(reader, futures_codec::LinesCodec)
450 .map_err(Error::IO)
451 .and_then(|s: String| async move { serde_json::from_str(&s).map_err(Error::from) }),
452 )
453 }}
454
455 api_doc! {
456 Play => KubeLibpod
457 |
458 pub async fn play_kubernetes_yaml(
494 &self,
495 opts: &PlayKubernetesYamlOpts,
496 yaml: impl Into<String>,
497 ) -> Result<models::PlayKubeReport> {
498 let ep = url::construct_ep("/libpod/play/kube", opts.serialize());
499 let yaml = yaml.into();
500 self.post_json(ep, Payload::Text(yaml), Headers::none())
501 .await
502 }}
503
504 api_doc! {
505 Play => KubeDownLibpod
506 |
507 pub async fn remove_kubernetes_pods(&self) -> Result<models::PlayKubeReport> {
523 self.delete_json("/libpod/play/kube").await
524 }}
525
526 pub(crate) async fn resource_exists(
527 &self,
528 resource: ApiResource,
529 id: &crate::Id,
530 ) -> Result<bool> {
531 use crate::conn::http::StatusCode;
532 let ep = format!("/libpod/{}/{}/exists", resource.as_ref(), id);
533 match self.get(&ep).await {
534 Ok(resp) => match resp.status() {
535 StatusCode::NO_CONTENT => Ok(true),
536 _ => Ok(false),
537 },
538 Err(e) => match e {
539 Error::Fault {
540 code: StatusCode::NOT_FOUND,
541 message: _,
542 } => Ok(false),
543 e => Err(e),
544 },
545 }
546 }
547
548 pub(crate) async fn generate_systemd_units(
549 &self,
550 opts: &SystemdUnitsOpts,
551 id: &crate::Id,
552 ) -> Result<Value> {
553 let ep = url::construct_ep(
554 format!("/libpod/generate/{}/systemd", &id),
555 opts.serialize(),
556 );
557 self.get_json(ep).await
558 }
559
560 pub(crate) async fn generate_kube_yaml(&self, service: bool, id: &crate::Id) -> Result<String> {
561 let opts = [("names", id.to_string()), ("service", service.to_string())];
562 let ep = url::construct_ep("/libpod/generate/kube", Some(url::encoded_pairs(opts)));
563
564 self.get_string(ep).await
565 }
566
567 pub(crate) async fn get(&self, endpoint: impl AsRef<str>) -> Result<Response<Body>> {
572 self.client.get(self.version.make_endpoint(endpoint)).await
573 }
574
575 pub(crate) async fn get_string(&self, endpoint: impl AsRef<str>) -> Result<String> {
576 self.client
577 .get_string(self.version.make_endpoint(endpoint))
578 .await
579 }
580
581 pub(crate) async fn get_json<T: DeserializeOwned>(
582 &self,
583 endpoint: impl AsRef<str>,
584 ) -> Result<T> {
585 self.client
586 .get_json(self.version.make_endpoint(endpoint))
587 .await
588 }
589
590 pub(crate) fn get_stream(
591 &'_ self,
592 endpoint: impl AsRef<str>,
593 ) -> impl Stream<Item = Result<Bytes>> + '_ {
594 self.client.get_stream(self.version.make_endpoint(endpoint))
595 }
596
597 pub(crate) fn get_json_stream<'client, T>(
598 &'client self,
599 endpoint: impl AsRef<str> + 'client,
600 ) -> impl Stream<Item = Result<T>> + 'client
601 where
602 T: DeserializeOwned + 'client,
603 {
604 self.client
605 .get_json_stream(self.version.make_endpoint(endpoint))
606 }
607
608 pub(crate) async fn post<B>(
609 &self,
610 endpoint: impl AsRef<str>,
611 body: Payload<B>,
612 headers: Option<Headers>,
613 ) -> Result<Response<Body>>
614 where
615 B: Into<Body>,
616 {
617 self.client
618 .post(self.version.make_endpoint(endpoint), body, headers)
619 .await
620 }
621
622 pub(crate) async fn post_string<B>(
623 &self,
624 endpoint: impl AsRef<str>,
625 body: Payload<B>,
626 headers: Option<Headers>,
627 ) -> Result<String>
628 where
629 B: Into<Body>,
630 {
631 self.client
632 .post_string(self.version.make_endpoint(endpoint), body, headers)
633 .await
634 }
635
636 pub(crate) async fn post_json<B, T>(
637 &self,
638 endpoint: impl AsRef<str>,
639 body: Payload<B>,
640 headers: Option<Headers>,
641 ) -> Result<T>
642 where
643 T: DeserializeOwned,
644 B: Into<Body>,
645 {
646 self.client
647 .post_json(self.version.make_endpoint(endpoint), body, headers)
648 .await
649 }
650
651 pub(crate) fn post_stream<'client, B>(
652 &'client self,
653 endpoint: impl AsRef<str>,
654 body: Payload<B>,
655 headers: Option<Headers>,
656 ) -> impl Stream<Item = Result<Bytes>> + 'client
657 where
658 B: Into<Body> + 'client,
659 {
660 self.client
661 .post_stream(self.version.make_endpoint(endpoint), body, headers)
662 }
663
664 pub(crate) async fn post_upgrade_stream<'client, B>(
665 &'client self,
666 endpoint: impl AsRef<str> + 'client,
667 body: Payload<B>,
668 ) -> Result<impl AsyncRead + AsyncWrite + 'client>
669 where
670 B: Into<Body> + 'client,
671 {
672 self.client
673 .post_upgrade_stream(self.version.make_endpoint(endpoint), body)
674 .await
675 }
676
677 pub(crate) async fn put<B>(
678 &self,
679 endpoint: impl AsRef<str>,
680 body: Payload<B>,
681 ) -> Result<Response<Body>>
682 where
683 B: Into<Body>,
684 {
685 self.client
686 .put(self.version.make_endpoint(endpoint), body)
687 .await
688 }
689
690 pub(crate) async fn delete(&self, endpoint: impl AsRef<str>) -> Result<Response<Body>> {
691 self.client
692 .delete(self.version.make_endpoint(endpoint))
693 .await
694 }
695
696 pub(crate) async fn delete_json<T: DeserializeOwned>(
697 &self,
698 endpoint: impl AsRef<str>,
699 ) -> Result<T> {
700 self.client
701 .delete_json(self.version.make_endpoint(endpoint))
702 .await
703 }
704}
705
706fn validate_response(
707 response: Response<Body>,
708) -> Pin<Box<dyn Future<Output = Result<Response<Body>>> + Send + Sync>> {
709 Box::pin(async move {
710 log::trace!(
711 "got response {} {:?}",
712 response.status(),
713 response.headers()
714 );
715 let status = response.status();
716
717 use crate::conn::hyper::{self, StatusCode};
718 match status {
719 StatusCode::OK
721 | StatusCode::CREATED
722 | StatusCode::SWITCHING_PROTOCOLS
723 | StatusCode::NO_CONTENT => Ok(response),
724 _ => {
726 let body = response.into_body();
727 let bytes = hyper::body::to_bytes(body)
728 .await
729 .map_err(conn::Error::from)?;
730 let message_body = String::from_utf8(bytes.to_vec()).map_err(conn::Error::from)?;
731 log::trace!("{message_body:#?}");
732 match serde_json::from_str::<models::ErrorModel>(&message_body) {
733 Ok(error) => {
734 let mut message = format!(
735 "{}: {}",
736 error.message.unwrap_or_default(),
737 error.cause.unwrap_or_default(),
738 );
739 if message == ": " {
740 message = status
741 .canonical_reason()
742 .unwrap_or("unknown error code")
743 .to_owned();
744 }
745 Err(Error::Fault {
746 code: status,
747 message,
748 })
749 }
750 Err(_) => Err(Error::Fault {
751 code: status,
752 message: message_body,
753 }),
754 }
755 }
756 }
757 })
758}
759
760#[cfg(test)]
761mod tests {
762 use super::{Error, Podman};
763 #[test]
764 fn creates_correct_podman() {
765 let d = Podman::new("tcp://127.0.0.1:80");
766 d.unwrap();
767 let d = Podman::new("http://127.0.0.1:80");
768 d.unwrap();
769
770 #[cfg(unix)]
771 {
772 let d = Podman::new("unix://127.0.0.1:80");
773 d.unwrap();
774 }
775 #[cfg(not(unix))]
776 {
777 let d = Podman::new("unix://127.0.0.1:80");
778 assert!(d.is_err());
779 match d.unwrap_err() {
780 Error::UnsupportedScheme(scheme) if &scheme == "unix" => {}
781 e => panic!(r#"Expected Error::UnsupportedScheme("unix"), got {}"#, e),
782 }
783 }
784
785 let d = Podman::new("rand://127.0.0.1:80");
786 match d.unwrap_err() {
787 Error::UnsupportedScheme(scheme) if &scheme == "rand" => {}
788 e => panic!(r#"Expected Error::UnsupportedScheme("rand"), got {e}"#),
789 }
790
791 let d = Podman::new("invalid_uri");
792 match d.unwrap_err() {
793 Error::UnsupportedScheme(scheme) if &scheme == "invalid_uri" => {}
794 e => panic!(r#"Expected Error::UnsupportedScheme("invalid_uri"), got {e}"#),
795 }
796 let d = Podman::new("");
797 match d.unwrap_err() {
798 Error::UnsupportedScheme(scheme) if scheme.is_empty() => {}
799 e => panic!(r#"Expected Error::UnsupportedScheme(""), got {e}"#),
800 }
801 }
802}