rs_docker_networks2arrow_ipc/
lib.rs1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::Docker;
10use bollard::errors::Error;
11use bollard::models::Network;
12use bollard::query_parameters::ListNetworksOptions;
13
14pub async fn list_networks(
15 d: &Docker,
16
17 opts: Option<ListNetworksOptions>,
18) -> Result<Vec<Network>, Error> {
19 d.list_networks(opts).await
20}
21
22use arrow::array::ArrayRef;
23use arrow::array::builder::{BooleanBuilder, StringBuilder};
24use arrow::datatypes::{DataType, Field, Schema};
25
26pub fn network_schema() -> Arc<Schema> {
27 Arc::new(Schema::new(vec![
28 Field::new("id", DataType::Utf8, true),
29 Field::new("name", DataType::Utf8, true),
30 Field::new("created", DataType::Utf8, true),
31 Field::new("scope", DataType::Utf8, true),
32 Field::new("driver", DataType::Utf8, true),
33 Field::new("enable_ipv6", DataType::Boolean, true),
34 Field::new("internal", DataType::Boolean, true),
35 Field::new("attachable", DataType::Boolean, true),
36 Field::new("ingress", DataType::Boolean, true),
37 ]))
38}
39
40pub fn networks2batch(
41 networks: Vec<Network>,
42 schema: Arc<Schema>,
43) -> Result<RecordBatch, io::Error> {
44 let mut id_builder = StringBuilder::new();
45 let mut name_builder = StringBuilder::new();
46 let mut created_builder = StringBuilder::new();
47 let mut scope_builder = StringBuilder::new();
48 let mut driver_builder = StringBuilder::new();
49 let mut enable_ipv6_builder = BooleanBuilder::new();
50 let mut internal_builder = BooleanBuilder::new();
51 let mut attachable_builder = BooleanBuilder::new();
52 let mut ingress_builder = BooleanBuilder::new();
53
54 for network in networks {
55 id_builder.append_option(network.id);
56 name_builder.append_option(network.name);
57 created_builder.append_option(network.created);
58 scope_builder.append_option(network.scope);
59 driver_builder.append_option(network.driver);
60 enable_ipv6_builder.append_option(network.enable_ipv6);
61 internal_builder.append_option(network.internal);
62 attachable_builder.append_option(network.attachable);
63 ingress_builder.append_option(network.ingress);
64 }
65
66 let columns: Vec<ArrayRef> = vec![
67 Arc::new(id_builder.finish()),
68 Arc::new(name_builder.finish()),
69 Arc::new(created_builder.finish()),
70 Arc::new(scope_builder.finish()),
71 Arc::new(driver_builder.finish()),
72 Arc::new(enable_ipv6_builder.finish()),
73 Arc::new(internal_builder.finish()),
74 Arc::new(attachable_builder.finish()),
75 Arc::new(ingress_builder.finish()),
76 ];
77
78 RecordBatch::try_new(schema, columns).map_err(io::Error::other)
79}
80
81pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
82where
83 W: Write;
84
85impl<W> IpcStreamWriter<W>
86where
87 W: Write,
88{
89 pub fn finish(&mut self) -> Result<(), io::Error> {
90 self.0.finish().map_err(io::Error::other)
91 }
92
93 pub fn flush(&mut self) -> Result<(), io::Error> {
94 self.0.get_mut().flush()
95 }
96
97 pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
98 self.0.write(b).map_err(io::Error::other)
99 }
100}
101
102pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
103where
104 W: Write,
105{
106 let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
107 .map_err(io::Error::other)?;
108 let mut iw = IpcStreamWriter(swtr);
109 iw.write_batch(b)?;
110 iw.flush()?;
111 iw.finish()?;
112
113 drop(iw);
114
115 wtr.flush()
116}
117
118pub fn networks_to_writer<W>(
119 networks: Vec<Network>,
120 mut wtr: W,
121 sch: Arc<Schema>,
122) -> Result<(), io::Error>
123where
124 W: Write,
125{
126 let batch = networks2batch(networks, sch.clone())?;
127 batch2writer(&batch, &mut wtr, &sch)
128}
129
130pub async fn networks2writer<W>(
131 d: &Docker,
132 mut wtr: W,
133 opts: Option<ListNetworksOptions>,
134) -> Result<(), io::Error>
135where
136 W: Write,
137{
138 let networks = list_networks(d, opts).await.map_err(io::Error::other)?;
139 let schema = network_schema();
140 networks_to_writer(networks, &mut wtr, schema)
141}
142
143use bollard::ClientVersion;
144
145pub fn unix2docker(
146 sock_path: &str,
147 timeout_seconds: u64,
148 client_version: &ClientVersion,
149) -> Result<Docker, io::Error> {
150 Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
151}
152
153pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
154pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 120;
155pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;