rs_nics2arrow_ipc_stream/
lib.rs

1use arrow::array::{
2    Array, ArrayRef, BooleanArray, ListArray, StringArray, StringBuilder, StructArray, UInt8Array,
3    UInt32Array, UInt64Array,
4};
5use arrow::datatypes::{DataType, Field, Fields, Schema};
6use arrow::record_batch::RecordBatch;
7use netdev::Interface;
8use std::sync::Arc;
9
10pub fn ifaces2batch(interfaces: &[Interface]) -> Result<RecordBatch, arrow::error::ArrowError> {
11    let schema = Arc::new(Schema::new(vec![
12        Field::new("index", DataType::UInt32, false),
13        Field::new("name", DataType::Utf8, false),
14        Field::new("friendly_name", DataType::Utf8, true),
15        Field::new("description", DataType::Utf8, true),
16        Field::new("if_type", DataType::Utf8, false),
17        Field::new("mac_addr", DataType::Utf8, true),
18        Field::new(
19            "ipv4",
20            DataType::List(Arc::new(Field::new(
21                "item",
22                DataType::Struct(
23                    vec![
24                        Field::new("addr", DataType::Utf8, false),
25                        Field::new("prefix_len", DataType::UInt8, false),
26                    ]
27                    .into(),
28                ),
29                true,
30            ))),
31            false,
32        ),
33        Field::new(
34            "ipv6",
35            DataType::List(Arc::new(Field::new(
36                "item",
37                DataType::Struct(
38                    vec![
39                        Field::new("addr", DataType::Utf8, false),
40                        Field::new("prefix_len", DataType::UInt8, false),
41                    ]
42                    .into(),
43                ),
44                true,
45            ))),
46            false,
47        ),
48        Field::new("flags", DataType::UInt32, false),
49        Field::new("oper_state", DataType::Utf8, false),
50        Field::new("transmit_speed", DataType::UInt64, true),
51        Field::new("receive_speed", DataType::UInt64, true),
52        Field::new(
53            "stats",
54            DataType::Struct(
55                vec![
56                    Field::new("rx_bytes", DataType::UInt64, false),
57                    Field::new("tx_bytes", DataType::UInt64, false),
58                ]
59                .into(),
60            ),
61            true,
62        ),
63        Field::new(
64            "gateway",
65            DataType::Struct(
66                vec![
67                    Field::new("mac_addr", DataType::Utf8, false),
68                    Field::new(
69                        "ipv4",
70                        DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
71                        false,
72                    ),
73                    Field::new(
74                        "ipv6",
75                        DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
76                        false,
77                    ),
78                ]
79                .into(),
80            ),
81            true,
82        ),
83        Field::new(
84            "dns_servers",
85            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
86            false,
87        ),
88        Field::new("mtu", DataType::UInt32, true),
89        Field::new("default", DataType::Boolean, false),
90        Field::new("is_up", DataType::Boolean, false),
91        Field::new("is_loopback", DataType::Boolean, false),
92        Field::new("is_multicast", DataType::Boolean, false),
93        Field::new("is_broadcast", DataType::Boolean, false),
94        Field::new("is_point_to_point", DataType::Boolean, false),
95        Field::new("is_tun", DataType::Boolean, false),
96        Field::new("is_running", DataType::Boolean, false),
97        Field::new("is_physical", DataType::Boolean, false),
98    ]));
99
100    let columns = to_columns(interfaces, &schema)?;
101    RecordBatch::try_new(schema, columns)
102}
103
104fn to_columns(
105    interfaces: &[Interface],
106    schema: &Schema,
107) -> Result<Vec<ArrayRef>, arrow::error::ArrowError> {
108    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
109    columns.push(Arc::new(UInt32Array::from_iter_values(
110        interfaces.iter().map(|iface| iface.index),
111    )));
112    columns.push(Arc::new(StringArray::from_iter_values(
113        interfaces.iter().map(|iface| &iface.name),
114    )));
115    columns.push(Arc::new(StringArray::from_iter(
116        interfaces
117            .iter()
118            .map(|iface| iface.friendly_name.as_deref()),
119    )));
120    columns.push(Arc::new(StringArray::from_iter(
121        interfaces.iter().map(|iface| iface.description.as_deref()),
122    )));
123    columns.push(Arc::new(StringArray::from_iter_values(
124        interfaces
125            .iter()
126            .map(|iface| format!("{:?}", iface.if_type)),
127    )));
128    columns.push(Arc::new(StringArray::from_iter(
129        interfaces
130            .iter()
131            .map(|iface| iface.mac_addr.map(|mac| mac.to_string())),
132    )));
133    columns.push(ipv4_to_arrow(interfaces)?);
134    columns.push(ipv6_to_arrow(interfaces)?);
135    columns.push(Arc::new(UInt32Array::from_iter_values(
136        interfaces.iter().map(|iface| iface.flags),
137    )));
138    columns.push(Arc::new(StringArray::from_iter_values(
139        interfaces
140            .iter()
141            .map(|iface| format!("{:?}", iface.oper_state)),
142    )));
143    columns.push(Arc::new(UInt64Array::from_iter(
144        interfaces.iter().map(|iface| iface.transmit_speed),
145    )));
146    columns.push(Arc::new(UInt64Array::from_iter(
147        interfaces.iter().map(|iface| iface.receive_speed),
148    )));
149    columns.push(stats_to_arrow(interfaces)?);
150    columns.push(gateway_to_arrow(interfaces)?);
151    columns.push(dns_servers_to_arrow(interfaces)?);
152    columns.push(Arc::new(UInt32Array::from_iter(
153        interfaces.iter().map(|iface| iface.mtu),
154    )));
155    columns.push(Arc::new(BooleanArray::from_iter(
156        interfaces.iter().map(|iface| Some(iface.default)),
157    )));
158    columns.push(Arc::new(BooleanArray::from_iter(
159        interfaces.iter().map(|iface| Some(iface.is_up())),
160    )));
161    columns.push(Arc::new(BooleanArray::from_iter(
162        interfaces.iter().map(|iface| Some(iface.is_loopback())),
163    )));
164    columns.push(Arc::new(BooleanArray::from_iter(
165        interfaces.iter().map(|iface| Some(iface.is_multicast())),
166    )));
167    columns.push(Arc::new(BooleanArray::from_iter(
168        interfaces.iter().map(|iface| Some(iface.is_broadcast())),
169    )));
170    columns.push(Arc::new(BooleanArray::from_iter(
171        interfaces
172            .iter()
173            .map(|iface| Some(iface.is_point_to_point())),
174    )));
175    columns.push(Arc::new(BooleanArray::from_iter(
176        interfaces.iter().map(|iface| Some(iface.is_tun())),
177    )));
178    columns.push(Arc::new(BooleanArray::from_iter(
179        interfaces.iter().map(|iface| Some(iface.is_running())),
180    )));
181    columns.push(Arc::new(BooleanArray::from_iter(
182        interfaces.iter().map(|iface| Some(iface.is_physical())),
183    )));
184    Ok(columns)
185}
186
187fn ipv4_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
188    let mut offsets = vec![0];
189    let mut addrs = StringBuilder::new();
190    let mut prefix_lens = UInt8Array::builder(0);
191
192    for iface in interfaces {
193        for ipv4 in &iface.ipv4 {
194            addrs.append_value(ipv4.addr().to_string());
195            prefix_lens.append_value(ipv4.prefix_len());
196        }
197        let last_offset = offsets[offsets.len() - 1];
198        offsets.push(iface.ipv4.len() as i32 + last_offset);
199    }
200
201    let addr_array = Arc::new(addrs.finish());
202    let prefix_len_array = Arc::new(prefix_lens.finish());
203
204    let fields = Fields::from(vec![
205        Field::new("addr", DataType::Utf8, false),
206        Field::new("prefix_len", DataType::UInt8, false),
207    ]);
208
209    let struct_array = StructArray::try_new(fields, vec![addr_array, prefix_len_array], None)?;
210
211    let list_array = ListArray::try_new(
212        Arc::new(Field::new("item", struct_array.data_type().clone(), true)),
213        arrow::buffer::OffsetBuffer::new(offsets.into()),
214        Arc::new(struct_array),
215        None,
216    )?;
217
218    Ok(Arc::new(list_array))
219}
220
221fn ipv6_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
222    let mut offsets = vec![0];
223    let mut addrs = StringBuilder::new();
224    let mut prefix_lens = UInt8Array::builder(0);
225
226    for iface in interfaces {
227        for ipv6 in &iface.ipv6 {
228            addrs.append_value(ipv6.addr().to_string());
229            prefix_lens.append_value(ipv6.prefix_len());
230        }
231        let last_offset = offsets[offsets.len() - 1];
232        offsets.push(iface.ipv6.len() as i32 + last_offset);
233    }
234
235    let addr_array = Arc::new(addrs.finish());
236    let prefix_len_array = Arc::new(prefix_lens.finish());
237
238    let fields = Fields::from(vec![
239        Field::new("addr", DataType::Utf8, false),
240        Field::new("prefix_len", DataType::UInt8, false),
241    ]);
242
243    let struct_array = StructArray::try_new(fields, vec![addr_array, prefix_len_array], None)?;
244
245    let list_array = ListArray::try_new(
246        Arc::new(Field::new("item", struct_array.data_type().clone(), true)),
247        arrow::buffer::OffsetBuffer::new(offsets.into()),
248        Arc::new(struct_array),
249        None,
250    )?;
251
252    Ok(Arc::new(list_array))
253}
254
255fn stats_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
256    let mut rx_bytes = UInt64Array::builder(interfaces.len());
257    let mut tx_bytes = UInt64Array::builder(interfaces.len());
258    let mut nulls = Vec::with_capacity(interfaces.len());
259
260    for iface in interfaces {
261        if let Some(stats) = &iface.stats {
262            rx_bytes.append_value(stats.rx_bytes);
263            tx_bytes.append_value(stats.tx_bytes);
264            nulls.push(true);
265        } else {
266            rx_bytes.append_null();
267            tx_bytes.append_null();
268            nulls.push(false);
269        }
270    }
271
272    let rx_bytes_array = Arc::new(rx_bytes.finish());
273    let tx_bytes_array = Arc::new(tx_bytes.finish());
274
275    let fields = Fields::from(vec![
276        Field::new("rx_bytes", DataType::UInt64, false),
277        Field::new("tx_bytes", DataType::UInt64, false),
278    ]);
279
280    let struct_array = StructArray::try_new(
281        fields,
282        vec![rx_bytes_array, tx_bytes_array],
283        Some(arrow::buffer::NullBuffer::new(
284            arrow::buffer::BooleanBuffer::from_iter(nulls),
285        )),
286    )?;
287
288    Ok(Arc::new(struct_array))
289}
290
291fn gateway_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
292    let mut mac_addrs = StringBuilder::new();
293    let mut ipv4_offsets = vec![0];
294    let mut ipv4_values = StringBuilder::new();
295    let mut ipv6_offsets = vec![0];
296    let mut ipv6_values = StringBuilder::new();
297    let mut nulls = Vec::with_capacity(interfaces.len());
298
299    for iface in interfaces {
300        if let Some(gateway) = &iface.gateway {
301            mac_addrs.append_value(gateway.mac_addr.to_string());
302            for ip in &gateway.ipv4 {
303                ipv4_values.append_value(ip.to_string());
304            }
305            let last_ipv4_offset = ipv4_offsets[ipv4_offsets.len() - 1];
306            ipv4_offsets.push(gateway.ipv4.len() as i32 + last_ipv4_offset);
307            for ip in &gateway.ipv6 {
308                ipv6_values.append_value(ip.to_string());
309            }
310            let last_ipv6_offset = ipv6_offsets[ipv6_offsets.len() - 1];
311            ipv6_offsets.push(gateway.ipv6.len() as i32 + last_ipv6_offset);
312            nulls.push(true);
313        } else {
314            mac_addrs.append_null();
315            let last_ipv4_offset = ipv4_offsets[ipv4_offsets.len() - 1];
316            ipv4_offsets.push(last_ipv4_offset);
317            let last_ipv6_offset = ipv6_offsets[ipv6_offsets.len() - 1];
318            ipv6_offsets.push(last_ipv6_offset);
319            nulls.push(false);
320        }
321    }
322
323    let mac_addrs_array = Arc::new(mac_addrs.finish());
324    let ipv4_values_array = Arc::new(ipv4_values.finish());
325    let ipv6_values_array = Arc::new(ipv6_values.finish());
326
327    let ipv4_list_array = ListArray::try_new(
328        Arc::new(Field::new("item", DataType::Utf8, true)),
329        arrow::buffer::OffsetBuffer::new(ipv4_offsets.into()),
330        ipv4_values_array,
331        None,
332    )?;
333
334    let ipv6_list_array = ListArray::try_new(
335        Arc::new(Field::new("item", DataType::Utf8, true)),
336        arrow::buffer::OffsetBuffer::new(ipv6_offsets.into()),
337        ipv6_values_array,
338        None,
339    )?;
340
341    let fields = Fields::from(vec![
342        Field::new("mac_addr", DataType::Utf8, false),
343        Field::new("ipv4", ipv4_list_array.data_type().clone(), false),
344        Field::new("ipv6", ipv6_list_array.data_type().clone(), false),
345    ]);
346
347    let struct_array = StructArray::try_new(
348        fields,
349        vec![
350            mac_addrs_array,
351            Arc::new(ipv4_list_array),
352            Arc::new(ipv6_list_array),
353        ],
354        Some(arrow::buffer::NullBuffer::new(
355            arrow::buffer::BooleanBuffer::from_iter(nulls),
356        )),
357    )?;
358
359    Ok(Arc::new(struct_array))
360}
361
362fn dns_servers_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
363    let mut offsets = vec![0];
364    let mut values = StringBuilder::new();
365
366    for iface in interfaces {
367        for ip in &iface.dns_servers {
368            values.append_value(ip.to_string());
369        }
370        let last_offset = offsets[offsets.len() - 1];
371        offsets.push(iface.dns_servers.len() as i32 + last_offset);
372    }
373
374    let values_array = Arc::new(values.finish());
375
376    let list_array = ListArray::try_new(
377        Arc::new(Field::new("item", DataType::Utf8, true)),
378        arrow::buffer::OffsetBuffer::new(offsets.into()),
379        values_array,
380        None,
381    )?;
382
383    Ok(Arc::new(list_array))
384}