docker_client_async/
container.rs

1/*
2 * Copyright 2020 Damian Peckett <damian@pecke.tt>.
3 * Copyright 2013-2018 Docker, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! Docker container api implementation.
19
20use crate::error::*;
21use crate::types::client::*;
22use crate::types::container::{Container, ContainerConfig, UpdateConfig, WaitCondition};
23use crate::types::filters;
24use crate::types::stats::Stats;
25use crate::{
26    get_docker_os, get_filters_query, read_response_body, read_response_body_raw, version,
27    AsyncHttpBodyReader, DockerEngineClient,
28};
29use bytes::Bytes;
30use futures::ready;
31use futures::task::{Context, Poll};
32use futures::Stream;
33use hyper::client::connect::Connect;
34use hyper::{Body, Method, Request, Response};
35use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
36use snafu::ensure;
37use snafu::OptionExt;
38use snafu::ResultExt;
39use std::collections::HashMap;
40use std::time::{Duration, SystemTime};
41use tokio::io::AsyncBufReadExt;
42use tokio::io::BufReader;
43use tokio::io::Lines;
44use tokio::macros::support::Pin;
45use tokio::time::timeout;
46
47impl<C: Connect + Clone + Send + Sync + 'static> DockerEngineClient<C> {
48    /// container_list returns the list of containers in the docker host.
49    pub async fn container_list(
50        &self,
51        options: Option<ContainerListOptions>,
52    ) -> Result<Vec<Container>, Error> {
53        let mut query_params: HashMap<String, String> = HashMap::new();
54        if let Some(options) = options {
55            if options.size {
56                query_params.insert("size".into(), "1".into());
57            }
58            if options.all {
59                query_params.insert("all".into(), "1".into());
60            }
61            if let Some(limit) = options.limit {
62                query_params.insert("limit".into(), limit.to_string());
63            }
64            if let Some(filters) = options.filters.as_ref() {
65                query_params.insert(
66                    "filters".into(),
67                    serde_json::to_string(&filters.fields).context(JsonSerializationError {})?,
68                );
69            }
70        }
71        let query_params = if !query_params.is_empty() {
72            Some(query_params)
73        } else {
74            None
75        };
76
77        let request = Request::builder()
78            .method(Method::GET)
79            .uri(self.request_uri("/containers/json", query_params)?)
80            .header("Accept", "application/json")
81            .body(Body::empty())
82            .context(HttpClientRequestBuilderError {})?;
83
84        let client = self.client.as_ref().unwrap();
85        let response = timeout(self.timeout, client.request(request))
86            .await
87            .context(HttpClientTimeoutError {})?
88            .context(HttpClientError {})?;
89        ensure!(
90            response.status().is_success(),
91            HttpClientResponseError {
92                status: response.status().as_u16()
93            }
94        );
95        let response_body = read_response_body(response, self.timeout).await?;
96        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
97    }
98
99    /// container_create creates a new container based in the given configuration.
100    /// It can be associated with a name, but it's not mandatory.
101    pub async fn container_create(
102        &self,
103        mut config: ContainerConfig,
104        container_name: Option<String>,
105    ) -> Result<ContainerCreateCreatedBody, Error> {
106        // When using API 1.24 and under, the client is responsible for removing the container
107        if version::less_than(&self.version, "1.25") {
108            if let Some(mut host_config) = config.host_config.take() {
109                host_config.auto_remove = Some(false);
110                config.host_config = Some(host_config);
111            }
112        }
113
114        let mut query_params: HashMap<String, String> = HashMap::new();
115        if let Some(container_name) = &container_name {
116            query_params.insert("name".into(), container_name.clone());
117        }
118        let query_params = if !query_params.is_empty() {
119            Some(query_params)
120        } else {
121            None
122        };
123
124        let request = Request::builder()
125            .method(Method::POST)
126            .uri(self.request_uri("/containers/create", query_params)?)
127            .header("Content-Type", "application/json")
128            .header("Accept", "application/json")
129            .body(Body::from(
130                serde_json::to_string(&config).context(JsonSerializationError {})?,
131            ))
132            .context(HttpClientRequestBuilderError {})?;
133
134        let client = self.client.as_ref().unwrap();
135        let response = timeout(self.timeout, client.request(request))
136            .await
137            .context(HttpClientTimeoutError {})?
138            .context(HttpClientError {})?;
139        ensure!(
140            response.status().is_success(),
141            HttpClientResponseError {
142                status: response.status().as_u16()
143            }
144        );
145        let response_body = read_response_body(response, self.timeout).await?;
146        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
147    }
148
149    /// container_inspect returns the container information.
150    pub async fn container_inspect(&self, container_id: &str) -> Result<Container, Error> {
151        let request = Request::builder()
152            .method(Method::GET)
153            .uri(self.request_uri(
154                &format!(
155                    "/containers/{}/json",
156                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
157                ),
158                None,
159            )?)
160            .header("Accept", "application/json")
161            .body(Body::empty())
162            .context(HttpClientRequestBuilderError {})?;
163
164        let client = self.client.as_ref().unwrap();
165        let response = timeout(self.timeout, client.request(request))
166            .await
167            .context(HttpClientTimeoutError {})?
168            .context(HttpClientError {})?;
169        ensure!(
170            response.status().is_success(),
171            HttpClientResponseError {
172                status: response.status().as_u16()
173            }
174        );
175        let response_body = read_response_body(response, self.timeout).await?;
176        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
177    }
178
179    /// container_top shows process information from within a container.
180    pub async fn container_top(
181        &self,
182        container_id: &str,
183        arguments: Option<Vec<String>>,
184    ) -> Result<ContainerTopOKBody, Error> {
185        let mut query_params: HashMap<String, String> = HashMap::new();
186        if let Some(arguments) = arguments {
187            query_params.insert("ps_args".into(), arguments.join(" "));
188        }
189        let query_params = if !query_params.is_empty() {
190            Some(query_params)
191        } else {
192            None
193        };
194
195        let request = Request::builder()
196            .method(Method::GET)
197            .uri(self.request_uri(
198                &format!(
199                    "/containers/{}/top",
200                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
201                ),
202                query_params,
203            )?)
204            .header("Accept", "application/json")
205            .body(Body::empty())
206            .context(HttpClientRequestBuilderError {})?;
207
208        let client = self.client.as_ref().unwrap();
209        let response = timeout(self.timeout, client.request(request))
210            .await
211            .context(HttpClientTimeoutError {})?
212            .context(HttpClientError {})?;
213        ensure!(
214            response.status().is_success(),
215            HttpClientResponseError {
216                status: response.status().as_u16()
217            }
218        );
219        let response_body = read_response_body(response, self.timeout).await?;
220        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
221    }
222
223    /// container_logs returns the logs generated by a container.
224    /// Currently only supports TTY containers.
225    pub async fn container_logs(
226        &self,
227        container_id: &str,
228        options: Option<ContainerLogsOptions>,
229    ) -> Result<LogStream, Error> {
230        let mut query_params: HashMap<String, String> = HashMap::new();
231        if let Some(options) = options {
232            if options.show_stdout {
233                query_params.insert("stdout".into(), "1".into());
234            }
235            if options.show_stderr {
236                query_params.insert("stderr".into(), "1".into());
237            }
238            if let Some(since) = options.since {
239                let since_epoch = since.duration_since(SystemTime::UNIX_EPOCH).unwrap();
240                query_params.insert("since".into(), format!("{}", since_epoch.as_secs_f64()));
241            }
242            if let Some(until) = options.until {
243                let until_epoch = until.duration_since(SystemTime::UNIX_EPOCH).unwrap();
244                query_params.insert("until".into(), format!("{}", until_epoch.as_secs_f64()));
245            }
246            if options.timestamps {
247                query_params.insert("timestamps".into(), "1".into());
248            }
249            if options.details {
250                query_params.insert("details".into(), "1".into());
251            }
252            if options.follow {
253                query_params.insert("follow".into(), "1".into());
254            }
255            if let Some(tail) = options.tail {
256                query_params.insert("tail".into(), tail);
257            }
258        }
259        let query_params = if !query_params.is_empty() {
260            Some(query_params)
261        } else {
262            None
263        };
264
265        let request = Request::builder()
266            .method(Method::GET)
267            .uri(self.request_uri(
268                &format!(
269                    "/containers/{}/logs",
270                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
271                ),
272                query_params,
273            )?)
274            .body(Body::empty())
275            .context(HttpClientRequestBuilderError {})?;
276
277        let client = self.client.as_ref().unwrap();
278        let response = timeout(self.timeout, client.request(request))
279            .await
280            .context(HttpClientTimeoutError {})?
281            .context(HttpClientError {})?;
282
283        ensure!(
284            response.status().is_success(),
285            HttpClientResponseError {
286                status: response.status().as_u16()
287            }
288        );
289
290        Ok(LogStream::new(response))
291    }
292
293    /// container_diff shows differences in a container filesystem since it was started.
294    pub async fn container_diff(
295        &self,
296        container_id: &str,
297    ) -> Result<Option<Vec<ContainerChangeResponseItem>>, Error> {
298        let request = Request::builder()
299            .method(Method::GET)
300            .uri(self.request_uri(
301                &format!(
302                    "/containers/{}/changes",
303                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
304                ),
305                None,
306            )?)
307            .header("Accept", "application/json")
308            .body(Body::empty())
309            .context(HttpClientRequestBuilderError {})?;
310
311        let client = self.client.as_ref().unwrap();
312        let response = timeout(self.timeout, client.request(request))
313            .await
314            .context(HttpClientTimeoutError {})?
315            .context(HttpClientError {})?;
316        ensure!(
317            response.status().is_success(),
318            HttpClientResponseError {
319                status: response.status().as_u16()
320            }
321        );
322        let response_body = read_response_body(response, self.timeout).await?;
323        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
324    }
325
326    /// container_export retrieves the raw contents of a container.
327    pub async fn container_export(&self, container_id: &str) -> Result<Vec<u8>, Error> {
328        let request = Request::builder()
329            .method(Method::GET)
330            .uri(self.request_uri(
331                &format!(
332                    "/containers/{}/export",
333                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
334                ),
335                None,
336            )?)
337            .header("Accept", "application/x-tar")
338            .body(Body::empty())
339            .context(HttpClientRequestBuilderError {})?;
340        let client = self.client.as_ref().unwrap();
341        let response = timeout(self.timeout, client.request(request))
342            .await
343            .context(HttpClientTimeoutError {})?
344            .context(HttpClientError {})?;
345        ensure!(
346            response.status().is_success(),
347            HttpClientResponseError {
348                status: response.status().as_u16()
349            }
350        );
351
352        let response_body_raw = read_response_body_raw(response, Duration::from_secs(300)).await?;
353        Ok(response_body_raw)
354    }
355
356    /// container_stats returns near realtime stats for a given container.
357    pub async fn container_stats(
358        &self,
359        container_id: &str,
360        stream: bool,
361    ) -> Result<ContainerStats, Error> {
362        let mut query_params: HashMap<String, String> = HashMap::new();
363        query_params.insert("stream".into(), "0".into());
364        if stream {
365            query_params.insert("stream".into(), "1".into());
366        }
367        let query_params = if !query_params.is_empty() {
368            Some(query_params)
369        } else {
370            None
371        };
372
373        let request = Request::builder()
374            .method(Method::GET)
375            .uri(self.request_uri(
376                &format!(
377                    "/containers/{}/stats",
378                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
379                ),
380                query_params,
381            )?)
382            .header("Accept", "application/json")
383            .body(Body::empty())
384            .context(HttpClientRequestBuilderError {})?;
385        let client = self.client.as_ref().unwrap();
386        let response = timeout(self.timeout, client.request(request))
387            .await
388            .context(HttpClientTimeoutError {})?
389            .context(HttpClientError {})?;
390        ensure!(
391            response.status().is_success(),
392            HttpClientResponseError {
393                status: response.status().as_u16()
394            }
395        );
396
397        let os_type = response
398            .headers()
399            .get("Server")
400            .map(|v| get_docker_os(v.to_str().unwrap()))
401            .unwrap_or_else(String::new);
402
403        Ok(ContainerStats {
404            os_type,
405            body: StatsStream::new(response),
406        })
407    }
408
409    /// container_resize changes the size of the tty for a container.
410    pub async fn container_resize(
411        &self,
412        container_id: &str,
413        options: ResizeOptions,
414    ) -> Result<(), Error> {
415        let mut query_params: HashMap<String, String> = HashMap::new();
416        if let Some(height) = options.height {
417            query_params.insert("h".into(), height.to_string());
418        }
419        if let Some(width) = options.width {
420            query_params.insert("w".into(), width.to_string());
421        }
422        let query_params = if !query_params.is_empty() {
423            Some(query_params)
424        } else {
425            None
426        };
427
428        let request = Request::builder()
429            .method(Method::POST)
430            .uri(self.request_uri(
431                &format!(
432                    "/containers/{}/resize",
433                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
434                ),
435                query_params,
436            )?)
437            .header("Accept", "application/json")
438            .body(Body::empty())
439            .context(HttpClientRequestBuilderError {})?;
440
441        let client = self.client.as_ref().unwrap();
442        let response = timeout(self.timeout, client.request(request))
443            .await
444            .context(HttpClientTimeoutError {})?
445            .context(HttpClientError {})?;
446        ensure!(
447            response.status().is_success(),
448            HttpClientResponseError {
449                status: response.status().as_u16()
450            }
451        );
452
453        Ok(())
454    }
455
456    /// container_start sends a request to the docker daemon to start a container.
457    pub async fn container_start(
458        &self,
459        container_id: &str,
460        options: Option<ContainerStartOptions>,
461    ) -> Result<(), Error> {
462        let mut query_params: HashMap<String, String> = HashMap::new();
463        if let Some(options) = options {
464            if let Some(checkpoint_id) = options.checkpoint_id {
465                query_params.insert("checkpoint".into(), checkpoint_id);
466            }
467            if let Some(checkpoint_dir) = options.checkpoint_dir {
468                query_params.insert("checkpoint-dir".into(), checkpoint_dir);
469            }
470        }
471        let query_params = if !query_params.is_empty() {
472            Some(query_params)
473        } else {
474            None
475        };
476
477        let request = Request::builder()
478            .method(Method::POST)
479            .uri(self.request_uri(
480                &format!(
481                    "/containers/{}/start",
482                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
483                ),
484                query_params,
485            )?)
486            .header("Accept", "application/json")
487            .body(Body::empty())
488            .context(HttpClientRequestBuilderError {})?;
489
490        let client = self.client.as_ref().unwrap();
491        let response = timeout(self.timeout, client.request(request))
492            .await
493            .context(HttpClientTimeoutError {})?
494            .context(HttpClientError {})?;
495        ensure!(
496            response.status().is_success(),
497            HttpClientResponseError {
498                status: response.status().as_u16()
499            }
500        );
501
502        Ok(())
503    }
504
505    /// container_stop stops a container. In case the container fails to stop gracefully within a
506    /// time frame specified by the timeout argument, it is forcefully terminated (killed).
507    pub async fn container_stop(&self, container_id: &str, timeout: Duration) -> Result<(), Error> {
508        let mut query_params: HashMap<String, String> = HashMap::new();
509        query_params.insert("t".into(), timeout.as_secs().to_string());
510        let query_params = if !query_params.is_empty() {
511            Some(query_params)
512        } else {
513            None
514        };
515
516        let request = Request::builder()
517            .method(Method::POST)
518            .uri(self.request_uri(
519                &format!(
520                    "/containers/{}/stop",
521                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
522                ),
523                query_params,
524            )?)
525            .header("Accept", "application/json")
526            .body(Body::empty())
527            .context(HttpClientRequestBuilderError {})?;
528
529        let client = self.client.as_ref().unwrap();
530        let response = tokio::time::timeout(timeout + self.timeout, client.request(request))
531            .await
532            .context(HttpClientTimeoutError {})?
533            .context(HttpClientError {})?;
534        ensure!(
535            response.status().is_success(),
536            HttpClientResponseError {
537                status: response.status().as_u16()
538            }
539        );
540
541        Ok(())
542    }
543
544    /// container_restart stops and starts a container again. It makes the daemon to wait for the
545    /// container to be up again for a specific amount of time, given the timeout.
546    pub async fn container_restart(
547        &self,
548        container_id: &str,
549        timeout: Duration,
550    ) -> Result<(), Error> {
551        let mut query_params: HashMap<String, String> = HashMap::new();
552        query_params.insert("t".into(), timeout.as_secs().to_string());
553        let query_params = if !query_params.is_empty() {
554            Some(query_params)
555        } else {
556            None
557        };
558
559        let request = Request::builder()
560            .method(Method::POST)
561            .uri(self.request_uri(
562                &format!(
563                    "/containers/{}/restart",
564                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
565                ),
566                query_params,
567            )?)
568            .header("Accept", "application/json")
569            .body(Body::empty())
570            .context(HttpClientRequestBuilderError {})?;
571
572        let client = self.client.as_ref().unwrap();
573        let response = tokio::time::timeout(timeout + self.timeout, client.request(request))
574            .await
575            .context(HttpClientTimeoutError {})?
576            .context(HttpClientError {})?;
577        ensure!(
578            response.status().is_success(),
579            HttpClientResponseError {
580                status: response.status().as_u16()
581            }
582        );
583
584        Ok(())
585    }
586
587    /// container_kill terminates the container process but does not remove the container from the
588    /// docker host.
589    pub async fn container_kill(
590        &self,
591        container_id: &str,
592        signal: Option<String>,
593    ) -> Result<(), Error> {
594        let mut query_params: HashMap<String, String> = HashMap::new();
595        if let Some(signal) = signal {
596            query_params.insert("signal".into(), signal);
597        }
598        let query_params = if !query_params.is_empty() {
599            Some(query_params)
600        } else {
601            None
602        };
603
604        let request = Request::builder()
605            .method(Method::POST)
606            .uri(self.request_uri(
607                &format!(
608                    "/containers/{}/kill",
609                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
610                ),
611                query_params,
612            )?)
613            .header("Accept", "application/json")
614            .body(Body::empty())
615            .context(HttpClientRequestBuilderError {})?;
616
617        let client = self.client.as_ref().unwrap();
618        let response = timeout(self.timeout, client.request(request))
619            .await
620            .context(HttpClientTimeoutError {})?
621            .context(HttpClientError {})?;
622        ensure!(
623            response.status().is_success(),
624            HttpClientResponseError {
625                status: response.status().as_u16()
626            }
627        );
628
629        Ok(())
630    }
631
632    /// container_update updates resources of a container.
633    pub async fn container_update(
634        &self,
635        container_id: &str,
636        update_config: UpdateConfig,
637    ) -> Result<ContainerUpdateOKBody, Error> {
638        let request = Request::builder()
639            .method(Method::POST)
640            .uri(self.request_uri(
641                &format!(
642                    "/containers/{}/update",
643                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
644                ),
645                None,
646            )?)
647            .header("Content-Type", "application/json")
648            .header("Accept", "application/json")
649            .body(Body::from(
650                serde_json::to_string(&update_config).context(JsonSerializationError {})?,
651            ))
652            .context(HttpClientRequestBuilderError {})?;
653
654        let client = self.client.as_ref().unwrap();
655        let response = timeout(self.timeout, client.request(request))
656            .await
657            .context(HttpClientTimeoutError {})?
658            .context(HttpClientError {})?;
659        ensure!(
660            response.status().is_success(),
661            HttpClientResponseError {
662                status: response.status().as_u16()
663            }
664        );
665        let response_body = read_response_body(response, self.timeout).await?;
666        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
667    }
668
669    /// container_rename changes the name of a given container.
670    pub async fn container_rename(
671        &self,
672        container_id: &str,
673        new_container_name: &str,
674    ) -> Result<(), Error> {
675        let mut query_params: HashMap<String, String> = HashMap::new();
676        query_params.insert("name".into(), new_container_name.to_string());
677        let query_params = if !query_params.is_empty() {
678            Some(query_params)
679        } else {
680            None
681        };
682
683        let request = Request::builder()
684            .method(Method::POST)
685            .uri(self.request_uri(
686                &format!(
687                    "/containers/{}/rename",
688                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
689                ),
690                query_params,
691            )?)
692            .header("Accept", "application/json")
693            .body(Body::empty())
694            .context(HttpClientRequestBuilderError {})?;
695
696        let client = self.client.as_ref().unwrap();
697        let response = timeout(self.timeout, client.request(request))
698            .await
699            .context(HttpClientTimeoutError {})?
700            .context(HttpClientError {})?;
701        ensure!(
702            response.status().is_success(),
703            HttpClientResponseError {
704                status: response.status().as_u16()
705            }
706        );
707
708        Ok(())
709    }
710
711    /// container_pause pauses the main process of a given container without terminating it.
712    pub async fn container_pause(&self, container_id: &str) -> Result<(), Error> {
713        let request = Request::builder()
714            .method(Method::POST)
715            .uri(self.request_uri(
716                &format!(
717                    "/containers/{}/pause",
718                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
719                ),
720                None,
721            )?)
722            .header("Accept", "application/json")
723            .body(Body::empty())
724            .context(HttpClientRequestBuilderError {})?;
725
726        let client = self.client.as_ref().unwrap();
727        let response = timeout(self.timeout, client.request(request))
728            .await
729            .context(HttpClientTimeoutError {})?
730            .context(HttpClientError {})?;
731        ensure!(
732            response.status().is_success(),
733            HttpClientResponseError {
734                status: response.status().as_u16()
735            }
736        );
737
738        Ok(())
739    }
740
741    /// container_unpause resumes the process execution within a container.
742    pub async fn container_unpause(&self, container_id: &str) -> Result<(), Error> {
743        let request = Request::builder()
744            .method(Method::POST)
745            .uri(self.request_uri(
746                &format!(
747                    "/containers/{}/unpause",
748                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
749                ),
750                None,
751            )?)
752            .header("Accept", "application/json")
753            .body(Body::empty())
754            .context(HttpClientRequestBuilderError {})?;
755
756        let client = self.client.as_ref().unwrap();
757        let response = timeout(self.timeout, client.request(request))
758            .await
759            .context(HttpClientTimeoutError {})?
760            .context(HttpClientError {})?;
761        ensure!(
762            response.status().is_success(),
763            HttpClientResponseError {
764                status: response.status().as_u16()
765            }
766        );
767
768        Ok(())
769    }
770
771    /// container_wait waits until the specified container is in a certain state
772    /// indicated by the given condition, either "not-running" (default),
773    /// "next-exit", or "removed".
774    pub async fn container_wait(
775        &self,
776        container_id: &str,
777        condition: WaitCondition,
778        timeout: Duration,
779    ) -> Result<ContainerWaitOKBody, Error> {
780        let mut query_params: HashMap<String, String> = HashMap::new();
781        query_params.insert("condition".into(), condition.to_string());
782        let query_params = if !query_params.is_empty() {
783            Some(query_params)
784        } else {
785            None
786        };
787
788        let request = Request::builder()
789            .method(Method::POST)
790            .uri(self.request_uri(
791                &format!(
792                    "/containers/{}/wait",
793                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
794                ),
795                query_params,
796            )?)
797            .header("Accept", "application/json")
798            .body(Body::empty())
799            .context(HttpClientRequestBuilderError {})?;
800
801        let client = self.client.as_ref().unwrap();
802        let response = tokio::time::timeout(timeout + self.timeout, client.request(request))
803            .await
804            .context(HttpClientTimeoutError {})?
805            .context(HttpClientError {})?;
806        ensure!(
807            response.status().is_success(),
808            HttpClientResponseError {
809                status: response.status().as_u16()
810            }
811        );
812
813        let response_body = read_response_body(response, timeout).await?;
814        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
815    }
816
817    /// container_remove kills and removes a container from the docker host.
818    pub async fn container_remove(
819        &self,
820        container_id: &str,
821        options: Option<ContainerRemoveOptions>,
822    ) -> Result<(), Error> {
823        let mut query_params: HashMap<String, String> = HashMap::new();
824        if let Some(options) = options {
825            if options.remove_volumes {
826                query_params.insert("v".into(), "1".into());
827            }
828            if options.remove_links {
829                query_params.insert("link".into(), "1".into());
830            }
831            if options.force {
832                query_params.insert("force".into(), "1".into());
833            }
834        }
835        let query_params = if !query_params.is_empty() {
836            Some(query_params)
837        } else {
838            None
839        };
840
841        let request = Request::builder()
842            .method(Method::DELETE)
843            .uri(self.request_uri(
844                &format!(
845                    "/containers/{}",
846                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
847                ),
848                query_params,
849            )?)
850            .header("Accept", "application/json")
851            .body(Body::empty())
852            .context(HttpClientRequestBuilderError {})?;
853
854        let client = self.client.as_ref().unwrap();
855        let response = timeout(self.timeout, client.request(request))
856            .await
857            .context(HttpClientTimeoutError {})?
858            .context(HttpClientError {})?;
859        ensure!(
860            response.status().is_success(),
861            HttpClientResponseError {
862                status: response.status().as_u16()
863            }
864        );
865
866        Ok(())
867    }
868
869    /// container_stat_path returns Stat information about a path inside the container filesystem.
870    pub async fn container_stat_path(
871        &self,
872        container_id: &str,
873        path: &str,
874    ) -> Result<ContainerPathStat, Error> {
875        let mut query_params: HashMap<String, String> = HashMap::new();
876        query_params.insert("path".into(), path.to_string());
877        let query_params = if !query_params.is_empty() {
878            Some(query_params)
879        } else {
880            None
881        };
882
883        let request = Request::builder()
884            .method(Method::HEAD)
885            .uri(self.request_uri(
886                &format!(
887                    "/containers/{}/archive",
888                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
889                ),
890                query_params,
891            )?)
892            .body(Body::empty())
893            .context(HttpClientRequestBuilderError {})?;
894
895        let client = self.client.as_ref().unwrap();
896        let response = timeout(self.timeout, client.request(request))
897            .await
898            .context(HttpClientTimeoutError {})?
899            .context(HttpClientError {})?;
900        ensure!(
901            response.status().is_success(),
902            HttpClientResponseError {
903                status: response.status().as_u16()
904            }
905        );
906
907        response
908            .headers()
909            .get("X-Docker-Container-Path-Stat")
910            .map(|v| get_container_path_stat_from_header(v.to_str().unwrap()))
911            .context(MalformedResponseError {})?
912    }
913
914    /// copy_from_container gets the content from the container and returns it.
915    /// for a TAR archive to manipulate it in the host.
916    pub async fn copy_from_container(
917        &self,
918        container_id: &str,
919        src_path: &str,
920    ) -> Result<(ContainerPathStat, Vec<u8>), Error> {
921        let mut query_params: HashMap<String, String> = HashMap::new();
922        query_params.insert("path".into(), src_path.to_string());
923        let query_params = if !query_params.is_empty() {
924            Some(query_params)
925        } else {
926            None
927        };
928
929        let request = Request::builder()
930            .method(Method::GET)
931            .uri(self.request_uri(
932                &format!(
933                    "/containers/{}/archive",
934                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
935                ),
936                query_params,
937            )?)
938            .header("Accept", "application/x-tar")
939            .body(Body::empty())
940            .context(HttpClientRequestBuilderError {})?;
941
942        let client = self.client.as_ref().unwrap();
943        let response = timeout(self.timeout, client.request(request))
944            .await
945            .context(HttpClientTimeoutError {})?
946            .context(HttpClientError {})?;
947        ensure!(
948            response.status().is_success(),
949            HttpClientResponseError {
950                status: response.status().as_u16()
951            }
952        );
953
954        let container_path_stat = response
955            .headers()
956            .get("X-Docker-Container-Path-Stat")
957            .map(|v| get_container_path_stat_from_header(v.to_str().unwrap()))
958            .context(MalformedResponseError {})??;
959
960        let response_body_raw = read_response_body_raw(response, Duration::from_secs(300)).await?;
961        Ok((container_path_stat, response_body_raw))
962    }
963
964    /// copy_to_container copies content into the container filesystem.
965    /// Note that `content` must be a stream for a TAR archive
966    pub async fn copy_to_container<S, O, E>(
967        &self,
968        container_id: &str,
969        dst_path: &str,
970        content: S,
971        options: Option<CopyToContainerOptions>,
972    ) -> Result<(), Error>
973    where
974        S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
975        O: Into<Bytes> + 'static,
976        E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
977    {
978        let mut query_params: HashMap<String, String> = HashMap::new();
979        query_params.insert("path".into(), dst_path.to_string());
980        if let Some(options) = options {
981            if options.allow_overwrite_dir_with_file {
982                query_params.insert("noOverwriteDirNonDir".into(), "true".into());
983            }
984            if options.copy_uid_gid {
985                query_params.insert("copyUIDGID".into(), "true".into());
986            }
987        }
988        let query_params = if !query_params.is_empty() {
989            Some(query_params)
990        } else {
991            None
992        };
993
994        let request = Request::builder()
995            .method(Method::PUT)
996            .uri(self.request_uri(
997                &format!(
998                    "/containers/{}/archive",
999                    utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
1000                ),
1001                query_params,
1002            )?)
1003            .header("Content-Type", "application/x-tar")
1004            .body(Body::wrap_stream(content))
1005            .context(HttpClientRequestBuilderError {})?;
1006
1007        let client = self.client.as_ref().unwrap();
1008        let response = timeout(self.timeout, client.request(request))
1009            .await
1010            .context(HttpClientTimeoutError {})?
1011            .context(HttpClientError {})?;
1012        ensure!(
1013            response.status().is_success(),
1014            HttpClientResponseError {
1015                status: response.status().as_u16()
1016            }
1017        );
1018
1019        Ok(())
1020    }
1021
1022    /// containers_prune requests the daemon to delete unused data.
1023    pub async fn containers_prune(
1024        &self,
1025        prune_filters: Option<filters::Args>,
1026    ) -> Result<ContainersPruneReport, Error> {
1027        let query_params = if let Some(prune_filters) = prune_filters {
1028            Some(get_filters_query(prune_filters)?)
1029        } else {
1030            None
1031        };
1032        let request = Request::builder()
1033            .method(Method::POST)
1034            .uri(self.request_uri("/containers/prune", query_params)?)
1035            .header("Accept", "application/json")
1036            .body(Body::empty())
1037            .context(HttpClientRequestBuilderError {})?;
1038
1039        let client = self.client.as_ref().unwrap();
1040        let response = timeout(self.timeout, client.request(request))
1041            .await
1042            .context(HttpClientTimeoutError {})?
1043            .context(HttpClientError {})?;
1044        ensure!(
1045            response.status().is_success(),
1046            HttpClientResponseError {
1047                status: response.status().as_u16()
1048            }
1049        );
1050        let response_body = read_response_body(response, self.timeout).await?;
1051        Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
1052    }
1053}
1054
1055fn get_container_path_stat_from_header(header: &str) -> Result<ContainerPathStat, Error> {
1056    let contents = base64::decode(header).context(B64DecodingError)?;
1057    Ok(serde_json::from_str(&String::from_utf8_lossy(&contents))
1058        .context(JsonDeserializationError {})?)
1059}
1060
1061/// A stream of container logs.
1062pub struct LogStream {
1063    inner: Pin<Box<Lines<BufReader<AsyncHttpBodyReader>>>>,
1064}
1065
1066impl LogStream {
1067    pub fn new(response: Response<Body>) -> Self {
1068        Self {
1069            inner: Box::pin(BufReader::new(AsyncHttpBodyReader::new(response)).lines()),
1070        }
1071    }
1072}
1073
1074impl Stream for LogStream {
1075    type Item = Result<String, Error>;
1076
1077    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1078        if let Some(line) = ready!(self.inner.as_mut().poll_next(cx)) {
1079            match line {
1080                Ok(line) => Poll::Ready(Some(Ok(line))),
1081                Err(err) => Poll::Ready(Some(Err(err).context(IoError {}))),
1082            }
1083        } else {
1084            Poll::Ready(None)
1085        }
1086    }
1087}
1088
1089/// A stream of container statistics.
1090pub struct StatsStream {
1091    inner: Pin<Box<Lines<BufReader<AsyncHttpBodyReader>>>>,
1092}
1093
1094impl StatsStream {
1095    pub fn new(response: Response<Body>) -> Self {
1096        Self {
1097            inner: Box::pin(BufReader::new(AsyncHttpBodyReader::new(response)).lines()),
1098        }
1099    }
1100}
1101
1102impl Stream for StatsStream {
1103    type Item = Result<Stats, Error>;
1104
1105    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1106        if let Some(line) = ready!(self.inner.as_mut().poll_next(cx)) {
1107            match line {
1108                Ok(line) => Poll::Ready(Some(
1109                    serde_json::from_str(&line).context(JsonDeserializationError),
1110                )),
1111                Err(err) => Poll::Ready(Some(Err(err).context(IoError {}))),
1112            }
1113        } else {
1114            Poll::Ready(None)
1115        }
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::*;
1122    use crate::types::container::*;
1123    use crate::{opts, LocalDockerEngineClient};
1124    use futures::StreamExt;
1125    use maplit::hashmap;
1126    use std::collections::HashSet;
1127    use std::io::Read;
1128    use tar::Archive;
1129
1130    #[tokio::test]
1131    async fn test_container_api() {
1132        let docker_client =
1133            LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))
1134                .unwrap();
1135
1136        // Create a new ephemeral container.
1137        let create_response = docker_client
1138            .container_create(
1139                ContainerConfigBuilder::default()
1140                    .image(Some("busybox".into()))
1141                    .tty(Some(true))
1142                    .cmd(Some(vec![
1143                        "/bin/sh".into(),
1144                        "-c".into(),
1145                        "while true; do echo 'hello test_container_api'; sleep 1; done".into(),
1146                    ]))
1147                    .build()
1148                    .unwrap(),
1149                Some("test_container_api".into()),
1150            )
1151            .await
1152            .unwrap();
1153
1154        // Start the ephemeral container.
1155        docker_client
1156            .container_start(&create_response.id, None)
1157            .await
1158            .unwrap();
1159
1160        // Resize the console.
1161        docker_client
1162            .container_resize(
1163                &create_response.id,
1164                ResizeOptionsBuilder::default()
1165                    .width(Some(100))
1166                    .height(Some(20))
1167                    .build()
1168                    .unwrap(),
1169            )
1170            .await
1171            .unwrap();
1172
1173        // Rename the container.
1174        docker_client
1175            .container_rename(&create_response.id, "test_container_api_renamed")
1176            .await
1177            .unwrap();
1178
1179        // List the ephemeral container.
1180        let containers = docker_client
1181            .container_list(Some(
1182                ContainerListOptionsBuilder::default()
1183                    .filters(
1184                        filters::ArgsBuilder::default()
1185                            .fields(hashmap! {
1186                                "id".into() => vec![create_response.id.clone()],
1187                            })
1188                            .build()
1189                            .unwrap(),
1190                    )
1191                    .build()
1192                    .unwrap(),
1193            ))
1194            .await
1195            .unwrap();
1196
1197        assert_eq!(containers.len(), 1);
1198        assert_eq!(
1199            &containers[0].names.as_ref().unwrap()[0],
1200            "/test_container_api_renamed"
1201        );
1202        assert_eq!(&containers[0].image, "busybox");
1203
1204        // Set a memory limit on the container.
1205        docker_client
1206            .container_update(
1207                &create_response.id,
1208                UpdateConfigBuilder::default()
1209                    .memory(Some(10_000_000))
1210                    .build()
1211                    .unwrap(),
1212            )
1213            .await
1214            .unwrap();
1215
1216        // Get detailed container information.
1217        let container = docker_client
1218            .container_inspect(&create_response.id)
1219            .await
1220            .unwrap();
1221
1222        assert_eq!(container.host_config.unwrap().memory.unwrap(), 10_000_000);
1223
1224        // Get container statistics.
1225        let mut stats = docker_client
1226            .container_stats(&create_response.id, false)
1227            .await
1228            .unwrap();
1229
1230        let stats = stats.body.next().await.unwrap().unwrap();
1231
1232        // Reporting a reasonable quantity of memory?
1233        let memory_stats = stats.memory_stats.unwrap();
1234        assert!(memory_stats.usage.unwrap() > 500_000);
1235        assert!(memory_stats.usage.unwrap() < 5_000_000);
1236
1237        // Get the first log line from the container.
1238        let mut logs = docker_client
1239            .container_logs(
1240                &create_response.id,
1241                Some(
1242                    ContainerLogsOptionsBuilder::default()
1243                        .show_stdout(true)
1244                        .follow(true)
1245                        .build()
1246                        .unwrap(),
1247                ),
1248            )
1249            .await
1250            .unwrap();
1251
1252        let log_line = logs.next().await.unwrap().unwrap();
1253        assert_eq!(&log_line, "hello test_container_api");
1254
1255        // Get the list of processes inside the ephemeral container.
1256        let top_response = docker_client
1257            .container_top(&create_response.id, None)
1258            .await
1259            .unwrap();
1260
1261        let cmd_index = top_response
1262            .titles
1263            .iter()
1264            .position(|title| title.eq("CMD"))
1265            .unwrap();
1266        let commands = top_response
1267            .processes
1268            .iter()
1269            .map(|process| process[cmd_index].clone())
1270            .collect::<Vec<_>>();
1271        let shell_command = commands
1272            .iter()
1273            .find(|command| command.starts_with("/bin/sh"));
1274
1275        assert!(shell_command.is_some());
1276
1277        // Stop the ephemeral container.
1278        docker_client
1279            .container_stop(&create_response.id, Duration::from_secs(0))
1280            .await
1281            .unwrap();
1282
1283        // Cleanup the ephemeral container.
1284        docker_client
1285            .container_remove(
1286                &create_response.id,
1287                Some(
1288                    ContainerRemoveOptionsBuilder::default()
1289                        .force(true)
1290                        .build()
1291                        .unwrap(),
1292                ),
1293            )
1294            .await
1295            .unwrap();
1296    }
1297
1298    #[tokio::test]
1299    async fn test_container_lifecycle_api() {
1300        let docker_client =
1301            LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))
1302                .unwrap();
1303
1304        // Create a new ephemeral container.
1305        let create_response = docker_client
1306            .container_create(
1307                ContainerConfigBuilder::default()
1308                    .image(Some("busybox".into()))
1309                    .tty(Some(true))
1310                    .cmd(Some(vec![
1311                        "/bin/sh".into(),
1312                        "-c".into(),
1313                        "for i in 1 2 3; do echo \"$i\"; sleep 1; done".into(),
1314                    ]))
1315                    .build()
1316                    .unwrap(),
1317                Some("test_container_lifecycle_api".into()),
1318            )
1319            .await
1320            .unwrap();
1321
1322        // Start the ephemeral container.
1323        docker_client
1324            .container_start(&create_response.id, None)
1325            .await
1326            .unwrap();
1327
1328        // Get the first log line from the container.
1329        let mut logs = docker_client
1330            .container_logs(
1331                &create_response.id,
1332                Some(
1333                    ContainerLogsOptionsBuilder::default()
1334                        .show_stdout(true)
1335                        .follow(true)
1336                        .build()
1337                        .unwrap(),
1338                ),
1339            )
1340            .await
1341            .unwrap();
1342
1343        let log_line = logs.next().await.unwrap().unwrap();
1344        let last_log_time = SystemTime::now();
1345        assert_eq!(&log_line, "1");
1346
1347        // Restart the ephemeral container.
1348        docker_client
1349            .container_restart(&create_response.id, Duration::from_secs(0))
1350            .await
1351            .unwrap();
1352
1353        // Get the first log line from the container.
1354        let mut logs = docker_client
1355            .container_logs(
1356                &create_response.id,
1357                Some(
1358                    ContainerLogsOptionsBuilder::default()
1359                        .show_stdout(true)
1360                        .follow(true)
1361                        .since(Some(last_log_time))
1362                        .build()
1363                        .unwrap(),
1364                ),
1365            )
1366            .await
1367            .unwrap();
1368
1369        let log_line = logs.next().await.unwrap().unwrap();
1370        assert_eq!(&log_line, "1");
1371
1372        // Pause the ephemeral container.
1373        docker_client
1374            .container_pause(&create_response.id)
1375            .await
1376            .unwrap();
1377
1378        // Should timeout as no logs are written while paused.
1379        let log_line = timeout(Duration::from_secs(2), logs.next()).await;
1380        assert!(log_line.is_err());
1381
1382        // Unpause the ephemeral container.
1383        docker_client
1384            .container_unpause(&create_response.id)
1385            .await
1386            .unwrap();
1387
1388        let log_line = logs.next().await.unwrap().unwrap();
1389        assert_eq!(&log_line, "2");
1390
1391        // Ephemeral container should exit successfully within one second.
1392        let exit_status = docker_client
1393            .container_wait(
1394                &create_response.id,
1395                WaitCondition::NextExit,
1396                Duration::from_secs(5),
1397            )
1398            .await
1399            .unwrap();
1400
1401        assert_eq!(exit_status.status_code, 0);
1402
1403        // Cleanup the ephemeral container.
1404        docker_client
1405            .container_remove(
1406                &create_response.id,
1407                Some(
1408                    ContainerRemoveOptionsBuilder::default()
1409                        .force(true)
1410                        .build()
1411                        .unwrap(),
1412                ),
1413            )
1414            .await
1415            .unwrap();
1416    }
1417
1418    #[tokio::test]
1419    async fn test_container_filesystem_api() {
1420        let docker_client =
1421            LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))
1422                .unwrap();
1423
1424        // Create a new ephemeral container.
1425        let create_response = docker_client
1426            .container_create(
1427                ContainerConfigBuilder::default()
1428                    .image(Some("busybox".into()))
1429                    .cmd(Some(vec!["sleep".into(), "60".into()]))
1430                    .build()
1431                    .unwrap(),
1432                Some("test_container_filesystem_api".into()),
1433            )
1434            .await
1435            .unwrap();
1436
1437        let chunks: Vec<Result<_, std::io::Error>> =
1438            vec![std::fs::read_to_string("src/test_fixture/hello_world.tar")];
1439        let content = futures::stream::iter(chunks);
1440
1441        // Copy a simple hello world text file into the ephemeral container.
1442        docker_client
1443            .copy_to_container(&create_response.id, "/", content, None)
1444            .await
1445            .unwrap();
1446
1447        // Confirm the hello text file exists in the ephemeral container.
1448        let stat_path = docker_client
1449            .container_stat_path(&create_response.id, "/hello.txt")
1450            .await
1451            .unwrap();
1452
1453        assert_eq!(stat_path.size, 12);
1454
1455        // Copy the hello text file back out of the ephemeral container.
1456        let (stat_path, hello_tar) = docker_client
1457            .copy_from_container(&create_response.id, "/hello.txt")
1458            .await
1459            .unwrap();
1460
1461        assert_eq!(stat_path.size, 12);
1462
1463        let mut hello_tar = Archive::new(hello_tar.as_slice());
1464        let mut hello_txt_entry = hello_tar.entries().unwrap().next().unwrap().unwrap();
1465
1466        // Verify the echoed hello text file contains the expected content.
1467        let mut s = String::new();
1468        hello_txt_entry.read_to_string(&mut s).unwrap();
1469        assert_eq!(&s, "Hello World\n");
1470
1471        let changes = docker_client
1472            .container_diff(&create_response.id)
1473            .await
1474            .unwrap()
1475            .unwrap();
1476
1477        assert_eq!(&changes[0].path, "/hello.txt");
1478
1479        // Export the contents of the ephemeral container.
1480        let container_tar = docker_client
1481            .container_export(&create_response.id)
1482            .await
1483            .unwrap();
1484
1485        // Get the complete list of files from the ephemeral containers bundle.
1486        let mut container_tar = Archive::new(container_tar.as_slice());
1487        let container_files = container_tar
1488            .entries()
1489            .unwrap()
1490            .map(|entry| String::from(entry.unwrap().path().unwrap().to_string_lossy()))
1491            .collect::<HashSet<_>>();
1492
1493        // Ensure the exported bundle contained our hello world file.
1494        assert!(container_files.contains("hello.txt"));
1495
1496        // Cleanup the ephemeral container.
1497        docker_client
1498            .container_remove(
1499                &create_response.id,
1500                Some(
1501                    ContainerRemoveOptionsBuilder::default()
1502                        .force(true)
1503                        .build()
1504                        .unwrap(),
1505                ),
1506            )
1507            .await
1508            .unwrap();
1509    }
1510}