Skip to main content

flowparser_sflow/counter_records/
mod.rs

1pub mod app_operations;
2pub mod app_resources;
3pub mod app_workers;
4pub mod energy;
5pub mod ethernet_interface;
6pub mod fans;
7pub mod generic_interface;
8pub mod host_adapters;
9pub mod host_cpu;
10pub mod host_descr;
11pub mod host_disk_io;
12pub mod host_memory;
13pub mod host_net_io;
14pub mod host_parent;
15pub mod http_counters;
16pub mod humidity;
17pub mod ib_counters;
18pub mod ieee80211_counters;
19pub mod jmx_runtime;
20pub mod jvm_statistics;
21pub mod lag_port_stats;
22pub mod memcache_counters;
23pub mod mib2_icmp_group;
24pub mod mib2_ip_group;
25pub mod mib2_tcp_group;
26pub mod mib2_udp_group;
27pub mod of_port;
28pub mod ovs_dp_stats;
29pub mod port_name;
30pub mod processor;
31pub mod queue_length;
32pub mod radio_utilization;
33pub mod sfp;
34pub mod slow_path_counts;
35pub mod temperature;
36pub mod token_ring;
37pub mod vg_counters;
38pub mod virt_cpu;
39pub mod virt_disk_io;
40pub mod virt_memory;
41pub mod virt_net_io;
42pub mod virt_node;
43pub mod vlan;
44pub mod xen_vif;
45
46use nom::IResult;
47use nom::number::complete::be_u32;
48use serde::{Deserialize, Serialize};
49
50pub use app_operations::AppOperations;
51pub use app_resources::AppResources;
52pub use app_workers::AppWorkers;
53pub use energy::Energy;
54pub use ethernet_interface::EthernetInterface;
55pub use fans::Fans;
56pub use generic_interface::GenericInterface;
57pub use host_adapters::HostAdapters;
58pub use host_cpu::HostCpu;
59pub use host_descr::{HostDescr, MachineType, OsName};
60pub use host_disk_io::HostDiskIo;
61pub use host_memory::HostMemory;
62pub use host_net_io::HostNetIo;
63pub use host_parent::HostParent;
64pub use http_counters::HttpCounters;
65pub use humidity::Humidity;
66pub use ib_counters::IbCounters;
67pub use ieee80211_counters::Ieee80211Counters;
68pub use jmx_runtime::JmxRuntime;
69pub use jvm_statistics::JvmStatistics;
70pub use lag_port_stats::LagPortStats;
71pub use memcache_counters::MemcacheCounters;
72pub use mib2_icmp_group::Mib2IcmpGroup;
73pub use mib2_ip_group::Mib2IpGroup;
74pub use mib2_tcp_group::Mib2TcpGroup;
75pub use mib2_udp_group::Mib2UdpGroup;
76pub use of_port::OfPort;
77pub use ovs_dp_stats::OvsDpStats;
78pub use port_name::PortName;
79pub use processor::Processor;
80pub use queue_length::QueueLength;
81pub use radio_utilization::RadioUtilization;
82pub use sfp::Sfp;
83pub use slow_path_counts::SlowPathCounts;
84pub use temperature::Temperature;
85pub use token_ring::TokenRing;
86pub use vg_counters::VgCounters;
87pub use virt_cpu::{VirtCpu, VirtDomainState};
88pub use virt_disk_io::VirtDiskIo;
89pub use virt_memory::VirtMemory;
90pub use virt_net_io::VirtNetIo;
91pub use virt_node::VirtNode;
92pub use vlan::Vlan;
93pub use xen_vif::XenVif;
94
95/// A counter record within a counter sample.
96///
97/// Counter records contain periodic interface and system statistics
98/// reported by the sFlow agent.
99#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
100pub enum CounterRecord {
101    /// Generic interface counters (enterprise=0, format=1).
102    GenericInterface(GenericInterface),
103    /// Ethernet-specific interface counters (enterprise=0, format=2).
104    EthernetInterface(EthernetInterface),
105    /// Token Ring interface counters (enterprise=0, format=3).
106    TokenRing(TokenRing),
107    /// 100VG-AnyLAN counters (enterprise=0, format=4).
108    VgCounters(VgCounters),
109    /// VLAN counters (enterprise=0, format=5).
110    Vlan(Vlan),
111    /// IEEE 802.11 counters (enterprise=0, format=6).
112    Ieee80211Counters(Ieee80211Counters),
113    /// LAG port statistics (enterprise=0, format=7).
114    LagPortStats(LagPortStats),
115    /// Slow path counts (enterprise=0, format=8).
116    SlowPathCounts(SlowPathCounts),
117    /// InfiniBand counters (enterprise=0, format=9).
118    IbCounters(IbCounters),
119    /// SFP/optical transceiver counters (enterprise=0, format=10).
120    Sfp(Sfp),
121    /// Processor/CPU counters (enterprise=0, format=1001).
122    Processor(Processor),
123    /// Radio utilization counters (enterprise=0, format=1002).
124    RadioUtilization(RadioUtilization),
125    /// Queue length histogram (enterprise=0, format=1003).
126    QueueLength(QueueLength),
127    /// OpenFlow port mapping (enterprise=0, format=1004).
128    OfPort(OfPort),
129    /// Port name (enterprise=0, format=1005).
130    PortName(PortName),
131    /// Host description (enterprise=0, format=2000).
132    HostDescr(HostDescr),
133    /// Host network adapters (enterprise=0, format=2001).
134    HostAdapters(HostAdapters),
135    /// Host parent (virtualization) (enterprise=0, format=2002).
136    HostParent(HostParent),
137    /// Host CPU counters (enterprise=0, format=2003).
138    HostCpu(HostCpu),
139    /// Host memory counters (enterprise=0, format=2004).
140    HostMemory(HostMemory),
141    /// Host disk I/O counters (enterprise=0, format=2005).
142    HostDiskIo(HostDiskIo),
143    /// Host network I/O counters (enterprise=0, format=2006).
144    HostNetIo(HostNetIo),
145    /// Virtual node statistics (enterprise=0, format=2100).
146    VirtNode(VirtNode),
147    /// Virtual CPU statistics (enterprise=0, format=2101).
148    VirtCpu(VirtCpu),
149    /// Virtual memory statistics (enterprise=0, format=2102).
150    VirtMemory(VirtMemory),
151    /// Virtual disk I/O statistics (enterprise=0, format=2103).
152    VirtDiskIo(VirtDiskIo),
153    /// Virtual network I/O statistics (enterprise=0, format=2104).
154    VirtNetIo(VirtNetIo),
155    /// MIB-II IP group counters (enterprise=0, format=2007).
156    Mib2IpGroup(Mib2IpGroup),
157    /// MIB-II ICMP group counters (enterprise=0, format=2008).
158    Mib2IcmpGroup(Mib2IcmpGroup),
159    /// MIB-II TCP group counters (enterprise=0, format=2009).
160    Mib2TcpGroup(Mib2TcpGroup),
161    /// MIB-II UDP group counters (enterprise=0, format=2010).
162    Mib2UdpGroup(Mib2UdpGroup),
163    /// JMX runtime information (enterprise=0, format=2105).
164    JmxRuntime(JmxRuntime),
165    /// JVM statistics counters (enterprise=0, format=2106).
166    JvmStatistics(JvmStatistics),
167    /// HTTP method and status counters (enterprise=0, format=2201).
168    HttpCounters(HttpCounters),
169    /// Application operations counters (enterprise=0, format=2202).
170    AppOperations(AppOperations),
171    /// Application resource counters (enterprise=0, format=2203).
172    AppResources(AppResources),
173    /// Memcache counters (enterprise=0, format=2204).
174    MemcacheCounters(MemcacheCounters),
175    /// Application worker counters (enterprise=0, format=2206).
176    AppWorkers(AppWorkers),
177    /// Open vSwitch datapath statistics (enterprise=0, format=2207).
178    OvsDpStats(OvsDpStats),
179    /// Energy consumption counters (enterprise=0, format=3000).
180    Energy(Energy),
181    /// Temperature sensor counters (enterprise=0, format=3001).
182    Temperature(Temperature),
183    /// Humidity sensor counters (enterprise=0, format=3002).
184    Humidity(Humidity),
185    /// Fan status counters (enterprise=0, format=3003).
186    Fans(Fans),
187    /// XenServer virtual interface metadata (enterprise=4300, format=2).
188    XenVif(XenVif),
189    /// Unrecognized counter record type, preserved as raw bytes.
190    Unknown {
191        /// Enterprise code from the record header.
192        enterprise: u32,
193        /// Format code from the record header.
194        format: u32,
195        /// Raw record data.
196        data: Vec<u8>,
197    },
198}
199
200pub(crate) fn parse_counter_records(
201    mut input: &[u8],
202    num_records: u32,
203) -> IResult<&[u8], Vec<CounterRecord>> {
204    // Cap capacity to prevent DoS: each record needs at least 8 bytes (format + length)
205    let cap = (num_records as usize).min(input.len() / 8);
206    let mut records = Vec::with_capacity(cap);
207
208    for _ in 0..num_records {
209        let (rest, data_format) = be_u32(input)?;
210        let enterprise = data_format >> 12;
211        let format = data_format & 0xFFF;
212
213        let (rest, record_length) = be_u32(rest)?;
214        let record_length = record_length as usize;
215
216        if rest.len() < record_length {
217            return Err(nom::Err::Error(nom::error::Error::new(
218                rest,
219                nom::error::ErrorKind::Eof,
220            )));
221        }
222
223        let record_data = &rest[..record_length];
224        let after_record = &rest[record_length..];
225
226        let record = if enterprise == 0 {
227            match format {
228                1 => {
229                    let (_, r) = generic_interface::parse_generic_interface(record_data)?;
230                    CounterRecord::GenericInterface(r)
231                }
232                2 => {
233                    let (_, r) = ethernet_interface::parse_ethernet_interface(record_data)?;
234                    CounterRecord::EthernetInterface(r)
235                }
236                3 => {
237                    let (_, r) = token_ring::parse_token_ring(record_data)?;
238                    CounterRecord::TokenRing(r)
239                }
240                4 => {
241                    let (_, r) = vg_counters::parse_vg_counters(record_data)?;
242                    CounterRecord::VgCounters(r)
243                }
244                5 => {
245                    let (_, r) = vlan::parse_vlan(record_data)?;
246                    CounterRecord::Vlan(r)
247                }
248                6 => {
249                    let (_, r) = ieee80211_counters::parse_ieee80211_counters(record_data)?;
250                    CounterRecord::Ieee80211Counters(r)
251                }
252                7 => {
253                    let (_, r) = lag_port_stats::parse_lag_port_stats(record_data)?;
254                    CounterRecord::LagPortStats(r)
255                }
256                8 => {
257                    let (_, r) = slow_path_counts::parse_slow_path_counts(record_data)?;
258                    CounterRecord::SlowPathCounts(r)
259                }
260                9 => {
261                    let (_, r) = ib_counters::parse_ib_counters(record_data)?;
262                    CounterRecord::IbCounters(r)
263                }
264                10 => {
265                    let (_, r) = sfp::parse_sfp(record_data)?;
266                    CounterRecord::Sfp(r)
267                }
268                1001 => {
269                    let (_, r) = processor::parse_processor(record_data)?;
270                    CounterRecord::Processor(r)
271                }
272                1003 => {
273                    let (_, r) = queue_length::parse_queue_length(record_data)?;
274                    CounterRecord::QueueLength(r)
275                }
276                1002 => {
277                    let (_, r) = radio_utilization::parse_radio_utilization(record_data)?;
278                    CounterRecord::RadioUtilization(r)
279                }
280                1004 => {
281                    let (_, r) = of_port::parse_of_port(record_data)?;
282                    CounterRecord::OfPort(r)
283                }
284                1005 => {
285                    let (_, r) = port_name::parse_port_name(record_data)?;
286                    CounterRecord::PortName(r)
287                }
288                2000 => {
289                    let (_, r) = host_descr::parse_host_descr(record_data)?;
290                    CounterRecord::HostDescr(r)
291                }
292                2001 => {
293                    let (_, r) = host_adapters::parse_host_adapters(record_data)?;
294                    CounterRecord::HostAdapters(r)
295                }
296                2002 => {
297                    let (_, r) = host_parent::parse_host_parent(record_data)?;
298                    CounterRecord::HostParent(r)
299                }
300                2003 => {
301                    let (_, r) = host_cpu::parse_host_cpu(record_data)?;
302                    CounterRecord::HostCpu(r)
303                }
304                2004 => {
305                    let (_, r) = host_memory::parse_host_memory(record_data)?;
306                    CounterRecord::HostMemory(r)
307                }
308                2005 => {
309                    let (_, r) = host_disk_io::parse_host_disk_io(record_data)?;
310                    CounterRecord::HostDiskIo(r)
311                }
312                2006 => {
313                    let (_, r) = host_net_io::parse_host_net_io(record_data)?;
314                    CounterRecord::HostNetIo(r)
315                }
316                2100 => {
317                    let (_, r) = virt_node::parse_virt_node(record_data)?;
318                    CounterRecord::VirtNode(r)
319                }
320                2101 => {
321                    let (_, r) = virt_cpu::parse_virt_cpu(record_data)?;
322                    CounterRecord::VirtCpu(r)
323                }
324                2102 => {
325                    let (_, r) = virt_memory::parse_virt_memory(record_data)?;
326                    CounterRecord::VirtMemory(r)
327                }
328                2103 => {
329                    let (_, r) = virt_disk_io::parse_virt_disk_io(record_data)?;
330                    CounterRecord::VirtDiskIo(r)
331                }
332                2104 => {
333                    let (_, r) = virt_net_io::parse_virt_net_io(record_data)?;
334                    CounterRecord::VirtNetIo(r)
335                }
336                2007 => {
337                    let (_, r) = mib2_ip_group::parse_mib2_ip_group(record_data)?;
338                    CounterRecord::Mib2IpGroup(r)
339                }
340                2008 => {
341                    let (_, r) = mib2_icmp_group::parse_mib2_icmp_group(record_data)?;
342                    CounterRecord::Mib2IcmpGroup(r)
343                }
344                2009 => {
345                    let (_, r) = mib2_tcp_group::parse_mib2_tcp_group(record_data)?;
346                    CounterRecord::Mib2TcpGroup(r)
347                }
348                2010 => {
349                    let (_, r) = mib2_udp_group::parse_mib2_udp_group(record_data)?;
350                    CounterRecord::Mib2UdpGroup(r)
351                }
352                2105 => {
353                    let (_, r) = jmx_runtime::parse_jmx_runtime(record_data)?;
354                    CounterRecord::JmxRuntime(r)
355                }
356                2106 => {
357                    let (_, r) = jvm_statistics::parse_jvm_statistics(record_data)?;
358                    CounterRecord::JvmStatistics(r)
359                }
360                2201 => {
361                    let (_, r) = http_counters::parse_http_counters(record_data)?;
362                    CounterRecord::HttpCounters(r)
363                }
364                2202 => {
365                    let (_, r) = app_operations::parse_app_operations(record_data)?;
366                    CounterRecord::AppOperations(r)
367                }
368                2203 => {
369                    let (_, r) = app_resources::parse_app_resources(record_data)?;
370                    CounterRecord::AppResources(r)
371                }
372                2204 => {
373                    let (_, r) = memcache_counters::parse_memcache_counters(record_data)?;
374                    CounterRecord::MemcacheCounters(r)
375                }
376                2206 => {
377                    let (_, r) = app_workers::parse_app_workers(record_data)?;
378                    CounterRecord::AppWorkers(r)
379                }
380                2207 => {
381                    let (_, r) = ovs_dp_stats::parse_ovs_dp_stats(record_data)?;
382                    CounterRecord::OvsDpStats(r)
383                }
384                3000 => {
385                    let (_, r) = energy::parse_energy(record_data)?;
386                    CounterRecord::Energy(r)
387                }
388                3001 => {
389                    let (_, r) = temperature::parse_temperature(record_data)?;
390                    CounterRecord::Temperature(r)
391                }
392                3002 => {
393                    let (_, r) = humidity::parse_humidity(record_data)?;
394                    CounterRecord::Humidity(r)
395                }
396                3003 => {
397                    let (_, r) = fans::parse_fans(record_data)?;
398                    CounterRecord::Fans(r)
399                }
400                _ => CounterRecord::Unknown {
401                    enterprise,
402                    format,
403                    data: record_data.to_vec(),
404                },
405            }
406        } else {
407            match (enterprise, format) {
408                (4300, 2) => {
409                    let (_, r) = xen_vif::parse_xen_vif(record_data)?;
410                    CounterRecord::XenVif(r)
411                }
412                _ => CounterRecord::Unknown {
413                    enterprise,
414                    format,
415                    data: record_data.to_vec(),
416                },
417            }
418        };
419
420        records.push(record);
421        input = after_record;
422    }
423
424    Ok((input, records))
425}