1use crate::config::{FlowConfig, NetconfConfig, PublisherEndpoint, UdpNotifConfig};
17use crate::flow::aggregation::AggregationActorHandle;
18use crate::flow::enrichment::FlowEnrichmentActorHandle;
19use crate::flow::renormalization::RenormalizationActorHandle;
20use crate::inputs::files::FilesActorHandle;
21use crate::inputs::flow_options::FlowOptionsActorHandle;
22use crate::inputs::kafka::KafkaConsumerActorHandle;
23use crate::publishers::http::{HttpPublisherActorHandle, Message};
24use crate::publishers::kafka_avro::KafkaAvroPublisherActorHandle;
25use crate::publishers::kafka_json::KafkaJsonPublisherActorHandle;
26use crate::publishers::kafka_yang::KafkaYangPublisherActorHandle;
27use crate::yang_push::enrichment::YangPushEnrichmentActorHandle;
28
29use futures_util::StreamExt;
30use futures_util::stream::FuturesUnordered;
31use netgauze_flow_pkt::FlowInfo;
32use netgauze_flow_service::FlowRequest;
33use netgauze_flow_service::flow_supervisor::FlowCollectorsSupervisorActorHandle;
34use netgauze_udp_notif_pkt::raw::MediaType;
35use netgauze_udp_notif_service::UdpNotifRequest;
36use netgauze_udp_notif_service::supervisor::UdpNotifSupervisorHandle;
37use netgauze_yang_push::ContentId;
38use netgauze_yang_push::cache::actor::CacheActorHandle;
39use netgauze_yang_push::cache::fetcher::NetconfYangLibraryFetcher;
40use netgauze_yang_push::cache::storage::SubscriptionInfo;
41use netgauze_yang_push::model::telemetry::TelemetryMessageWrapper;
42use netgauze_yang_push::validation::ValidationActorHandle;
43use std::net::IpAddr;
44use std::path::PathBuf;
45use std::str::Utf8Error;
46use std::sync::Arc;
47use std::time::Duration;
48use tracing::{info, warn};
49
50pub mod config;
51pub mod flow;
52pub mod inputs;
53pub mod publishers;
54pub mod yang_push;
55
56pub async fn init_flow_collection(
57 flow_config: FlowConfig,
58 meter: opentelemetry::metrics::Meter,
59) -> anyhow::Result<()> {
60 let supervisor_config = flow_config.supervisor_config();
61
62 let (supervisor_join_handle, supervisor_handle) =
63 FlowCollectorsSupervisorActorHandle::new(supervisor_config, meter.clone()).await?;
64 let mut http_handles = Vec::new();
65 let mut enrichment_handles = Vec::new();
66 let mut renormalization_handles = Vec::new();
67 let mut agg_handles = Vec::new();
68 let mut flow_options_input_handles = Vec::new();
69 let mut files_input_handles = Vec::new();
70 let mut kafka_input_handles = Vec::new();
71 let mut kafka_avro_handles = Vec::new();
72 let mut kafka_json_handles = Vec::new();
73 let mut join_set = FuturesUnordered::new();
74
75 for (group_name, publisher_config) in flow_config.publishers {
76 info!("Starting publishers group '{group_name}'");
77
78 let mut flow_recvs = Vec::new();
79 if publisher_config.shards > 1 {
80 (flow_recvs, _) = supervisor_handle
81 .subscribe_shards(publisher_config.shards, publisher_config.buffer_size)
82 .await?;
83 } else {
84 let (flow_recv, _) = supervisor_handle
85 .subscribe(publisher_config.buffer_size)
86 .await?;
87 flow_recvs.push(flow_recv);
88 }
89
90 for (endpoint_name, endpoint) in publisher_config.endpoints {
91 info!("Creating publisher '{endpoint_name}'");
92
93 match &endpoint {
94 PublisherEndpoint::Http(config) => {
95 let converter = |request: Arc<FlowRequest>, writer_id: String| {
96 let ret = Message::insert {
97 ts: format!("{}", chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")),
98 peer_src: format!("{}", request.0.ip()),
99 writer_id,
100 payload: request.1.clone(),
101 };
102 vec![ret]
103 };
104 for flow_recv in &flow_recvs {
105 let (http_join, http_handle) = HttpPublisherActorHandle::new(
106 endpoint_name.clone(),
107 config.clone(),
108 converter,
109 flow_recv.clone(),
110 meter.clone(),
111 )?;
112 join_set.push(http_join);
113 http_handles.push(http_handle);
114 }
115 }
116 PublisherEndpoint::FlowKafkaAvro(config) => {
117 for (shard_id, flow_recv) in flow_recvs.iter().enumerate() {
118 let (enrichment_join, enrichment_handle) = FlowEnrichmentActorHandle::new(
119 publisher_config.buffer_size,
120 flow_recv.clone(),
121 either::Left(meter.clone()),
122 shard_id,
123 config.writer_id.clone(),
124 );
125
126 if let Some(aggregation_config) = publisher_config.aggregation.as_ref() {
127 let (renormalization_join, renormalization_handle) =
128 RenormalizationActorHandle::new(
129 publisher_config.buffer_size,
130 enrichment_handle.subscribe(),
131 either::Left(meter.clone()),
132 shard_id,
133 );
134
135 let (agg_join, agg_handle) = AggregationActorHandle::new(
136 publisher_config.buffer_size,
137 aggregation_config.clone(),
138 renormalization_handle.subscribe(),
139 either::Left(meter.clone()),
140 shard_id,
141 );
142
143 let (kafka_join, kafka_handle) =
144 KafkaAvroPublisherActorHandle::from_config(
145 config.clone(),
146 agg_handle.subscribe(),
147 either::Left(meter.clone()),
148 )
149 .await?;
150
151 join_set.push(enrichment_join);
152 join_set.push(renormalization_join);
153 join_set.push(agg_join);
154 join_set.push(kafka_join);
155 enrichment_handles.push(enrichment_handle);
156 renormalization_handles.push(renormalization_handle);
157 agg_handles.push(agg_handle);
158 kafka_avro_handles.push(kafka_handle);
159 }
160 }
161
162 let (flow_recv, _) = supervisor_handle
163 .subscribe(publisher_config.buffer_size)
164 .await?;
165
166 if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
167 if let Some(flow_options_config) = enrichment_config
168 .inputs
169 .as_ref()
170 .and_then(|i| i.flow_options.as_ref())
171 {
172 let (flow_options_join, flow_options_handle) =
173 FlowOptionsActorHandle::from_config(
174 flow_options_config,
175 flow_recv.clone(),
176 enrichment_handles.clone(),
177 either::Left(meter.clone()),
178 );
179 join_set.push(flow_options_join);
180 flow_options_input_handles.push(flow_options_handle);
181 }
182
183 if let Some(files_config) = enrichment_config
184 .inputs
185 .as_ref()
186 .and_then(|i| i.files.as_ref())
187 {
188 let (files_join, files_handle) = FilesActorHandle::from_config(
189 files_config.clone(),
190 enrichment_handles.clone(),
191 either::Left(meter.clone()),
192 );
193 join_set.push(files_join);
194 files_input_handles.push(files_handle);
195 }
196
197 if let Some(kafka_config) = enrichment_config
198 .inputs
199 .as_ref()
200 .and_then(|i| i.kafka.as_ref())
201 {
202 for consumer_config in &kafka_config.consumers {
203 let (join_handle, actor_handle) =
204 KafkaConsumerActorHandle::from_config(
205 consumer_config,
206 enrichment_handles.clone(),
207 either::Left(meter.clone()),
208 )?;
209 join_set.push(join_handle);
210 kafka_input_handles.push(actor_handle);
211 }
212 }
213 }
214 }
215 PublisherEndpoint::FlowKafkaJson(config) => {
216 for (shard_id, flow_recv) in flow_recvs.iter().enumerate() {
217 let (enrichment_join, enrichment_handle) = FlowEnrichmentActorHandle::new(
218 publisher_config.buffer_size,
219 flow_recv.clone(),
220 either::Left(meter.clone()),
221 shard_id,
222 config.writer_id.clone(),
223 );
224
225 if let Some(aggregation_config) = publisher_config.aggregation.as_ref() {
226 let (renormalization_join, renormalization_handle) =
227 RenormalizationActorHandle::new(
228 publisher_config.buffer_size,
229 enrichment_handle.subscribe(),
230 either::Left(meter.clone()),
231 shard_id,
232 );
233
234 let (agg_join, agg_handle) = AggregationActorHandle::new(
235 publisher_config.buffer_size,
236 aggregation_config.clone(),
237 renormalization_handle.subscribe(),
238 either::Left(meter.clone()),
239 shard_id,
240 );
241
242 let (kafka_join, kafka_handle) =
243 KafkaJsonPublisherActorHandle::from_config(
244 serialize_flow,
245 config.clone(),
246 agg_handle.subscribe(),
247 either::Left(meter.clone()),
248 )?;
249
250 join_set.push(enrichment_join);
251 join_set.push(renormalization_join);
252 join_set.push(agg_join);
253 join_set.push(kafka_join);
254 enrichment_handles.push(enrichment_handle);
255 renormalization_handles.push(renormalization_handle);
256 agg_handles.push(agg_handle);
257 kafka_json_handles.push(kafka_handle);
258 }
259 }
260
261 let (flow_recv, _) = supervisor_handle
262 .subscribe(publisher_config.buffer_size)
263 .await?;
264
265 if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
266 if let Some(flow_options_config) = enrichment_config
267 .inputs
268 .as_ref()
269 .and_then(|i| i.flow_options.as_ref())
270 {
271 let (flow_options_join, flow_options_handle) =
272 FlowOptionsActorHandle::from_config(
273 flow_options_config,
274 flow_recv.clone(),
275 enrichment_handles.clone(),
276 either::Left(meter.clone()),
277 );
278 join_set.push(flow_options_join);
279 flow_options_input_handles.push(flow_options_handle);
280 }
281
282 if let Some(files_config) = enrichment_config
283 .inputs
284 .as_ref()
285 .and_then(|i| i.files.as_ref())
286 {
287 let (files_join, files_handle) = FilesActorHandle::from_config(
288 files_config.clone(),
289 enrichment_handles.clone(),
290 either::Left(meter.clone()),
291 );
292 join_set.push(files_join);
293 files_input_handles.push(files_handle);
294 }
295
296 if let Some(kafka_config) = enrichment_config
297 .inputs
298 .as_ref()
299 .and_then(|i| i.kafka.as_ref())
300 {
301 for consumer_config in &kafka_config.consumers {
302 let (join_handle, actor_handle) =
303 KafkaConsumerActorHandle::from_config(
304 consumer_config,
305 enrichment_handles.clone(),
306 either::Left(meter.clone()),
307 )?;
308 join_set.push(join_handle);
309 kafka_input_handles.push(actor_handle);
310 }
311 }
312 }
313 }
314 PublisherEndpoint::KafkaJson(config) => {
315 for flow_recv in &flow_recvs {
316 let (join_handle, handle) = KafkaJsonPublisherActorHandle::from_config(
317 serialize_flow_req,
318 config.clone(),
319 flow_recv.clone(),
320 either::Left(meter.clone()),
321 )?;
322 join_set.push(join_handle);
323 kafka_json_handles.push(handle);
324 }
325 }
326 PublisherEndpoint::TelemetryKafkaJson(_) => {
327 return Err(anyhow::anyhow!(
328 "Telemetry KafkaJson publisher not yet supported for Flow"
329 ));
330 }
331 PublisherEndpoint::TelemetryKafkaYang(_) => {
332 return Err(anyhow::anyhow!(
333 "Telemetry KafkaYang publisher not supported for Flow"
334 ));
335 }
336 }
337 }
338 }
339 let ret = tokio::select! {
340 supervisor_ret = supervisor_join_handle => {
341 info!("Flow supervisor exited, shutting down all publishers");
342 for handle in http_handles {
343 let shutdown_result = tokio::time::timeout(std::time::Duration::from_secs(1), handle.shutdown()).await;
344 if shutdown_result.is_err() {
345 warn!("Timeout shutting down flow http publisher {}", handle.name())
346 }
347 if let Ok(Err(err)) = shutdown_result {
348 warn!("Error in shutting down flow http publisher {}: {err}", handle.name())
349 }
350 }
351 for handle in kafka_avro_handles {
352 let _ = handle.shutdown().await;
353 }
354 for handle in kafka_json_handles {
355 let _ = handle.shutdown().await;
356 }
357 match supervisor_ret {
358 Ok(_) => Ok(()),
359 Err(err) => Err(anyhow::anyhow!(err)),
360 }
361 },
362 join_ret = join_set.next() => {
363 warn!("Flow publisher exited, shutting down flow collection and publishers");
364 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), supervisor_handle.shutdown()).await;
365 for handle in enrichment_handles {
366 let _ = handle.shutdown().await;
367 }
368 for handle in renormalization_handles {
369 let _ = handle.shutdown().await;
370 }
371 for handle in agg_handles {
372 let _ = handle.shutdown().await;
373 }
374 for handle in http_handles {
375 let shutdown_result = tokio::time::timeout(std::time::Duration::from_secs(1), handle.shutdown()).await;
376 if shutdown_result.is_err() {
377 warn!("Timeout shutting down flow http publisher {}", handle.name())
378 }
379 if let Ok(Err(err)) = shutdown_result {
380 warn!("Error in shutting down flow http publisher {}: {err}", handle.name())
381 }
382 }
383 for handle in kafka_avro_handles {
384 let _ = handle.shutdown().await;
385 }
386 for handle in kafka_json_handles {
387 let _ = handle.shutdown().await;
388 }
389 for handle in flow_options_input_handles {
390 let _ = handle.shutdown().await;
391 }
392 for handle in files_input_handles {
393 let _ = handle.shutdown().await;
394 }
395 for handle in kafka_input_handles {
396 let _ = handle.shutdown().await;
397 }
398 match join_ret {
399 None | Some(Ok(Ok(_))) => Ok(()),
400 Some(Err(err)) => Err(anyhow::anyhow!(err)),
401 Some(Ok(Err(err))) => Err(anyhow::anyhow!(err)),
402 }
403 }
404 };
405 ret
406}
407
408pub async fn init_udp_notif_collection(
409 udp_notif_config: UdpNotifConfig,
410 meter: opentelemetry::metrics::Meter,
411) -> anyhow::Result<()> {
412 let supervisor_config = udp_notif_config.supervisor_config();
413 let (supervisor_join_handle, supervisor_handle) =
414 UdpNotifSupervisorHandle::new(supervisor_config, meter.clone()).await;
415 let mut join_set = FuturesUnordered::new();
416 let mut validation_join_set = FuturesUnordered::new();
417 let mut validation_handles = Vec::new();
418 let mut enrichment_handles = Vec::new();
419 let mut files_input_handles = Vec::new();
420 let mut kafka_input_handles = Vec::new();
421 let mut kafka_json_handles = Vec::new();
422 let mut kafka_yang_handles = Vec::new();
423
424 let cache_location: PathBuf = udp_notif_config.cache_location.into();
426 let netconf_fetcher = netconf_fetcher(&udp_notif_config.netconf)?;
427 let (_schema_join, schema_handle) = CacheActorHandle::new(
428 10000,
429 either::Right(cache_location),
430 netconf_fetcher,
431 Duration::from_mins(5),
432 either::Left(meter.clone()),
433 )?;
434
435 for (group_name, publisher_config) in udp_notif_config.publishers {
436 info!("Starting publishers group '{group_name}'");
437 let (udp_notif_recv, _) = supervisor_handle
438 .subscribe(publisher_config.buffer_size)
439 .await?;
440 for (endpoint_name, endpoint) in publisher_config.endpoints {
441 info!("Creating publisher '{endpoint_name}'");
442 match &endpoint {
443 PublisherEndpoint::Http(_) => {
444 return Err(anyhow::anyhow!(
445 "HTTP publisher not supported for UDP Notif"
446 ));
447 }
448 PublisherEndpoint::KafkaJson(config) => {
449 let hdl = KafkaJsonPublisherActorHandle::from_config(
450 serialize_udp_notif,
451 config.clone(),
452 udp_notif_recv.clone(),
453 either::Left(meter.clone()),
454 );
455 match hdl {
456 Ok((kafka_join, kafka_handle)) => {
457 join_set.push(kafka_join);
458 kafka_json_handles.push(kafka_handle);
459 }
460 Err(err) => {
461 return Err(anyhow::anyhow!(
462 "Error creating KafkaJsonPublisherActorHandle: {err}"
463 ));
464 }
465 }
466 }
467 PublisherEndpoint::FlowKafkaJson(_) => {
468 return Err(anyhow::anyhow!(
469 "Flow Kafka JSON publisher not supported for UDP Notif"
470 ));
471 }
472 PublisherEndpoint::FlowKafkaAvro(_) => {
473 return Err(anyhow::anyhow!(
474 "Flow Kafka Avro publisher not supported for UDP Notif"
475 ));
476 }
477 PublisherEndpoint::TelemetryKafkaJson(config) => {
478 let (validated_tx, validated_rx) =
479 async_channel::bounded(publisher_config.buffer_size);
480 let (validation_join, validation_handle) = ValidationActorHandle::new(
481 publisher_config.buffer_size,
482 udp_notif_config.max_cached_packets_per_peer,
483 udp_notif_config.max_cached_packets_per_subscription,
484 udp_notif_recv.clone(),
485 validated_tx,
486 schema_handle.request_tx(),
487 either::Left(meter.clone()),
488 )?;
489 validation_join_set.push(validation_join);
490 validation_handles.push(validation_handle.clone());
491
492 let (enrichment_join, enrichment_handle) = YangPushEnrichmentActorHandle::new(
493 publisher_config.buffer_size,
494 validated_rx,
495 either::Left(meter.clone()),
496 config.writer_id.clone(),
497 );
498 join_set.push(enrichment_join);
499 enrichment_handles.push(enrichment_handle.clone());
500
501 let hdl = KafkaJsonPublisherActorHandle::from_config(
502 serialize_telemetry_json,
503 config.clone(),
504 enrichment_handle.subscribe(),
505 either::Left(meter.clone()),
506 );
507 match hdl {
508 Ok((kafka_join, kafka_handle)) => {
509 join_set.push(kafka_join);
510 kafka_json_handles.push(kafka_handle);
511 }
512 Err(err) => {
513 return Err(anyhow::anyhow!(
514 "Error creating KafkaJsonPublisherActorHandle: {err}"
515 ));
516 }
517 }
518
519 if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
520 if let Some(files_config) = enrichment_config
521 .inputs
522 .as_ref()
523 .and_then(|i| i.files.as_ref())
524 {
525 let (files_join, files_handle) = FilesActorHandle::from_config(
526 files_config.clone(),
527 enrichment_handles.clone(),
528 either::Left(meter.clone()),
529 );
530 join_set.push(files_join);
531 files_input_handles.push(files_handle);
532 }
533
534 if let Some(kafka_config) = enrichment_config
535 .inputs
536 .as_ref()
537 .and_then(|i| i.kafka.as_ref())
538 {
539 for consumer_config in &kafka_config.consumers {
540 let (join_handle, actor_handle) =
541 KafkaConsumerActorHandle::from_config(
542 consumer_config,
543 enrichment_handles.clone(),
544 either::Left(meter.clone()),
545 )?;
546 join_set.push(join_handle);
547 kafka_input_handles.push(actor_handle);
548 }
549 }
550 }
551 }
552 PublisherEndpoint::TelemetryKafkaYang(config) => {
553 let (validated_tx, validated_rx) =
554 async_channel::bounded(publisher_config.buffer_size);
555 let (validation_join, validation_handle) = ValidationActorHandle::new(
556 publisher_config.buffer_size,
557 udp_notif_config.max_cached_packets_per_peer,
558 udp_notif_config.max_cached_packets_per_subscription,
559 udp_notif_recv.clone(),
560 validated_tx,
561 schema_handle.request_tx(),
562 either::Left(meter.clone()),
563 )?;
564 validation_join_set.push(validation_join);
565 validation_handles.push(validation_handle.clone());
566
567 let (enrichment_join, enrichment_handle) = YangPushEnrichmentActorHandle::new(
568 publisher_config.buffer_size,
569 validated_rx,
570 either::Left(meter.clone()),
571 config.writer_id.clone(),
572 );
573 join_set.push(enrichment_join);
574 enrichment_handles.push(enrichment_handle.clone());
575
576 let hdl = KafkaYangPublisherActorHandle::from_config(
577 config.clone(),
578 publisher_config
579 .custom_yang_schemas
580 .clone()
581 .unwrap_or_default(),
582 enrichment_handle.subscribe(),
583 either::Left(meter.clone()),
584 schema_handle.request_tx(),
585 )
586 .await;
587
588 match hdl {
589 Ok((kafka_join, kafka_handle)) => {
590 join_set.push(kafka_join);
591 kafka_yang_handles.push(kafka_handle);
592 }
593 Err(err) => {
594 return Err(anyhow::anyhow!(
595 "Error creating KafkaYangPublisherActorHandle: {err}"
596 ));
597 }
598 }
599 if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
600 if let Some(files_config) = enrichment_config
601 .inputs
602 .as_ref()
603 .and_then(|i| i.files.as_ref())
604 {
605 let (files_join, files_handle) = FilesActorHandle::from_config(
606 files_config.clone(),
607 enrichment_handles.clone(),
608 either::Left(meter.clone()),
609 );
610 join_set.push(files_join);
611 files_input_handles.push(files_handle);
612 }
613
614 if let Some(kafka_config) = enrichment_config
615 .inputs
616 .as_ref()
617 .and_then(|i| i.kafka.as_ref())
618 {
619 for consumer_config in &kafka_config.consumers {
620 let (join_handle, actor_handle) =
621 KafkaConsumerActorHandle::from_config(
622 consumer_config,
623 enrichment_handles.clone(),
624 either::Left(meter.clone()),
625 )?;
626 join_set.push(join_handle);
627 kafka_input_handles.push(actor_handle);
628 }
629 }
630 }
631 }
632 }
633 }
634 }
635 let ret = tokio::select! {
636 supervisor_ret = supervisor_join_handle => {
637 info!("udp-notif supervisor exited, shutting down all publishers");
638 for handle in kafka_json_handles {
639 let _ = handle.shutdown().await;
640 }
641 for handle in kafka_yang_handles {
642 let _ = handle.shutdown().await;
643 }
644 match supervisor_ret {
645 Ok(_) => Ok(()),
646 Err(err) => Err(anyhow::anyhow!(err)),
647 }
648 },
649 join_ret = validation_join_set.next() => {
650 warn!("udp-notif http publisher exited, shutting down udp-notif collection and publishers");
651 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), supervisor_handle.shutdown()).await;
652 let _ = schema_handle.shutdown().await;
653 for handle in validation_handles {
654 let _ = handle.shutdown().await;
655 }
656 for handle in enrichment_handles {
657 let _ = handle.shutdown().await;
658 }
659 for handle in kafka_json_handles {
660 let _ = handle.shutdown().await;
661 }
662 for handle in kafka_yang_handles {
663 let _ = handle.shutdown().await;
664 }
665 for handle in files_input_handles {
666 let _ = handle.shutdown().await;
667 }
668 for handle in kafka_input_handles {
669 let _ = handle.shutdown().await;
670 }
671 match join_ret {
672 None | Some(Ok(Ok(_))) => Ok(()),
673 Some(Err(err)) => Err(anyhow::anyhow!(err)),
674 Some(Ok(Err(err))) => Err(anyhow::anyhow!(err)),
675 }
676 },
677 join_ret = join_set.next() => {
678 warn!("udp-notif http publisher exited, shutting down udp-notif collection and publishers");
679 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), supervisor_handle.shutdown()).await;
680 let _ = schema_handle.shutdown().await;
681 for handle in validation_handles {
682 let _ = handle.shutdown().await;
683 }
684 for handle in enrichment_handles {
685 let _ = handle.shutdown().await;
686 }
687 for handle in kafka_json_handles {
688 let _ = handle.shutdown().await;
689 }
690 for handle in kafka_yang_handles {
691 let _ = handle.shutdown().await;
692 }
693 for handle in files_input_handles {
694 let _ = handle.shutdown().await;
695 }
696 for handle in kafka_input_handles {
697 let _ = handle.shutdown().await;
698 }
699 match join_ret {
700 None | Some(Ok(Ok(_))) => Ok(()),
701 Some(Err(err)) => Err(anyhow::anyhow!(err)),
702 Some(Ok(Err(err))) => Err(anyhow::anyhow!(err)),
703 }
704 }
705 };
706 ret
707}
708
709#[derive(Debug, strum_macros::Display)]
710pub enum UdpNotifSerializationError {
711 SerializationError(serde_json::Error),
712 Utf8Error(Utf8Error),
713 CborError(ciborium::de::Error<std::io::Error>),
714 UnsupportedMediaType(MediaType),
715}
716
717impl std::error::Error for UdpNotifSerializationError {}
718
719impl From<serde_json::Error> for UdpNotifSerializationError {
720 fn from(err: serde_json::Error) -> Self {
721 UdpNotifSerializationError::SerializationError(err)
722 }
723}
724
725impl From<Utf8Error> for UdpNotifSerializationError {
726 fn from(err: Utf8Error) -> Self {
727 UdpNotifSerializationError::Utf8Error(err)
728 }
729}
730
731impl From<ciborium::de::Error<std::io::Error>> for UdpNotifSerializationError {
732 fn from(err: ciborium::de::Error<std::io::Error>) -> Self {
733 UdpNotifSerializationError::CborError(err)
734 }
735}
736
737fn serialize_udp_notif(
738 input: Arc<UdpNotifRequest>,
739 writer_id: String,
740) -> Result<(Option<serde_json::Value>, serde_json::Value), UdpNotifSerializationError> {
741 let (peer, msg) = input.as_ref();
742 let mut value = serde_json::to_value(msg)?;
743 if let serde_json::Value::Object(val) = &mut value {
744 val.insert(
746 "writer_id".to_string(),
747 serde_json::Value::String(writer_id.to_string()),
748 );
749 let payload = msg.payload();
751 match msg.media_type() {
752 MediaType::YangDataJson => {
753 let payload = serde_json::from_slice(&payload)?;
755 val.insert("payload".to_string(), payload);
756 }
757 MediaType::YangDataXml => {
758 let payload = std::str::from_utf8(&payload)?;
759 val.insert(
760 "payload".to_string(),
761 serde_json::Value::String(payload.to_string()),
762 );
763 }
764 MediaType::YangDataCbor => {
765 let payload: serde_json::Value =
766 ciborium::de::from_reader(std::io::Cursor::new(payload))?;
767 val.insert("payload".to_string(), payload);
768 }
769 media_type => {
770 return Err(UdpNotifSerializationError::UnsupportedMediaType(media_type));
771 }
772 }
773 }
774 Ok((
775 Some(serde_json::Value::String(peer.ip().to_string())),
776 value,
777 ))
778}
779
780fn serialize_telemetry_json(
781 input: (Option<ContentId>, SubscriptionInfo, TelemetryMessageWrapper),
782 _writer_id: String,
783) -> Result<(Option<serde_json::Value>, serde_json::Value), UdpNotifSerializationError> {
784 let tmw = input.2;
785 let ip = tmw.message().telemetry_message_metadata().export_address();
786 let value = serde_json::to_value(tmw)?;
787 let key = serde_json::Value::String(ip.to_string());
788 Ok((Some(key), value))
789}
790
791#[derive(Debug, strum_macros::Display)]
792pub enum FlowSerializationError {
793 SerializationError(serde_json::Error),
794 Utf8Error(Utf8Error),
795}
796
797impl std::error::Error for FlowSerializationError {}
798
799impl From<serde_json::Error> for FlowSerializationError {
800 fn from(err: serde_json::Error) -> Self {
801 FlowSerializationError::SerializationError(err)
802 }
803}
804
805fn serialize_flow_req(
806 input: Arc<FlowRequest>,
807 _writer_id: String,
808) -> Result<(Option<serde_json::Value>, serde_json::Value), FlowSerializationError> {
809 let (peer, msg) = input.as_ref();
810 let value = serde_json::to_value(msg)?;
811 let key = serde_json::Value::String(peer.ip().to_string());
812 Ok((Some(key), value))
813}
814
815fn serialize_flow(
816 input: (IpAddr, FlowInfo),
817 _writer_id: String,
818) -> Result<(Option<serde_json::Value>, serde_json::Value), FlowSerializationError> {
819 let (ip, msg) = input;
820 let value = serde_json::to_value(msg)?;
821 let key = serde_json::Value::String(ip.to_string());
822 Ok((Some(key), value))
823}
824
825fn netconf_fetcher(config: &NetconfConfig) -> Result<NetconfYangLibraryFetcher, std::io::Error> {
826 let user = &config.username;
827 let private_key_path: PathBuf = (&config.private_key_path).into();
828
829 let private_key_string = match std::fs::read_to_string(&private_key_path) {
830 Ok(key) => key,
831 Err(err) => {
832 return Err(std::io::Error::other(format!(
833 "Failed to read private key: {err}"
834 )));
835 }
836 };
837 let private_key =
838 russh::keys::decode_secret_key(&private_key_string, config.password.as_deref())
839 .map_err(|err| std::io::Error::other(format!("Failed to decode private key: {err}")))?;
840 let ssh_config = russh::client::Config {
841 inactivity_timeout: Some(Duration::from_secs(60)),
842 ..<_>::default()
843 };
844 let fetcher = NetconfYangLibraryFetcher::new(
845 user.to_string(),
846 Arc::new(private_key),
847 ssh_config,
848 config.port,
849 Duration::from_secs(100),
850 );
851 Ok(fetcher)
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use bytes::Bytes;
858 use netgauze_udp_notif_pkt::raw::UdpNotifPacket;
859 use std::collections::HashMap;
860 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
861
862 #[test]
863 fn test_serialize_udp_notif_unknown_media_type() {
864 let writer_id = String::from("writer_id");
865 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
866 let pkt = UdpNotifPacket::new(
867 MediaType::Unknown(0xee),
868 0x01000001,
869 0x02000002,
870 HashMap::new(),
871 Bytes::from(&[0xffu8, 0xffu8][..]),
872 );
873
874 let request = Arc::new((peer, pkt));
875 let serialized = serialize_udp_notif(request.clone(), writer_id.clone());
876 assert!(matches!(
877 serialized,
878 Err(UdpNotifSerializationError::UnsupportedMediaType(
879 MediaType::Unknown(0xee)
880 ))
881 ));
882 }
883
884 #[test]
885 fn test_serialize_udp_notif_json() {
886 let writer_id = String::from("writer_id");
887 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
888 let pkt = UdpNotifPacket::new(
889 MediaType::YangDataJson,
890 0x01000001,
891 0x02000002,
892 HashMap::new(),
893 Bytes::from(r#"{"id": 1}"#),
894 );
895
896 let pkt_invalid_json = UdpNotifPacket::new(
897 MediaType::YangDataJson,
898 0x01000001,
899 0x02000002,
900 HashMap::new(),
901 Bytes::from(r#"{"id""#),
902 );
903
904 let expected_value = serde_json::json!(
905 {
906 "media_type": "YangDataJson",
907 "message_id": 33554434,
908 "options": {},
909 "payload": {"id": 1},
910 "publisher_id": 16777217,
911 "writer_id": "writer_id"
912 }
913 );
914 let request_invalid = Arc::new((peer, pkt_invalid_json));
915 let request_good = Arc::new((peer, pkt));
916 let result_invalid = serialize_udp_notif(request_invalid, writer_id.clone());
917 let serialized =
918 serialize_udp_notif(request_good, writer_id.clone()).expect("failed to serialize json");
919
920 assert!(matches!(
921 result_invalid,
922 Err(UdpNotifSerializationError::SerializationError(_))
923 ));
924 assert_eq!(
925 serialized,
926 (
927 Some(serde_json::Value::String(peer.ip().to_string())),
928 expected_value
929 )
930 );
931 }
932
933 #[test]
934 fn test_serialize_udp_notif_xml() {
935 let writer_id = String::from("writer_id");
936 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
937 let pkt = UdpNotifPacket::new(
938 MediaType::YangDataXml,
939 0x01000001,
940 0x02000002,
941 HashMap::new(),
942 Bytes::from("<id>1</id>"),
943 );
944 let pkt_invalid_utf8 = UdpNotifPacket::new(
945 MediaType::YangDataXml,
946 0x01000001,
947 0x02000002,
948 HashMap::new(),
949 Bytes::from(vec![0x80]),
951 );
952
953 let expected_value = serde_json::json!(
954 {
955 "media_type": "YangDataXml",
956 "message_id": 33554434,
957 "options": {},
958 "payload": "<id>1</id>",
959 "publisher_id": 16777217,
960 "writer_id": "writer_id"
961 }
962 );
963
964 let request_invalid = Arc::new((peer, pkt_invalid_utf8));
965 let request_good = Arc::new((peer, pkt));
966 let result_invalid = serialize_udp_notif(request_invalid, writer_id.clone());
967 let serialized =
968 serialize_udp_notif(request_good, writer_id.clone()).expect("failed to serialize json");
969 assert!(matches!(
970 result_invalid,
971 Err(UdpNotifSerializationError::Utf8Error(_))
972 ));
973 assert_eq!(
974 serialized,
975 (
976 Some(serde_json::Value::String(peer.ip().to_string())),
977 expected_value
978 )
979 );
980 }
981
982 #[test]
983 fn test_serialize_udp_notif_cbor() {
984 let writer_id = String::from("writer_id");
985 let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
986 let mut cursor = std::io::Cursor::new(vec![]);
987 ciborium::ser::into_writer(&serde_json::json!({"id": 1}), &mut cursor)
988 .expect("failed to serialize cbor");
989 let payload = cursor.into_inner();
990 let pkt = UdpNotifPacket::new(
991 MediaType::YangDataCbor,
992 0x01000001,
993 0x02000002,
994 HashMap::new(),
995 Bytes::from(payload),
996 );
997 let pkt_invalid = UdpNotifPacket::new(
998 MediaType::YangDataCbor,
999 0x01000001,
1000 0x02000002,
1001 HashMap::new(),
1002 Bytes::from(vec![0x83, 0x01, 0x02]),
1004 );
1005
1006 let expected_value = serde_json::json!(
1007 {
1008 "media_type": "YangDataCbor",
1009 "message_id": 33554434,
1010 "options": {},
1011 "payload": {"id": 1},
1012 "publisher_id": 16777217,
1013 "writer_id": "writer_id"
1014 }
1015 );
1016
1017 let request_invalid = Arc::new((peer, pkt_invalid));
1018 let request_good = Arc::new((peer, pkt));
1019 let result_invalid = serialize_udp_notif(request_invalid, writer_id.clone());
1020 let serialized =
1021 serialize_udp_notif(request_good, writer_id.clone()).expect("failed to serialize json");
1022 assert!(matches!(
1023 result_invalid,
1024 Err(UdpNotifSerializationError::CborError(_))
1025 ));
1026 assert_eq!(
1027 serialized,
1028 (
1029 Some(serde_json::Value::String(peer.ip().to_string())),
1030 expected_value
1031 )
1032 );
1033 }
1034}