docker_client_async/
lib.rs

1/*
2 * Copyright 2020 Damian Peckett <damian@pecke.tt>.
3 * Copyright 2013-2018 Docker, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! A modern async/await Docker client written in Rust. Ported directly from the reference
19//! Golang implementation.
20//!
21//! # Examples
22//!
23//! ```
24//! use docker_client_async::{opts, LocalDockerEngineClient};
25//! use docker_client_async::error::Error;
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<(), Error> {
29//!   let cli = LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))?;
30//!   for container in cli.container_list(None).await? {
31//!     println!("{} {}", container.id, container.image);
32//!   }
33//!   Ok(())
34//! }
35//! ```
36
37use crate::error::*;
38use crate::opts::DockerEngineClientOption;
39use bytes::Bytes;
40use futures::ready;
41use futures::task::{Context, Poll};
42use hyper::body::HttpBody;
43use hyper::client::connect::Connect;
44use hyper::{Body, Client, Response, Uri};
45use hyperlocal::UnixConnector;
46use lazy_static::lazy_static;
47use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
48use regex::Regex;
49use snafu::ResultExt;
50use std::collections::HashMap;
51use std::io;
52use std::sync::Mutex;
53use std::time::Duration;
54use tokio::io::AsyncRead;
55use tokio::macros::support::Pin;
56use tokio::time::timeout;
57
58pub mod container;
59pub mod error;
60pub mod network;
61pub mod opts;
62pub mod types;
63pub mod version;
64pub mod volume;
65
66lazy_static! {
67    static ref HEADER_REGEXP: Regex = Regex::new(r"\ADocker/.+\s\((.+)\)\z").unwrap();
68}
69
70/// A DockerEngineClient for communicating over a local unix domain socket.
71pub type LocalDockerEngineClient = DockerEngineClient<UnixConnector>;
72
73/// DockerEngineClient is the API client that performs all operations
74/// against a docker server.
75#[derive(Clone, Debug)]
76pub struct DockerEngineClient<C: Connect + Clone + Send + Sync + 'static> {
77    /// scheme sets the scheme for the client.
78    pub(crate) scheme: Option<String>,
79    /// host holds the server address to connect to.
80    pub(crate) host: Option<String>,
81    /// proto holds the client protocol i.e. unix.
82    pub(crate) proto: Option<String>,
83    /// addr holds the client address.
84    pub(crate) addr: Option<String>,
85    /// base_path holds the path to prepend to the requests.
86    pub(crate) base_path: Option<String>,
87    /// timeout holds the http client request timeout.
88    pub(crate) timeout: Duration,
89    // client used to send and receive http requests.
90    pub(crate) client: Option<Client<C, Body>>,
91    // version of the server to talk to.
92    pub(crate) version: String,
93    /// custom http headers configured by users.
94    pub(crate) custom_http_headers: Option<HashMap<String, String>>,
95    /// manualOverride is set to true when the version was set by users.
96    pub(crate) manual_override: bool,
97    /// negotiateVersion indicates if the client should automatically negotiate
98    /// the API version to use when making requests. API version negotiation is
99    /// performed on the first request, after which negotiated is set to "true"
100    /// so that subsequent requests do not re-negotiate.
101    pub(crate) negotiate_version: bool,
102    /// negotiated indicates that API version negotiation took place.
103    pub(crate) negotiated: bool,
104}
105
106impl<C: Connect + Clone + Send + Sync + 'static> Default for DockerEngineClient<C> {
107    fn default() -> Self {
108        Self {
109            scheme: Some("http".into()),
110            host: Some("unix:///var/run/docker.sock".into()),
111            proto: Some("unix".into()),
112            addr: Some("/var/run/docker.sock".into()),
113            base_path: None,
114            timeout: Duration::from_millis(5_000),
115            client: None,
116            version: "1.40".into(),
117            custom_http_headers: None,
118            manual_override: false,
119            negotiate_version: false,
120            negotiated: false,
121        }
122    }
123}
124
125impl<C: Connect + Clone + Send + Sync + 'static> DockerEngineClient<C> {
126    pub fn new_client_with_opts(
127        options: Option<Vec<DockerEngineClientOption<C>>>,
128    ) -> Result<DockerEngineClient<C>, Error> {
129        let mut client: DockerEngineClient<C> = Default::default();
130        if let Some(client_options) = options {
131            for client_option in &client_options {
132                client_option(&mut client)?;
133            }
134        }
135        Ok(client)
136    }
137
138    /// Construct a hyper Uri for a given request.
139    fn request_uri(
140        &self,
141        path: &str,
142        query_params: Option<HashMap<String, String>>,
143    ) -> Result<Uri, Error> {
144        let encoded_query_params = query_params
145            .map(|query_params| {
146                let mut encoded_query_params = String::from("?");
147                for (key, value) in query_params {
148                    encoded_query_params.push_str(&key);
149                    encoded_query_params.push('=');
150                    encoded_query_params
151                        .push_str(&utf8_percent_encode(&value, NON_ALPHANUMERIC).to_string());
152                    encoded_query_params.push('&');
153                }
154                encoded_query_params.trim_end_matches('&').to_string()
155            })
156            .unwrap_or_else(String::new);
157
158        let path_and_query_params = format!(
159            "{}{}{}",
160            self.base_path
161                .as_ref()
162                .map(String::from)
163                .unwrap_or_else(|| format!("/v{}", self.version)),
164            path,
165            encoded_query_params
166        );
167
168        Ok(match self.proto.as_ref().unwrap().as_str() {
169            "tcp" => {
170                let scheme = self
171                    .scheme
172                    .as_ref()
173                    .map(String::from)
174                    .unwrap_or_else(|| "http".to_string());
175                let address = self
176                    .addr
177                    .as_ref()
178                    .map(String::from)
179                    .unwrap_or_else(|| "localhost".to_string());
180                format!("{}://{}{}", scheme, address, path_and_query_params)
181                    .parse()
182                    .unwrap()
183            }
184            "unix" => {
185                hyperlocal::Uri::new(self.addr.as_ref().unwrap(), &path_and_query_params).into()
186            }
187            _ => unimplemented!(),
188        })
189    }
190}
191
192/// Read all http response chunks and concatenate into a string.
193pub(crate) async fn read_response_body(
194    mut response: Response<Body>,
195    read_timeout: Duration,
196) -> Result<String, Error> {
197    let mut response_body = String::new();
198    while let Some(chunk) = timeout(read_timeout, response.body_mut().data())
199        .await
200        .context(HttpClientTimeoutError {})?
201    {
202        response_body.push_str(&String::from_utf8_lossy(
203            &chunk.context(HttpClientError {})?,
204        ));
205    }
206    Ok(response_body)
207}
208
209/// Read all http response chunks and concatenate into a raw vector.
210pub(crate) async fn read_response_body_raw(
211    mut response: Response<Body>,
212    read_timeout: Duration,
213) -> Result<Vec<u8>, Error> {
214    let mut response_body = Vec::new();
215    while let Some(chunk) = timeout(read_timeout, response.body_mut().data())
216        .await
217        .context(HttpClientTimeoutError {})?
218    {
219        response_body.append(&mut chunk.context(HttpClientError {})?.to_vec());
220    }
221    Ok(response_body)
222}
223
224/// parse_host_url parses a url string, validates the string is a host url, and
225/// returns the parsed URL.
226pub(crate) fn parse_host_url(host: &str) -> Result<Uri, Error> {
227    host.parse::<Uri>().context(HttpUriError {})
228}
229
230/// get_docker_os returns the operating system based on the server header from the daemon.
231pub(crate) fn get_docker_os(server_header: &str) -> String {
232    let captures = HEADER_REGEXP.captures(server_header).unwrap();
233    captures
234        .get(1)
235        .map(|m| String::from(m.as_str()))
236        .unwrap_or_else(String::new)
237}
238
239/// get_filters_query returns a url query with "filters" query term, based on the
240/// filters provided.
241pub(crate) fn get_filters_query(
242    filters: types::filters::Args,
243) -> Result<HashMap<String, String>, Error> {
244    let mut query_params: HashMap<String, String> = HashMap::new();
245    if !filters.fields.is_empty() {
246        query_params.insert(
247            "filters".into(),
248            serde_json::to_string(&filters.fields).context(JsonSerializationError {})?,
249        );
250    }
251    Ok(query_params)
252}
253
254pub(crate) struct AsyncHttpBodyReader {
255    body: Mutex<Pin<Box<dyn HttpBody<Data = Bytes, Error = hyper::Error>>>>,
256}
257
258impl AsyncHttpBodyReader {
259    pub(crate) fn new(response: Response<Body>) -> Self {
260        Self {
261            body: Mutex::new(Box::pin(response.into_body())),
262        }
263    }
264}
265
266impl AsyncRead for AsyncHttpBodyReader {
267    fn poll_read(
268        self: Pin<&mut Self>,
269        cx: &mut Context<'_>,
270        buf: &mut [u8],
271    ) -> Poll<Result<usize, std::io::Error>> {
272        let mut body_reader = self.body.lock().unwrap();
273        if let Some(data) = ready!(body_reader.as_mut().poll_data(cx)) {
274            match data {
275                Ok(data) => Poll::Ready(io::Read::read(
276                    &mut Vec::from(data.as_ref()).as_mut_slice().as_ref(),
277                    buf,
278                )),
279                Err(err) => Poll::Ready(Err(io::Error::new(
280                    io::ErrorKind::Other,
281                    format!("{}", err),
282                ))),
283            }
284        } else {
285            Poll::Ready(Ok(0))
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    pub fn test_get_docker_os() {
296        let os = get_docker_os("Docker/19.03.5 (linux)");
297        assert_eq!(&os, "linux");
298    }
299}