rs_docker_containers2arrow_ipc/
lib.rs1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::ClientVersion;
10use bollard::Docker;
11use bollard::models::ContainerSummary;
12use bollard::query_parameters::ListContainersOptions;
13
14pub async fn list_containers(
15 d: &Docker,
16 opts: Option<ListContainersOptions>,
17) -> Result<Vec<ContainerSummary>, io::Error> {
18 d.list_containers(opts).await.map_err(io::Error::other)
19}
20
21use arrow::array::ArrayRef;
22use arrow::array::builder::{Int64Builder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24
25pub fn summary_schema() -> Arc<Schema> {
26 Arc::new(Schema::new(vec![
27 Field::new("id", DataType::Utf8, true),
28 Field::new("image", DataType::Utf8, true),
29 Field::new("image_id", DataType::Utf8, true),
30 Field::new("command", DataType::Utf8, true),
31 Field::new("state", DataType::Utf8, true),
32 Field::new("status", DataType::Utf8, true),
33 Field::new("created", DataType::Int64, true),
34 Field::new("size_rw", DataType::Int64, true),
35 Field::new("size_root_fs", DataType::Int64, true),
36 ]))
37}
38
39pub fn containers2batch(
40 containers: Vec<ContainerSummary>,
41 schema: Arc<Schema>,
42) -> Result<RecordBatch, io::Error> {
43 let mut id_builder = StringBuilder::new();
44 let mut image_builder = StringBuilder::new();
45 let mut image_id_builder = StringBuilder::new();
46 let mut command_builder = StringBuilder::new();
47 let mut state_builder = StringBuilder::new();
48 let mut status_builder = StringBuilder::new();
49 let mut created_builder = Int64Builder::new();
50 let mut size_rw_builder = Int64Builder::new();
51 let mut size_root_fs_builder = Int64Builder::new();
52
53 for container in containers {
54 id_builder.append_option(container.id);
55 image_builder.append_option(container.image);
56 image_id_builder.append_option(container.image_id);
57 command_builder.append_option(container.command);
58 state_builder.append_option(container.state);
59 status_builder.append_option(container.status);
60 created_builder.append_option(container.created);
61 size_rw_builder.append_option(container.size_rw);
62 size_root_fs_builder.append_option(container.size_root_fs);
63 }
64
65 let columns: Vec<ArrayRef> = vec![
66 Arc::new(id_builder.finish()),
67 Arc::new(image_builder.finish()),
68 Arc::new(image_id_builder.finish()),
69 Arc::new(command_builder.finish()),
70 Arc::new(state_builder.finish()),
71 Arc::new(status_builder.finish()),
72 Arc::new(created_builder.finish()),
73 Arc::new(size_rw_builder.finish()),
74 Arc::new(size_root_fs_builder.finish()),
75 ];
76
77 RecordBatch::try_new(schema, columns).map_err(io::Error::other)
78}
79
80pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
81where
82 W: Write;
83
84impl<W> IpcStreamWriter<W>
85where
86 W: Write,
87{
88 pub fn finish(&mut self) -> Result<(), io::Error> {
89 self.0.finish().map_err(io::Error::other)
90 }
91
92 pub fn flush(&mut self) -> Result<(), io::Error> {
93 self.0.flush().map_err(io::Error::other)
94 }
95
96 pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
97 self.0.write(b).map_err(io::Error::other)
98 }
99}
100
101pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
102where
103 W: Write,
104{
105 let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
106 .map_err(io::Error::other)?;
107 let mut iw = IpcStreamWriter(swtr);
108 iw.write_batch(b)?;
109 iw.flush()?;
110 iw.finish()?;
111
112 drop(iw);
113
114 wtr.flush()
115}
116
117pub fn containers2writer<W>(
118 containers: Vec<ContainerSummary>,
119 mut wtr: W,
120 sch: Arc<Schema>,
121) -> Result<(), io::Error>
122where
123 W: Write,
124{
125 let batch = containers2batch(containers, sch.clone())?;
126 batch2writer(&batch, &mut wtr, &sch)
127}
128
129pub async fn list_containers_and_write<W>(
130 d: &Docker,
131 mut wtr: W,
132 opts: Option<ListContainersOptions>,
133) -> Result<(), io::Error>
134where
135 W: Write,
136{
137 let containers = list_containers(d, opts).await?;
138 let schema = summary_schema();
139 containers2writer(containers, &mut wtr, schema)
140}
141
142pub fn unix2docker(
143 sock_path: &str,
144 timeout_seconds: u64,
145 client_version: &ClientVersion,
146) -> Result<Docker, io::Error> {
147 Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
148}
149
150pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
151pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 30;
152pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;