rs_docker_images2arrow_ipc/
lib.rs

1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::ClientVersion;
10use bollard::Docker;
11use bollard::models::ImageSummary;
12use bollard::query_parameters::ListImagesOptions;
13
14pub async fn list_images(
15    d: &Docker,
16    opts: Option<ListImagesOptions>,
17) -> Result<Vec<ImageSummary>, io::Error> {
18    d.list_images(opts).await.map_err(io::Error::other)
19}
20
21use arrow::array::ArrayRef;
22use arrow::array::builder::{Int64Builder, ListBuilder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24
25pub fn summary_schema() -> Arc<Schema> {
26    Arc::new(Schema::new(vec![
27        Field::new("id", DataType::Utf8, false),
28        Field::new("parent_id", DataType::Utf8, false),
29        Field::new("repo_tags", DataType::new_list(DataType::Utf8, true), false),
30        Field::new(
31            "repo_digests",
32            DataType::new_list(DataType::Utf8, true),
33            false,
34        ),
35        Field::new("created", DataType::Int64, false),
36        Field::new("size", DataType::Int64, false),
37        Field::new("shared_size", DataType::Int64, false),
38        Field::new("virtual_size", DataType::Int64, true), // Option<i64>
39        // Field::new("labels", DataType::Utf8, false), // HashMap<String, String> - complex type, handle later
40        Field::new("containers", DataType::Int64, false),
41        // Field::new("manifests", DataType::Utf8, true), // Option<Vec<ImageManifestSummary>> - complex type, handle later
42        // Field::new("descriptor", DataType::Utf8, true), // Option<OciDescriptor> - complex type, handle later
43    ]))
44}
45
46pub fn images2batch(
47    imgs: Vec<ImageSummary>,
48    schema: Arc<Schema>,
49) -> Result<RecordBatch, io::Error> {
50    let mut id_builder = StringBuilder::new();
51    let mut parent_id_builder = StringBuilder::new();
52    let mut repo_tags_builder = ListBuilder::new(StringBuilder::new());
53    let mut repo_digests_builder = ListBuilder::new(StringBuilder::new());
54    let mut created_builder = Int64Builder::new();
55    let mut size_builder = Int64Builder::new();
56    let mut shared_size_builder = Int64Builder::new();
57    let mut virtual_size_builder = Int64Builder::new();
58    let mut containers_builder = Int64Builder::new();
59
60    for img in imgs {
61        id_builder.append_value(img.id);
62        parent_id_builder.append_value(img.parent_id);
63
64        repo_tags_builder.append(true);
65        for tag in img.repo_tags {
66            repo_tags_builder.values().append_value(tag);
67        }
68
69        repo_digests_builder.append(true);
70        for digest in img.repo_digests {
71            repo_digests_builder.values().append_value(digest);
72        }
73
74        created_builder.append_value(img.created);
75        size_builder.append_value(img.size);
76        shared_size_builder.append_value(img.shared_size);
77        containers_builder.append_value(img.containers);
78
79        if let Some(vs) = img.virtual_size {
80            virtual_size_builder.append_value(vs);
81        } else {
82            virtual_size_builder.append_null();
83        }
84    }
85
86    let id_array = id_builder.finish();
87    let parent_id_array = parent_id_builder.finish();
88    let repo_tags_array = repo_tags_builder.finish();
89    let repo_digests_array = repo_digests_builder.finish();
90    let created_array = created_builder.finish();
91    let size_array = size_builder.finish();
92    let shared_size_array = shared_size_builder.finish();
93    let virtual_size_array = virtual_size_builder.finish();
94    let containers_array = containers_builder.finish();
95
96    let columns: Vec<ArrayRef> = vec![
97        std::sync::Arc::new(id_array),
98        std::sync::Arc::new(parent_id_array),
99        std::sync::Arc::new(repo_tags_array),
100        std::sync::Arc::new(repo_digests_array),
101        std::sync::Arc::new(created_array),
102        std::sync::Arc::new(size_array),
103        std::sync::Arc::new(shared_size_array),
104        std::sync::Arc::new(virtual_size_array),
105        std::sync::Arc::new(containers_array),
106    ];
107
108    RecordBatch::try_new(schema, columns).map_err(io::Error::other)
109}
110
111pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
112where
113    W: Write;
114
115impl<W> IpcStreamWriter<W>
116where
117    W: Write,
118{
119    pub fn finish(&mut self) -> Result<(), io::Error> {
120        self.0.finish().map_err(io::Error::other)
121    }
122
123    pub fn flush(&mut self) -> Result<(), io::Error> {
124        self.0.flush().map_err(io::Error::other)
125    }
126
127    pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
128        self.0.write(b).map_err(io::Error::other)
129    }
130}
131
132pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
133where
134    W: Write,
135{
136    let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
137        .map_err(io::Error::other)?;
138    let mut iw = IpcStreamWriter(swtr);
139    iw.write_batch(b)?;
140    iw.flush()?;
141    iw.finish()?;
142
143    drop(iw);
144
145    wtr.flush()
146}
147
148pub fn imgs2writer<W>(
149    imgs: Vec<ImageSummary>,
150    mut wtr: W,
151    sch: Arc<Schema>,
152) -> Result<(), io::Error>
153where
154    W: Write,
155{
156    let batch = images2batch(imgs, sch.clone())?;
157    batch2writer(&batch, &mut wtr, &sch)
158}
159
160pub async fn images2writer<W>(
161    d: &Docker,
162    mut wtr: W,
163    opts: Option<ListImagesOptions>,
164) -> Result<(), io::Error>
165where
166    W: Write,
167{
168    let imgs = list_images(d, opts).await?;
169    let schema = summary_schema();
170    imgs2writer(imgs, &mut wtr, schema)
171}
172
173pub fn unix2docker(
174    sock_path: &str,
175    timeout_seconds: u64,
176    client_version: &ClientVersion,
177) -> Result<Docker, io::Error> {
178    Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
179}
180
181pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
182pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 30;
183pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;