rs_docker_networks2arrow_ipc/
lib.rs

1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::Docker;
10use bollard::errors::Error;
11use bollard::models::Network;
12use bollard::query_parameters::ListNetworksOptions;
13
14pub async fn list_networks(
15    d: &Docker,
16
17    opts: Option<ListNetworksOptions>,
18) -> Result<Vec<Network>, Error> {
19    d.list_networks(opts).await
20}
21
22use arrow::array::ArrayRef;
23use arrow::array::builder::{BooleanBuilder, StringBuilder};
24use arrow::datatypes::{DataType, Field, Schema};
25
26pub fn network_schema() -> Arc<Schema> {
27    Arc::new(Schema::new(vec![
28        Field::new("id", DataType::Utf8, true),
29        Field::new("name", DataType::Utf8, true),
30        Field::new("created", DataType::Utf8, true),
31        Field::new("scope", DataType::Utf8, true),
32        Field::new("driver", DataType::Utf8, true),
33        Field::new("enable_ipv6", DataType::Boolean, true),
34        Field::new("internal", DataType::Boolean, true),
35        Field::new("attachable", DataType::Boolean, true),
36        Field::new("ingress", DataType::Boolean, true),
37    ]))
38}
39
40pub fn networks2batch(
41    networks: Vec<Network>,
42    schema: Arc<Schema>,
43) -> Result<RecordBatch, io::Error> {
44    let mut id_builder = StringBuilder::new();
45    let mut name_builder = StringBuilder::new();
46    let mut created_builder = StringBuilder::new();
47    let mut scope_builder = StringBuilder::new();
48    let mut driver_builder = StringBuilder::new();
49    let mut enable_ipv6_builder = BooleanBuilder::new();
50    let mut internal_builder = BooleanBuilder::new();
51    let mut attachable_builder = BooleanBuilder::new();
52    let mut ingress_builder = BooleanBuilder::new();
53
54    for network in networks {
55        id_builder.append_option(network.id);
56        name_builder.append_option(network.name);
57        created_builder.append_option(network.created);
58        scope_builder.append_option(network.scope);
59        driver_builder.append_option(network.driver);
60        enable_ipv6_builder.append_option(network.enable_ipv6);
61        internal_builder.append_option(network.internal);
62        attachable_builder.append_option(network.attachable);
63        ingress_builder.append_option(network.ingress);
64    }
65
66    let columns: Vec<ArrayRef> = vec![
67        Arc::new(id_builder.finish()),
68        Arc::new(name_builder.finish()),
69        Arc::new(created_builder.finish()),
70        Arc::new(scope_builder.finish()),
71        Arc::new(driver_builder.finish()),
72        Arc::new(enable_ipv6_builder.finish()),
73        Arc::new(internal_builder.finish()),
74        Arc::new(attachable_builder.finish()),
75        Arc::new(ingress_builder.finish()),
76    ];
77
78    RecordBatch::try_new(schema, columns).map_err(io::Error::other)
79}
80
81pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
82where
83    W: Write;
84
85impl<W> IpcStreamWriter<W>
86where
87    W: Write,
88{
89    pub fn finish(&mut self) -> Result<(), io::Error> {
90        self.0.finish().map_err(io::Error::other)
91    }
92
93    pub fn flush(&mut self) -> Result<(), io::Error> {
94        self.0.get_mut().flush()
95    }
96
97    pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
98        self.0.write(b).map_err(io::Error::other)
99    }
100}
101
102pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
103where
104    W: Write,
105{
106    let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
107        .map_err(io::Error::other)?;
108    let mut iw = IpcStreamWriter(swtr);
109    iw.write_batch(b)?;
110    iw.flush()?;
111    iw.finish()?;
112
113    drop(iw);
114
115    wtr.flush()
116}
117
118pub fn networks_to_writer<W>(
119    networks: Vec<Network>,
120    mut wtr: W,
121    sch: Arc<Schema>,
122) -> Result<(), io::Error>
123where
124    W: Write,
125{
126    let batch = networks2batch(networks, sch.clone())?;
127    batch2writer(&batch, &mut wtr, &sch)
128}
129
130pub async fn networks2writer<W>(
131    d: &Docker,
132    mut wtr: W,
133    opts: Option<ListNetworksOptions>,
134) -> Result<(), io::Error>
135where
136    W: Write,
137{
138    let networks = list_networks(d, opts).await.map_err(io::Error::other)?;
139    let schema = network_schema();
140    networks_to_writer(networks, &mut wtr, schema)
141}
142
143use bollard::ClientVersion;
144
145pub fn unix2docker(
146    sock_path: &str,
147    timeout_seconds: u64,
148    client_version: &ClientVersion,
149) -> Result<Docker, io::Error> {
150    Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
151}
152
153pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
154pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 120;
155pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;