rs_docker_images2arrow_ipc/
lib.rs1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::ClientVersion;
10use bollard::Docker;
11use bollard::models::ImageSummary;
12use bollard::query_parameters::ListImagesOptions;
13
14pub async fn list_images(
15 d: &Docker,
16 opts: Option<ListImagesOptions>,
17) -> Result<Vec<ImageSummary>, io::Error> {
18 d.list_images(opts).await.map_err(io::Error::other)
19}
20
21use arrow::array::ArrayRef;
22use arrow::array::builder::{Int64Builder, ListBuilder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24
25pub fn summary_schema() -> Arc<Schema> {
26 Arc::new(Schema::new(vec![
27 Field::new("id", DataType::Utf8, false),
28 Field::new("parent_id", DataType::Utf8, false),
29 Field::new("repo_tags", DataType::new_list(DataType::Utf8, true), false),
30 Field::new(
31 "repo_digests",
32 DataType::new_list(DataType::Utf8, true),
33 false,
34 ),
35 Field::new("created", DataType::Int64, false),
36 Field::new("size", DataType::Int64, false),
37 Field::new("shared_size", DataType::Int64, false),
38 Field::new("virtual_size", DataType::Int64, true), Field::new("containers", DataType::Int64, false),
41 ]))
44}
45
46pub fn images2batch(
47 imgs: Vec<ImageSummary>,
48 schema: Arc<Schema>,
49) -> Result<RecordBatch, io::Error> {
50 let mut id_builder = StringBuilder::new();
51 let mut parent_id_builder = StringBuilder::new();
52 let mut repo_tags_builder = ListBuilder::new(StringBuilder::new());
53 let mut repo_digests_builder = ListBuilder::new(StringBuilder::new());
54 let mut created_builder = Int64Builder::new();
55 let mut size_builder = Int64Builder::new();
56 let mut shared_size_builder = Int64Builder::new();
57 let mut virtual_size_builder = Int64Builder::new();
58 let mut containers_builder = Int64Builder::new();
59
60 for img in imgs {
61 id_builder.append_value(img.id);
62 parent_id_builder.append_value(img.parent_id);
63
64 repo_tags_builder.append(true);
65 for tag in img.repo_tags {
66 repo_tags_builder.values().append_value(tag);
67 }
68
69 repo_digests_builder.append(true);
70 for digest in img.repo_digests {
71 repo_digests_builder.values().append_value(digest);
72 }
73
74 created_builder.append_value(img.created);
75 size_builder.append_value(img.size);
76 shared_size_builder.append_value(img.shared_size);
77 containers_builder.append_value(img.containers);
78
79 if let Some(vs) = img.virtual_size {
80 virtual_size_builder.append_value(vs);
81 } else {
82 virtual_size_builder.append_null();
83 }
84 }
85
86 let id_array = id_builder.finish();
87 let parent_id_array = parent_id_builder.finish();
88 let repo_tags_array = repo_tags_builder.finish();
89 let repo_digests_array = repo_digests_builder.finish();
90 let created_array = created_builder.finish();
91 let size_array = size_builder.finish();
92 let shared_size_array = shared_size_builder.finish();
93 let virtual_size_array = virtual_size_builder.finish();
94 let containers_array = containers_builder.finish();
95
96 let columns: Vec<ArrayRef> = vec![
97 std::sync::Arc::new(id_array),
98 std::sync::Arc::new(parent_id_array),
99 std::sync::Arc::new(repo_tags_array),
100 std::sync::Arc::new(repo_digests_array),
101 std::sync::Arc::new(created_array),
102 std::sync::Arc::new(size_array),
103 std::sync::Arc::new(shared_size_array),
104 std::sync::Arc::new(virtual_size_array),
105 std::sync::Arc::new(containers_array),
106 ];
107
108 RecordBatch::try_new(schema, columns).map_err(io::Error::other)
109}
110
111pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
112where
113 W: Write;
114
115impl<W> IpcStreamWriter<W>
116where
117 W: Write,
118{
119 pub fn finish(&mut self) -> Result<(), io::Error> {
120 self.0.finish().map_err(io::Error::other)
121 }
122
123 pub fn flush(&mut self) -> Result<(), io::Error> {
124 self.0.flush().map_err(io::Error::other)
125 }
126
127 pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
128 self.0.write(b).map_err(io::Error::other)
129 }
130}
131
132pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
133where
134 W: Write,
135{
136 let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
137 .map_err(io::Error::other)?;
138 let mut iw = IpcStreamWriter(swtr);
139 iw.write_batch(b)?;
140 iw.flush()?;
141 iw.finish()?;
142
143 drop(iw);
144
145 wtr.flush()
146}
147
148pub fn imgs2writer<W>(
149 imgs: Vec<ImageSummary>,
150 mut wtr: W,
151 sch: Arc<Schema>,
152) -> Result<(), io::Error>
153where
154 W: Write,
155{
156 let batch = images2batch(imgs, sch.clone())?;
157 batch2writer(&batch, &mut wtr, &sch)
158}
159
160pub async fn images2writer<W>(
161 d: &Docker,
162 mut wtr: W,
163 opts: Option<ListImagesOptions>,
164) -> Result<(), io::Error>
165where
166 W: Write,
167{
168 let imgs = list_images(d, opts).await?;
169 let schema = summary_schema();
170 imgs2writer(imgs, &mut wtr, schema)
171}
172
173pub fn unix2docker(
174 sock_path: &str,
175 timeout_seconds: u64,
176 client_version: &ClientVersion,
177) -> Result<Docker, io::Error> {
178 Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
179}
180
181pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
182pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 30;
183pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;