docker_client_async/
lib.rs1use crate::error::*;
38use crate::opts::DockerEngineClientOption;
39use bytes::Bytes;
40use futures::ready;
41use futures::task::{Context, Poll};
42use hyper::body::HttpBody;
43use hyper::client::connect::Connect;
44use hyper::{Body, Client, Response, Uri};
45use hyperlocal::UnixConnector;
46use lazy_static::lazy_static;
47use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
48use regex::Regex;
49use snafu::ResultExt;
50use std::collections::HashMap;
51use std::io;
52use std::sync::Mutex;
53use std::time::Duration;
54use tokio::io::AsyncRead;
55use tokio::macros::support::Pin;
56use tokio::time::timeout;
57
58pub mod container;
59pub mod error;
60pub mod network;
61pub mod opts;
62pub mod types;
63pub mod version;
64pub mod volume;
65
66lazy_static! {
67 static ref HEADER_REGEXP: Regex = Regex::new(r"\ADocker/.+\s\((.+)\)\z").unwrap();
68}
69
70pub type LocalDockerEngineClient = DockerEngineClient<UnixConnector>;
72
73#[derive(Clone, Debug)]
76pub struct DockerEngineClient<C: Connect + Clone + Send + Sync + 'static> {
77 pub(crate) scheme: Option<String>,
79 pub(crate) host: Option<String>,
81 pub(crate) proto: Option<String>,
83 pub(crate) addr: Option<String>,
85 pub(crate) base_path: Option<String>,
87 pub(crate) timeout: Duration,
89 pub(crate) client: Option<Client<C, Body>>,
91 pub(crate) version: String,
93 pub(crate) custom_http_headers: Option<HashMap<String, String>>,
95 pub(crate) manual_override: bool,
97 pub(crate) negotiate_version: bool,
102 pub(crate) negotiated: bool,
104}
105
106impl<C: Connect + Clone + Send + Sync + 'static> Default for DockerEngineClient<C> {
107 fn default() -> Self {
108 Self {
109 scheme: Some("http".into()),
110 host: Some("unix:///var/run/docker.sock".into()),
111 proto: Some("unix".into()),
112 addr: Some("/var/run/docker.sock".into()),
113 base_path: None,
114 timeout: Duration::from_millis(5_000),
115 client: None,
116 version: "1.40".into(),
117 custom_http_headers: None,
118 manual_override: false,
119 negotiate_version: false,
120 negotiated: false,
121 }
122 }
123}
124
125impl<C: Connect + Clone + Send + Sync + 'static> DockerEngineClient<C> {
126 pub fn new_client_with_opts(
127 options: Option<Vec<DockerEngineClientOption<C>>>,
128 ) -> Result<DockerEngineClient<C>, Error> {
129 let mut client: DockerEngineClient<C> = Default::default();
130 if let Some(client_options) = options {
131 for client_option in &client_options {
132 client_option(&mut client)?;
133 }
134 }
135 Ok(client)
136 }
137
138 fn request_uri(
140 &self,
141 path: &str,
142 query_params: Option<HashMap<String, String>>,
143 ) -> Result<Uri, Error> {
144 let encoded_query_params = query_params
145 .map(|query_params| {
146 let mut encoded_query_params = String::from("?");
147 for (key, value) in query_params {
148 encoded_query_params.push_str(&key);
149 encoded_query_params.push('=');
150 encoded_query_params
151 .push_str(&utf8_percent_encode(&value, NON_ALPHANUMERIC).to_string());
152 encoded_query_params.push('&');
153 }
154 encoded_query_params.trim_end_matches('&').to_string()
155 })
156 .unwrap_or_else(String::new);
157
158 let path_and_query_params = format!(
159 "{}{}{}",
160 self.base_path
161 .as_ref()
162 .map(String::from)
163 .unwrap_or_else(|| format!("/v{}", self.version)),
164 path,
165 encoded_query_params
166 );
167
168 Ok(match self.proto.as_ref().unwrap().as_str() {
169 "tcp" => {
170 let scheme = self
171 .scheme
172 .as_ref()
173 .map(String::from)
174 .unwrap_or_else(|| "http".to_string());
175 let address = self
176 .addr
177 .as_ref()
178 .map(String::from)
179 .unwrap_or_else(|| "localhost".to_string());
180 format!("{}://{}{}", scheme, address, path_and_query_params)
181 .parse()
182 .unwrap()
183 }
184 "unix" => {
185 hyperlocal::Uri::new(self.addr.as_ref().unwrap(), &path_and_query_params).into()
186 }
187 _ => unimplemented!(),
188 })
189 }
190}
191
192pub(crate) async fn read_response_body(
194 mut response: Response<Body>,
195 read_timeout: Duration,
196) -> Result<String, Error> {
197 let mut response_body = String::new();
198 while let Some(chunk) = timeout(read_timeout, response.body_mut().data())
199 .await
200 .context(HttpClientTimeoutError {})?
201 {
202 response_body.push_str(&String::from_utf8_lossy(
203 &chunk.context(HttpClientError {})?,
204 ));
205 }
206 Ok(response_body)
207}
208
209pub(crate) async fn read_response_body_raw(
211 mut response: Response<Body>,
212 read_timeout: Duration,
213) -> Result<Vec<u8>, Error> {
214 let mut response_body = Vec::new();
215 while let Some(chunk) = timeout(read_timeout, response.body_mut().data())
216 .await
217 .context(HttpClientTimeoutError {})?
218 {
219 response_body.append(&mut chunk.context(HttpClientError {})?.to_vec());
220 }
221 Ok(response_body)
222}
223
224pub(crate) fn parse_host_url(host: &str) -> Result<Uri, Error> {
227 host.parse::<Uri>().context(HttpUriError {})
228}
229
230pub(crate) fn get_docker_os(server_header: &str) -> String {
232 let captures = HEADER_REGEXP.captures(server_header).unwrap();
233 captures
234 .get(1)
235 .map(|m| String::from(m.as_str()))
236 .unwrap_or_else(String::new)
237}
238
239pub(crate) fn get_filters_query(
242 filters: types::filters::Args,
243) -> Result<HashMap<String, String>, Error> {
244 let mut query_params: HashMap<String, String> = HashMap::new();
245 if !filters.fields.is_empty() {
246 query_params.insert(
247 "filters".into(),
248 serde_json::to_string(&filters.fields).context(JsonSerializationError {})?,
249 );
250 }
251 Ok(query_params)
252}
253
254pub(crate) struct AsyncHttpBodyReader {
255 body: Mutex<Pin<Box<dyn HttpBody<Data = Bytes, Error = hyper::Error>>>>,
256}
257
258impl AsyncHttpBodyReader {
259 pub(crate) fn new(response: Response<Body>) -> Self {
260 Self {
261 body: Mutex::new(Box::pin(response.into_body())),
262 }
263 }
264}
265
266impl AsyncRead for AsyncHttpBodyReader {
267 fn poll_read(
268 self: Pin<&mut Self>,
269 cx: &mut Context<'_>,
270 buf: &mut [u8],
271 ) -> Poll<Result<usize, std::io::Error>> {
272 let mut body_reader = self.body.lock().unwrap();
273 if let Some(data) = ready!(body_reader.as_mut().poll_data(cx)) {
274 match data {
275 Ok(data) => Poll::Ready(io::Read::read(
276 &mut Vec::from(data.as_ref()).as_mut_slice().as_ref(),
277 buf,
278 )),
279 Err(err) => Poll::Ready(Err(io::Error::new(
280 io::ErrorKind::Other,
281 format!("{}", err),
282 ))),
283 }
284 } else {
285 Poll::Ready(Ok(0))
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 pub fn test_get_docker_os() {
296 let os = get_docker_os("Docker/19.03.5 (linux)");
297 assert_eq!(&os, "linux");
298 }
299}