rs_docker_volumes2arrow_ipc/
lib.rs1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::ClientVersion;
10use bollard::Docker;
11use bollard::models::Volume;
12use bollard::query_parameters::ListVolumesOptions;
13
14pub async fn list_volumes(
15 d: &Docker,
16 opts: Option<ListVolumesOptions>,
17) -> Result<Vec<Volume>, io::Error> {
18 let res = d.list_volumes(opts).await.map_err(io::Error::other)?;
19 Ok(res.volumes.unwrap_or_default())
20}
21
22use arrow::array::ArrayRef;
23use arrow::array::builder::StringBuilder;
24use arrow::datatypes::{DataType, Field, Schema};
25
26pub fn a_schema() -> Arc<Schema> {
27 Arc::new(Schema::new(vec![
28 Field::new("name", DataType::Utf8, false),
29 Field::new("driver", DataType::Utf8, false),
30 Field::new("mountpoint", DataType::Utf8, false),
31 Field::new("created_at", DataType::Utf8, true),
32 ]))
34}
35
36pub fn volumes2batch(volumes: Vec<Volume>, schema: Arc<Schema>) -> Result<RecordBatch, io::Error> {
37 let mut name_builder = StringBuilder::new();
38 let mut driver_builder = StringBuilder::new();
39 let mut mountpoint_builder = StringBuilder::new();
40 let mut created_at_builder = StringBuilder::new();
41 for v in volumes {
44 name_builder.append_value(v.name);
45 driver_builder.append_value(v.driver);
46 mountpoint_builder.append_value(v.mountpoint);
47 created_at_builder.append_option(v.created_at);
48 }
50
51 let columns: Vec<ArrayRef> = vec![
52 Arc::new(name_builder.finish()),
53 Arc::new(driver_builder.finish()),
54 Arc::new(mountpoint_builder.finish()),
55 Arc::new(created_at_builder.finish()),
56 ];
58
59 RecordBatch::try_new(schema, columns).map_err(io::Error::other)
60}
61
62pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
63where
64 W: Write;
65
66impl<W> IpcStreamWriter<W>
67where
68 W: Write,
69{
70 pub fn finish(&mut self) -> Result<(), io::Error> {
71 self.0.finish().map_err(io::Error::other)
72 }
73
74 pub fn flush(&mut self) -> Result<(), io::Error> {
75 self.0.flush().map_err(io::Error::other)
76 }
77
78 pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
79 self.0.write(b).map_err(io::Error::other)
80 }
81}
82
83pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
84where
85 W: Write,
86{
87 let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
88 .map_err(io::Error::other)?;
89 let mut iw = IpcStreamWriter(swtr);
90 iw.write_batch(b)?;
91 iw.flush()?;
92 iw.finish()?;
93
94 drop(iw);
95
96 wtr.flush()
97}
98
99pub fn volumes2writer<W>(
100 volumes: Vec<Volume>,
101 mut wtr: W,
102 sch: Arc<Schema>,
103) -> Result<(), io::Error>
104where
105 W: Write,
106{
107 let batch = volumes2batch(volumes, sch.clone())?;
108 batch2writer(&batch, &mut wtr, &sch)
109}
110
111pub async fn list_volumes_and_write<W>(
112 d: &Docker,
113 mut wtr: W,
114 opts: Option<ListVolumesOptions>,
115) -> Result<(), io::Error>
116where
117 W: Write,
118{
119 let volumes = list_volumes(d, opts).await?;
120 let schema = a_schema();
121 volumes2writer(volumes, &mut wtr, schema)
122}
123
124pub fn unix2docker(
125 sock_path: &str,
126 timeout_seconds: u64,
127 client_version: &ClientVersion,
128) -> Result<Docker, io::Error> {
129 Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
130}
131
132pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
133pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 30;
134pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;