rs_docker_containers2arrow_ipc/
lib.rs

1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::ClientVersion;
10use bollard::Docker;
11use bollard::models::ContainerSummary;
12use bollard::query_parameters::ListContainersOptions;
13
14pub async fn list_containers(
15    d: &Docker,
16    opts: Option<ListContainersOptions>,
17) -> Result<Vec<ContainerSummary>, io::Error> {
18    d.list_containers(opts).await.map_err(io::Error::other)
19}
20
21use arrow::array::ArrayRef;
22use arrow::array::builder::{Int64Builder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24
25pub fn summary_schema() -> Arc<Schema> {
26    Arc::new(Schema::new(vec![
27        Field::new("id", DataType::Utf8, true),
28        Field::new("image", DataType::Utf8, true),
29        Field::new("image_id", DataType::Utf8, true),
30        Field::new("command", DataType::Utf8, true),
31        Field::new("state", DataType::Utf8, true),
32        Field::new("status", DataType::Utf8, true),
33        Field::new("created", DataType::Int64, true),
34        Field::new("size_rw", DataType::Int64, true),
35        Field::new("size_root_fs", DataType::Int64, true),
36    ]))
37}
38
39pub fn containers2batch(
40    containers: Vec<ContainerSummary>,
41    schema: Arc<Schema>,
42) -> Result<RecordBatch, io::Error> {
43    let mut id_builder = StringBuilder::new();
44    let mut image_builder = StringBuilder::new();
45    let mut image_id_builder = StringBuilder::new();
46    let mut command_builder = StringBuilder::new();
47    let mut state_builder = StringBuilder::new();
48    let mut status_builder = StringBuilder::new();
49    let mut created_builder = Int64Builder::new();
50    let mut size_rw_builder = Int64Builder::new();
51    let mut size_root_fs_builder = Int64Builder::new();
52
53    for container in containers {
54        id_builder.append_option(container.id);
55        image_builder.append_option(container.image);
56        image_id_builder.append_option(container.image_id);
57        command_builder.append_option(container.command);
58        state_builder.append_option(container.state);
59        status_builder.append_option(container.status);
60        created_builder.append_option(container.created);
61        size_rw_builder.append_option(container.size_rw);
62        size_root_fs_builder.append_option(container.size_root_fs);
63    }
64
65    let columns: Vec<ArrayRef> = vec![
66        Arc::new(id_builder.finish()),
67        Arc::new(image_builder.finish()),
68        Arc::new(image_id_builder.finish()),
69        Arc::new(command_builder.finish()),
70        Arc::new(state_builder.finish()),
71        Arc::new(status_builder.finish()),
72        Arc::new(created_builder.finish()),
73        Arc::new(size_rw_builder.finish()),
74        Arc::new(size_root_fs_builder.finish()),
75    ];
76
77    RecordBatch::try_new(schema, columns).map_err(io::Error::other)
78}
79
80pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
81where
82    W: Write;
83
84impl<W> IpcStreamWriter<W>
85where
86    W: Write,
87{
88    pub fn finish(&mut self) -> Result<(), io::Error> {
89        self.0.finish().map_err(io::Error::other)
90    }
91
92    pub fn flush(&mut self) -> Result<(), io::Error> {
93        self.0.flush().map_err(io::Error::other)
94    }
95
96    pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
97        self.0.write(b).map_err(io::Error::other)
98    }
99}
100
101pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
102where
103    W: Write,
104{
105    let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
106        .map_err(io::Error::other)?;
107    let mut iw = IpcStreamWriter(swtr);
108    iw.write_batch(b)?;
109    iw.flush()?;
110    iw.finish()?;
111
112    drop(iw);
113
114    wtr.flush()
115}
116
117pub fn containers2writer<W>(
118    containers: Vec<ContainerSummary>,
119    mut wtr: W,
120    sch: Arc<Schema>,
121) -> Result<(), io::Error>
122where
123    W: Write,
124{
125    let batch = containers2batch(containers, sch.clone())?;
126    batch2writer(&batch, &mut wtr, &sch)
127}
128
129pub async fn list_containers_and_write<W>(
130    d: &Docker,
131    mut wtr: W,
132    opts: Option<ListContainersOptions>,
133) -> Result<(), io::Error>
134where
135    W: Write,
136{
137    let containers = list_containers(d, opts).await?;
138    let schema = summary_schema();
139    containers2writer(containers, &mut wtr, schema)
140}
141
142pub fn unix2docker(
143    sock_path: &str,
144    timeout_seconds: u64,
145    client_version: &ClientVersion,
146) -> Result<Docker, io::Error> {
147    Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
148}
149
150pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
151pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 30;
152pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;