netgauze_pcap_decoder/handlers/
udp_notif.rs

1// Copyright (C) 2025-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::{decode_buffer, serialize_error, serialize_success};
17use crate::protocol_handler::{DecodeOutcome, ProtocolHandler};
18use bytes::BytesMut;
19use netgauze_pcap_reader::TransportProtocol;
20use netgauze_udp_notif_pkt::codec::{UdpPacketCodec, UdpPacketCodecError};
21use netgauze_udp_notif_pkt::raw::{MediaType, UdpNotifPacket};
22use std::collections::HashMap;
23use std::net::IpAddr;
24
25pub struct UdpNotifProtocolHandler {
26    ports: Vec<u16>,
27}
28
29impl UdpNotifProtocolHandler {
30    pub fn new(ports: Vec<u16>) -> Self {
31        UdpNotifProtocolHandler { ports }
32    }
33}
34
35impl ProtocolHandler<UdpNotifPacket, UdpPacketCodec, UdpPacketCodecError>
36    for UdpNotifProtocolHandler
37{
38    fn decode(
39        &self,
40        flow_key: (IpAddr, u16, IpAddr, u16),
41        protocol: TransportProtocol,
42        packet_data: &[u8],
43        exporter_peers: &mut HashMap<(IpAddr, u16, IpAddr, u16), (UdpPacketCodec, BytesMut)>,
44    ) -> Option<Vec<DecodeOutcome<UdpNotifPacket, UdpPacketCodecError>>> {
45        let dst_port: u16 = flow_key.3;
46
47        if protocol == TransportProtocol::UDP && self.ports.contains(&dst_port) {
48            let (codec, buffer) = exporter_peers
49                .entry(flow_key)
50                .or_insert((UdpPacketCodec::default(), BytesMut::new()));
51            buffer.extend_from_slice(packet_data);
52
53            // because of implementation specification UDP-Notif exports maximum 1 message
54            // per packet payload
55            let mut results = Vec::new();
56            decode_buffer(buffer, codec, flow_key, &mut results);
57            if !results.is_empty() {
58                return Some(results);
59            }
60        }
61        None
62    }
63
64    fn serialize(
65        &self,
66        decode_outcome: DecodeOutcome<UdpNotifPacket, UdpPacketCodecError>,
67    ) -> Result<serde_json::Value, std::io::Error> {
68        match decode_outcome {
69            DecodeOutcome::Success(m) => {
70                let (flow_key, udp_notif_packet) = m;
71                let mut value = serde_json::to_value(&udp_notif_packet)
72                    .expect("Couldn't serialize UDP-Notif message to json");
73                // Convert when possible inner payload into human-readable format
74                match udp_notif_packet.media_type() {
75                    MediaType::YangDataJson => {
76                        let payload = serde_json::from_slice(&udp_notif_packet.payload())
77                            .expect("Couldn't deserialize JSON payload into a JSON object");
78                        if let serde_json::Value::Object(val) = &mut value {
79                            val.insert("payload".to_string(), payload);
80                        }
81                    }
82                    MediaType::YangDataXml => {
83                        let payload = udp_notif_packet.payload();
84                        let payload = std::str::from_utf8(&payload)
85                            .expect("Couldn't deserialize XML payload into an UTF-8 string");
86                        if let serde_json::Value::Object(val) = &mut value {
87                            val.insert(
88                                "payload".to_string(),
89                                serde_json::Value::String(payload.to_string()),
90                            );
91                        }
92                    }
93                    MediaType::YangDataCbor => {
94                        let payload: serde_json::Value = ciborium::de::from_reader(
95                            std::io::Cursor::new(udp_notif_packet.payload()),
96                        )
97                        .expect("Couldn't deserialize CBOR payload into a CBOR object");
98                        if let serde_json::Value::Object(val) = &mut value {
99                            val.insert("payload".to_string(), payload);
100                        }
101                    }
102                    _ => {}
103                }
104                serialize_success(flow_key, value)
105            }
106            DecodeOutcome::Error(m) => serialize_error(m),
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use bytes::Bytes;
115    use serde_json::json;
116    use std::net::Ipv4Addr;
117
118    #[test]
119    fn test_udp_notif_handler_decode_success() {
120        let handler = UdpNotifProtocolHandler::new(vec![1234]);
121        let flow_key = (
122            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
123            12345,
124            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
125            1234,
126        );
127        let packet_data = [
128            0x21, // version 1, no private space, Media type: 1 = YANG data JSON
129            0x0c, // Header length
130            0x00, 0x0e, // Message length
131            0x01, 0x00, 0x00, 0x01, // Publisher ID
132            0x02, 0x00, 0x00, 0x02, // Message ID
133            0xff, 0xff, // dummy payload
134        ];
135        let mut exporter_peers = HashMap::new();
136
137        let result = handler.decode(
138            flow_key,
139            TransportProtocol::UDP,
140            &packet_data,
141            &mut exporter_peers,
142        );
143
144        assert_eq!(
145            result,
146            Some(vec![DecodeOutcome::Success((
147                flow_key,
148                UdpNotifPacket::new(
149                    MediaType::YangDataJson,
150                    0x01000001,
151                    0x02000002,
152                    HashMap::new(),
153                    Bytes::from(&[0xffu8, 0xffu8][..]),
154                )
155            ))])
156        );
157        // Now we should have an empty buffer for this flow key
158        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
159    }
160
161    #[test]
162    fn test_udp_notif_handler_decode_fragmented_success() {
163        let handler = UdpNotifProtocolHandler::new(vec![4739]);
164        let flow_key = (
165            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
166            12345,
167            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
168            4739,
169        );
170        let packet_data1 = [
171            0x21, // version 1, no private space, Media type: 1 = YANG data JSON
172            0x0c, // Header length
173            0x00, 0x0e, // Message length
174            0x01, 0x00, 0x00, 0x01, // Publisher ID
175            0x02, 0x00, 0x00, 0x02, // Message ID
176        ];
177        let packet_data2 = [
178            0xff, 0xff, // dummy payload
179        ];
180        let mut exporter_peers = HashMap::new();
181
182        let result1 = handler.decode(
183            flow_key,
184            TransportProtocol::UDP,
185            &packet_data1,
186            &mut exporter_peers,
187        );
188        // UDP is datagram oriented, so the codec will wait for the full datagram.
189        // The test setup simulates fragmentation at a higher level.
190        assert!(result1.is_none());
191        // The buffer for this flow key should now contain the first part, so not empty
192        assert!(!exporter_peers.get(&flow_key).unwrap().1.is_empty());
193
194        let result2 = handler.decode(
195            flow_key,
196            TransportProtocol::UDP,
197            &packet_data2,
198            &mut exporter_peers,
199        );
200
201        assert_eq!(
202            result2,
203            Some(vec![DecodeOutcome::Success((
204                flow_key,
205                UdpNotifPacket::new(
206                    MediaType::YangDataJson,
207                    0x01000001,
208                    0x02000002,
209                    HashMap::new(),
210                    Bytes::from(&[0xffu8, 0xffu8][..]),
211                )
212            ))])
213        );
214        // Now we should have an empty buffer for this flow key
215        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
216    }
217
218    #[test]
219    fn test_udp_notif_handler_decode_multiple_messages_should_fail() {
220        let handler = UdpNotifProtocolHandler::new(vec![1234]);
221        let flow_key = (
222            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
223            12345,
224            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
225            1234,
226        );
227        // Two messages
228        let packet_data = [
229            0x21, // version 1, no private space, Media type: 1 = YANG data JSON
230            0x0c, // Header length
231            0x00, 0x0e, // Message length
232            0x01, 0x00, 0x00, 0x01, // Publisher ID
233            0x02, 0x00, 0x00, 0x02, // Message ID
234            0xff, 0xff, // dummy payload
235            0x21, // version 1, no private space, Media type: 1 = YANG data JSON
236            0x0c, // Header length
237            0x00, 0x0e, // Message length
238            0x01, 0x00, 0x00, 0x01, // Publisher ID
239            0x02, 0x00, 0x00, 0x02, // Message ID
240            0xff, 0xff, // dummy payload
241        ];
242        let mut exporter_peers = HashMap::new();
243
244        let result = handler.decode(
245            flow_key,
246            TransportProtocol::UDP,
247            &packet_data,
248            &mut exporter_peers,
249        );
250
251        assert_eq!(
252            result,
253            Some(vec![DecodeOutcome::Error(
254                UdpPacketCodecError::InvalidMessageLength(14)
255            )]),
256        );
257        // Now we should have an empty buffer for this flow key
258        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
259    }
260
261    #[test]
262    fn test_udp_notif_handler_decode_failure() {
263        let handler = UdpNotifProtocolHandler::new(vec![1234]);
264        let flow_key = (
265            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
266            12345,
267            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
268            1234,
269        );
270        let packet_data = [
271            0x21, // version 1, no private space, Media type: 1 = YANG data JSON
272            0x01, // Invalid Header length
273            0x00, 0x0e, // Message length
274            0x01, 0x00, 0x00, 0x01, // Publisher ID
275            0x02, 0x00, 0x00, 0x02, // Message ID
276            0xff, 0xff, // dummy payload
277        ];
278        let mut exporter_peers = HashMap::new();
279
280        let result = handler.decode(
281            flow_key,
282            TransportProtocol::UDP,
283            &packet_data,
284            &mut exporter_peers,
285        );
286
287        assert_eq!(
288            result,
289            Some(vec![DecodeOutcome::Error(
290                UdpPacketCodecError::InvalidHeaderLength(1)
291            )]),
292        );
293        // Now we should have an empty buffer for this flow key
294        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
295    }
296
297    #[test]
298    fn test_udp_notif_handler_decode_ignore_wrong_port() {
299        let handler = UdpNotifProtocolHandler::new(vec![1234]);
300        let flow_key = (
301            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
302            12345,
303            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
304            5678, // Wrong port
305        );
306        let packet_data = [0xff];
307        let mut exporter_peers = HashMap::new();
308
309        let result = handler.decode(
310            flow_key,
311            TransportProtocol::UDP,
312            &packet_data,
313            &mut exporter_peers,
314        );
315
316        assert!(result.is_none());
317    }
318
319    #[test]
320    fn test_udp_notif_handler_decode_ignore_wrong_protocol() {
321        let handler = UdpNotifProtocolHandler::new(vec![1234]);
322        let flow_key = (
323            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
324            12345,
325            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
326            1234,
327        );
328        let packet_data = [0xff];
329        let mut exporter_peers = HashMap::new();
330
331        let result = handler.decode(
332            flow_key,
333            TransportProtocol::TCP, // Wrong protocol
334            &packet_data,
335            &mut exporter_peers,
336        );
337
338        assert!(result.is_none());
339    }
340    #[test]
341    fn test_udp_notif_handler_serialize_success() {
342        let handler = UdpNotifProtocolHandler::new(vec![1234]);
343        let flow_key = (
344            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
345            12345,
346            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
347            1234,
348        );
349        let udp_notif_packet = UdpNotifPacket::new(
350            MediaType::Unknown(0xee),
351            0x01000001,
352            0x02000002,
353            HashMap::new(),
354            Bytes::from(&[0xffu8, 0xffu8][..]),
355        );
356        let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
357
358        let result = handler.serialize(outcome);
359
360        assert!(result.is_ok());
361        let json = result.unwrap();
362        let expected = json!({
363          "source_address": "10.0.0.1:12345",
364          "destination_address": "10.0.0.2:1234",
365          "info": {
366            "media_type": {
367              "Unknown": 238
368            },
369            "message_id": 33554434,
370            "options": {},
371            "payload": [
372              255,
373              255
374            ],
375            "publisher_id": 16777217
376          }
377        });
378        assert_eq!(json, expected);
379    }
380
381    #[test]
382    fn test_udp_notif_handler_serialize_error() {
383        let handler = UdpNotifProtocolHandler::new(vec![1234]);
384        let error = UdpPacketCodecError::InvalidMessageLength(10);
385        let outcome = DecodeOutcome::Error(error);
386
387        let result = handler.serialize(outcome);
388
389        assert!(result.is_ok());
390        let json = result.unwrap();
391        let expected = json!({
392            "InvalidMessageLength": 10
393        });
394        assert_eq!(json, expected);
395    }
396
397    #[test]
398    fn test_udp_notif_handler_serialize_json_payload() {
399        let handler = UdpNotifProtocolHandler::new(vec![1234]);
400        let flow_key = (
401            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
402            12345,
403            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
404            1234,
405        );
406        let json_payload = json!({"a": "b"});
407        let udp_notif_packet = UdpNotifPacket::new(
408            MediaType::YangDataJson,
409            0x01000001,
410            0x02000002,
411            HashMap::new(),
412            Bytes::from(serde_json::to_vec(&json_payload).unwrap()),
413        );
414        let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
415
416        let result = handler.serialize(outcome);
417
418        assert!(result.is_ok());
419        let json = result.unwrap();
420        let expected = json!({
421          "source_address": "10.0.0.1:12345",
422          "destination_address": "10.0.0.2:1234",
423          "info": {
424            "media_type": "YangDataJson",
425            "message_id": 33554434,
426            "options": {},
427            "payload": {
428              "a": "b"
429            },
430            "publisher_id": 16777217
431          }
432        });
433        assert_eq!(json, expected);
434    }
435
436    #[test]
437    fn test_udp_notif_handler_serialize_xml_payload() {
438        let handler = UdpNotifProtocolHandler::new(vec![1234]);
439        let flow_key = (
440            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
441            12345,
442            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
443            1234,
444        );
445        let xml_payload = r#"<a b="c"/>"#;
446        let udp_notif_packet = UdpNotifPacket::new(
447            MediaType::YangDataXml,
448            0x01000001,
449            0x02000002,
450            HashMap::new(),
451            Bytes::from(xml_payload),
452        );
453        let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
454
455        let result = handler.serialize(outcome);
456
457        assert!(result.is_ok());
458        let json = result.unwrap();
459        let expected = json!({
460          "source_address": "10.0.0.1:12345",
461          "destination_address": "10.0.0.2:1234",
462          "info": {
463            "media_type": "YangDataXml",
464            "message_id": 33554434,
465            "options": {},
466            "payload": "<a b=\"c\"/>",
467            "publisher_id": 16777217
468          }
469        });
470        assert_eq!(json, expected);
471    }
472
473    #[test]
474    fn test_udp_notif_handler_serialize_cbor_payload() {
475        let handler = UdpNotifProtocolHandler::new(vec![1234]);
476        let flow_key = (
477            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
478            12345,
479            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
480            1234,
481        );
482        let cbor_payload_map = json!({"a": "b"});
483        let mut cbor_payload = Vec::new();
484        ciborium::ser::into_writer(&cbor_payload_map, &mut cbor_payload).unwrap();
485
486        let udp_notif_packet = UdpNotifPacket::new(
487            MediaType::YangDataCbor,
488            0x01000001,
489            0x02000002,
490            HashMap::new(),
491            Bytes::from(cbor_payload),
492        );
493        let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
494
495        let result = handler.serialize(outcome);
496
497        assert!(result.is_ok());
498        let json = result.unwrap();
499        let expected = json!({
500          "source_address": "10.0.0.1:12345",
501          "destination_address": "10.0.0.2:1234",
502          "info": {
503            "media_type": "YangDataCbor",
504            "message_id": 33554434,
505            "options": {},
506            "payload": {
507              "a": "b"
508            },
509            "publisher_id": 16777217
510          }
511        });
512        assert_eq!(json, expected);
513    }
514}