1use arrow::array::{
2 Array, ArrayRef, BooleanArray, ListArray, StringArray, StringBuilder, StructArray, UInt8Array,
3 UInt32Array, UInt64Array,
4};
5use arrow::datatypes::{DataType, Field, Fields, Schema};
6use arrow::record_batch::RecordBatch;
7use netdev::Interface;
8use std::sync::Arc;
9
10pub fn ifaces2batch(interfaces: &[Interface]) -> Result<RecordBatch, arrow::error::ArrowError> {
11 let schema = Arc::new(Schema::new(vec![
12 Field::new("index", DataType::UInt32, false),
13 Field::new("name", DataType::Utf8, false),
14 Field::new("friendly_name", DataType::Utf8, true),
15 Field::new("description", DataType::Utf8, true),
16 Field::new("if_type", DataType::Utf8, false),
17 Field::new("mac_addr", DataType::Utf8, true),
18 Field::new(
19 "ipv4",
20 DataType::List(Arc::new(Field::new(
21 "item",
22 DataType::Struct(
23 vec![
24 Field::new("addr", DataType::Utf8, false),
25 Field::new("prefix_len", DataType::UInt8, false),
26 ]
27 .into(),
28 ),
29 true,
30 ))),
31 false,
32 ),
33 Field::new(
34 "ipv6",
35 DataType::List(Arc::new(Field::new(
36 "item",
37 DataType::Struct(
38 vec![
39 Field::new("addr", DataType::Utf8, false),
40 Field::new("prefix_len", DataType::UInt8, false),
41 ]
42 .into(),
43 ),
44 true,
45 ))),
46 false,
47 ),
48 Field::new("flags", DataType::UInt32, false),
49 Field::new("oper_state", DataType::Utf8, false),
50 Field::new("transmit_speed", DataType::UInt64, true),
51 Field::new("receive_speed", DataType::UInt64, true),
52 Field::new(
53 "stats",
54 DataType::Struct(
55 vec![
56 Field::new("rx_bytes", DataType::UInt64, false),
57 Field::new("tx_bytes", DataType::UInt64, false),
58 ]
59 .into(),
60 ),
61 true,
62 ),
63 Field::new(
64 "gateway",
65 DataType::Struct(
66 vec![
67 Field::new("mac_addr", DataType::Utf8, false),
68 Field::new(
69 "ipv4",
70 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
71 false,
72 ),
73 Field::new(
74 "ipv6",
75 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
76 false,
77 ),
78 ]
79 .into(),
80 ),
81 true,
82 ),
83 Field::new(
84 "dns_servers",
85 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
86 false,
87 ),
88 Field::new("mtu", DataType::UInt32, true),
89 Field::new("default", DataType::Boolean, false),
90 Field::new("is_up", DataType::Boolean, false),
91 Field::new("is_loopback", DataType::Boolean, false),
92 Field::new("is_multicast", DataType::Boolean, false),
93 Field::new("is_broadcast", DataType::Boolean, false),
94 Field::new("is_point_to_point", DataType::Boolean, false),
95 Field::new("is_tun", DataType::Boolean, false),
96 Field::new("is_running", DataType::Boolean, false),
97 Field::new("is_physical", DataType::Boolean, false),
98 ]));
99
100 let columns = to_columns(interfaces, &schema)?;
101 RecordBatch::try_new(schema, columns)
102}
103
104fn to_columns(
105 interfaces: &[Interface],
106 schema: &Schema,
107) -> Result<Vec<ArrayRef>, arrow::error::ArrowError> {
108 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
109 columns.push(Arc::new(UInt32Array::from_iter_values(
110 interfaces.iter().map(|iface| iface.index),
111 )));
112 columns.push(Arc::new(StringArray::from_iter_values(
113 interfaces.iter().map(|iface| &iface.name),
114 )));
115 columns.push(Arc::new(StringArray::from_iter(
116 interfaces
117 .iter()
118 .map(|iface| iface.friendly_name.as_deref()),
119 )));
120 columns.push(Arc::new(StringArray::from_iter(
121 interfaces.iter().map(|iface| iface.description.as_deref()),
122 )));
123 columns.push(Arc::new(StringArray::from_iter_values(
124 interfaces
125 .iter()
126 .map(|iface| format!("{:?}", iface.if_type)),
127 )));
128 columns.push(Arc::new(StringArray::from_iter(
129 interfaces
130 .iter()
131 .map(|iface| iface.mac_addr.map(|mac| mac.to_string())),
132 )));
133 columns.push(ipv4_to_arrow(interfaces)?);
134 columns.push(ipv6_to_arrow(interfaces)?);
135 columns.push(Arc::new(UInt32Array::from_iter_values(
136 interfaces.iter().map(|iface| iface.flags),
137 )));
138 columns.push(Arc::new(StringArray::from_iter_values(
139 interfaces
140 .iter()
141 .map(|iface| format!("{:?}", iface.oper_state)),
142 )));
143 columns.push(Arc::new(UInt64Array::from_iter(
144 interfaces.iter().map(|iface| iface.transmit_speed),
145 )));
146 columns.push(Arc::new(UInt64Array::from_iter(
147 interfaces.iter().map(|iface| iface.receive_speed),
148 )));
149 columns.push(stats_to_arrow(interfaces)?);
150 columns.push(gateway_to_arrow(interfaces)?);
151 columns.push(dns_servers_to_arrow(interfaces)?);
152 columns.push(Arc::new(UInt32Array::from_iter(
153 interfaces.iter().map(|iface| iface.mtu),
154 )));
155 columns.push(Arc::new(BooleanArray::from_iter(
156 interfaces.iter().map(|iface| Some(iface.default)),
157 )));
158 columns.push(Arc::new(BooleanArray::from_iter(
159 interfaces.iter().map(|iface| Some(iface.is_up())),
160 )));
161 columns.push(Arc::new(BooleanArray::from_iter(
162 interfaces.iter().map(|iface| Some(iface.is_loopback())),
163 )));
164 columns.push(Arc::new(BooleanArray::from_iter(
165 interfaces.iter().map(|iface| Some(iface.is_multicast())),
166 )));
167 columns.push(Arc::new(BooleanArray::from_iter(
168 interfaces.iter().map(|iface| Some(iface.is_broadcast())),
169 )));
170 columns.push(Arc::new(BooleanArray::from_iter(
171 interfaces
172 .iter()
173 .map(|iface| Some(iface.is_point_to_point())),
174 )));
175 columns.push(Arc::new(BooleanArray::from_iter(
176 interfaces.iter().map(|iface| Some(iface.is_tun())),
177 )));
178 columns.push(Arc::new(BooleanArray::from_iter(
179 interfaces.iter().map(|iface| Some(iface.is_running())),
180 )));
181 columns.push(Arc::new(BooleanArray::from_iter(
182 interfaces.iter().map(|iface| Some(iface.is_physical())),
183 )));
184 Ok(columns)
185}
186
187fn ipv4_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
188 let mut offsets = vec![0];
189 let mut addrs = StringBuilder::new();
190 let mut prefix_lens = UInt8Array::builder(0);
191
192 for iface in interfaces {
193 for ipv4 in &iface.ipv4 {
194 addrs.append_value(ipv4.addr().to_string());
195 prefix_lens.append_value(ipv4.prefix_len());
196 }
197 let last_offset = offsets[offsets.len() - 1];
198 offsets.push(iface.ipv4.len() as i32 + last_offset);
199 }
200
201 let addr_array = Arc::new(addrs.finish());
202 let prefix_len_array = Arc::new(prefix_lens.finish());
203
204 let fields = Fields::from(vec![
205 Field::new("addr", DataType::Utf8, false),
206 Field::new("prefix_len", DataType::UInt8, false),
207 ]);
208
209 let struct_array = StructArray::try_new(fields, vec![addr_array, prefix_len_array], None)?;
210
211 let list_array = ListArray::try_new(
212 Arc::new(Field::new("item", struct_array.data_type().clone(), true)),
213 arrow::buffer::OffsetBuffer::new(offsets.into()),
214 Arc::new(struct_array),
215 None,
216 )?;
217
218 Ok(Arc::new(list_array))
219}
220
221fn ipv6_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
222 let mut offsets = vec![0];
223 let mut addrs = StringBuilder::new();
224 let mut prefix_lens = UInt8Array::builder(0);
225
226 for iface in interfaces {
227 for ipv6 in &iface.ipv6 {
228 addrs.append_value(ipv6.addr().to_string());
229 prefix_lens.append_value(ipv6.prefix_len());
230 }
231 let last_offset = offsets[offsets.len() - 1];
232 offsets.push(iface.ipv6.len() as i32 + last_offset);
233 }
234
235 let addr_array = Arc::new(addrs.finish());
236 let prefix_len_array = Arc::new(prefix_lens.finish());
237
238 let fields = Fields::from(vec![
239 Field::new("addr", DataType::Utf8, false),
240 Field::new("prefix_len", DataType::UInt8, false),
241 ]);
242
243 let struct_array = StructArray::try_new(fields, vec![addr_array, prefix_len_array], None)?;
244
245 let list_array = ListArray::try_new(
246 Arc::new(Field::new("item", struct_array.data_type().clone(), true)),
247 arrow::buffer::OffsetBuffer::new(offsets.into()),
248 Arc::new(struct_array),
249 None,
250 )?;
251
252 Ok(Arc::new(list_array))
253}
254
255fn stats_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
256 let mut rx_bytes = UInt64Array::builder(interfaces.len());
257 let mut tx_bytes = UInt64Array::builder(interfaces.len());
258 let mut nulls = Vec::with_capacity(interfaces.len());
259
260 for iface in interfaces {
261 if let Some(stats) = &iface.stats {
262 rx_bytes.append_value(stats.rx_bytes);
263 tx_bytes.append_value(stats.tx_bytes);
264 nulls.push(true);
265 } else {
266 rx_bytes.append_null();
267 tx_bytes.append_null();
268 nulls.push(false);
269 }
270 }
271
272 let rx_bytes_array = Arc::new(rx_bytes.finish());
273 let tx_bytes_array = Arc::new(tx_bytes.finish());
274
275 let fields = Fields::from(vec![
276 Field::new("rx_bytes", DataType::UInt64, false),
277 Field::new("tx_bytes", DataType::UInt64, false),
278 ]);
279
280 let struct_array = StructArray::try_new(
281 fields,
282 vec![rx_bytes_array, tx_bytes_array],
283 Some(arrow::buffer::NullBuffer::new(
284 arrow::buffer::BooleanBuffer::from_iter(nulls),
285 )),
286 )?;
287
288 Ok(Arc::new(struct_array))
289}
290
291fn gateway_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
292 let mut mac_addrs = StringBuilder::new();
293 let mut ipv4_offsets = vec![0];
294 let mut ipv4_values = StringBuilder::new();
295 let mut ipv6_offsets = vec![0];
296 let mut ipv6_values = StringBuilder::new();
297 let mut nulls = Vec::with_capacity(interfaces.len());
298
299 for iface in interfaces {
300 if let Some(gateway) = &iface.gateway {
301 mac_addrs.append_value(gateway.mac_addr.to_string());
302 for ip in &gateway.ipv4 {
303 ipv4_values.append_value(ip.to_string());
304 }
305 let last_ipv4_offset = ipv4_offsets[ipv4_offsets.len() - 1];
306 ipv4_offsets.push(gateway.ipv4.len() as i32 + last_ipv4_offset);
307 for ip in &gateway.ipv6 {
308 ipv6_values.append_value(ip.to_string());
309 }
310 let last_ipv6_offset = ipv6_offsets[ipv6_offsets.len() - 1];
311 ipv6_offsets.push(gateway.ipv6.len() as i32 + last_ipv6_offset);
312 nulls.push(true);
313 } else {
314 mac_addrs.append_null();
315 let last_ipv4_offset = ipv4_offsets[ipv4_offsets.len() - 1];
316 ipv4_offsets.push(last_ipv4_offset);
317 let last_ipv6_offset = ipv6_offsets[ipv6_offsets.len() - 1];
318 ipv6_offsets.push(last_ipv6_offset);
319 nulls.push(false);
320 }
321 }
322
323 let mac_addrs_array = Arc::new(mac_addrs.finish());
324 let ipv4_values_array = Arc::new(ipv4_values.finish());
325 let ipv6_values_array = Arc::new(ipv6_values.finish());
326
327 let ipv4_list_array = ListArray::try_new(
328 Arc::new(Field::new("item", DataType::Utf8, true)),
329 arrow::buffer::OffsetBuffer::new(ipv4_offsets.into()),
330 ipv4_values_array,
331 None,
332 )?;
333
334 let ipv6_list_array = ListArray::try_new(
335 Arc::new(Field::new("item", DataType::Utf8, true)),
336 arrow::buffer::OffsetBuffer::new(ipv6_offsets.into()),
337 ipv6_values_array,
338 None,
339 )?;
340
341 let fields = Fields::from(vec![
342 Field::new("mac_addr", DataType::Utf8, false),
343 Field::new("ipv4", ipv4_list_array.data_type().clone(), false),
344 Field::new("ipv6", ipv6_list_array.data_type().clone(), false),
345 ]);
346
347 let struct_array = StructArray::try_new(
348 fields,
349 vec![
350 mac_addrs_array,
351 Arc::new(ipv4_list_array),
352 Arc::new(ipv6_list_array),
353 ],
354 Some(arrow::buffer::NullBuffer::new(
355 arrow::buffer::BooleanBuffer::from_iter(nulls),
356 )),
357 )?;
358
359 Ok(Arc::new(struct_array))
360}
361
362fn dns_servers_to_arrow(interfaces: &[Interface]) -> Result<ArrayRef, arrow::error::ArrowError> {
363 let mut offsets = vec![0];
364 let mut values = StringBuilder::new();
365
366 for iface in interfaces {
367 for ip in &iface.dns_servers {
368 values.append_value(ip.to_string());
369 }
370 let last_offset = offsets[offsets.len() - 1];
371 offsets.push(iface.dns_servers.len() as i32 + last_offset);
372 }
373
374 let values_array = Arc::new(values.finish());
375
376 let list_array = ListArray::try_new(
377 Arc::new(Field::new("item", DataType::Utf8, true)),
378 arrow::buffer::OffsetBuffer::new(offsets.into()),
379 values_array,
380 None,
381 )?;
382
383 Ok(Arc::new(list_array))
384}