docker_sdk/
docker.rs

1//! Main entrypoint for interacting with the Docker API.
2//!
3//! API Reference: <https://docs.docker.com/engine/api/v1.41/>
4
5use std::{collections::HashMap, env, io, path::Path};
6
7use futures_util::{stream::Stream, TryStreamExt};
8use hyper::{client::HttpConnector, Body, Client, Method};
9use mime::Mime;
10use serde::{de, Deserialize, Serialize};
11use url::form_urlencoded;
12
13use crate::{
14    container::Containers,
15    errors::{Error, Result},
16    image::Images,
17    network::Networks,
18    service::Services,
19    transport::{Headers, Payload, Transport},
20    volume::Volumes,
21    Uri,
22};
23
24#[cfg(feature = "chrono")]
25use crate::datetime::{datetime_from_nano_timestamp, datetime_from_unix_timestamp};
26#[cfg(feature = "chrono")]
27use chrono::{DateTime, Utc};
28
29#[cfg(feature = "tls")]
30use hyper_openssl::HttpsConnector;
31#[cfg(feature = "tls")]
32use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
33
34#[cfg(feature = "unix-socket")]
35use hyperlocal::UnixConnector;
36
37/// Entrypoint interface for communicating with docker daemon
38#[derive(Clone)]
39pub struct Docker {
40    transport: Transport,
41}
42
43fn get_http_connector() -> HttpConnector {
44    let mut http = HttpConnector::new();
45    http.enforce_http(false);
46
47    http
48}
49
50#[cfg(feature = "tls")]
51fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
52    let http = get_http_connector();
53    if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") {
54        // fixme: don't unwrap before you know what's in the box
55        // https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428
56        let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
57        connector.set_cipher_list("DEFAULT").unwrap();
58        let cert = &format!("{}/cert.pem", certs);
59        let key = &format!("{}/key.pem", certs);
60        connector
61            .set_certificate_file(&Path::new(cert), SslFiletype::PEM)
62            .unwrap();
63        connector
64            .set_private_key_file(&Path::new(key), SslFiletype::PEM)
65            .unwrap();
66        if env::var("DOCKER_TLS_VERIFY").is_ok() {
67            let ca = &format!("{}/ca.pem", certs);
68            connector.set_ca_file(&Path::new(ca)).unwrap();
69        }
70
71        // If we are attempting to connec to the docker daemon via tcp
72        // we need to convert the scheme to `https` to let hyper connect.
73        // Otherwise, hyper will reject the connection since it does not
74        // recongnize `tcp` as a valid `http` scheme.
75        let tcp_host_str = if tcp_host_str.contains("tcp://") {
76            tcp_host_str.replace("tcp://", "https://")
77        } else {
78            tcp_host_str
79        };
80
81        Docker {
82            transport: Transport::EncryptedTcp {
83                client: Client::builder()
84                    .build(HttpsConnector::with_connector(http, connector).unwrap()),
85                host: tcp_host_str,
86            },
87        }
88    } else {
89        Docker {
90            transport: Transport::Tcp {
91                client: Client::builder().build(http),
92                host: tcp_host_str,
93            },
94        }
95    }
96}
97
98#[cfg(not(feature = "tls"))]
99fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
100    let http = get_http_connector();
101    Docker {
102        transport: Transport::Tcp {
103            client: Client::builder().build(http),
104            host: tcp_host_str,
105        },
106    }
107}
108
109// https://docs.docker.com/reference/api/docker_remote_api_v1.17/
110impl Docker {
111    /// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`,
112    /// falling back on unix:///var/run/docker.sock
113    pub fn new() -> Docker {
114        match env::var("DOCKER_HOST").ok() {
115            Some(host) => {
116                #[cfg(feature = "unix-socket")]
117                if let Some(path) = host.strip_prefix("unix://") {
118                    return Docker::unix(path);
119                }
120                let host = host.parse().expect("invalid url");
121                Docker::host(host)
122            }
123            #[cfg(feature = "unix-socket")]
124            None => Docker::unix("/var/run/docker.sock"),
125            #[cfg(not(feature = "unix-socket"))]
126            None => {
127                let url = "tcp://localhost:2375";
128                let uri = url.parse::<Uri>().unwrap();
129                Docker::host(uri)
130            }
131        }
132    }
133
134    /// Creates a new docker instance for a docker host
135    /// listening on a given Unix socket.
136    #[cfg(feature = "unix-socket")]
137    pub fn unix<S>(socket_path: S) -> Docker
138    where
139        S: Into<String>,
140    {
141        Docker {
142            transport: Transport::Unix {
143                client: Client::builder()
144                    .pool_max_idle_per_host(0)
145                    .build(UnixConnector),
146                path: socket_path.into(),
147            },
148        }
149    }
150
151    /// constructs a new Docker instance for docker host listening at the given host url
152    pub fn host(host: Uri) -> Docker {
153        let tcp_host_str = format!(
154            "{}://{}:{}",
155            host.scheme_str().unwrap(),
156            host.host().unwrap().to_owned(),
157            host.port_u16().unwrap_or(80)
158        );
159
160        match host.scheme_str() {
161            #[cfg(feature = "unix-socket")]
162            Some("unix") => Docker {
163                transport: Transport::Unix {
164                    client: Client::builder().build(UnixConnector),
165                    path: host.path().to_owned(),
166                },
167            },
168
169            #[cfg(not(feature = "unix-socket"))]
170            Some("unix") => panic!("Unix socket support is disabled"),
171
172            _ => get_docker_for_tcp(tcp_host_str),
173        }
174    }
175
176    /// Exports an interface for interacting with docker images
177    pub fn images(&'_ self) -> Images<'_> {
178        Images::new(self)
179    }
180
181    /// Exports an interface for interacting with docker containers
182    pub fn containers(&'_ self) -> Containers<'_> {
183        Containers::new(self)
184    }
185
186    /// Exports an interface for interacting with docker services
187    pub fn services(&'_ self) -> Services<'_> {
188        Services::new(self)
189    }
190
191    pub fn networks(&'_ self) -> Networks<'_> {
192        Networks::new(self)
193    }
194
195    pub fn volumes(&'_ self) -> Volumes<'_> {
196        Volumes::new(self)
197    }
198
199    /// Returns version information associated with the docker daemon
200    pub async fn version(&self) -> Result<Version> {
201        self.get_json("/version").await
202    }
203
204    /// Returns information associated with the docker daemon
205    pub async fn info(&self) -> Result<Info> {
206        self.get_json("/info").await
207    }
208
209    /// Returns a simple ping response indicating the docker daemon is accessible
210    pub async fn ping(&self) -> Result<String> {
211        self.get("/_ping").await
212    }
213
214    /// Returns a stream of docker events
215    pub fn events<'docker>(
216        &'docker self,
217        opts: &EventsOptions,
218    ) -> impl Stream<Item = Result<Event>> + Unpin + 'docker {
219        let mut path = vec!["/events".to_owned()];
220        if let Some(query) = opts.serialize() {
221            path.push(query);
222        }
223        let reader = Box::pin(
224            self.stream_get(path.join("?"))
225                .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
226        )
227        .into_async_read();
228
229        let codec = futures_codec::LinesCodec {};
230
231        Box::pin(
232            futures_codec::FramedRead::new(reader, codec)
233                .map_err(Error::IO)
234                .and_then(|s: String| async move {
235                    serde_json::from_str(&s).map_err(Error::SerdeJsonError)
236                }),
237        )
238    }
239
240    //
241    // Utility functions to make requests
242    //
243
244    pub(crate) async fn get(
245        &self,
246        endpoint: &str,
247    ) -> Result<String> {
248        self.transport
249            .request(Method::GET, endpoint, Payload::None, Headers::None)
250            .await
251    }
252
253    pub(crate) async fn get_json<T: serde::de::DeserializeOwned>(
254        &self,
255        endpoint: &str,
256    ) -> Result<T> {
257        let raw_string = self
258            .transport
259            .request(Method::GET, endpoint, Payload::None, Headers::None)
260            .await?;
261
262        Ok(serde_json::from_str::<T>(&raw_string)?)
263    }
264
265    pub(crate) async fn post(
266        &self,
267        endpoint: &str,
268        body: Option<(Body, Mime)>,
269    ) -> Result<String> {
270        self.transport
271            .request(Method::POST, endpoint, body, Headers::None)
272            .await
273    }
274
275    pub(crate) async fn put(
276        &self,
277        endpoint: &str,
278        body: Option<(Body, Mime)>,
279    ) -> Result<String> {
280        self.transport
281            .request(Method::PUT, endpoint, body, Headers::None)
282            .await
283    }
284
285    pub(crate) async fn post_json<T, B>(
286        &self,
287        endpoint: impl AsRef<str>,
288        body: Option<(B, Mime)>,
289    ) -> Result<T>
290    where
291        T: serde::de::DeserializeOwned,
292        B: Into<Body>,
293    {
294        let string = self
295            .transport
296            .request(Method::POST, endpoint, body, Headers::None)
297            .await?;
298
299        Ok(serde_json::from_str::<T>(&string)?)
300    }
301
302    pub(crate) async fn post_json_headers<'a, T, B, H>(
303        &self,
304        endpoint: impl AsRef<str>,
305        body: Option<(B, Mime)>,
306        headers: Option<H>,
307    ) -> Result<T>
308    where
309        T: serde::de::DeserializeOwned,
310        B: Into<Body>,
311        H: IntoIterator<Item = (&'static str, String)> + 'a,
312    {
313        let string = self
314            .transport
315            .request(Method::POST, endpoint, body, headers)
316            .await?;
317
318        Ok(serde_json::from_str::<T>(&string)?)
319    }
320
321    pub(crate) async fn delete(
322        &self,
323        endpoint: &str,
324    ) -> Result<String> {
325        self.transport
326            .request(Method::DELETE, endpoint, Payload::None, Headers::None)
327            .await
328    }
329
330    pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>(
331        &self,
332        endpoint: &str,
333    ) -> Result<T> {
334        let string = self
335            .transport
336            .request(Method::DELETE, endpoint, Payload::None, Headers::None)
337            .await?;
338
339        Ok(serde_json::from_str::<T>(&string)?)
340    }
341
342    /// Send a streaming post request.
343    ///
344    /// Use stream_post_into_values if the endpoint returns JSON values
345    pub(crate) fn stream_post<'a, H>(
346        &'a self,
347        endpoint: impl AsRef<str> + 'a,
348        body: Option<(Body, Mime)>,
349        headers: Option<H>,
350    ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
351    where
352        H: IntoIterator<Item = (&'static str, String)> + 'a,
353    {
354        self.transport
355            .stream_chunks(Method::POST, endpoint, body, headers)
356    }
357
358    /// Send a streaming post request that returns a stream of JSON values
359    ///
360    /// Assumes that each received chunk contains one or more JSON values
361    pub(crate) fn stream_post_into<'a, H, T>(
362        &'a self,
363        endpoint: impl AsRef<str> + 'a,
364        body: Option<(Body, Mime)>,
365        headers: Option<H>,
366    ) -> impl Stream<Item = Result<T>> + 'a
367    where
368        H: IntoIterator<Item = (&'static str, String)> + 'a,
369        T: de::DeserializeOwned,
370    {
371        self.stream_post(endpoint, body, headers)
372            .and_then(|chunk| async move {
373                let stream = futures_util::stream::iter(
374                    serde_json::Deserializer::from_slice(&chunk)
375                        .into_iter()
376                        .collect::<Vec<_>>(),
377                )
378                .map_err(Error::from);
379
380                Ok(stream)
381            })
382            .try_flatten()
383    }
384
385    pub(crate) fn stream_get<'a>(
386        &'a self,
387        endpoint: impl AsRef<str> + Unpin + 'a,
388    ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
389        let headers = Some(Vec::default());
390        self.transport
391            .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
392    }
393
394    pub(crate) async fn stream_post_upgrade<'a>(
395        &'a self,
396        endpoint: impl AsRef<str> + 'a,
397        body: Option<(Body, Mime)>,
398    ) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> {
399        self.transport
400            .stream_upgrade(Method::POST, endpoint, body)
401            .await
402    }
403}
404
405impl Default for Docker {
406    fn default() -> Self {
407        Self::new()
408    }
409}
410
411/// Options for filtering streams of Docker events
412#[derive(Default, Debug)]
413pub struct EventsOptions {
414    params: HashMap<&'static str, String>,
415}
416
417impl EventsOptions {
418    pub fn builder() -> EventsOptionsBuilder {
419        EventsOptionsBuilder::default()
420    }
421
422    /// serialize options as a string. returns None if no options are defined
423    pub fn serialize(&self) -> Option<String> {
424        if self.params.is_empty() {
425            None
426        } else {
427            Some(
428                form_urlencoded::Serializer::new(String::new())
429                    .extend_pairs(&self.params)
430                    .finish(),
431            )
432        }
433    }
434}
435
436#[derive(Copy, Clone)]
437pub enum EventFilterType {
438    Container,
439    Image,
440    Volume,
441    Network,
442    Daemon,
443}
444
445fn event_filter_type_to_string(filter: EventFilterType) -> &'static str {
446    match filter {
447        EventFilterType::Container => "container",
448        EventFilterType::Image => "image",
449        EventFilterType::Volume => "volume",
450        EventFilterType::Network => "network",
451        EventFilterType::Daemon => "daemon",
452    }
453}
454
455/// Filter options for image listings
456pub enum EventFilter {
457    Container(String),
458    Event(String),
459    Image(String),
460    Label(String),
461    Type(EventFilterType),
462    Volume(String),
463    Network(String),
464    Daemon(String),
465}
466
467/// Builder interface for `EventOptions`
468#[derive(Default)]
469pub struct EventsOptionsBuilder {
470    params: HashMap<&'static str, String>,
471    events: Vec<String>,
472    containers: Vec<String>,
473    images: Vec<String>,
474    labels: Vec<String>,
475    volumes: Vec<String>,
476    networks: Vec<String>,
477    daemons: Vec<String>,
478    types: Vec<String>,
479}
480
481impl EventsOptionsBuilder {
482    /// Filter events since a given timestamp
483    pub fn since(
484        &mut self,
485        ts: &u64,
486    ) -> &mut Self {
487        self.params.insert("since", ts.to_string());
488        self
489    }
490
491    /// Filter events until a given timestamp
492    pub fn until(
493        &mut self,
494        ts: &u64,
495    ) -> &mut Self {
496        self.params.insert("until", ts.to_string());
497        self
498    }
499
500    pub fn filter(
501        &mut self,
502        filters: Vec<EventFilter>,
503    ) -> &mut Self {
504        let mut params = HashMap::new();
505        for f in filters {
506            match f {
507                EventFilter::Container(n) => {
508                    self.containers.push(n);
509                    params.insert("container", self.containers.clone())
510                }
511                EventFilter::Event(n) => {
512                    self.events.push(n);
513                    params.insert("event", self.events.clone())
514                }
515                EventFilter::Image(n) => {
516                    self.images.push(n);
517                    params.insert("image", self.images.clone())
518                }
519                EventFilter::Label(n) => {
520                    self.labels.push(n);
521                    params.insert("label", self.labels.clone())
522                }
523                EventFilter::Volume(n) => {
524                    self.volumes.push(n);
525                    params.insert("volume", self.volumes.clone())
526                }
527                EventFilter::Network(n) => {
528                    self.networks.push(n);
529                    params.insert("network", self.networks.clone())
530                }
531                EventFilter::Daemon(n) => {
532                    self.daemons.push(n);
533                    params.insert("daemon", self.daemons.clone())
534                }
535                EventFilter::Type(n) => {
536                    let event_type = event_filter_type_to_string(n).to_string();
537                    self.types.push(event_type);
538                    params.insert("type", self.types.clone())
539                }
540            };
541        }
542        self.params
543            .insert("filters", serde_json::to_string(&params).unwrap());
544        self
545    }
546
547    pub fn build(&self) -> EventsOptions {
548        EventsOptions {
549            params: self.params.clone(),
550        }
551    }
552}
553
554#[derive(Clone, Debug, Serialize, Deserialize)]
555#[serde(rename_all = "PascalCase")]
556pub struct Version {
557    pub version: String,
558    pub api_version: String,
559    pub git_commit: String,
560    pub go_version: String,
561    pub os: String,
562    pub arch: String,
563    pub kernel_version: String,
564    #[cfg(feature = "chrono")]
565    pub build_time: DateTime<Utc>,
566    #[cfg(not(feature = "chrono"))]
567    pub build_time: String,
568}
569
570#[derive(Clone, Debug, Serialize, Deserialize)]
571#[serde(rename_all = "PascalCase")]
572pub struct Info {
573    pub containers: u64,
574    pub images: u64,
575    pub driver: String,
576    pub docker_root_dir: String,
577    pub driver_status: Vec<Vec<String>>,
578    #[serde(rename = "ID")]
579    pub id: String,
580    pub kernel_version: String,
581    // pub Labels: Option<???>,
582    pub mem_total: u64,
583    pub memory_limit: bool,
584    #[serde(rename = "NCPU")]
585    pub n_cpu: u64,
586    pub n_events_listener: u64,
587    pub n_goroutines: u64,
588    pub name: String,
589    pub operating_system: String,
590    // pub RegistryConfig:???
591    pub swap_limit: bool,
592    pub system_time: Option<String>,
593}
594
595#[derive(Clone, Debug, Serialize, Deserialize)]
596pub struct Event {
597    #[serde(rename = "Type")]
598    pub typ: String,
599    #[serde(rename = "Action")]
600    pub action: String,
601    #[serde(rename = "Actor")]
602    pub actor: Actor,
603    pub status: Option<String>,
604    pub id: Option<String>,
605    pub from: Option<String>,
606    #[cfg(feature = "chrono")]
607    #[serde(deserialize_with = "datetime_from_unix_timestamp")]
608    pub time: DateTime<Utc>,
609    #[cfg(not(feature = "chrono"))]
610    pub time: u64,
611    #[cfg(feature = "chrono")]
612    #[serde(deserialize_with = "datetime_from_nano_timestamp", rename = "timeNano")]
613    pub time_nano: DateTime<Utc>,
614    #[cfg(not(feature = "chrono"))]
615    #[serde(rename = "timeNano")]
616    pub time_nano: u64,
617}
618
619#[derive(Clone, Debug, Serialize, Deserialize)]
620pub struct Actor {
621    #[serde(rename = "ID")]
622    pub id: String,
623    #[serde(rename = "Attributes")]
624    pub attributes: HashMap<String, String>,
625}
626
627#[cfg(test)]
628mod tests {
629    #[cfg(feature = "unix-socket")]
630    #[test]
631    fn unix_host_env() {
632        use super::Docker;
633        use std::env;
634        env::set_var("DOCKER_HOST", "unix:///docker.sock");
635        let d = Docker::new();
636        match d.transport {
637            crate::transport::Transport::Unix { path, .. } => {
638                assert_eq!(path, "/docker.sock");
639            }
640            _ => {
641                panic!("Expected transport to be unix.");
642            }
643        }
644        env::set_var("DOCKER_HOST", "http://localhost:8000");
645        let d = Docker::new();
646        match d.transport {
647            crate::transport::Transport::Tcp { host, .. } => {
648                assert_eq!(host, "http://localhost:8000");
649            }
650            _ => {
651                panic!("Expected transport to be http.");
652            }
653        }
654    }
655}