1use crate::{
5 conn::{get_http_connector, Headers, Payload, Transport},
6 errors::{Error, Result},
7 ApiVersion, Containers, Images, Networks, Volumes,
8};
9use containers_api::conn::RequestClient;
10
11#[cfg(feature = "swarm")]
12use crate::{Configs, Nodes, Plugins, Secrets, Services, Swarm, Tasks};
13
14#[cfg(feature = "tls")]
15use crate::conn::get_https_connector;
16#[cfg(unix)]
17use crate::conn::get_unix_connector;
18
19use futures_util::{
20 io::{AsyncRead, AsyncWrite},
21 stream::Stream,
22};
23use hyper::{body::Bytes, Body, Client, Response};
24use serde::de::DeserializeOwned;
25use std::future::Future;
26use std::path::{Path, PathBuf};
27use std::pin::Pin;
28
29#[derive(Debug, Clone)]
31pub struct Docker {
32 version: Option<ApiVersion>,
33 client: RequestClient<Error>,
34}
35
36impl Docker {
37 pub fn new(uri: impl AsRef<str>) -> Result<Self> {
51 Self::new_impl(uri.as_ref(), None)
52 }
53
54 pub fn new_versioned(uri: impl AsRef<str>, version: impl Into<ApiVersion>) -> Result<Self> {
56 Self::new_impl(uri.as_ref(), Some(version.into()))
57 }
58
59 fn new_impl(uri: &str, version: Option<ApiVersion>) -> Result<Self> {
60 let mut it = uri.split("://");
61
62 match it.next() {
63 #[cfg(unix)]
64 Some("unix") => {
65 if let Some(path) = it.next() {
66 Ok(Self::new_unix_impl(path, version))
67 } else {
68 Err(Error::MissingAuthority)
69 }
70 }
71 #[cfg(not(unix))]
72 Some("unix") => Err(Error::UnsupportedScheme("unix".to_string())),
73 Some("tcp") | Some("http") => {
74 if let Some(host) = it.next() {
75 Self::new_tcp_impl(host, version)
76 } else {
77 Err(Error::MissingAuthority)
78 }
79 }
80 Some(scheme) => Err(Error::UnsupportedScheme(scheme.to_string())),
81 None => unreachable!(), }
84 }
85
86 #[cfg(unix)]
87 #[cfg_attr(docsrs, doc(cfg(unix)))]
88 pub fn unix(socket_path: impl AsRef<Path>) -> Self {
96 Self::new_unix_impl(socket_path.as_ref(), None)
97 }
98
99 #[cfg(unix)]
100 #[cfg_attr(docsrs, doc(cfg(unix)))]
101 pub fn unix_versioned(socket_path: impl AsRef<Path>, version: impl Into<ApiVersion>) -> Self {
103 Self::new_unix_impl(socket_path.as_ref(), Some(version.into()))
104 }
105
106 #[cfg(unix)]
107 fn new_unix_impl(socket_path: impl Into<PathBuf>, version: Option<ApiVersion>) -> Self {
108 Docker {
109 version,
110 client: RequestClient::new(
111 Transport::Unix {
112 client: Client::builder()
113 .pool_max_idle_per_host(0)
114 .build(get_unix_connector()),
115 path: socket_path.into(),
116 },
117 Box::new(validate_response),
118 ),
119 }
120 }
121
122 #[cfg(feature = "tls")]
123 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
124 pub fn tls(host: impl AsRef<str>, cert_path: impl AsRef<Path>, verify: bool) -> Result<Self> {
138 Self::new_tls_impl(host.as_ref(), None, cert_path.as_ref(), verify)
139 }
140
141 #[cfg(feature = "tls")]
142 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
143 pub fn tls_versioned(
145 host: impl AsRef<str>,
146 version: impl Into<ApiVersion>,
147 cert_path: impl AsRef<Path>,
148 verify: bool,
149 ) -> Result<Self> {
150 Self::new_tls_impl(
151 host.as_ref(),
152 Some(version.into()),
153 cert_path.as_ref(),
154 verify,
155 )
156 }
157
158 #[cfg(feature = "tls")]
159 fn new_tls_impl(
160 host: &str,
161 version: Option<ApiVersion>,
162 cert_path: &Path,
163 verify: bool,
164 ) -> Result<Self> {
165 Ok(Self {
166 version,
167 client: RequestClient::new(
168 Transport::EncryptedTcp {
169 client: Client::builder().build(get_https_connector(cert_path, verify)?),
170 host: url::Url::parse(&format!("https://{host}")).map_err(Error::InvalidUrl)?,
171 },
172 Box::new(validate_response),
173 ),
174 })
175 }
176
177 pub fn tcp(host: impl AsRef<str>) -> Result<Self> {
188 Self::new_tcp_impl(host.as_ref(), None)
189 }
190
191 pub fn tcp_versioned(host: impl AsRef<str>, version: impl Into<ApiVersion>) -> Result<Self> {
193 Self::new_tcp_impl(host.as_ref(), Some(version.into()))
194 }
195
196 fn new_tcp_impl(host: &str, version: Option<ApiVersion>) -> Result<Self> {
197 Ok(Self {
198 version,
199 client: RequestClient::new(
200 Transport::Tcp {
201 client: Client::builder().build(get_http_connector()),
202 host: url::Url::parse(&format!("tcp://{host}")).map_err(Error::InvalidUrl)?,
203 },
204 Box::new(validate_response),
205 ),
206 })
207 }
208
209 pub fn images(&'_ self) -> Images {
211 Images::new(self.clone())
212 }
213
214 pub fn containers(&'_ self) -> Containers {
216 Containers::new(self.clone())
217 }
218
219 pub fn networks(&'_ self) -> Networks {
221 Networks::new(self.clone())
222 }
223
224 pub fn volumes(&'_ self) -> Volumes {
226 Volumes::new(self.clone())
227 }
228
229 pub async fn adjust_api_version(&mut self) -> Result<()> {
232 let server_version: ApiVersion = self.version().await.and_then(|v| {
233 v.api_version
234 .unwrap_or_default()
235 .parse::<ApiVersion>()
236 .map_err(Error::MalformedVersion)
237 })?;
238
239 self.version = Some(server_version);
240
241 Ok(())
242 }
243
244 fn make_endpoint(&self, endpoint: impl AsRef<str>) -> String {
251 if let Some(version) = self.version {
252 version.make_endpoint(endpoint)
253 } else {
254 endpoint.as_ref().to_owned()
255 }
256 }
257
258 pub(crate) async fn get(&self, endpoint: &str) -> Result<Response<Body>> {
259 self.client.get(self.make_endpoint(endpoint)).await
260 }
261
262 pub(crate) async fn get_json<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
263 self.client.get_json(self.make_endpoint(endpoint)).await
264 }
265
266 #[allow(dead_code)]
267 pub(crate) async fn post<B>(
268 &self,
269 endpoint: &str,
270 body: Payload<B>,
271 headers: Option<Headers>,
272 ) -> Result<Response<Body>>
273 where
274 B: Into<Body>,
275 {
276 self.client
277 .post(self.make_endpoint(endpoint), body, headers)
278 .await
279 }
280
281 pub(crate) async fn post_string<B>(
282 &self,
283 endpoint: &str,
284 body: Payload<B>,
285 headers: Option<Headers>,
286 ) -> Result<String>
287 where
288 B: Into<Body>,
289 {
290 self.client
291 .post_string(self.make_endpoint(endpoint), body, headers)
292 .await
293 }
294
295 pub(crate) async fn post_json<B, T>(
296 &self,
297 endpoint: impl AsRef<str>,
298 body: Payload<B>,
299 headers: Option<Headers>,
300 ) -> Result<T>
301 where
302 T: DeserializeOwned,
303 B: Into<Body>,
304 {
305 self.client
306 .post_json(self.make_endpoint(endpoint), body, headers)
307 .await
308 }
309
310 pub(crate) async fn put<B>(&self, endpoint: &str, body: Payload<B>) -> Result<String>
311 where
312 B: Into<Body>,
313 {
314 self.client
315 .put_string(self.make_endpoint(endpoint), body)
316 .await
317 }
318
319 pub(crate) async fn delete(&self, endpoint: &str) -> Result<String> {
320 self.client
321 .delete_string(self.make_endpoint(endpoint))
322 .await
323 }
324
325 pub(crate) async fn delete_json<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
326 self.client.delete_json(self.make_endpoint(endpoint)).await
327 }
328
329 pub(crate) async fn head(&self, endpoint: &str) -> Result<Response<Body>> {
330 self.client.head(self.make_endpoint(endpoint)).await
331 }
332
333 #[allow(dead_code)]
334 pub(crate) fn post_stream<'a, B>(
338 &'a self,
339 endpoint: impl AsRef<str> + 'a,
340 body: Payload<B>,
341 headers: Option<Headers>,
342 ) -> impl Stream<Item = Result<Bytes>> + 'a
343 where
344 B: Into<Body> + 'a,
345 {
346 self.client
347 .post_stream(self.make_endpoint(endpoint), body, headers)
348 }
349
350 pub(crate) fn post_into_stream<'a, B, T>(
354 &'a self,
355 endpoint: impl AsRef<str> + 'a,
356 body: Payload<B>,
357 headers: Option<Headers>,
358 ) -> impl Stream<Item = Result<T>> + 'a
359 where
360 B: Into<Body> + 'a,
361 T: DeserializeOwned + 'a,
362 {
363 self.client
364 .post_into_stream(self.make_endpoint(endpoint), body, headers)
365 }
366
367 pub(crate) fn get_stream<'a>(
368 &'a self,
369 endpoint: impl AsRef<str> + Unpin + 'a,
370 ) -> impl Stream<Item = Result<Bytes>> + 'a {
371 self.client.get_stream(self.make_endpoint(endpoint))
372 }
373
374 pub(crate) async fn post_upgrade_stream<B>(
375 self,
376 endpoint: impl AsRef<str>,
377 body: Payload<B>,
378 ) -> Result<impl AsyncRead + AsyncWrite>
379 where
380 B: Into<Body>,
381 {
382 let ep = self.make_endpoint(endpoint);
383 self.client.post_upgrade_stream(ep, body).await
384 }
385}
386
387fn validate_response(
388 response: Response<Body>,
389) -> Pin<Box<dyn Future<Output = Result<Response<Body>>> + Send + Sync>> {
390 use serde::{Deserialize, Serialize};
391 #[derive(Serialize, Deserialize)]
392 struct ErrorResponse {
393 message: String,
394 }
395
396 Box::pin(async move {
397 log::trace!(
398 "got response {} {:?}",
399 response.status(),
400 response.headers()
401 );
402 let status = response.status();
403
404 use crate::conn::{self, hyper::StatusCode};
405 match status {
406 StatusCode::OK
408 | StatusCode::CREATED
409 | StatusCode::SWITCHING_PROTOCOLS
410 | StatusCode::NO_CONTENT => Ok(response),
411 _ => {
413 let body = response.into_body();
414 let bytes = hyper::body::to_bytes(body)
415 .await
416 .map_err(conn::Error::from)?;
417 let message_body = String::from_utf8(bytes.to_vec()).map_err(conn::Error::from)?;
418 log::trace!("{message_body:#?}");
419 let message = serde_json::from_str::<ErrorResponse>(&message_body)
420 .map(|e| e.message)
421 .unwrap_or_else(|_| {
422 status
423 .canonical_reason()
424 .unwrap_or("unknown error code")
425 .to_owned()
426 });
427 Err(Error::Fault {
428 code: status,
429 message,
430 })
431 }
432 }
433 })
434}
435
436#[cfg(feature = "swarm")]
437impl Docker {
438 pub fn services(&'_ self) -> Services {
440 Services::new(self.clone())
441 }
442
443 pub fn configs(&'_ self) -> Configs {
445 Configs::new(self.clone())
446 }
447
448 pub fn tasks(&'_ self) -> Tasks {
450 Tasks::new(self.clone())
451 }
452
453 pub fn secrets(&'_ self) -> Secrets {
455 Secrets::new(self.clone())
456 }
457
458 pub fn swarm(&'_ self) -> Swarm {
460 Swarm::new(self.clone())
461 }
462
463 pub fn nodes(&'_ self) -> Nodes {
465 Nodes::new(self.clone())
466 }
467
468 pub fn plugins(&'_ self) -> Plugins {
470 Plugins::new(self.clone())
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::{Docker, Error};
477 #[test]
478 fn creates_correct_docker() {
479 let d = Docker::new("tcp://127.0.0.1:80");
480 d.unwrap();
481 let d = Docker::new("http://127.0.0.1:80");
482 d.unwrap();
483
484 #[cfg(unix)]
485 {
486 let d = Docker::new("unix://127.0.0.1:80");
487 d.unwrap();
488 }
489 #[cfg(not(unix))]
490 {
491 let d = Docker::new("unix://127.0.0.1:80");
492 assert!(d.is_err());
493 match d.unwrap_err() {
494 Error::UnsupportedScheme(scheme) if &scheme == "unix" => {}
495 e => panic!(r#"Expected Error::UnsupportedScheme("unix"), got {}"#, e),
496 }
497 }
498
499 let d = Docker::new("rand://127.0.0.1:80");
500 match d.unwrap_err() {
501 Error::UnsupportedScheme(scheme) if &scheme == "rand" => {}
502 e => panic!(r#"Expected Error::UnsupportedScheme("rand"), got {e}"#),
503 }
504
505 let d = Docker::new("invalid_uri");
506 match d.unwrap_err() {
507 Error::UnsupportedScheme(scheme) if &scheme == "invalid_uri" => {}
508 e => panic!(r#"Expected Error::UnsupportedScheme("invalid_uri"), got {e}"#),
509 }
510 let d = Docker::new("");
511 match d.unwrap_err() {
512 Error::UnsupportedScheme(scheme) if scheme.is_empty() => {}
513 e => panic!(r#"Expected Error::UnsupportedScheme(""), got {e}"#),
514 }
515 }
516}