1use super::{decode_buffer, serialize_error, serialize_success};
17use crate::protocol_handler::{DecodeOutcome, ProtocolHandler};
18use bytes::BytesMut;
19use netgauze_pcap_reader::TransportProtocol;
20use netgauze_udp_notif_pkt::codec::{UdpPacketCodec, UdpPacketCodecError};
21use netgauze_udp_notif_pkt::raw::{MediaType, UdpNotifPacket};
22use std::collections::HashMap;
23use std::net::IpAddr;
24
25pub struct UdpNotifProtocolHandler {
26 ports: Vec<u16>,
27}
28
29impl UdpNotifProtocolHandler {
30 pub fn new(ports: Vec<u16>) -> Self {
31 UdpNotifProtocolHandler { ports }
32 }
33}
34
35impl ProtocolHandler<UdpNotifPacket, UdpPacketCodec, UdpPacketCodecError>
36 for UdpNotifProtocolHandler
37{
38 fn decode(
39 &self,
40 flow_key: (IpAddr, u16, IpAddr, u16),
41 protocol: TransportProtocol,
42 packet_data: &[u8],
43 exporter_peers: &mut HashMap<(IpAddr, u16, IpAddr, u16), (UdpPacketCodec, BytesMut)>,
44 ) -> Option<Vec<DecodeOutcome<UdpNotifPacket, UdpPacketCodecError>>> {
45 let dst_port: u16 = flow_key.3;
46
47 if protocol == TransportProtocol::UDP && self.ports.contains(&dst_port) {
48 let (codec, buffer) = exporter_peers
49 .entry(flow_key)
50 .or_insert((UdpPacketCodec::default(), BytesMut::new()));
51 buffer.extend_from_slice(packet_data);
52
53 let mut results = Vec::new();
56 decode_buffer(buffer, codec, flow_key, &mut results);
57 if !results.is_empty() {
58 return Some(results);
59 }
60 }
61 None
62 }
63
64 fn serialize(
65 &self,
66 decode_outcome: DecodeOutcome<UdpNotifPacket, UdpPacketCodecError>,
67 ) -> Result<serde_json::Value, std::io::Error> {
68 match decode_outcome {
69 DecodeOutcome::Success(m) => {
70 let (flow_key, udp_notif_packet) = m;
71 let mut value = serde_json::to_value(&udp_notif_packet)
72 .expect("Couldn't serialize UDP-Notif message to json");
73 match udp_notif_packet.media_type() {
75 MediaType::YangDataJson => {
76 let payload = serde_json::from_slice(&udp_notif_packet.payload())
77 .expect("Couldn't deserialize JSON payload into a JSON object");
78 if let serde_json::Value::Object(val) = &mut value {
79 val.insert("payload".to_string(), payload);
80 }
81 }
82 MediaType::YangDataXml => {
83 let payload = udp_notif_packet.payload();
84 let payload = std::str::from_utf8(&payload)
85 .expect("Couldn't deserialize XML payload into an UTF-8 string");
86 if let serde_json::Value::Object(val) = &mut value {
87 val.insert(
88 "payload".to_string(),
89 serde_json::Value::String(payload.to_string()),
90 );
91 }
92 }
93 MediaType::YangDataCbor => {
94 let payload: serde_json::Value = ciborium::de::from_reader(
95 std::io::Cursor::new(udp_notif_packet.payload()),
96 )
97 .expect("Couldn't deserialize CBOR payload into a CBOR object");
98 if let serde_json::Value::Object(val) = &mut value {
99 val.insert("payload".to_string(), payload);
100 }
101 }
102 _ => {}
103 }
104 serialize_success(flow_key, value)
105 }
106 DecodeOutcome::Error(m) => serialize_error(m),
107 }
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use bytes::Bytes;
115 use serde_json::json;
116 use std::net::Ipv4Addr;
117
118 #[test]
119 fn test_udp_notif_handler_decode_success() {
120 let handler = UdpNotifProtocolHandler::new(vec![1234]);
121 let flow_key = (
122 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
123 12345,
124 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
125 1234,
126 );
127 let packet_data = [
128 0x21, 0x0c, 0x00, 0x0e, 0x01, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x02, 0xff, 0xff, ];
135 let mut exporter_peers = HashMap::new();
136
137 let result = handler.decode(
138 flow_key,
139 TransportProtocol::UDP,
140 &packet_data,
141 &mut exporter_peers,
142 );
143
144 assert_eq!(
145 result,
146 Some(vec![DecodeOutcome::Success((
147 flow_key,
148 UdpNotifPacket::new(
149 MediaType::YangDataJson,
150 0x01000001,
151 0x02000002,
152 HashMap::new(),
153 Bytes::from(&[0xffu8, 0xffu8][..]),
154 )
155 ))])
156 );
157 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
159 }
160
161 #[test]
162 fn test_udp_notif_handler_decode_fragmented_success() {
163 let handler = UdpNotifProtocolHandler::new(vec![4739]);
164 let flow_key = (
165 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
166 12345,
167 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
168 4739,
169 );
170 let packet_data1 = [
171 0x21, 0x0c, 0x00, 0x0e, 0x01, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x02, ];
177 let packet_data2 = [
178 0xff, 0xff, ];
180 let mut exporter_peers = HashMap::new();
181
182 let result1 = handler.decode(
183 flow_key,
184 TransportProtocol::UDP,
185 &packet_data1,
186 &mut exporter_peers,
187 );
188 assert!(result1.is_none());
191 assert!(!exporter_peers.get(&flow_key).unwrap().1.is_empty());
193
194 let result2 = handler.decode(
195 flow_key,
196 TransportProtocol::UDP,
197 &packet_data2,
198 &mut exporter_peers,
199 );
200
201 assert_eq!(
202 result2,
203 Some(vec![DecodeOutcome::Success((
204 flow_key,
205 UdpNotifPacket::new(
206 MediaType::YangDataJson,
207 0x01000001,
208 0x02000002,
209 HashMap::new(),
210 Bytes::from(&[0xffu8, 0xffu8][..]),
211 )
212 ))])
213 );
214 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
216 }
217
218 #[test]
219 fn test_udp_notif_handler_decode_multiple_messages_should_fail() {
220 let handler = UdpNotifProtocolHandler::new(vec![1234]);
221 let flow_key = (
222 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
223 12345,
224 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
225 1234,
226 );
227 let packet_data = [
229 0x21, 0x0c, 0x00, 0x0e, 0x01, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x02, 0xff, 0xff, 0x21, 0x0c, 0x00, 0x0e, 0x01, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x02, 0xff, 0xff, ];
242 let mut exporter_peers = HashMap::new();
243
244 let result = handler.decode(
245 flow_key,
246 TransportProtocol::UDP,
247 &packet_data,
248 &mut exporter_peers,
249 );
250
251 assert_eq!(
252 result,
253 Some(vec![DecodeOutcome::Error(
254 UdpPacketCodecError::InvalidMessageLength(14)
255 )]),
256 );
257 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
259 }
260
261 #[test]
262 fn test_udp_notif_handler_decode_failure() {
263 let handler = UdpNotifProtocolHandler::new(vec![1234]);
264 let flow_key = (
265 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
266 12345,
267 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
268 1234,
269 );
270 let packet_data = [
271 0x21, 0x01, 0x00, 0x0e, 0x01, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x02, 0xff, 0xff, ];
278 let mut exporter_peers = HashMap::new();
279
280 let result = handler.decode(
281 flow_key,
282 TransportProtocol::UDP,
283 &packet_data,
284 &mut exporter_peers,
285 );
286
287 assert_eq!(
288 result,
289 Some(vec![DecodeOutcome::Error(
290 UdpPacketCodecError::InvalidHeaderLength(1)
291 )]),
292 );
293 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
295 }
296
297 #[test]
298 fn test_udp_notif_handler_decode_ignore_wrong_port() {
299 let handler = UdpNotifProtocolHandler::new(vec![1234]);
300 let flow_key = (
301 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
302 12345,
303 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
304 5678, );
306 let packet_data = [0xff];
307 let mut exporter_peers = HashMap::new();
308
309 let result = handler.decode(
310 flow_key,
311 TransportProtocol::UDP,
312 &packet_data,
313 &mut exporter_peers,
314 );
315
316 assert!(result.is_none());
317 }
318
319 #[test]
320 fn test_udp_notif_handler_decode_ignore_wrong_protocol() {
321 let handler = UdpNotifProtocolHandler::new(vec![1234]);
322 let flow_key = (
323 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
324 12345,
325 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
326 1234,
327 );
328 let packet_data = [0xff];
329 let mut exporter_peers = HashMap::new();
330
331 let result = handler.decode(
332 flow_key,
333 TransportProtocol::TCP, &packet_data,
335 &mut exporter_peers,
336 );
337
338 assert!(result.is_none());
339 }
340 #[test]
341 fn test_udp_notif_handler_serialize_success() {
342 let handler = UdpNotifProtocolHandler::new(vec![1234]);
343 let flow_key = (
344 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
345 12345,
346 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
347 1234,
348 );
349 let udp_notif_packet = UdpNotifPacket::new(
350 MediaType::Unknown(0xee),
351 0x01000001,
352 0x02000002,
353 HashMap::new(),
354 Bytes::from(&[0xffu8, 0xffu8][..]),
355 );
356 let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
357
358 let result = handler.serialize(outcome);
359
360 assert!(result.is_ok());
361 let json = result.unwrap();
362 let expected = json!({
363 "source_address": "10.0.0.1:12345",
364 "destination_address": "10.0.0.2:1234",
365 "info": {
366 "media_type": {
367 "Unknown": 238
368 },
369 "message_id": 33554434,
370 "options": {},
371 "payload": [
372 255,
373 255
374 ],
375 "publisher_id": 16777217
376 }
377 });
378 assert_eq!(json, expected);
379 }
380
381 #[test]
382 fn test_udp_notif_handler_serialize_error() {
383 let handler = UdpNotifProtocolHandler::new(vec![1234]);
384 let error = UdpPacketCodecError::InvalidMessageLength(10);
385 let outcome = DecodeOutcome::Error(error);
386
387 let result = handler.serialize(outcome);
388
389 assert!(result.is_ok());
390 let json = result.unwrap();
391 let expected = json!({
392 "InvalidMessageLength": 10
393 });
394 assert_eq!(json, expected);
395 }
396
397 #[test]
398 fn test_udp_notif_handler_serialize_json_payload() {
399 let handler = UdpNotifProtocolHandler::new(vec![1234]);
400 let flow_key = (
401 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
402 12345,
403 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
404 1234,
405 );
406 let json_payload = json!({"a": "b"});
407 let udp_notif_packet = UdpNotifPacket::new(
408 MediaType::YangDataJson,
409 0x01000001,
410 0x02000002,
411 HashMap::new(),
412 Bytes::from(serde_json::to_vec(&json_payload).unwrap()),
413 );
414 let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
415
416 let result = handler.serialize(outcome);
417
418 assert!(result.is_ok());
419 let json = result.unwrap();
420 let expected = json!({
421 "source_address": "10.0.0.1:12345",
422 "destination_address": "10.0.0.2:1234",
423 "info": {
424 "media_type": "YangDataJson",
425 "message_id": 33554434,
426 "options": {},
427 "payload": {
428 "a": "b"
429 },
430 "publisher_id": 16777217
431 }
432 });
433 assert_eq!(json, expected);
434 }
435
436 #[test]
437 fn test_udp_notif_handler_serialize_xml_payload() {
438 let handler = UdpNotifProtocolHandler::new(vec![1234]);
439 let flow_key = (
440 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
441 12345,
442 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
443 1234,
444 );
445 let xml_payload = r#"<a b="c"/>"#;
446 let udp_notif_packet = UdpNotifPacket::new(
447 MediaType::YangDataXml,
448 0x01000001,
449 0x02000002,
450 HashMap::new(),
451 Bytes::from(xml_payload),
452 );
453 let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
454
455 let result = handler.serialize(outcome);
456
457 assert!(result.is_ok());
458 let json = result.unwrap();
459 let expected = json!({
460 "source_address": "10.0.0.1:12345",
461 "destination_address": "10.0.0.2:1234",
462 "info": {
463 "media_type": "YangDataXml",
464 "message_id": 33554434,
465 "options": {},
466 "payload": "<a b=\"c\"/>",
467 "publisher_id": 16777217
468 }
469 });
470 assert_eq!(json, expected);
471 }
472
473 #[test]
474 fn test_udp_notif_handler_serialize_cbor_payload() {
475 let handler = UdpNotifProtocolHandler::new(vec![1234]);
476 let flow_key = (
477 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
478 12345,
479 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
480 1234,
481 );
482 let cbor_payload_map = json!({"a": "b"});
483 let mut cbor_payload = Vec::new();
484 ciborium::ser::into_writer(&cbor_payload_map, &mut cbor_payload).unwrap();
485
486 let udp_notif_packet = UdpNotifPacket::new(
487 MediaType::YangDataCbor,
488 0x01000001,
489 0x02000002,
490 HashMap::new(),
491 Bytes::from(cbor_payload),
492 );
493 let outcome = DecodeOutcome::Success((flow_key, udp_notif_packet));
494
495 let result = handler.serialize(outcome);
496
497 assert!(result.is_ok());
498 let json = result.unwrap();
499 let expected = json!({
500 "source_address": "10.0.0.1:12345",
501 "destination_address": "10.0.0.2:1234",
502 "info": {
503 "media_type": "YangDataCbor",
504 "message_id": 33554434,
505 "options": {},
506 "payload": {
507 "a": "b"
508 },
509 "publisher_id": 16777217
510 }
511 });
512 assert_eq!(json, expected);
513 }
514}