1use std::{collections::HashMap, env, io, path::Path};
6
7use futures_util::{stream::Stream, TryStreamExt};
8use hyper::{client::HttpConnector, Body, Client, Method};
9use mime::Mime;
10use serde::{de, Deserialize, Serialize};
11use url::form_urlencoded;
12
13use crate::{
14 container::Containers,
15 errors::{Error, Result},
16 image::Images,
17 network::Networks,
18 service::Services,
19 transport::{Headers, Payload, Transport},
20 volume::Volumes,
21 Uri,
22};
23
24#[cfg(feature = "chrono")]
25use crate::datetime::{datetime_from_nano_timestamp, datetime_from_unix_timestamp};
26#[cfg(feature = "chrono")]
27use chrono::{DateTime, Utc};
28
29#[cfg(feature = "tls")]
30use hyper_openssl::HttpsConnector;
31#[cfg(feature = "tls")]
32use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
33
34#[cfg(feature = "unix-socket")]
35use hyperlocal::UnixConnector;
36
37#[derive(Clone)]
39pub struct Docker {
40 transport: Transport,
41}
42
43fn get_http_connector() -> HttpConnector {
44 let mut http = HttpConnector::new();
45 http.enforce_http(false);
46
47 http
48}
49
50#[cfg(feature = "tls")]
51fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
52 let http = get_http_connector();
53 if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") {
54 let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
57 connector.set_cipher_list("DEFAULT").unwrap();
58 let cert = &format!("{}/cert.pem", certs);
59 let key = &format!("{}/key.pem", certs);
60 connector
61 .set_certificate_file(&Path::new(cert), SslFiletype::PEM)
62 .unwrap();
63 connector
64 .set_private_key_file(&Path::new(key), SslFiletype::PEM)
65 .unwrap();
66 if env::var("DOCKER_TLS_VERIFY").is_ok() {
67 let ca = &format!("{}/ca.pem", certs);
68 connector.set_ca_file(&Path::new(ca)).unwrap();
69 }
70
71 let tcp_host_str = if tcp_host_str.contains("tcp://") {
76 tcp_host_str.replace("tcp://", "https://")
77 } else {
78 tcp_host_str
79 };
80
81 Docker {
82 transport: Transport::EncryptedTcp {
83 client: Client::builder()
84 .build(HttpsConnector::with_connector(http, connector).unwrap()),
85 host: tcp_host_str,
86 },
87 }
88 } else {
89 Docker {
90 transport: Transport::Tcp {
91 client: Client::builder().build(http),
92 host: tcp_host_str,
93 },
94 }
95 }
96}
97
98#[cfg(not(feature = "tls"))]
99fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
100 let http = get_http_connector();
101 Docker {
102 transport: Transport::Tcp {
103 client: Client::builder().build(http),
104 host: tcp_host_str,
105 },
106 }
107}
108
109impl Docker {
111 pub fn new() -> Docker {
114 match env::var("DOCKER_HOST").ok() {
115 Some(host) => {
116 #[cfg(feature = "unix-socket")]
117 if let Some(path) = host.strip_prefix("unix://") {
118 return Docker::unix(path);
119 }
120 let host = host.parse().expect("invalid url");
121 Docker::host(host)
122 }
123 #[cfg(feature = "unix-socket")]
124 None => Docker::unix("/var/run/docker.sock"),
125 #[cfg(not(feature = "unix-socket"))]
126 None => {
127 let url = "tcp://localhost:2375";
128 let uri = url.parse::<Uri>().unwrap();
129 Docker::host(uri)
130 }
131 }
132 }
133
134 #[cfg(feature = "unix-socket")]
137 pub fn unix<S>(socket_path: S) -> Docker
138 where
139 S: Into<String>,
140 {
141 Docker {
142 transport: Transport::Unix {
143 client: Client::builder()
144 .pool_max_idle_per_host(0)
145 .build(UnixConnector),
146 path: socket_path.into(),
147 },
148 }
149 }
150
151 pub fn host(host: Uri) -> Docker {
153 let tcp_host_str = format!(
154 "{}://{}:{}",
155 host.scheme_str().unwrap(),
156 host.host().unwrap().to_owned(),
157 host.port_u16().unwrap_or(80)
158 );
159
160 match host.scheme_str() {
161 #[cfg(feature = "unix-socket")]
162 Some("unix") => Docker {
163 transport: Transport::Unix {
164 client: Client::builder().build(UnixConnector),
165 path: host.path().to_owned(),
166 },
167 },
168
169 #[cfg(not(feature = "unix-socket"))]
170 Some("unix") => panic!("Unix socket support is disabled"),
171
172 _ => get_docker_for_tcp(tcp_host_str),
173 }
174 }
175
176 pub fn images(&'_ self) -> Images<'_> {
178 Images::new(self)
179 }
180
181 pub fn containers(&'_ self) -> Containers<'_> {
183 Containers::new(self)
184 }
185
186 pub fn services(&'_ self) -> Services<'_> {
188 Services::new(self)
189 }
190
191 pub fn networks(&'_ self) -> Networks<'_> {
192 Networks::new(self)
193 }
194
195 pub fn volumes(&'_ self) -> Volumes<'_> {
196 Volumes::new(self)
197 }
198
199 pub async fn version(&self) -> Result<Version> {
201 self.get_json("/version").await
202 }
203
204 pub async fn info(&self) -> Result<Info> {
206 self.get_json("/info").await
207 }
208
209 pub async fn ping(&self) -> Result<String> {
211 self.get("/_ping").await
212 }
213
214 pub fn events<'docker>(
216 &'docker self,
217 opts: &EventsOptions,
218 ) -> impl Stream<Item = Result<Event>> + Unpin + 'docker {
219 let mut path = vec!["/events".to_owned()];
220 if let Some(query) = opts.serialize() {
221 path.push(query);
222 }
223 let reader = Box::pin(
224 self.stream_get(path.join("?"))
225 .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
226 )
227 .into_async_read();
228
229 let codec = futures_codec::LinesCodec {};
230
231 Box::pin(
232 futures_codec::FramedRead::new(reader, codec)
233 .map_err(Error::IO)
234 .and_then(|s: String| async move {
235 serde_json::from_str(&s).map_err(Error::SerdeJsonError)
236 }),
237 )
238 }
239
240 pub(crate) async fn get(
245 &self,
246 endpoint: &str,
247 ) -> Result<String> {
248 self.transport
249 .request(Method::GET, endpoint, Payload::None, Headers::None)
250 .await
251 }
252
253 pub(crate) async fn get_json<T: serde::de::DeserializeOwned>(
254 &self,
255 endpoint: &str,
256 ) -> Result<T> {
257 let raw_string = self
258 .transport
259 .request(Method::GET, endpoint, Payload::None, Headers::None)
260 .await?;
261
262 Ok(serde_json::from_str::<T>(&raw_string)?)
263 }
264
265 pub(crate) async fn post(
266 &self,
267 endpoint: &str,
268 body: Option<(Body, Mime)>,
269 ) -> Result<String> {
270 self.transport
271 .request(Method::POST, endpoint, body, Headers::None)
272 .await
273 }
274
275 pub(crate) async fn put(
276 &self,
277 endpoint: &str,
278 body: Option<(Body, Mime)>,
279 ) -> Result<String> {
280 self.transport
281 .request(Method::PUT, endpoint, body, Headers::None)
282 .await
283 }
284
285 pub(crate) async fn post_json<T, B>(
286 &self,
287 endpoint: impl AsRef<str>,
288 body: Option<(B, Mime)>,
289 ) -> Result<T>
290 where
291 T: serde::de::DeserializeOwned,
292 B: Into<Body>,
293 {
294 let string = self
295 .transport
296 .request(Method::POST, endpoint, body, Headers::None)
297 .await?;
298
299 Ok(serde_json::from_str::<T>(&string)?)
300 }
301
302 pub(crate) async fn post_json_headers<'a, T, B, H>(
303 &self,
304 endpoint: impl AsRef<str>,
305 body: Option<(B, Mime)>,
306 headers: Option<H>,
307 ) -> Result<T>
308 where
309 T: serde::de::DeserializeOwned,
310 B: Into<Body>,
311 H: IntoIterator<Item = (&'static str, String)> + 'a,
312 {
313 let string = self
314 .transport
315 .request(Method::POST, endpoint, body, headers)
316 .await?;
317
318 Ok(serde_json::from_str::<T>(&string)?)
319 }
320
321 pub(crate) async fn delete(
322 &self,
323 endpoint: &str,
324 ) -> Result<String> {
325 self.transport
326 .request(Method::DELETE, endpoint, Payload::None, Headers::None)
327 .await
328 }
329
330 pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>(
331 &self,
332 endpoint: &str,
333 ) -> Result<T> {
334 let string = self
335 .transport
336 .request(Method::DELETE, endpoint, Payload::None, Headers::None)
337 .await?;
338
339 Ok(serde_json::from_str::<T>(&string)?)
340 }
341
342 pub(crate) fn stream_post<'a, H>(
346 &'a self,
347 endpoint: impl AsRef<str> + 'a,
348 body: Option<(Body, Mime)>,
349 headers: Option<H>,
350 ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
351 where
352 H: IntoIterator<Item = (&'static str, String)> + 'a,
353 {
354 self.transport
355 .stream_chunks(Method::POST, endpoint, body, headers)
356 }
357
358 pub(crate) fn stream_post_into<'a, H, T>(
362 &'a self,
363 endpoint: impl AsRef<str> + 'a,
364 body: Option<(Body, Mime)>,
365 headers: Option<H>,
366 ) -> impl Stream<Item = Result<T>> + 'a
367 where
368 H: IntoIterator<Item = (&'static str, String)> + 'a,
369 T: de::DeserializeOwned,
370 {
371 self.stream_post(endpoint, body, headers)
372 .and_then(|chunk| async move {
373 let stream = futures_util::stream::iter(
374 serde_json::Deserializer::from_slice(&chunk)
375 .into_iter()
376 .collect::<Vec<_>>(),
377 )
378 .map_err(Error::from);
379
380 Ok(stream)
381 })
382 .try_flatten()
383 }
384
385 pub(crate) fn stream_get<'a>(
386 &'a self,
387 endpoint: impl AsRef<str> + Unpin + 'a,
388 ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
389 let headers = Some(Vec::default());
390 self.transport
391 .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
392 }
393
394 pub(crate) async fn stream_post_upgrade<'a>(
395 &'a self,
396 endpoint: impl AsRef<str> + 'a,
397 body: Option<(Body, Mime)>,
398 ) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> {
399 self.transport
400 .stream_upgrade(Method::POST, endpoint, body)
401 .await
402 }
403}
404
405impl Default for Docker {
406 fn default() -> Self {
407 Self::new()
408 }
409}
410
411#[derive(Default, Debug)]
413pub struct EventsOptions {
414 params: HashMap<&'static str, String>,
415}
416
417impl EventsOptions {
418 pub fn builder() -> EventsOptionsBuilder {
419 EventsOptionsBuilder::default()
420 }
421
422 pub fn serialize(&self) -> Option<String> {
424 if self.params.is_empty() {
425 None
426 } else {
427 Some(
428 form_urlencoded::Serializer::new(String::new())
429 .extend_pairs(&self.params)
430 .finish(),
431 )
432 }
433 }
434}
435
436#[derive(Copy, Clone)]
437pub enum EventFilterType {
438 Container,
439 Image,
440 Volume,
441 Network,
442 Daemon,
443}
444
445fn event_filter_type_to_string(filter: EventFilterType) -> &'static str {
446 match filter {
447 EventFilterType::Container => "container",
448 EventFilterType::Image => "image",
449 EventFilterType::Volume => "volume",
450 EventFilterType::Network => "network",
451 EventFilterType::Daemon => "daemon",
452 }
453}
454
455pub enum EventFilter {
457 Container(String),
458 Event(String),
459 Image(String),
460 Label(String),
461 Type(EventFilterType),
462 Volume(String),
463 Network(String),
464 Daemon(String),
465}
466
467#[derive(Default)]
469pub struct EventsOptionsBuilder {
470 params: HashMap<&'static str, String>,
471 events: Vec<String>,
472 containers: Vec<String>,
473 images: Vec<String>,
474 labels: Vec<String>,
475 volumes: Vec<String>,
476 networks: Vec<String>,
477 daemons: Vec<String>,
478 types: Vec<String>,
479}
480
481impl EventsOptionsBuilder {
482 pub fn since(
484 &mut self,
485 ts: &u64,
486 ) -> &mut Self {
487 self.params.insert("since", ts.to_string());
488 self
489 }
490
491 pub fn until(
493 &mut self,
494 ts: &u64,
495 ) -> &mut Self {
496 self.params.insert("until", ts.to_string());
497 self
498 }
499
500 pub fn filter(
501 &mut self,
502 filters: Vec<EventFilter>,
503 ) -> &mut Self {
504 let mut params = HashMap::new();
505 for f in filters {
506 match f {
507 EventFilter::Container(n) => {
508 self.containers.push(n);
509 params.insert("container", self.containers.clone())
510 }
511 EventFilter::Event(n) => {
512 self.events.push(n);
513 params.insert("event", self.events.clone())
514 }
515 EventFilter::Image(n) => {
516 self.images.push(n);
517 params.insert("image", self.images.clone())
518 }
519 EventFilter::Label(n) => {
520 self.labels.push(n);
521 params.insert("label", self.labels.clone())
522 }
523 EventFilter::Volume(n) => {
524 self.volumes.push(n);
525 params.insert("volume", self.volumes.clone())
526 }
527 EventFilter::Network(n) => {
528 self.networks.push(n);
529 params.insert("network", self.networks.clone())
530 }
531 EventFilter::Daemon(n) => {
532 self.daemons.push(n);
533 params.insert("daemon", self.daemons.clone())
534 }
535 EventFilter::Type(n) => {
536 let event_type = event_filter_type_to_string(n).to_string();
537 self.types.push(event_type);
538 params.insert("type", self.types.clone())
539 }
540 };
541 }
542 self.params
543 .insert("filters", serde_json::to_string(¶ms).unwrap());
544 self
545 }
546
547 pub fn build(&self) -> EventsOptions {
548 EventsOptions {
549 params: self.params.clone(),
550 }
551 }
552}
553
554#[derive(Clone, Debug, Serialize, Deserialize)]
555#[serde(rename_all = "PascalCase")]
556pub struct Version {
557 pub version: String,
558 pub api_version: String,
559 pub git_commit: String,
560 pub go_version: String,
561 pub os: String,
562 pub arch: String,
563 pub kernel_version: String,
564 #[cfg(feature = "chrono")]
565 pub build_time: DateTime<Utc>,
566 #[cfg(not(feature = "chrono"))]
567 pub build_time: String,
568}
569
570#[derive(Clone, Debug, Serialize, Deserialize)]
571#[serde(rename_all = "PascalCase")]
572pub struct Info {
573 pub containers: u64,
574 pub images: u64,
575 pub driver: String,
576 pub docker_root_dir: String,
577 pub driver_status: Vec<Vec<String>>,
578 #[serde(rename = "ID")]
579 pub id: String,
580 pub kernel_version: String,
581 pub mem_total: u64,
583 pub memory_limit: bool,
584 #[serde(rename = "NCPU")]
585 pub n_cpu: u64,
586 pub n_events_listener: u64,
587 pub n_goroutines: u64,
588 pub name: String,
589 pub operating_system: String,
590 pub swap_limit: bool,
592 pub system_time: Option<String>,
593}
594
595#[derive(Clone, Debug, Serialize, Deserialize)]
596pub struct Event {
597 #[serde(rename = "Type")]
598 pub typ: String,
599 #[serde(rename = "Action")]
600 pub action: String,
601 #[serde(rename = "Actor")]
602 pub actor: Actor,
603 pub status: Option<String>,
604 pub id: Option<String>,
605 pub from: Option<String>,
606 #[cfg(feature = "chrono")]
607 #[serde(deserialize_with = "datetime_from_unix_timestamp")]
608 pub time: DateTime<Utc>,
609 #[cfg(not(feature = "chrono"))]
610 pub time: u64,
611 #[cfg(feature = "chrono")]
612 #[serde(deserialize_with = "datetime_from_nano_timestamp", rename = "timeNano")]
613 pub time_nano: DateTime<Utc>,
614 #[cfg(not(feature = "chrono"))]
615 #[serde(rename = "timeNano")]
616 pub time_nano: u64,
617}
618
619#[derive(Clone, Debug, Serialize, Deserialize)]
620pub struct Actor {
621 #[serde(rename = "ID")]
622 pub id: String,
623 #[serde(rename = "Attributes")]
624 pub attributes: HashMap<String, String>,
625}
626
627#[cfg(test)]
628mod tests {
629 #[cfg(feature = "unix-socket")]
630 #[test]
631 fn unix_host_env() {
632 use super::Docker;
633 use std::env;
634 env::set_var("DOCKER_HOST", "unix:///docker.sock");
635 let d = Docker::new();
636 match d.transport {
637 crate::transport::Transport::Unix { path, .. } => {
638 assert_eq!(path, "/docker.sock");
639 }
640 _ => {
641 panic!("Expected transport to be unix.");
642 }
643 }
644 env::set_var("DOCKER_HOST", "http://localhost:8000");
645 let d = Docker::new();
646 match d.transport {
647 crate::transport::Transport::Tcp { host, .. } => {
648 assert_eq!(host, "http://localhost:8000");
649 }
650 _ => {
651 panic!("Expected transport to be http.");
652 }
653 }
654 }
655}