Skip to main content

rs_docker_volumes2arrow_ipc/
lib.rs

1use std::io;
2use std::sync::Arc;
3
4use io::BufWriter;
5use io::Write;
6
7use arrow::record_batch::RecordBatch;
8
9use bollard::ClientVersion;
10use bollard::Docker;
11use bollard::models::Volume;
12use bollard::query_parameters::ListVolumesOptions;
13
14pub async fn list_volumes(
15    d: &Docker,
16    opts: Option<ListVolumesOptions>,
17) -> Result<Vec<Volume>, io::Error> {
18    let res = d.list_volumes(opts).await.map_err(io::Error::other)?;
19    Ok(res.volumes.unwrap_or_default())
20}
21
22use arrow::array::ArrayRef;
23use arrow::array::builder::StringBuilder;
24use arrow::datatypes::{DataType, Field, Schema};
25
26pub fn a_schema() -> Arc<Schema> {
27    Arc::new(Schema::new(vec![
28        Field::new("name", DataType::Utf8, false),
29        Field::new("driver", DataType::Utf8, false),
30        Field::new("mountpoint", DataType::Utf8, false),
31        Field::new("created_at", DataType::Utf8, true),
32        // Field::new("scope", DataType::Utf8, true),
33    ]))
34}
35
36pub fn volumes2batch(volumes: Vec<Volume>, schema: Arc<Schema>) -> Result<RecordBatch, io::Error> {
37    let mut name_builder = StringBuilder::new();
38    let mut driver_builder = StringBuilder::new();
39    let mut mountpoint_builder = StringBuilder::new();
40    let mut created_at_builder = StringBuilder::new();
41    // let mut scope_builder = StringBuilder::new();
42
43    for v in volumes {
44        name_builder.append_value(v.name);
45        driver_builder.append_value(v.driver);
46        mountpoint_builder.append_value(v.mountpoint);
47        created_at_builder.append_option(v.created_at);
48        // scope_builder.append_option(v.scope);
49    }
50
51    let columns: Vec<ArrayRef> = vec![
52        Arc::new(name_builder.finish()),
53        Arc::new(driver_builder.finish()),
54        Arc::new(mountpoint_builder.finish()),
55        Arc::new(created_at_builder.finish()),
56        // Arc::new(scope_builder.finish()),
57    ];
58
59    RecordBatch::try_new(schema, columns).map_err(io::Error::other)
60}
61
62pub struct IpcStreamWriter<W>(pub arrow::ipc::writer::StreamWriter<BufWriter<W>>)
63where
64    W: Write;
65
66impl<W> IpcStreamWriter<W>
67where
68    W: Write,
69{
70    pub fn finish(&mut self) -> Result<(), io::Error> {
71        self.0.finish().map_err(io::Error::other)
72    }
73
74    pub fn flush(&mut self) -> Result<(), io::Error> {
75        self.0.flush().map_err(io::Error::other)
76    }
77
78    pub fn write_batch(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
79        self.0.write(b).map_err(io::Error::other)
80    }
81}
82
83pub fn batch2writer<W>(b: &RecordBatch, mut wtr: W, sch: &Schema) -> Result<(), io::Error>
84where
85    W: Write,
86{
87    let swtr = arrow::ipc::writer::StreamWriter::try_new_buffered(&mut wtr, sch)
88        .map_err(io::Error::other)?;
89    let mut iw = IpcStreamWriter(swtr);
90    iw.write_batch(b)?;
91    iw.flush()?;
92    iw.finish()?;
93
94    drop(iw);
95
96    wtr.flush()
97}
98
99pub fn volumes2writer<W>(
100    volumes: Vec<Volume>,
101    mut wtr: W,
102    sch: Arc<Schema>,
103) -> Result<(), io::Error>
104where
105    W: Write,
106{
107    let batch = volumes2batch(volumes, sch.clone())?;
108    batch2writer(&batch, &mut wtr, &sch)
109}
110
111pub async fn list_volumes_and_write<W>(
112    d: &Docker,
113    mut wtr: W,
114    opts: Option<ListVolumesOptions>,
115) -> Result<(), io::Error>
116where
117    W: Write,
118{
119    let volumes = list_volumes(d, opts).await?;
120    let schema = a_schema();
121    volumes2writer(volumes, &mut wtr, schema)
122}
123
124pub fn unix2docker(
125    sock_path: &str,
126    timeout_seconds: u64,
127    client_version: &ClientVersion,
128) -> Result<Docker, io::Error> {
129    Docker::connect_with_unix(sock_path, timeout_seconds, client_version).map_err(io::Error::other)
130}
131
132pub const DOCKER_UNIX_PATH_DEFAULT: &str = "/var/run/docker.sock";
133pub const DOCKER_CON_TIMEOUT_SECONDS_DEFAULT: u64 = 30;
134pub const DOCKER_CLIENT_VERSION_DEFAULT: &ClientVersion = bollard::API_DEFAULT_VERSION;