chord_util/docker/
engine.rs1use std::collections::VecDeque;
2use std::str::FromStr;
3
4use futures::StreamExt;
5use log::trace;
6use reqwest::header::{HeaderName, HeaderValue};
7use reqwest::{Client, Method, Response, Url};
8
9use chord_core::value::Value;
10
11use crate::docker::Error;
12use crate::docker::Error::*;
13
14pub struct Engine {
15 address: String,
16 client: Client,
17}
18
19impl Engine {
20 pub async fn new(address: String) -> Result<Engine, Error> {
21 trace!("docker info {}", address);
22 let client = Client::new();
23 call0(
24 client.clone(),
25 address.as_str(),
26 "info",
27 Method::GET,
28 None,
29 999,
30 |buf| String::from_utf8_lossy(buf).to_string(),
31 )
32 .await
33 .map_err(|e| e.into())
34 .map(|_| Engine { address, client })
35 }
36
37 pub async fn call(
38 &self,
39 uri: &str,
40 method: Method,
41 data: Option<Value>,
42 tail_size: usize,
43 ) -> Result<Vec<String>, Error> {
44 self.call_with_op(uri, method, data, tail_size, |buf| {
45 String::from_utf8_lossy(buf).to_string()
46 })
47 .await
48 }
49
50 pub async fn call_with_op<O: Fn(&Vec<u8>) -> String>(
51 &self,
52 uri: &str,
53 method: Method,
54 data: Option<Value>,
55 tail_size: usize,
56 op: O,
57 ) -> Result<Vec<String>, Error> {
58 call0(
59 self.client.clone(),
60 self.address.as_str(),
61 uri,
62 method,
63 data,
64 tail_size,
65 op,
66 )
67 .await
68 .map_err(|e| e.into())
69 }
70}
71
72const CR: u8 = 0x0D;
73const LF: u8 = 0x0A;
74
75async fn call0<O: Fn(&Vec<u8>) -> String>(
76 client: Client,
77 address: &str,
78 uri: &str,
79 method: Method,
80 data: Option<Value>,
81 tail_size: usize,
82 op: O,
83) -> Result<Vec<String>, Error> {
84 let url = format!("http://{}/{}", address, uri);
85 let url = Url::from_str(url.as_str()).or(Err(Error::Url(url)))?;
86 let mut rb = client.request(method, url);
87 rb = rb.header(
88 HeaderName::from_str("Content-Type").unwrap(),
89 HeaderValue::from_str("application/json").unwrap(),
90 );
91 if let Some(d) = data {
92 rb = rb.body(d.to_string());
93 }
94
95 let res: Response = rb.send().await.map_err(|e| Error::Io(e.to_string()))?;
96
97 let mut tail_lines: VecDeque<String> = VecDeque::with_capacity(tail_size);
98
99 let status = res.status();
100
101 let mut stream = res.bytes_stream();
102 let mut line_buf = Vec::new();
103 while let Some(bytes) = stream.next().await {
104 let bytes = bytes.map_err(|e| Error::Io(e.to_string()))?;
105 for b in bytes {
106 if b == CR {
107 } else if b == LF {
109 end_of_line(&mut line_buf, &mut tail_lines, tail_size, &op);
110 } else {
111 line_buf.push(b);
112 }
113 }
114 }
115 end_of_line(&mut line_buf, &mut tail_lines, tail_size, &op);
116
117 return if !status.is_success() {
118 Err(Status(status.into()))
119 } else {
120 Ok(tail_lines.into())
121 };
122}
123
124fn end_of_line<O: Fn(&Vec<u8>) -> String>(
125 line_buf: &mut Vec<u8>,
126 tail_lines: &mut VecDeque<String>,
127 tail_size: usize,
128 op: &O,
129) {
130 if line_buf.is_empty() {
131 return;
132 }
133 let line: String = op(line_buf);
134 line_buf.clear();
135
136 trace!("{}", line);
137
138 tail_lines.push_back(line.clone());
139 if tail_lines.len() > tail_size {
140 tail_lines.pop_front();
141 }
142}