docker_api/
docker.rs

1//! Main entrypoint for interacting with the Docker API.
2//!
3//! API Reference: <https://docs.docker.com/engine/api/v1.42/>
4use crate::{
5    conn::{get_http_connector, Headers, Payload, Transport},
6    errors::{Error, Result},
7    ApiVersion, Containers, Images, Networks, Volumes,
8};
9use containers_api::conn::RequestClient;
10
11#[cfg(feature = "swarm")]
12use crate::{Configs, Nodes, Plugins, Secrets, Services, Swarm, Tasks};
13
14#[cfg(feature = "tls")]
15use crate::conn::get_https_connector;
16#[cfg(unix)]
17use crate::conn::get_unix_connector;
18
19use futures_util::{
20    io::{AsyncRead, AsyncWrite},
21    stream::Stream,
22};
23use hyper::{body::Bytes, Body, Client, Response};
24use serde::de::DeserializeOwned;
25use std::future::Future;
26use std::path::{Path, PathBuf};
27use std::pin::Pin;
28
29/// Entrypoint interface for communicating with docker daemon
30#[derive(Debug, Clone)]
31pub struct Docker {
32    version: Option<ApiVersion>,
33    client: RequestClient<Error>,
34}
35
36impl Docker {
37    /// Creates a new Docker instance by automatically choosing appropriate connection type based
38    /// on provided `uri`.
39    ///
40    /// Supported schemes are:
41    ///  - `unix://` only works when build target is `unix`, otherwise returns an Error
42    ///  - `tcp://`
43    ///  - `http://`
44    ///
45    ///  To create a Docker instance utilizing TLS use explicit [Docker::tls](Docker::tls)
46    ///  constructor (this requires `tls` feature enabled).
47    ///  
48    ///  This creates an unversioned connector that'll use the latest server version, to use a specific version see
49    ///  [`Docker::unix_versioned`](Docker::unix_versioned).
50    pub fn new(uri: impl AsRef<str>) -> Result<Self> {
51        Self::new_impl(uri.as_ref(), None)
52    }
53
54    /// Same as [`Docker::new`](Docker::new) but the API version can be explicitly specified.
55    pub fn new_versioned(uri: impl AsRef<str>, version: impl Into<ApiVersion>) -> Result<Self> {
56        Self::new_impl(uri.as_ref(), Some(version.into()))
57    }
58
59    fn new_impl(uri: &str, version: Option<ApiVersion>) -> Result<Self> {
60        let mut it = uri.split("://");
61
62        match it.next() {
63            #[cfg(unix)]
64            Some("unix") => {
65                if let Some(path) = it.next() {
66                    Ok(Self::new_unix_impl(path, version))
67                } else {
68                    Err(Error::MissingAuthority)
69                }
70            }
71            #[cfg(not(unix))]
72            Some("unix") => Err(Error::UnsupportedScheme("unix".to_string())),
73            Some("tcp") | Some("http") => {
74                if let Some(host) = it.next() {
75                    Self::new_tcp_impl(host, version)
76                } else {
77                    Err(Error::MissingAuthority)
78                }
79            }
80            Some(scheme) => Err(Error::UnsupportedScheme(scheme.to_string())),
81            None => unreachable!(), // This is never possible because calling split on an empty string
82                                    // always returns at least one element
83        }
84    }
85
86    #[cfg(unix)]
87    #[cfg_attr(docsrs, doc(cfg(unix)))]
88    /// Creates a new docker instance for a docker host listening on a given Unix socket.
89    ///
90    /// `socket_path` is the part of URI that comes after the `unix://`. For example a URI `unix:///run/docker.sock` has a
91    /// `socket_path` == "/run/docker.sock".
92    ///  
93    ///  This creates an unversioned connector that'll use the latest server version, to use a specific version see
94    ///  [`Docker::unix_versioned`](Docker::unix_versioned).
95    pub fn unix(socket_path: impl AsRef<Path>) -> Self {
96        Self::new_unix_impl(socket_path.as_ref(), None)
97    }
98
99    #[cfg(unix)]
100    #[cfg_attr(docsrs, doc(cfg(unix)))]
101    /// Same as [`Docker::unix`](Docker::unix) but the API version can be explicitly specified.
102    pub fn unix_versioned(socket_path: impl AsRef<Path>, version: impl Into<ApiVersion>) -> Self {
103        Self::new_unix_impl(socket_path.as_ref(), Some(version.into()))
104    }
105
106    #[cfg(unix)]
107    fn new_unix_impl(socket_path: impl Into<PathBuf>, version: Option<ApiVersion>) -> Self {
108        Docker {
109            version,
110            client: RequestClient::new(
111                Transport::Unix {
112                    client: Client::builder()
113                        .pool_max_idle_per_host(0)
114                        .build(get_unix_connector()),
115                    path: socket_path.into(),
116                },
117                Box::new(validate_response),
118            ),
119        }
120    }
121
122    #[cfg(feature = "tls")]
123    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
124    /// Creates a new docker instance for a docker host listening on a given TCP socket `host`.
125    /// `host` is the part of URI that comes after `tcp://` or `http://` or `https://` schemes,
126    /// also known as authority part.
127    ///
128    /// `cert_path` specifies the base path in the filesystem containing a certificate (`cert.pem`)
129    /// and a key (`key.pem`) that will be used by the client. If verify is `true` a CA file will be
130    /// added (`ca.pem`) to the connector.
131    ///
132    /// Returns an error if the provided host will fail to parse as URL or reading the certificate
133    /// files will fail.
134    ///  
135    ///  This creates an unversioned connector that'll use the latest server version, to use a specific version see
136    ///  [`Docker::unix_versioned`](Docker::unix_versioned).
137    pub fn tls(host: impl AsRef<str>, cert_path: impl AsRef<Path>, verify: bool) -> Result<Self> {
138        Self::new_tls_impl(host.as_ref(), None, cert_path.as_ref(), verify)
139    }
140
141    #[cfg(feature = "tls")]
142    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
143    /// Same as [`Docker::tls`](Docker::tls) but the API version can be explicitly specified.
144    pub fn tls_versioned(
145        host: impl AsRef<str>,
146        version: impl Into<ApiVersion>,
147        cert_path: impl AsRef<Path>,
148        verify: bool,
149    ) -> Result<Self> {
150        Self::new_tls_impl(
151            host.as_ref(),
152            Some(version.into()),
153            cert_path.as_ref(),
154            verify,
155        )
156    }
157
158    #[cfg(feature = "tls")]
159    fn new_tls_impl(
160        host: &str,
161        version: Option<ApiVersion>,
162        cert_path: &Path,
163        verify: bool,
164    ) -> Result<Self> {
165        Ok(Self {
166            version,
167            client: RequestClient::new(
168                Transport::EncryptedTcp {
169                    client: Client::builder().build(get_https_connector(cert_path, verify)?),
170                    host: url::Url::parse(&format!("https://{host}")).map_err(Error::InvalidUrl)?,
171                },
172                Box::new(validate_response),
173            ),
174        })
175    }
176
177    /// Creates a new docker instance for a docker host listening on a given TCP socket `host`.
178    /// `host` is the part of URI that comes after `tcp://` or `http://` schemes, also known as
179    /// authority part.
180    ///
181    /// TLS is supported with feature `tls` enabled through [Docker::tls](Docker::tls) constructor.
182    ///
183    /// Returns an error if the provided host will fail to parse as URL.
184    ///  
185    ///  This creates an unversioned connector that'll use the latest server version, to use a specific version see
186    ///  [`Docker::unix_versioned`](Docker::unix_versioned).
187    pub fn tcp(host: impl AsRef<str>) -> Result<Self> {
188        Self::new_tcp_impl(host.as_ref(), None)
189    }
190
191    /// Same as [`Docker::tcp`](Docker::tcp) but the API version can be explicitly specified.
192    pub fn tcp_versioned(host: impl AsRef<str>, version: impl Into<ApiVersion>) -> Result<Self> {
193        Self::new_tcp_impl(host.as_ref(), Some(version.into()))
194    }
195
196    fn new_tcp_impl(host: &str, version: Option<ApiVersion>) -> Result<Self> {
197        Ok(Self {
198            version,
199            client: RequestClient::new(
200                Transport::Tcp {
201                    client: Client::builder().build(get_http_connector()),
202                    host: url::Url::parse(&format!("tcp://{host}")).map_err(Error::InvalidUrl)?,
203                },
204                Box::new(validate_response),
205            ),
206        })
207    }
208
209    /// Exports an interface for interacting with Docker images
210    pub fn images(&'_ self) -> Images {
211        Images::new(self.clone())
212    }
213
214    /// Exports an interface for interacting with Docker containers
215    pub fn containers(&'_ self) -> Containers {
216        Containers::new(self.clone())
217    }
218
219    /// Exports an interface for interacting with Docker networks
220    pub fn networks(&'_ self) -> Networks {
221        Networks::new(self.clone())
222    }
223
224    /// Exports an interface for interacting with Docker volumes
225    pub fn volumes(&'_ self) -> Volumes {
226        Volumes::new(self.clone())
227    }
228
229    /// Verifies the API version returned by the server and adjusts the version used by this client
230    /// in future requests.
231    pub async fn adjust_api_version(&mut self) -> Result<()> {
232        let server_version: ApiVersion = self.version().await.and_then(|v| {
233            v.api_version
234                .unwrap_or_default()
235                .parse::<ApiVersion>()
236                .map_err(Error::MalformedVersion)
237        })?;
238
239        self.version = Some(server_version);
240
241        Ok(())
242    }
243
244    //####################################################################################################
245    //
246    // Utility functions to make requests
247    //
248    //####################################################################################################
249
250    fn make_endpoint(&self, endpoint: impl AsRef<str>) -> String {
251        if let Some(version) = self.version {
252            version.make_endpoint(endpoint)
253        } else {
254            endpoint.as_ref().to_owned()
255        }
256    }
257
258    pub(crate) async fn get(&self, endpoint: &str) -> Result<Response<Body>> {
259        self.client.get(self.make_endpoint(endpoint)).await
260    }
261
262    pub(crate) async fn get_json<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
263        self.client.get_json(self.make_endpoint(endpoint)).await
264    }
265
266    #[allow(dead_code)]
267    pub(crate) async fn post<B>(
268        &self,
269        endpoint: &str,
270        body: Payload<B>,
271        headers: Option<Headers>,
272    ) -> Result<Response<Body>>
273    where
274        B: Into<Body>,
275    {
276        self.client
277            .post(self.make_endpoint(endpoint), body, headers)
278            .await
279    }
280
281    pub(crate) async fn post_string<B>(
282        &self,
283        endpoint: &str,
284        body: Payload<B>,
285        headers: Option<Headers>,
286    ) -> Result<String>
287    where
288        B: Into<Body>,
289    {
290        self.client
291            .post_string(self.make_endpoint(endpoint), body, headers)
292            .await
293    }
294
295    pub(crate) async fn post_json<B, T>(
296        &self,
297        endpoint: impl AsRef<str>,
298        body: Payload<B>,
299        headers: Option<Headers>,
300    ) -> Result<T>
301    where
302        T: DeserializeOwned,
303        B: Into<Body>,
304    {
305        self.client
306            .post_json(self.make_endpoint(endpoint), body, headers)
307            .await
308    }
309
310    pub(crate) async fn put<B>(&self, endpoint: &str, body: Payload<B>) -> Result<String>
311    where
312        B: Into<Body>,
313    {
314        self.client
315            .put_string(self.make_endpoint(endpoint), body)
316            .await
317    }
318
319    pub(crate) async fn delete(&self, endpoint: &str) -> Result<String> {
320        self.client
321            .delete_string(self.make_endpoint(endpoint))
322            .await
323    }
324
325    pub(crate) async fn delete_json<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
326        self.client.delete_json(self.make_endpoint(endpoint)).await
327    }
328
329    pub(crate) async fn head(&self, endpoint: &str) -> Result<Response<Body>> {
330        self.client.head(self.make_endpoint(endpoint)).await
331    }
332
333    #[allow(dead_code)]
334    /// Send a streaming post request.
335    ///
336    /// Use stream_post_into_values if the endpoint returns JSON values
337    pub(crate) fn post_stream<'a, B>(
338        &'a self,
339        endpoint: impl AsRef<str> + 'a,
340        body: Payload<B>,
341        headers: Option<Headers>,
342    ) -> impl Stream<Item = Result<Bytes>> + 'a
343    where
344        B: Into<Body> + 'a,
345    {
346        self.client
347            .post_stream(self.make_endpoint(endpoint), body, headers)
348    }
349
350    /// Send a streaming post request that returns a stream of JSON values
351    ///
352    /// When a received chunk does not contain a full JSON reads more chunks from the stream
353    pub(crate) fn post_into_stream<'a, B, T>(
354        &'a self,
355        endpoint: impl AsRef<str> + 'a,
356        body: Payload<B>,
357        headers: Option<Headers>,
358    ) -> impl Stream<Item = Result<T>> + 'a
359    where
360        B: Into<Body> + 'a,
361        T: DeserializeOwned + 'a,
362    {
363        self.client
364            .post_into_stream(self.make_endpoint(endpoint), body, headers)
365    }
366
367    pub(crate) fn get_stream<'a>(
368        &'a self,
369        endpoint: impl AsRef<str> + Unpin + 'a,
370    ) -> impl Stream<Item = Result<Bytes>> + 'a {
371        self.client.get_stream(self.make_endpoint(endpoint))
372    }
373
374    pub(crate) async fn post_upgrade_stream<B>(
375        self,
376        endpoint: impl AsRef<str>,
377        body: Payload<B>,
378    ) -> Result<impl AsyncRead + AsyncWrite>
379    where
380        B: Into<Body>,
381    {
382        let ep = self.make_endpoint(endpoint);
383        self.client.post_upgrade_stream(ep, body).await
384    }
385}
386
387fn validate_response(
388    response: Response<Body>,
389) -> Pin<Box<dyn Future<Output = Result<Response<Body>>> + Send + Sync>> {
390    use serde::{Deserialize, Serialize};
391    #[derive(Serialize, Deserialize)]
392    struct ErrorResponse {
393        message: String,
394    }
395
396    Box::pin(async move {
397        log::trace!(
398            "got response {} {:?}",
399            response.status(),
400            response.headers()
401        );
402        let status = response.status();
403
404        use crate::conn::{self, hyper::StatusCode};
405        match status {
406            // Success case: pass on the response
407            StatusCode::OK
408            | StatusCode::CREATED
409            | StatusCode::SWITCHING_PROTOCOLS
410            | StatusCode::NO_CONTENT => Ok(response),
411            // Error case: try to deserialize error message
412            _ => {
413                let body = response.into_body();
414                let bytes = hyper::body::to_bytes(body)
415                    .await
416                    .map_err(conn::Error::from)?;
417                let message_body = String::from_utf8(bytes.to_vec()).map_err(conn::Error::from)?;
418                log::trace!("{message_body:#?}");
419                let message = serde_json::from_str::<ErrorResponse>(&message_body)
420                    .map(|e| e.message)
421                    .unwrap_or_else(|_| {
422                        status
423                            .canonical_reason()
424                            .unwrap_or("unknown error code")
425                            .to_owned()
426                    });
427                Err(Error::Fault {
428                    code: status,
429                    message,
430                })
431            }
432        }
433    })
434}
435
436#[cfg(feature = "swarm")]
437impl Docker {
438    /// Exports an interface for interacting with Docker services.
439    pub fn services(&'_ self) -> Services {
440        Services::new(self.clone())
441    }
442
443    /// Exports an interface for interacting with Docker configs.
444    pub fn configs(&'_ self) -> Configs {
445        Configs::new(self.clone())
446    }
447
448    /// Exports an interface for interacting with Docker tasks.
449    pub fn tasks(&'_ self) -> Tasks {
450        Tasks::new(self.clone())
451    }
452
453    /// Exports an interface for interacting with Docker secrets.
454    pub fn secrets(&'_ self) -> Secrets {
455        Secrets::new(self.clone())
456    }
457
458    /// Exports an interface for interacting with Docker swarm.
459    pub fn swarm(&'_ self) -> Swarm {
460        Swarm::new(self.clone())
461    }
462
463    /// Exports an interface for interacting with Docker nodes.
464    pub fn nodes(&'_ self) -> Nodes {
465        Nodes::new(self.clone())
466    }
467
468    /// Exports an interface for interacting with Docker plugins.
469    pub fn plugins(&'_ self) -> Plugins {
470        Plugins::new(self.clone())
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::{Docker, Error};
477    #[test]
478    fn creates_correct_docker() {
479        let d = Docker::new("tcp://127.0.0.1:80");
480        d.unwrap();
481        let d = Docker::new("http://127.0.0.1:80");
482        d.unwrap();
483
484        #[cfg(unix)]
485        {
486            let d = Docker::new("unix://127.0.0.1:80");
487            d.unwrap();
488        }
489        #[cfg(not(unix))]
490        {
491            let d = Docker::new("unix://127.0.0.1:80");
492            assert!(d.is_err());
493            match d.unwrap_err() {
494                Error::UnsupportedScheme(scheme) if &scheme == "unix" => {}
495                e => panic!(r#"Expected Error::UnsupportedScheme("unix"), got {}"#, e),
496            }
497        }
498
499        let d = Docker::new("rand://127.0.0.1:80");
500        match d.unwrap_err() {
501            Error::UnsupportedScheme(scheme) if &scheme == "rand" => {}
502            e => panic!(r#"Expected Error::UnsupportedScheme("rand"), got {e}"#),
503        }
504
505        let d = Docker::new("invalid_uri");
506        match d.unwrap_err() {
507            Error::UnsupportedScheme(scheme) if &scheme == "invalid_uri" => {}
508            e => panic!(r#"Expected Error::UnsupportedScheme("invalid_uri"), got {e}"#),
509        }
510        let d = Docker::new("");
511        match d.unwrap_err() {
512            Error::UnsupportedScheme(scheme) if scheme.is_empty() => {}
513            e => panic!(r#"Expected Error::UnsupportedScheme(""), got {e}"#),
514        }
515    }
516}