Skip to main content

chord_util/docker/
engine.rs

1use std::collections::VecDeque;
2use std::str::FromStr;
3
4use futures::StreamExt;
5use log::trace;
6use reqwest::header::{HeaderName, HeaderValue};
7use reqwest::{Client, Method, Response, Url};
8
9use chord_core::value::Value;
10
11use crate::docker::Error;
12use crate::docker::Error::*;
13
14pub struct Engine {
15    address: String,
16    client: Client,
17}
18
19impl Engine {
20    pub async fn new(address: String) -> Result<Engine, Error> {
21        trace!("docker info {}", address);
22        let client = Client::new();
23        call0(
24            client.clone(),
25            address.as_str(),
26            "info",
27            Method::GET,
28            None,
29            999,
30            |buf| String::from_utf8_lossy(buf).to_string(),
31        )
32        .await
33        .map_err(|e| e.into())
34        .map(|_| Engine { address, client })
35    }
36
37    pub async fn call(
38        &self,
39        uri: &str,
40        method: Method,
41        data: Option<Value>,
42        tail_size: usize,
43    ) -> Result<Vec<String>, Error> {
44        self.call_with_op(uri, method, data, tail_size, |buf| {
45            String::from_utf8_lossy(buf).to_string()
46        })
47        .await
48    }
49
50    pub async fn call_with_op<O: Fn(&Vec<u8>) -> String>(
51        &self,
52        uri: &str,
53        method: Method,
54        data: Option<Value>,
55        tail_size: usize,
56        op: O,
57    ) -> Result<Vec<String>, Error> {
58        call0(
59            self.client.clone(),
60            self.address.as_str(),
61            uri,
62            method,
63            data,
64            tail_size,
65            op,
66        )
67        .await
68        .map_err(|e| e.into())
69    }
70}
71
72const CR: u8 = 0x0D;
73const LF: u8 = 0x0A;
74
75async fn call0<O: Fn(&Vec<u8>) -> String>(
76    client: Client,
77    address: &str,
78    uri: &str,
79    method: Method,
80    data: Option<Value>,
81    tail_size: usize,
82    op: O,
83) -> Result<Vec<String>, Error> {
84    let url = format!("http://{}/{}", address, uri);
85    let url = Url::from_str(url.as_str()).or(Err(Error::Url(url)))?;
86    let mut rb = client.request(method, url);
87    rb = rb.header(
88        HeaderName::from_str("Content-Type").unwrap(),
89        HeaderValue::from_str("application/json").unwrap(),
90    );
91    if let Some(d) = data {
92        rb = rb.body(d.to_string());
93    }
94
95    let res: Response = rb.send().await.map_err(|e| Error::Io(e.to_string()))?;
96
97    let mut tail_lines: VecDeque<String> = VecDeque::with_capacity(tail_size);
98
99    let status = res.status();
100
101    let mut stream = res.bytes_stream();
102    let mut line_buf = Vec::new();
103    while let Some(bytes) = stream.next().await {
104        let bytes = bytes.map_err(|e| Error::Io(e.to_string()))?;
105        for b in bytes {
106            if b == CR {
107                // ignore
108            } else if b == LF {
109                end_of_line(&mut line_buf, &mut tail_lines, tail_size, &op);
110            } else {
111                line_buf.push(b);
112            }
113        }
114    }
115    end_of_line(&mut line_buf, &mut tail_lines, tail_size, &op);
116
117    return if !status.is_success() {
118        Err(Status(status.into()))
119    } else {
120        Ok(tail_lines.into())
121    };
122}
123
124fn end_of_line<O: Fn(&Vec<u8>) -> String>(
125    line_buf: &mut Vec<u8>,
126    tail_lines: &mut VecDeque<String>,
127    tail_size: usize,
128    op: &O,
129) {
130    if line_buf.is_empty() {
131        return;
132    }
133    let line: String = op(line_buf);
134    line_buf.clear();
135
136    trace!("{}", line);
137
138    tail_lines.push_back(line.clone());
139    if tail_lines.len() > tail_size {
140        tail_lines.pop_front();
141    }
142}