1use crate::error::*;
21use crate::types::client::*;
22use crate::types::container::{Container, ContainerConfig, UpdateConfig, WaitCondition};
23use crate::types::filters;
24use crate::types::stats::Stats;
25use crate::{
26 get_docker_os, get_filters_query, read_response_body, read_response_body_raw, version,
27 AsyncHttpBodyReader, DockerEngineClient,
28};
29use bytes::Bytes;
30use futures::ready;
31use futures::task::{Context, Poll};
32use futures::Stream;
33use hyper::client::connect::Connect;
34use hyper::{Body, Method, Request, Response};
35use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
36use snafu::ensure;
37use snafu::OptionExt;
38use snafu::ResultExt;
39use std::collections::HashMap;
40use std::time::{Duration, SystemTime};
41use tokio::io::AsyncBufReadExt;
42use tokio::io::BufReader;
43use tokio::io::Lines;
44use tokio::macros::support::Pin;
45use tokio::time::timeout;
46
47impl<C: Connect + Clone + Send + Sync + 'static> DockerEngineClient<C> {
48 pub async fn container_list(
50 &self,
51 options: Option<ContainerListOptions>,
52 ) -> Result<Vec<Container>, Error> {
53 let mut query_params: HashMap<String, String> = HashMap::new();
54 if let Some(options) = options {
55 if options.size {
56 query_params.insert("size".into(), "1".into());
57 }
58 if options.all {
59 query_params.insert("all".into(), "1".into());
60 }
61 if let Some(limit) = options.limit {
62 query_params.insert("limit".into(), limit.to_string());
63 }
64 if let Some(filters) = options.filters.as_ref() {
65 query_params.insert(
66 "filters".into(),
67 serde_json::to_string(&filters.fields).context(JsonSerializationError {})?,
68 );
69 }
70 }
71 let query_params = if !query_params.is_empty() {
72 Some(query_params)
73 } else {
74 None
75 };
76
77 let request = Request::builder()
78 .method(Method::GET)
79 .uri(self.request_uri("/containers/json", query_params)?)
80 .header("Accept", "application/json")
81 .body(Body::empty())
82 .context(HttpClientRequestBuilderError {})?;
83
84 let client = self.client.as_ref().unwrap();
85 let response = timeout(self.timeout, client.request(request))
86 .await
87 .context(HttpClientTimeoutError {})?
88 .context(HttpClientError {})?;
89 ensure!(
90 response.status().is_success(),
91 HttpClientResponseError {
92 status: response.status().as_u16()
93 }
94 );
95 let response_body = read_response_body(response, self.timeout).await?;
96 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
97 }
98
99 pub async fn container_create(
102 &self,
103 mut config: ContainerConfig,
104 container_name: Option<String>,
105 ) -> Result<ContainerCreateCreatedBody, Error> {
106 if version::less_than(&self.version, "1.25") {
108 if let Some(mut host_config) = config.host_config.take() {
109 host_config.auto_remove = Some(false);
110 config.host_config = Some(host_config);
111 }
112 }
113
114 let mut query_params: HashMap<String, String> = HashMap::new();
115 if let Some(container_name) = &container_name {
116 query_params.insert("name".into(), container_name.clone());
117 }
118 let query_params = if !query_params.is_empty() {
119 Some(query_params)
120 } else {
121 None
122 };
123
124 let request = Request::builder()
125 .method(Method::POST)
126 .uri(self.request_uri("/containers/create", query_params)?)
127 .header("Content-Type", "application/json")
128 .header("Accept", "application/json")
129 .body(Body::from(
130 serde_json::to_string(&config).context(JsonSerializationError {})?,
131 ))
132 .context(HttpClientRequestBuilderError {})?;
133
134 let client = self.client.as_ref().unwrap();
135 let response = timeout(self.timeout, client.request(request))
136 .await
137 .context(HttpClientTimeoutError {})?
138 .context(HttpClientError {})?;
139 ensure!(
140 response.status().is_success(),
141 HttpClientResponseError {
142 status: response.status().as_u16()
143 }
144 );
145 let response_body = read_response_body(response, self.timeout).await?;
146 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
147 }
148
149 pub async fn container_inspect(&self, container_id: &str) -> Result<Container, Error> {
151 let request = Request::builder()
152 .method(Method::GET)
153 .uri(self.request_uri(
154 &format!(
155 "/containers/{}/json",
156 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
157 ),
158 None,
159 )?)
160 .header("Accept", "application/json")
161 .body(Body::empty())
162 .context(HttpClientRequestBuilderError {})?;
163
164 let client = self.client.as_ref().unwrap();
165 let response = timeout(self.timeout, client.request(request))
166 .await
167 .context(HttpClientTimeoutError {})?
168 .context(HttpClientError {})?;
169 ensure!(
170 response.status().is_success(),
171 HttpClientResponseError {
172 status: response.status().as_u16()
173 }
174 );
175 let response_body = read_response_body(response, self.timeout).await?;
176 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
177 }
178
179 pub async fn container_top(
181 &self,
182 container_id: &str,
183 arguments: Option<Vec<String>>,
184 ) -> Result<ContainerTopOKBody, Error> {
185 let mut query_params: HashMap<String, String> = HashMap::new();
186 if let Some(arguments) = arguments {
187 query_params.insert("ps_args".into(), arguments.join(" "));
188 }
189 let query_params = if !query_params.is_empty() {
190 Some(query_params)
191 } else {
192 None
193 };
194
195 let request = Request::builder()
196 .method(Method::GET)
197 .uri(self.request_uri(
198 &format!(
199 "/containers/{}/top",
200 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
201 ),
202 query_params,
203 )?)
204 .header("Accept", "application/json")
205 .body(Body::empty())
206 .context(HttpClientRequestBuilderError {})?;
207
208 let client = self.client.as_ref().unwrap();
209 let response = timeout(self.timeout, client.request(request))
210 .await
211 .context(HttpClientTimeoutError {})?
212 .context(HttpClientError {})?;
213 ensure!(
214 response.status().is_success(),
215 HttpClientResponseError {
216 status: response.status().as_u16()
217 }
218 );
219 let response_body = read_response_body(response, self.timeout).await?;
220 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
221 }
222
223 pub async fn container_logs(
226 &self,
227 container_id: &str,
228 options: Option<ContainerLogsOptions>,
229 ) -> Result<LogStream, Error> {
230 let mut query_params: HashMap<String, String> = HashMap::new();
231 if let Some(options) = options {
232 if options.show_stdout {
233 query_params.insert("stdout".into(), "1".into());
234 }
235 if options.show_stderr {
236 query_params.insert("stderr".into(), "1".into());
237 }
238 if let Some(since) = options.since {
239 let since_epoch = since.duration_since(SystemTime::UNIX_EPOCH).unwrap();
240 query_params.insert("since".into(), format!("{}", since_epoch.as_secs_f64()));
241 }
242 if let Some(until) = options.until {
243 let until_epoch = until.duration_since(SystemTime::UNIX_EPOCH).unwrap();
244 query_params.insert("until".into(), format!("{}", until_epoch.as_secs_f64()));
245 }
246 if options.timestamps {
247 query_params.insert("timestamps".into(), "1".into());
248 }
249 if options.details {
250 query_params.insert("details".into(), "1".into());
251 }
252 if options.follow {
253 query_params.insert("follow".into(), "1".into());
254 }
255 if let Some(tail) = options.tail {
256 query_params.insert("tail".into(), tail);
257 }
258 }
259 let query_params = if !query_params.is_empty() {
260 Some(query_params)
261 } else {
262 None
263 };
264
265 let request = Request::builder()
266 .method(Method::GET)
267 .uri(self.request_uri(
268 &format!(
269 "/containers/{}/logs",
270 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
271 ),
272 query_params,
273 )?)
274 .body(Body::empty())
275 .context(HttpClientRequestBuilderError {})?;
276
277 let client = self.client.as_ref().unwrap();
278 let response = timeout(self.timeout, client.request(request))
279 .await
280 .context(HttpClientTimeoutError {})?
281 .context(HttpClientError {})?;
282
283 ensure!(
284 response.status().is_success(),
285 HttpClientResponseError {
286 status: response.status().as_u16()
287 }
288 );
289
290 Ok(LogStream::new(response))
291 }
292
293 pub async fn container_diff(
295 &self,
296 container_id: &str,
297 ) -> Result<Option<Vec<ContainerChangeResponseItem>>, Error> {
298 let request = Request::builder()
299 .method(Method::GET)
300 .uri(self.request_uri(
301 &format!(
302 "/containers/{}/changes",
303 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
304 ),
305 None,
306 )?)
307 .header("Accept", "application/json")
308 .body(Body::empty())
309 .context(HttpClientRequestBuilderError {})?;
310
311 let client = self.client.as_ref().unwrap();
312 let response = timeout(self.timeout, client.request(request))
313 .await
314 .context(HttpClientTimeoutError {})?
315 .context(HttpClientError {})?;
316 ensure!(
317 response.status().is_success(),
318 HttpClientResponseError {
319 status: response.status().as_u16()
320 }
321 );
322 let response_body = read_response_body(response, self.timeout).await?;
323 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
324 }
325
326 pub async fn container_export(&self, container_id: &str) -> Result<Vec<u8>, Error> {
328 let request = Request::builder()
329 .method(Method::GET)
330 .uri(self.request_uri(
331 &format!(
332 "/containers/{}/export",
333 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
334 ),
335 None,
336 )?)
337 .header("Accept", "application/x-tar")
338 .body(Body::empty())
339 .context(HttpClientRequestBuilderError {})?;
340 let client = self.client.as_ref().unwrap();
341 let response = timeout(self.timeout, client.request(request))
342 .await
343 .context(HttpClientTimeoutError {})?
344 .context(HttpClientError {})?;
345 ensure!(
346 response.status().is_success(),
347 HttpClientResponseError {
348 status: response.status().as_u16()
349 }
350 );
351
352 let response_body_raw = read_response_body_raw(response, Duration::from_secs(300)).await?;
353 Ok(response_body_raw)
354 }
355
356 pub async fn container_stats(
358 &self,
359 container_id: &str,
360 stream: bool,
361 ) -> Result<ContainerStats, Error> {
362 let mut query_params: HashMap<String, String> = HashMap::new();
363 query_params.insert("stream".into(), "0".into());
364 if stream {
365 query_params.insert("stream".into(), "1".into());
366 }
367 let query_params = if !query_params.is_empty() {
368 Some(query_params)
369 } else {
370 None
371 };
372
373 let request = Request::builder()
374 .method(Method::GET)
375 .uri(self.request_uri(
376 &format!(
377 "/containers/{}/stats",
378 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
379 ),
380 query_params,
381 )?)
382 .header("Accept", "application/json")
383 .body(Body::empty())
384 .context(HttpClientRequestBuilderError {})?;
385 let client = self.client.as_ref().unwrap();
386 let response = timeout(self.timeout, client.request(request))
387 .await
388 .context(HttpClientTimeoutError {})?
389 .context(HttpClientError {})?;
390 ensure!(
391 response.status().is_success(),
392 HttpClientResponseError {
393 status: response.status().as_u16()
394 }
395 );
396
397 let os_type = response
398 .headers()
399 .get("Server")
400 .map(|v| get_docker_os(v.to_str().unwrap()))
401 .unwrap_or_else(String::new);
402
403 Ok(ContainerStats {
404 os_type,
405 body: StatsStream::new(response),
406 })
407 }
408
409 pub async fn container_resize(
411 &self,
412 container_id: &str,
413 options: ResizeOptions,
414 ) -> Result<(), Error> {
415 let mut query_params: HashMap<String, String> = HashMap::new();
416 if let Some(height) = options.height {
417 query_params.insert("h".into(), height.to_string());
418 }
419 if let Some(width) = options.width {
420 query_params.insert("w".into(), width.to_string());
421 }
422 let query_params = if !query_params.is_empty() {
423 Some(query_params)
424 } else {
425 None
426 };
427
428 let request = Request::builder()
429 .method(Method::POST)
430 .uri(self.request_uri(
431 &format!(
432 "/containers/{}/resize",
433 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
434 ),
435 query_params,
436 )?)
437 .header("Accept", "application/json")
438 .body(Body::empty())
439 .context(HttpClientRequestBuilderError {})?;
440
441 let client = self.client.as_ref().unwrap();
442 let response = timeout(self.timeout, client.request(request))
443 .await
444 .context(HttpClientTimeoutError {})?
445 .context(HttpClientError {})?;
446 ensure!(
447 response.status().is_success(),
448 HttpClientResponseError {
449 status: response.status().as_u16()
450 }
451 );
452
453 Ok(())
454 }
455
456 pub async fn container_start(
458 &self,
459 container_id: &str,
460 options: Option<ContainerStartOptions>,
461 ) -> Result<(), Error> {
462 let mut query_params: HashMap<String, String> = HashMap::new();
463 if let Some(options) = options {
464 if let Some(checkpoint_id) = options.checkpoint_id {
465 query_params.insert("checkpoint".into(), checkpoint_id);
466 }
467 if let Some(checkpoint_dir) = options.checkpoint_dir {
468 query_params.insert("checkpoint-dir".into(), checkpoint_dir);
469 }
470 }
471 let query_params = if !query_params.is_empty() {
472 Some(query_params)
473 } else {
474 None
475 };
476
477 let request = Request::builder()
478 .method(Method::POST)
479 .uri(self.request_uri(
480 &format!(
481 "/containers/{}/start",
482 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
483 ),
484 query_params,
485 )?)
486 .header("Accept", "application/json")
487 .body(Body::empty())
488 .context(HttpClientRequestBuilderError {})?;
489
490 let client = self.client.as_ref().unwrap();
491 let response = timeout(self.timeout, client.request(request))
492 .await
493 .context(HttpClientTimeoutError {})?
494 .context(HttpClientError {})?;
495 ensure!(
496 response.status().is_success(),
497 HttpClientResponseError {
498 status: response.status().as_u16()
499 }
500 );
501
502 Ok(())
503 }
504
505 pub async fn container_stop(&self, container_id: &str, timeout: Duration) -> Result<(), Error> {
508 let mut query_params: HashMap<String, String> = HashMap::new();
509 query_params.insert("t".into(), timeout.as_secs().to_string());
510 let query_params = if !query_params.is_empty() {
511 Some(query_params)
512 } else {
513 None
514 };
515
516 let request = Request::builder()
517 .method(Method::POST)
518 .uri(self.request_uri(
519 &format!(
520 "/containers/{}/stop",
521 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
522 ),
523 query_params,
524 )?)
525 .header("Accept", "application/json")
526 .body(Body::empty())
527 .context(HttpClientRequestBuilderError {})?;
528
529 let client = self.client.as_ref().unwrap();
530 let response = tokio::time::timeout(timeout + self.timeout, client.request(request))
531 .await
532 .context(HttpClientTimeoutError {})?
533 .context(HttpClientError {})?;
534 ensure!(
535 response.status().is_success(),
536 HttpClientResponseError {
537 status: response.status().as_u16()
538 }
539 );
540
541 Ok(())
542 }
543
544 pub async fn container_restart(
547 &self,
548 container_id: &str,
549 timeout: Duration,
550 ) -> Result<(), Error> {
551 let mut query_params: HashMap<String, String> = HashMap::new();
552 query_params.insert("t".into(), timeout.as_secs().to_string());
553 let query_params = if !query_params.is_empty() {
554 Some(query_params)
555 } else {
556 None
557 };
558
559 let request = Request::builder()
560 .method(Method::POST)
561 .uri(self.request_uri(
562 &format!(
563 "/containers/{}/restart",
564 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
565 ),
566 query_params,
567 )?)
568 .header("Accept", "application/json")
569 .body(Body::empty())
570 .context(HttpClientRequestBuilderError {})?;
571
572 let client = self.client.as_ref().unwrap();
573 let response = tokio::time::timeout(timeout + self.timeout, client.request(request))
574 .await
575 .context(HttpClientTimeoutError {})?
576 .context(HttpClientError {})?;
577 ensure!(
578 response.status().is_success(),
579 HttpClientResponseError {
580 status: response.status().as_u16()
581 }
582 );
583
584 Ok(())
585 }
586
587 pub async fn container_kill(
590 &self,
591 container_id: &str,
592 signal: Option<String>,
593 ) -> Result<(), Error> {
594 let mut query_params: HashMap<String, String> = HashMap::new();
595 if let Some(signal) = signal {
596 query_params.insert("signal".into(), signal);
597 }
598 let query_params = if !query_params.is_empty() {
599 Some(query_params)
600 } else {
601 None
602 };
603
604 let request = Request::builder()
605 .method(Method::POST)
606 .uri(self.request_uri(
607 &format!(
608 "/containers/{}/kill",
609 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
610 ),
611 query_params,
612 )?)
613 .header("Accept", "application/json")
614 .body(Body::empty())
615 .context(HttpClientRequestBuilderError {})?;
616
617 let client = self.client.as_ref().unwrap();
618 let response = timeout(self.timeout, client.request(request))
619 .await
620 .context(HttpClientTimeoutError {})?
621 .context(HttpClientError {})?;
622 ensure!(
623 response.status().is_success(),
624 HttpClientResponseError {
625 status: response.status().as_u16()
626 }
627 );
628
629 Ok(())
630 }
631
632 pub async fn container_update(
634 &self,
635 container_id: &str,
636 update_config: UpdateConfig,
637 ) -> Result<ContainerUpdateOKBody, Error> {
638 let request = Request::builder()
639 .method(Method::POST)
640 .uri(self.request_uri(
641 &format!(
642 "/containers/{}/update",
643 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
644 ),
645 None,
646 )?)
647 .header("Content-Type", "application/json")
648 .header("Accept", "application/json")
649 .body(Body::from(
650 serde_json::to_string(&update_config).context(JsonSerializationError {})?,
651 ))
652 .context(HttpClientRequestBuilderError {})?;
653
654 let client = self.client.as_ref().unwrap();
655 let response = timeout(self.timeout, client.request(request))
656 .await
657 .context(HttpClientTimeoutError {})?
658 .context(HttpClientError {})?;
659 ensure!(
660 response.status().is_success(),
661 HttpClientResponseError {
662 status: response.status().as_u16()
663 }
664 );
665 let response_body = read_response_body(response, self.timeout).await?;
666 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
667 }
668
669 pub async fn container_rename(
671 &self,
672 container_id: &str,
673 new_container_name: &str,
674 ) -> Result<(), Error> {
675 let mut query_params: HashMap<String, String> = HashMap::new();
676 query_params.insert("name".into(), new_container_name.to_string());
677 let query_params = if !query_params.is_empty() {
678 Some(query_params)
679 } else {
680 None
681 };
682
683 let request = Request::builder()
684 .method(Method::POST)
685 .uri(self.request_uri(
686 &format!(
687 "/containers/{}/rename",
688 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
689 ),
690 query_params,
691 )?)
692 .header("Accept", "application/json")
693 .body(Body::empty())
694 .context(HttpClientRequestBuilderError {})?;
695
696 let client = self.client.as_ref().unwrap();
697 let response = timeout(self.timeout, client.request(request))
698 .await
699 .context(HttpClientTimeoutError {})?
700 .context(HttpClientError {})?;
701 ensure!(
702 response.status().is_success(),
703 HttpClientResponseError {
704 status: response.status().as_u16()
705 }
706 );
707
708 Ok(())
709 }
710
711 pub async fn container_pause(&self, container_id: &str) -> Result<(), Error> {
713 let request = Request::builder()
714 .method(Method::POST)
715 .uri(self.request_uri(
716 &format!(
717 "/containers/{}/pause",
718 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
719 ),
720 None,
721 )?)
722 .header("Accept", "application/json")
723 .body(Body::empty())
724 .context(HttpClientRequestBuilderError {})?;
725
726 let client = self.client.as_ref().unwrap();
727 let response = timeout(self.timeout, client.request(request))
728 .await
729 .context(HttpClientTimeoutError {})?
730 .context(HttpClientError {})?;
731 ensure!(
732 response.status().is_success(),
733 HttpClientResponseError {
734 status: response.status().as_u16()
735 }
736 );
737
738 Ok(())
739 }
740
741 pub async fn container_unpause(&self, container_id: &str) -> Result<(), Error> {
743 let request = Request::builder()
744 .method(Method::POST)
745 .uri(self.request_uri(
746 &format!(
747 "/containers/{}/unpause",
748 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
749 ),
750 None,
751 )?)
752 .header("Accept", "application/json")
753 .body(Body::empty())
754 .context(HttpClientRequestBuilderError {})?;
755
756 let client = self.client.as_ref().unwrap();
757 let response = timeout(self.timeout, client.request(request))
758 .await
759 .context(HttpClientTimeoutError {})?
760 .context(HttpClientError {})?;
761 ensure!(
762 response.status().is_success(),
763 HttpClientResponseError {
764 status: response.status().as_u16()
765 }
766 );
767
768 Ok(())
769 }
770
771 pub async fn container_wait(
775 &self,
776 container_id: &str,
777 condition: WaitCondition,
778 timeout: Duration,
779 ) -> Result<ContainerWaitOKBody, Error> {
780 let mut query_params: HashMap<String, String> = HashMap::new();
781 query_params.insert("condition".into(), condition.to_string());
782 let query_params = if !query_params.is_empty() {
783 Some(query_params)
784 } else {
785 None
786 };
787
788 let request = Request::builder()
789 .method(Method::POST)
790 .uri(self.request_uri(
791 &format!(
792 "/containers/{}/wait",
793 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
794 ),
795 query_params,
796 )?)
797 .header("Accept", "application/json")
798 .body(Body::empty())
799 .context(HttpClientRequestBuilderError {})?;
800
801 let client = self.client.as_ref().unwrap();
802 let response = tokio::time::timeout(timeout + self.timeout, client.request(request))
803 .await
804 .context(HttpClientTimeoutError {})?
805 .context(HttpClientError {})?;
806 ensure!(
807 response.status().is_success(),
808 HttpClientResponseError {
809 status: response.status().as_u16()
810 }
811 );
812
813 let response_body = read_response_body(response, timeout).await?;
814 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
815 }
816
817 pub async fn container_remove(
819 &self,
820 container_id: &str,
821 options: Option<ContainerRemoveOptions>,
822 ) -> Result<(), Error> {
823 let mut query_params: HashMap<String, String> = HashMap::new();
824 if let Some(options) = options {
825 if options.remove_volumes {
826 query_params.insert("v".into(), "1".into());
827 }
828 if options.remove_links {
829 query_params.insert("link".into(), "1".into());
830 }
831 if options.force {
832 query_params.insert("force".into(), "1".into());
833 }
834 }
835 let query_params = if !query_params.is_empty() {
836 Some(query_params)
837 } else {
838 None
839 };
840
841 let request = Request::builder()
842 .method(Method::DELETE)
843 .uri(self.request_uri(
844 &format!(
845 "/containers/{}",
846 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
847 ),
848 query_params,
849 )?)
850 .header("Accept", "application/json")
851 .body(Body::empty())
852 .context(HttpClientRequestBuilderError {})?;
853
854 let client = self.client.as_ref().unwrap();
855 let response = timeout(self.timeout, client.request(request))
856 .await
857 .context(HttpClientTimeoutError {})?
858 .context(HttpClientError {})?;
859 ensure!(
860 response.status().is_success(),
861 HttpClientResponseError {
862 status: response.status().as_u16()
863 }
864 );
865
866 Ok(())
867 }
868
869 pub async fn container_stat_path(
871 &self,
872 container_id: &str,
873 path: &str,
874 ) -> Result<ContainerPathStat, Error> {
875 let mut query_params: HashMap<String, String> = HashMap::new();
876 query_params.insert("path".into(), path.to_string());
877 let query_params = if !query_params.is_empty() {
878 Some(query_params)
879 } else {
880 None
881 };
882
883 let request = Request::builder()
884 .method(Method::HEAD)
885 .uri(self.request_uri(
886 &format!(
887 "/containers/{}/archive",
888 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
889 ),
890 query_params,
891 )?)
892 .body(Body::empty())
893 .context(HttpClientRequestBuilderError {})?;
894
895 let client = self.client.as_ref().unwrap();
896 let response = timeout(self.timeout, client.request(request))
897 .await
898 .context(HttpClientTimeoutError {})?
899 .context(HttpClientError {})?;
900 ensure!(
901 response.status().is_success(),
902 HttpClientResponseError {
903 status: response.status().as_u16()
904 }
905 );
906
907 response
908 .headers()
909 .get("X-Docker-Container-Path-Stat")
910 .map(|v| get_container_path_stat_from_header(v.to_str().unwrap()))
911 .context(MalformedResponseError {})?
912 }
913
914 pub async fn copy_from_container(
917 &self,
918 container_id: &str,
919 src_path: &str,
920 ) -> Result<(ContainerPathStat, Vec<u8>), Error> {
921 let mut query_params: HashMap<String, String> = HashMap::new();
922 query_params.insert("path".into(), src_path.to_string());
923 let query_params = if !query_params.is_empty() {
924 Some(query_params)
925 } else {
926 None
927 };
928
929 let request = Request::builder()
930 .method(Method::GET)
931 .uri(self.request_uri(
932 &format!(
933 "/containers/{}/archive",
934 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
935 ),
936 query_params,
937 )?)
938 .header("Accept", "application/x-tar")
939 .body(Body::empty())
940 .context(HttpClientRequestBuilderError {})?;
941
942 let client = self.client.as_ref().unwrap();
943 let response = timeout(self.timeout, client.request(request))
944 .await
945 .context(HttpClientTimeoutError {})?
946 .context(HttpClientError {})?;
947 ensure!(
948 response.status().is_success(),
949 HttpClientResponseError {
950 status: response.status().as_u16()
951 }
952 );
953
954 let container_path_stat = response
955 .headers()
956 .get("X-Docker-Container-Path-Stat")
957 .map(|v| get_container_path_stat_from_header(v.to_str().unwrap()))
958 .context(MalformedResponseError {})??;
959
960 let response_body_raw = read_response_body_raw(response, Duration::from_secs(300)).await?;
961 Ok((container_path_stat, response_body_raw))
962 }
963
964 pub async fn copy_to_container<S, O, E>(
967 &self,
968 container_id: &str,
969 dst_path: &str,
970 content: S,
971 options: Option<CopyToContainerOptions>,
972 ) -> Result<(), Error>
973 where
974 S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
975 O: Into<Bytes> + 'static,
976 E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
977 {
978 let mut query_params: HashMap<String, String> = HashMap::new();
979 query_params.insert("path".into(), dst_path.to_string());
980 if let Some(options) = options {
981 if options.allow_overwrite_dir_with_file {
982 query_params.insert("noOverwriteDirNonDir".into(), "true".into());
983 }
984 if options.copy_uid_gid {
985 query_params.insert("copyUIDGID".into(), "true".into());
986 }
987 }
988 let query_params = if !query_params.is_empty() {
989 Some(query_params)
990 } else {
991 None
992 };
993
994 let request = Request::builder()
995 .method(Method::PUT)
996 .uri(self.request_uri(
997 &format!(
998 "/containers/{}/archive",
999 utf8_percent_encode(container_id, NON_ALPHANUMERIC).to_string()
1000 ),
1001 query_params,
1002 )?)
1003 .header("Content-Type", "application/x-tar")
1004 .body(Body::wrap_stream(content))
1005 .context(HttpClientRequestBuilderError {})?;
1006
1007 let client = self.client.as_ref().unwrap();
1008 let response = timeout(self.timeout, client.request(request))
1009 .await
1010 .context(HttpClientTimeoutError {})?
1011 .context(HttpClientError {})?;
1012 ensure!(
1013 response.status().is_success(),
1014 HttpClientResponseError {
1015 status: response.status().as_u16()
1016 }
1017 );
1018
1019 Ok(())
1020 }
1021
1022 pub async fn containers_prune(
1024 &self,
1025 prune_filters: Option<filters::Args>,
1026 ) -> Result<ContainersPruneReport, Error> {
1027 let query_params = if let Some(prune_filters) = prune_filters {
1028 Some(get_filters_query(prune_filters)?)
1029 } else {
1030 None
1031 };
1032 let request = Request::builder()
1033 .method(Method::POST)
1034 .uri(self.request_uri("/containers/prune", query_params)?)
1035 .header("Accept", "application/json")
1036 .body(Body::empty())
1037 .context(HttpClientRequestBuilderError {})?;
1038
1039 let client = self.client.as_ref().unwrap();
1040 let response = timeout(self.timeout, client.request(request))
1041 .await
1042 .context(HttpClientTimeoutError {})?
1043 .context(HttpClientError {})?;
1044 ensure!(
1045 response.status().is_success(),
1046 HttpClientResponseError {
1047 status: response.status().as_u16()
1048 }
1049 );
1050 let response_body = read_response_body(response, self.timeout).await?;
1051 Ok(serde_json::from_str(&response_body).context(JsonDeserializationError {})?)
1052 }
1053}
1054
1055fn get_container_path_stat_from_header(header: &str) -> Result<ContainerPathStat, Error> {
1056 let contents = base64::decode(header).context(B64DecodingError)?;
1057 Ok(serde_json::from_str(&String::from_utf8_lossy(&contents))
1058 .context(JsonDeserializationError {})?)
1059}
1060
1061pub struct LogStream {
1063 inner: Pin<Box<Lines<BufReader<AsyncHttpBodyReader>>>>,
1064}
1065
1066impl LogStream {
1067 pub fn new(response: Response<Body>) -> Self {
1068 Self {
1069 inner: Box::pin(BufReader::new(AsyncHttpBodyReader::new(response)).lines()),
1070 }
1071 }
1072}
1073
1074impl Stream for LogStream {
1075 type Item = Result<String, Error>;
1076
1077 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1078 if let Some(line) = ready!(self.inner.as_mut().poll_next(cx)) {
1079 match line {
1080 Ok(line) => Poll::Ready(Some(Ok(line))),
1081 Err(err) => Poll::Ready(Some(Err(err).context(IoError {}))),
1082 }
1083 } else {
1084 Poll::Ready(None)
1085 }
1086 }
1087}
1088
1089pub struct StatsStream {
1091 inner: Pin<Box<Lines<BufReader<AsyncHttpBodyReader>>>>,
1092}
1093
1094impl StatsStream {
1095 pub fn new(response: Response<Body>) -> Self {
1096 Self {
1097 inner: Box::pin(BufReader::new(AsyncHttpBodyReader::new(response)).lines()),
1098 }
1099 }
1100}
1101
1102impl Stream for StatsStream {
1103 type Item = Result<Stats, Error>;
1104
1105 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1106 if let Some(line) = ready!(self.inner.as_mut().poll_next(cx)) {
1107 match line {
1108 Ok(line) => Poll::Ready(Some(
1109 serde_json::from_str(&line).context(JsonDeserializationError),
1110 )),
1111 Err(err) => Poll::Ready(Some(Err(err).context(IoError {}))),
1112 }
1113 } else {
1114 Poll::Ready(None)
1115 }
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use super::*;
1122 use crate::types::container::*;
1123 use crate::{opts, LocalDockerEngineClient};
1124 use futures::StreamExt;
1125 use maplit::hashmap;
1126 use std::collections::HashSet;
1127 use std::io::Read;
1128 use tar::Archive;
1129
1130 #[tokio::test]
1131 async fn test_container_api() {
1132 let docker_client =
1133 LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))
1134 .unwrap();
1135
1136 let create_response = docker_client
1138 .container_create(
1139 ContainerConfigBuilder::default()
1140 .image(Some("busybox".into()))
1141 .tty(Some(true))
1142 .cmd(Some(vec![
1143 "/bin/sh".into(),
1144 "-c".into(),
1145 "while true; do echo 'hello test_container_api'; sleep 1; done".into(),
1146 ]))
1147 .build()
1148 .unwrap(),
1149 Some("test_container_api".into()),
1150 )
1151 .await
1152 .unwrap();
1153
1154 docker_client
1156 .container_start(&create_response.id, None)
1157 .await
1158 .unwrap();
1159
1160 docker_client
1162 .container_resize(
1163 &create_response.id,
1164 ResizeOptionsBuilder::default()
1165 .width(Some(100))
1166 .height(Some(20))
1167 .build()
1168 .unwrap(),
1169 )
1170 .await
1171 .unwrap();
1172
1173 docker_client
1175 .container_rename(&create_response.id, "test_container_api_renamed")
1176 .await
1177 .unwrap();
1178
1179 let containers = docker_client
1181 .container_list(Some(
1182 ContainerListOptionsBuilder::default()
1183 .filters(
1184 filters::ArgsBuilder::default()
1185 .fields(hashmap! {
1186 "id".into() => vec![create_response.id.clone()],
1187 })
1188 .build()
1189 .unwrap(),
1190 )
1191 .build()
1192 .unwrap(),
1193 ))
1194 .await
1195 .unwrap();
1196
1197 assert_eq!(containers.len(), 1);
1198 assert_eq!(
1199 &containers[0].names.as_ref().unwrap()[0],
1200 "/test_container_api_renamed"
1201 );
1202 assert_eq!(&containers[0].image, "busybox");
1203
1204 docker_client
1206 .container_update(
1207 &create_response.id,
1208 UpdateConfigBuilder::default()
1209 .memory(Some(10_000_000))
1210 .build()
1211 .unwrap(),
1212 )
1213 .await
1214 .unwrap();
1215
1216 let container = docker_client
1218 .container_inspect(&create_response.id)
1219 .await
1220 .unwrap();
1221
1222 assert_eq!(container.host_config.unwrap().memory.unwrap(), 10_000_000);
1223
1224 let mut stats = docker_client
1226 .container_stats(&create_response.id, false)
1227 .await
1228 .unwrap();
1229
1230 let stats = stats.body.next().await.unwrap().unwrap();
1231
1232 let memory_stats = stats.memory_stats.unwrap();
1234 assert!(memory_stats.usage.unwrap() > 500_000);
1235 assert!(memory_stats.usage.unwrap() < 5_000_000);
1236
1237 let mut logs = docker_client
1239 .container_logs(
1240 &create_response.id,
1241 Some(
1242 ContainerLogsOptionsBuilder::default()
1243 .show_stdout(true)
1244 .follow(true)
1245 .build()
1246 .unwrap(),
1247 ),
1248 )
1249 .await
1250 .unwrap();
1251
1252 let log_line = logs.next().await.unwrap().unwrap();
1253 assert_eq!(&log_line, "hello test_container_api");
1254
1255 let top_response = docker_client
1257 .container_top(&create_response.id, None)
1258 .await
1259 .unwrap();
1260
1261 let cmd_index = top_response
1262 .titles
1263 .iter()
1264 .position(|title| title.eq("CMD"))
1265 .unwrap();
1266 let commands = top_response
1267 .processes
1268 .iter()
1269 .map(|process| process[cmd_index].clone())
1270 .collect::<Vec<_>>();
1271 let shell_command = commands
1272 .iter()
1273 .find(|command| command.starts_with("/bin/sh"));
1274
1275 assert!(shell_command.is_some());
1276
1277 docker_client
1279 .container_stop(&create_response.id, Duration::from_secs(0))
1280 .await
1281 .unwrap();
1282
1283 docker_client
1285 .container_remove(
1286 &create_response.id,
1287 Some(
1288 ContainerRemoveOptionsBuilder::default()
1289 .force(true)
1290 .build()
1291 .unwrap(),
1292 ),
1293 )
1294 .await
1295 .unwrap();
1296 }
1297
1298 #[tokio::test]
1299 async fn test_container_lifecycle_api() {
1300 let docker_client =
1301 LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))
1302 .unwrap();
1303
1304 let create_response = docker_client
1306 .container_create(
1307 ContainerConfigBuilder::default()
1308 .image(Some("busybox".into()))
1309 .tty(Some(true))
1310 .cmd(Some(vec![
1311 "/bin/sh".into(),
1312 "-c".into(),
1313 "for i in 1 2 3; do echo \"$i\"; sleep 1; done".into(),
1314 ]))
1315 .build()
1316 .unwrap(),
1317 Some("test_container_lifecycle_api".into()),
1318 )
1319 .await
1320 .unwrap();
1321
1322 docker_client
1324 .container_start(&create_response.id, None)
1325 .await
1326 .unwrap();
1327
1328 let mut logs = docker_client
1330 .container_logs(
1331 &create_response.id,
1332 Some(
1333 ContainerLogsOptionsBuilder::default()
1334 .show_stdout(true)
1335 .follow(true)
1336 .build()
1337 .unwrap(),
1338 ),
1339 )
1340 .await
1341 .unwrap();
1342
1343 let log_line = logs.next().await.unwrap().unwrap();
1344 let last_log_time = SystemTime::now();
1345 assert_eq!(&log_line, "1");
1346
1347 docker_client
1349 .container_restart(&create_response.id, Duration::from_secs(0))
1350 .await
1351 .unwrap();
1352
1353 let mut logs = docker_client
1355 .container_logs(
1356 &create_response.id,
1357 Some(
1358 ContainerLogsOptionsBuilder::default()
1359 .show_stdout(true)
1360 .follow(true)
1361 .since(Some(last_log_time))
1362 .build()
1363 .unwrap(),
1364 ),
1365 )
1366 .await
1367 .unwrap();
1368
1369 let log_line = logs.next().await.unwrap().unwrap();
1370 assert_eq!(&log_line, "1");
1371
1372 docker_client
1374 .container_pause(&create_response.id)
1375 .await
1376 .unwrap();
1377
1378 let log_line = timeout(Duration::from_secs(2), logs.next()).await;
1380 assert!(log_line.is_err());
1381
1382 docker_client
1384 .container_unpause(&create_response.id)
1385 .await
1386 .unwrap();
1387
1388 let log_line = logs.next().await.unwrap().unwrap();
1389 assert_eq!(&log_line, "2");
1390
1391 let exit_status = docker_client
1393 .container_wait(
1394 &create_response.id,
1395 WaitCondition::NextExit,
1396 Duration::from_secs(5),
1397 )
1398 .await
1399 .unwrap();
1400
1401 assert_eq!(exit_status.status_code, 0);
1402
1403 docker_client
1405 .container_remove(
1406 &create_response.id,
1407 Some(
1408 ContainerRemoveOptionsBuilder::default()
1409 .force(true)
1410 .build()
1411 .unwrap(),
1412 ),
1413 )
1414 .await
1415 .unwrap();
1416 }
1417
1418 #[tokio::test]
1419 async fn test_container_filesystem_api() {
1420 let docker_client =
1421 LocalDockerEngineClient::new_client_with_opts(Some(vec![Box::new(opts::from_env)]))
1422 .unwrap();
1423
1424 let create_response = docker_client
1426 .container_create(
1427 ContainerConfigBuilder::default()
1428 .image(Some("busybox".into()))
1429 .cmd(Some(vec!["sleep".into(), "60".into()]))
1430 .build()
1431 .unwrap(),
1432 Some("test_container_filesystem_api".into()),
1433 )
1434 .await
1435 .unwrap();
1436
1437 let chunks: Vec<Result<_, std::io::Error>> =
1438 vec![std::fs::read_to_string("src/test_fixture/hello_world.tar")];
1439 let content = futures::stream::iter(chunks);
1440
1441 docker_client
1443 .copy_to_container(&create_response.id, "/", content, None)
1444 .await
1445 .unwrap();
1446
1447 let stat_path = docker_client
1449 .container_stat_path(&create_response.id, "/hello.txt")
1450 .await
1451 .unwrap();
1452
1453 assert_eq!(stat_path.size, 12);
1454
1455 let (stat_path, hello_tar) = docker_client
1457 .copy_from_container(&create_response.id, "/hello.txt")
1458 .await
1459 .unwrap();
1460
1461 assert_eq!(stat_path.size, 12);
1462
1463 let mut hello_tar = Archive::new(hello_tar.as_slice());
1464 let mut hello_txt_entry = hello_tar.entries().unwrap().next().unwrap().unwrap();
1465
1466 let mut s = String::new();
1468 hello_txt_entry.read_to_string(&mut s).unwrap();
1469 assert_eq!(&s, "Hello World\n");
1470
1471 let changes = docker_client
1472 .container_diff(&create_response.id)
1473 .await
1474 .unwrap()
1475 .unwrap();
1476
1477 assert_eq!(&changes[0].path, "/hello.txt");
1478
1479 let container_tar = docker_client
1481 .container_export(&create_response.id)
1482 .await
1483 .unwrap();
1484
1485 let mut container_tar = Archive::new(container_tar.as_slice());
1487 let container_files = container_tar
1488 .entries()
1489 .unwrap()
1490 .map(|entry| String::from(entry.unwrap().path().unwrap().to_string_lossy()))
1491 .collect::<HashSet<_>>();
1492
1493 assert!(container_files.contains("hello.txt"));
1495
1496 docker_client
1498 .container_remove(
1499 &create_response.id,
1500 Some(
1501 ContainerRemoveOptionsBuilder::default()
1502 .force(true)
1503 .build()
1504 .unwrap(),
1505 ),
1506 )
1507 .await
1508 .unwrap();
1509 }
1510}