Skip to main content

netgauze_collector/
lib.rs

1// Copyright (C) 2024-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::config::{FlowConfig, NetconfConfig, PublisherEndpoint, UdpNotifConfig};
17use crate::flow::aggregation::AggregationActorHandle;
18use crate::flow::enrichment::FlowEnrichmentActorHandle;
19use crate::flow::renormalization::RenormalizationActorHandle;
20use crate::inputs::files::FilesActorHandle;
21use crate::inputs::flow_options::FlowOptionsActorHandle;
22use crate::inputs::kafka::KafkaConsumerActorHandle;
23use crate::publishers::http::{HttpPublisherActorHandle, Message};
24use crate::publishers::kafka_avro::KafkaAvroPublisherActorHandle;
25use crate::publishers::kafka_json::KafkaJsonPublisherActorHandle;
26use crate::publishers::kafka_yang::KafkaYangPublisherActorHandle;
27use crate::yang_push::enrichment::YangPushEnrichmentActorHandle;
28
29use futures_util::StreamExt;
30use futures_util::stream::FuturesUnordered;
31use netgauze_flow_pkt::FlowInfo;
32use netgauze_flow_service::FlowRequest;
33use netgauze_flow_service::flow_supervisor::FlowCollectorsSupervisorActorHandle;
34use netgauze_udp_notif_pkt::raw::MediaType;
35use netgauze_udp_notif_service::UdpNotifRequest;
36use netgauze_udp_notif_service::supervisor::UdpNotifSupervisorHandle;
37use netgauze_yang_push::ContentId;
38use netgauze_yang_push::cache::actor::CacheActorHandle;
39use netgauze_yang_push::cache::fetcher::NetconfYangLibraryFetcher;
40use netgauze_yang_push::cache::storage::SubscriptionInfo;
41use netgauze_yang_push::model::telemetry::TelemetryMessageWrapper;
42use netgauze_yang_push::validation::ValidationActorHandle;
43use std::net::IpAddr;
44use std::path::PathBuf;
45use std::str::Utf8Error;
46use std::sync::Arc;
47use std::time::Duration;
48use tracing::{info, warn};
49
50pub mod config;
51pub mod flow;
52pub mod inputs;
53pub mod publishers;
54pub mod yang_push;
55
56pub async fn init_flow_collection(
57    flow_config: FlowConfig,
58    meter: opentelemetry::metrics::Meter,
59) -> anyhow::Result<()> {
60    let supervisor_config = flow_config.supervisor_config();
61
62    let (supervisor_join_handle, supervisor_handle) =
63        FlowCollectorsSupervisorActorHandle::new(supervisor_config, meter.clone()).await?;
64    let mut http_handles = Vec::new();
65    let mut enrichment_handles = Vec::new();
66    let mut renormalization_handles = Vec::new();
67    let mut agg_handles = Vec::new();
68    let mut flow_options_input_handles = Vec::new();
69    let mut files_input_handles = Vec::new();
70    let mut kafka_input_handles = Vec::new();
71    let mut kafka_avro_handles = Vec::new();
72    let mut kafka_json_handles = Vec::new();
73    let mut join_set = FuturesUnordered::new();
74
75    for (group_name, publisher_config) in flow_config.publishers {
76        info!("Starting publishers group '{group_name}'");
77
78        let mut flow_recvs = Vec::new();
79        if publisher_config.shards > 1 {
80            (flow_recvs, _) = supervisor_handle
81                .subscribe_shards(publisher_config.shards, publisher_config.buffer_size)
82                .await?;
83        } else {
84            let (flow_recv, _) = supervisor_handle
85                .subscribe(publisher_config.buffer_size)
86                .await?;
87            flow_recvs.push(flow_recv);
88        }
89
90        for (endpoint_name, endpoint) in publisher_config.endpoints {
91            info!("Creating publisher '{endpoint_name}'");
92
93            match &endpoint {
94                PublisherEndpoint::Http(config) => {
95                    let converter = |request: Arc<FlowRequest>, writer_id: String| {
96                        let ret = Message::insert {
97                            ts: format!("{}", chrono::Utc::now().format("%Y-%m-%d %H:%M:%S")),
98                            peer_src: format!("{}", request.0.ip()),
99                            writer_id,
100                            payload: request.1.clone(),
101                        };
102                        vec![ret]
103                    };
104                    for flow_recv in &flow_recvs {
105                        let (http_join, http_handle) = HttpPublisherActorHandle::new(
106                            endpoint_name.clone(),
107                            config.clone(),
108                            converter,
109                            flow_recv.clone(),
110                            meter.clone(),
111                        )?;
112                        join_set.push(http_join);
113                        http_handles.push(http_handle);
114                    }
115                }
116                PublisherEndpoint::FlowKafkaAvro(config) => {
117                    for (shard_id, flow_recv) in flow_recvs.iter().enumerate() {
118                        let (enrichment_join, enrichment_handle) = FlowEnrichmentActorHandle::new(
119                            publisher_config.buffer_size,
120                            flow_recv.clone(),
121                            either::Left(meter.clone()),
122                            shard_id,
123                            config.writer_id.clone(),
124                        );
125
126                        if let Some(aggregation_config) = publisher_config.aggregation.as_ref() {
127                            let (renormalization_join, renormalization_handle) =
128                                RenormalizationActorHandle::new(
129                                    publisher_config.buffer_size,
130                                    enrichment_handle.subscribe(),
131                                    either::Left(meter.clone()),
132                                    shard_id,
133                                );
134
135                            let (agg_join, agg_handle) = AggregationActorHandle::new(
136                                publisher_config.buffer_size,
137                                aggregation_config.clone(),
138                                renormalization_handle.subscribe(),
139                                either::Left(meter.clone()),
140                                shard_id,
141                            );
142
143                            let (kafka_join, kafka_handle) =
144                                KafkaAvroPublisherActorHandle::from_config(
145                                    config.clone(),
146                                    agg_handle.subscribe(),
147                                    either::Left(meter.clone()),
148                                )
149                                .await?;
150
151                            join_set.push(enrichment_join);
152                            join_set.push(renormalization_join);
153                            join_set.push(agg_join);
154                            join_set.push(kafka_join);
155                            enrichment_handles.push(enrichment_handle);
156                            renormalization_handles.push(renormalization_handle);
157                            agg_handles.push(agg_handle);
158                            kafka_avro_handles.push(kafka_handle);
159                        }
160                    }
161
162                    let (flow_recv, _) = supervisor_handle
163                        .subscribe(publisher_config.buffer_size)
164                        .await?;
165
166                    if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
167                        if let Some(flow_options_config) = enrichment_config
168                            .inputs
169                            .as_ref()
170                            .and_then(|i| i.flow_options.as_ref())
171                        {
172                            let (flow_options_join, flow_options_handle) =
173                                FlowOptionsActorHandle::from_config(
174                                    flow_options_config,
175                                    flow_recv.clone(),
176                                    enrichment_handles.clone(),
177                                    either::Left(meter.clone()),
178                                );
179                            join_set.push(flow_options_join);
180                            flow_options_input_handles.push(flow_options_handle);
181                        }
182
183                        if let Some(files_config) = enrichment_config
184                            .inputs
185                            .as_ref()
186                            .and_then(|i| i.files.as_ref())
187                        {
188                            let (files_join, files_handle) = FilesActorHandle::from_config(
189                                files_config.clone(),
190                                enrichment_handles.clone(),
191                                either::Left(meter.clone()),
192                            );
193                            join_set.push(files_join);
194                            files_input_handles.push(files_handle);
195                        }
196
197                        if let Some(kafka_config) = enrichment_config
198                            .inputs
199                            .as_ref()
200                            .and_then(|i| i.kafka.as_ref())
201                        {
202                            for consumer_config in &kafka_config.consumers {
203                                let (join_handle, actor_handle) =
204                                    KafkaConsumerActorHandle::from_config(
205                                        consumer_config,
206                                        enrichment_handles.clone(),
207                                        either::Left(meter.clone()),
208                                    )?;
209                                join_set.push(join_handle);
210                                kafka_input_handles.push(actor_handle);
211                            }
212                        }
213                    }
214                }
215                PublisherEndpoint::FlowKafkaJson(config) => {
216                    for (shard_id, flow_recv) in flow_recvs.iter().enumerate() {
217                        let (enrichment_join, enrichment_handle) = FlowEnrichmentActorHandle::new(
218                            publisher_config.buffer_size,
219                            flow_recv.clone(),
220                            either::Left(meter.clone()),
221                            shard_id,
222                            config.writer_id.clone(),
223                        );
224
225                        if let Some(aggregation_config) = publisher_config.aggregation.as_ref() {
226                            let (renormalization_join, renormalization_handle) =
227                                RenormalizationActorHandle::new(
228                                    publisher_config.buffer_size,
229                                    enrichment_handle.subscribe(),
230                                    either::Left(meter.clone()),
231                                    shard_id,
232                                );
233
234                            let (agg_join, agg_handle) = AggregationActorHandle::new(
235                                publisher_config.buffer_size,
236                                aggregation_config.clone(),
237                                renormalization_handle.subscribe(),
238                                either::Left(meter.clone()),
239                                shard_id,
240                            );
241
242                            let (kafka_join, kafka_handle) =
243                                KafkaJsonPublisherActorHandle::from_config(
244                                    serialize_flow,
245                                    config.clone(),
246                                    agg_handle.subscribe(),
247                                    either::Left(meter.clone()),
248                                )?;
249
250                            join_set.push(enrichment_join);
251                            join_set.push(renormalization_join);
252                            join_set.push(agg_join);
253                            join_set.push(kafka_join);
254                            enrichment_handles.push(enrichment_handle);
255                            renormalization_handles.push(renormalization_handle);
256                            agg_handles.push(agg_handle);
257                            kafka_json_handles.push(kafka_handle);
258                        }
259                    }
260
261                    let (flow_recv, _) = supervisor_handle
262                        .subscribe(publisher_config.buffer_size)
263                        .await?;
264
265                    if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
266                        if let Some(flow_options_config) = enrichment_config
267                            .inputs
268                            .as_ref()
269                            .and_then(|i| i.flow_options.as_ref())
270                        {
271                            let (flow_options_join, flow_options_handle) =
272                                FlowOptionsActorHandle::from_config(
273                                    flow_options_config,
274                                    flow_recv.clone(),
275                                    enrichment_handles.clone(),
276                                    either::Left(meter.clone()),
277                                );
278                            join_set.push(flow_options_join);
279                            flow_options_input_handles.push(flow_options_handle);
280                        }
281
282                        if let Some(files_config) = enrichment_config
283                            .inputs
284                            .as_ref()
285                            .and_then(|i| i.files.as_ref())
286                        {
287                            let (files_join, files_handle) = FilesActorHandle::from_config(
288                                files_config.clone(),
289                                enrichment_handles.clone(),
290                                either::Left(meter.clone()),
291                            );
292                            join_set.push(files_join);
293                            files_input_handles.push(files_handle);
294                        }
295
296                        if let Some(kafka_config) = enrichment_config
297                            .inputs
298                            .as_ref()
299                            .and_then(|i| i.kafka.as_ref())
300                        {
301                            for consumer_config in &kafka_config.consumers {
302                                let (join_handle, actor_handle) =
303                                    KafkaConsumerActorHandle::from_config(
304                                        consumer_config,
305                                        enrichment_handles.clone(),
306                                        either::Left(meter.clone()),
307                                    )?;
308                                join_set.push(join_handle);
309                                kafka_input_handles.push(actor_handle);
310                            }
311                        }
312                    }
313                }
314                PublisherEndpoint::KafkaJson(config) => {
315                    for flow_recv in &flow_recvs {
316                        let (join_handle, handle) = KafkaJsonPublisherActorHandle::from_config(
317                            serialize_flow_req,
318                            config.clone(),
319                            flow_recv.clone(),
320                            either::Left(meter.clone()),
321                        )?;
322                        join_set.push(join_handle);
323                        kafka_json_handles.push(handle);
324                    }
325                }
326                PublisherEndpoint::TelemetryKafkaJson(_) => {
327                    return Err(anyhow::anyhow!(
328                        "Telemetry KafkaJson publisher not yet supported for Flow"
329                    ));
330                }
331                PublisherEndpoint::TelemetryKafkaYang(_) => {
332                    return Err(anyhow::anyhow!(
333                        "Telemetry KafkaYang publisher not supported for Flow"
334                    ));
335                }
336            }
337        }
338    }
339    let ret = tokio::select! {
340        supervisor_ret = supervisor_join_handle => {
341            info!("Flow supervisor exited, shutting down all publishers");
342            for handle in http_handles {
343                let shutdown_result = tokio::time::timeout(std::time::Duration::from_secs(1), handle.shutdown()).await;
344                if shutdown_result.is_err() {
345                    warn!("Timeout shutting down flow http publisher {}", handle.name())
346                }
347                if let Ok(Err(err)) = shutdown_result {
348                    warn!("Error in shutting down flow http publisher {}: {err}", handle.name())
349                }
350            }
351            for handle in kafka_avro_handles {
352                let _ = handle.shutdown().await;
353            }
354            for handle in kafka_json_handles {
355                let _ = handle.shutdown().await;
356            }
357            match supervisor_ret {
358                Ok(_) => Ok(()),
359                Err(err) => Err(anyhow::anyhow!(err)),
360            }
361        },
362        join_ret = join_set.next() => {
363            warn!("Flow publisher exited, shutting down flow collection and publishers");
364            let _ = tokio::time::timeout(std::time::Duration::from_secs(1), supervisor_handle.shutdown()).await;
365            for handle in enrichment_handles {
366                let _ = handle.shutdown().await;
367            }
368            for handle in renormalization_handles {
369                let _ = handle.shutdown().await;
370            }
371            for handle in agg_handles {
372                let _ = handle.shutdown().await;
373            }
374            for handle in http_handles {
375                let shutdown_result = tokio::time::timeout(std::time::Duration::from_secs(1), handle.shutdown()).await;
376                if shutdown_result.is_err() {
377                    warn!("Timeout shutting down flow http publisher {}", handle.name())
378                }
379                if let Ok(Err(err)) = shutdown_result {
380                    warn!("Error in shutting down flow http publisher {}: {err}", handle.name())
381                }
382            }
383            for handle in kafka_avro_handles {
384                let _ = handle.shutdown().await;
385            }
386            for handle in kafka_json_handles {
387                let _ = handle.shutdown().await;
388            }
389            for handle in flow_options_input_handles {
390                let _ = handle.shutdown().await;
391            }
392            for handle in files_input_handles {
393                let _ = handle.shutdown().await;
394            }
395            for handle in kafka_input_handles {
396                let _ = handle.shutdown().await;
397            }
398            match join_ret {
399                None | Some(Ok(Ok(_))) => Ok(()),
400                Some(Err(err)) => Err(anyhow::anyhow!(err)),
401                Some(Ok(Err(err))) => Err(anyhow::anyhow!(err)),
402            }
403        }
404    };
405    ret
406}
407
408pub async fn init_udp_notif_collection(
409    udp_notif_config: UdpNotifConfig,
410    meter: opentelemetry::metrics::Meter,
411) -> anyhow::Result<()> {
412    let supervisor_config = udp_notif_config.supervisor_config();
413    let (supervisor_join_handle, supervisor_handle) =
414        UdpNotifSupervisorHandle::new(supervisor_config, meter.clone()).await;
415    let mut join_set = FuturesUnordered::new();
416    let mut validation_join_set = FuturesUnordered::new();
417    let mut validation_handles = Vec::new();
418    let mut enrichment_handles = Vec::new();
419    let mut files_input_handles = Vec::new();
420    let mut kafka_input_handles = Vec::new();
421    let mut kafka_json_handles = Vec::new();
422    let mut kafka_yang_handles = Vec::new();
423
424    // Only one schema cache is needed for all publishers
425    let cache_location: PathBuf = udp_notif_config.cache_location.into();
426    let netconf_fetcher = netconf_fetcher(&udp_notif_config.netconf)?;
427    let (_schema_join, schema_handle) = CacheActorHandle::new(
428        10000,
429        either::Right(cache_location),
430        netconf_fetcher,
431        Duration::from_mins(5),
432        either::Left(meter.clone()),
433    )?;
434
435    for (group_name, publisher_config) in udp_notif_config.publishers {
436        info!("Starting publishers group '{group_name}'");
437        let (udp_notif_recv, _) = supervisor_handle
438            .subscribe(publisher_config.buffer_size)
439            .await?;
440        for (endpoint_name, endpoint) in publisher_config.endpoints {
441            info!("Creating publisher '{endpoint_name}'");
442            match &endpoint {
443                PublisherEndpoint::Http(_) => {
444                    return Err(anyhow::anyhow!(
445                        "HTTP publisher not supported for UDP Notif"
446                    ));
447                }
448                PublisherEndpoint::KafkaJson(config) => {
449                    let hdl = KafkaJsonPublisherActorHandle::from_config(
450                        serialize_udp_notif,
451                        config.clone(),
452                        udp_notif_recv.clone(),
453                        either::Left(meter.clone()),
454                    );
455                    match hdl {
456                        Ok((kafka_join, kafka_handle)) => {
457                            join_set.push(kafka_join);
458                            kafka_json_handles.push(kafka_handle);
459                        }
460                        Err(err) => {
461                            return Err(anyhow::anyhow!(
462                                "Error creating KafkaJsonPublisherActorHandle: {err}"
463                            ));
464                        }
465                    }
466                }
467                PublisherEndpoint::FlowKafkaJson(_) => {
468                    return Err(anyhow::anyhow!(
469                        "Flow Kafka JSON publisher not supported for UDP Notif"
470                    ));
471                }
472                PublisherEndpoint::FlowKafkaAvro(_) => {
473                    return Err(anyhow::anyhow!(
474                        "Flow Kafka Avro publisher not supported for UDP Notif"
475                    ));
476                }
477                PublisherEndpoint::TelemetryKafkaJson(config) => {
478                    let (validated_tx, validated_rx) =
479                        async_channel::bounded(publisher_config.buffer_size);
480                    let (validation_join, validation_handle) = ValidationActorHandle::new(
481                        publisher_config.buffer_size,
482                        udp_notif_config.max_cached_packets_per_peer,
483                        udp_notif_config.max_cached_packets_per_subscription,
484                        udp_notif_recv.clone(),
485                        validated_tx,
486                        schema_handle.request_tx(),
487                        either::Left(meter.clone()),
488                    )?;
489                    validation_join_set.push(validation_join);
490                    validation_handles.push(validation_handle.clone());
491
492                    let (enrichment_join, enrichment_handle) = YangPushEnrichmentActorHandle::new(
493                        publisher_config.buffer_size,
494                        validated_rx,
495                        either::Left(meter.clone()),
496                        config.writer_id.clone(),
497                    );
498                    join_set.push(enrichment_join);
499                    enrichment_handles.push(enrichment_handle.clone());
500
501                    let hdl = KafkaJsonPublisherActorHandle::from_config(
502                        serialize_telemetry_json,
503                        config.clone(),
504                        enrichment_handle.subscribe(),
505                        either::Left(meter.clone()),
506                    );
507                    match hdl {
508                        Ok((kafka_join, kafka_handle)) => {
509                            join_set.push(kafka_join);
510                            kafka_json_handles.push(kafka_handle);
511                        }
512                        Err(err) => {
513                            return Err(anyhow::anyhow!(
514                                "Error creating KafkaJsonPublisherActorHandle: {err}"
515                            ));
516                        }
517                    }
518
519                    if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
520                        if let Some(files_config) = enrichment_config
521                            .inputs
522                            .as_ref()
523                            .and_then(|i| i.files.as_ref())
524                        {
525                            let (files_join, files_handle) = FilesActorHandle::from_config(
526                                files_config.clone(),
527                                enrichment_handles.clone(),
528                                either::Left(meter.clone()),
529                            );
530                            join_set.push(files_join);
531                            files_input_handles.push(files_handle);
532                        }
533
534                        if let Some(kafka_config) = enrichment_config
535                            .inputs
536                            .as_ref()
537                            .and_then(|i| i.kafka.as_ref())
538                        {
539                            for consumer_config in &kafka_config.consumers {
540                                let (join_handle, actor_handle) =
541                                    KafkaConsumerActorHandle::from_config(
542                                        consumer_config,
543                                        enrichment_handles.clone(),
544                                        either::Left(meter.clone()),
545                                    )?;
546                                join_set.push(join_handle);
547                                kafka_input_handles.push(actor_handle);
548                            }
549                        }
550                    }
551                }
552                PublisherEndpoint::TelemetryKafkaYang(config) => {
553                    let (validated_tx, validated_rx) =
554                        async_channel::bounded(publisher_config.buffer_size);
555                    let (validation_join, validation_handle) = ValidationActorHandle::new(
556                        publisher_config.buffer_size,
557                        udp_notif_config.max_cached_packets_per_peer,
558                        udp_notif_config.max_cached_packets_per_subscription,
559                        udp_notif_recv.clone(),
560                        validated_tx,
561                        schema_handle.request_tx(),
562                        either::Left(meter.clone()),
563                    )?;
564                    validation_join_set.push(validation_join);
565                    validation_handles.push(validation_handle.clone());
566
567                    let (enrichment_join, enrichment_handle) = YangPushEnrichmentActorHandle::new(
568                        publisher_config.buffer_size,
569                        validated_rx,
570                        either::Left(meter.clone()),
571                        config.writer_id.clone(),
572                    );
573                    join_set.push(enrichment_join);
574                    enrichment_handles.push(enrichment_handle.clone());
575
576                    let hdl = KafkaYangPublisherActorHandle::from_config(
577                        config.clone(),
578                        publisher_config
579                            .custom_yang_schemas
580                            .clone()
581                            .unwrap_or_default(),
582                        enrichment_handle.subscribe(),
583                        either::Left(meter.clone()),
584                        schema_handle.request_tx(),
585                    )
586                    .await;
587
588                    match hdl {
589                        Ok((kafka_join, kafka_handle)) => {
590                            join_set.push(kafka_join);
591                            kafka_yang_handles.push(kafka_handle);
592                        }
593                        Err(err) => {
594                            return Err(anyhow::anyhow!(
595                                "Error creating KafkaYangPublisherActorHandle: {err}"
596                            ));
597                        }
598                    }
599                    if let Some(enrichment_config) = publisher_config.enrichment.as_ref() {
600                        if let Some(files_config) = enrichment_config
601                            .inputs
602                            .as_ref()
603                            .and_then(|i| i.files.as_ref())
604                        {
605                            let (files_join, files_handle) = FilesActorHandle::from_config(
606                                files_config.clone(),
607                                enrichment_handles.clone(),
608                                either::Left(meter.clone()),
609                            );
610                            join_set.push(files_join);
611                            files_input_handles.push(files_handle);
612                        }
613
614                        if let Some(kafka_config) = enrichment_config
615                            .inputs
616                            .as_ref()
617                            .and_then(|i| i.kafka.as_ref())
618                        {
619                            for consumer_config in &kafka_config.consumers {
620                                let (join_handle, actor_handle) =
621                                    KafkaConsumerActorHandle::from_config(
622                                        consumer_config,
623                                        enrichment_handles.clone(),
624                                        either::Left(meter.clone()),
625                                    )?;
626                                join_set.push(join_handle);
627                                kafka_input_handles.push(actor_handle);
628                            }
629                        }
630                    }
631                }
632            }
633        }
634    }
635    let ret = tokio::select! {
636        supervisor_ret = supervisor_join_handle => {
637            info!("udp-notif supervisor exited, shutting down all publishers");
638            for handle in kafka_json_handles {
639                let _ = handle.shutdown().await;
640            }
641            for handle in kafka_yang_handles {
642                let _ = handle.shutdown().await;
643            }
644            match supervisor_ret {
645                Ok(_) => Ok(()),
646                Err(err) => Err(anyhow::anyhow!(err)),
647            }
648        },
649        join_ret = validation_join_set.next() => {
650            warn!("udp-notif http publisher exited, shutting down udp-notif collection and publishers");
651            let _ = tokio::time::timeout(std::time::Duration::from_secs(1), supervisor_handle.shutdown()).await;
652            let _ = schema_handle.shutdown().await;
653            for handle in validation_handles {
654                let _ = handle.shutdown().await;
655            }
656            for handle in enrichment_handles {
657                let _ = handle.shutdown().await;
658            }
659            for handle in kafka_json_handles {
660                let _ = handle.shutdown().await;
661            }
662            for handle in kafka_yang_handles {
663                let _ = handle.shutdown().await;
664            }
665            for handle in files_input_handles {
666                let _ = handle.shutdown().await;
667            }
668            for handle in kafka_input_handles {
669                let _ = handle.shutdown().await;
670            }
671            match join_ret {
672                None | Some(Ok(Ok(_))) => Ok(()),
673                Some(Err(err)) => Err(anyhow::anyhow!(err)),
674                Some(Ok(Err(err))) => Err(anyhow::anyhow!(err)),
675            }
676        },
677        join_ret = join_set.next() => {
678            warn!("udp-notif http publisher exited, shutting down udp-notif collection and publishers");
679            let _ = tokio::time::timeout(std::time::Duration::from_secs(1), supervisor_handle.shutdown()).await;
680            let _ = schema_handle.shutdown().await;
681            for handle in validation_handles {
682                let _ = handle.shutdown().await;
683            }
684            for handle in enrichment_handles {
685                let _ = handle.shutdown().await;
686            }
687            for handle in kafka_json_handles {
688                let _ = handle.shutdown().await;
689            }
690            for handle in kafka_yang_handles {
691                let _ = handle.shutdown().await;
692            }
693            for handle in files_input_handles {
694                let _ = handle.shutdown().await;
695            }
696            for handle in kafka_input_handles {
697                let _ = handle.shutdown().await;
698            }
699            match join_ret {
700                None | Some(Ok(Ok(_))) => Ok(()),
701                Some(Err(err)) => Err(anyhow::anyhow!(err)),
702                Some(Ok(Err(err))) => Err(anyhow::anyhow!(err)),
703            }
704        }
705    };
706    ret
707}
708
709#[derive(Debug, strum_macros::Display)]
710pub enum UdpNotifSerializationError {
711    SerializationError(serde_json::Error),
712    Utf8Error(Utf8Error),
713    CborError(ciborium::de::Error<std::io::Error>),
714    UnsupportedMediaType(MediaType),
715}
716
717impl std::error::Error for UdpNotifSerializationError {}
718
719impl From<serde_json::Error> for UdpNotifSerializationError {
720    fn from(err: serde_json::Error) -> Self {
721        UdpNotifSerializationError::SerializationError(err)
722    }
723}
724
725impl From<Utf8Error> for UdpNotifSerializationError {
726    fn from(err: Utf8Error) -> Self {
727        UdpNotifSerializationError::Utf8Error(err)
728    }
729}
730
731impl From<ciborium::de::Error<std::io::Error>> for UdpNotifSerializationError {
732    fn from(err: ciborium::de::Error<std::io::Error>) -> Self {
733        UdpNotifSerializationError::CborError(err)
734    }
735}
736
737fn serialize_udp_notif(
738    input: Arc<UdpNotifRequest>,
739    writer_id: String,
740) -> Result<(Option<serde_json::Value>, serde_json::Value), UdpNotifSerializationError> {
741    let (peer, msg) = input.as_ref();
742    let mut value = serde_json::to_value(msg)?;
743    if let serde_json::Value::Object(val) = &mut value {
744        // Add the writer ID to the message
745        val.insert(
746            "writer_id".to_string(),
747            serde_json::Value::String(writer_id.to_string()),
748        );
749        // Convert the inner payload into human-readable format when possible
750        let payload = msg.payload();
751        match msg.media_type() {
752            MediaType::YangDataJson => {
753                // Deserialize the payload into a JSON object
754                let payload = serde_json::from_slice(&payload)?;
755                val.insert("payload".to_string(), payload);
756            }
757            MediaType::YangDataXml => {
758                let payload = std::str::from_utf8(&payload)?;
759                val.insert(
760                    "payload".to_string(),
761                    serde_json::Value::String(payload.to_string()),
762                );
763            }
764            MediaType::YangDataCbor => {
765                let payload: serde_json::Value =
766                    ciborium::de::from_reader(std::io::Cursor::new(payload))?;
767                val.insert("payload".to_string(), payload);
768            }
769            media_type => {
770                return Err(UdpNotifSerializationError::UnsupportedMediaType(media_type));
771            }
772        }
773    }
774    Ok((
775        Some(serde_json::Value::String(peer.ip().to_string())),
776        value,
777    ))
778}
779
780fn serialize_telemetry_json(
781    input: (Option<ContentId>, SubscriptionInfo, TelemetryMessageWrapper),
782    _writer_id: String,
783) -> Result<(Option<serde_json::Value>, serde_json::Value), UdpNotifSerializationError> {
784    let tmw = input.2;
785    let ip = tmw.message().telemetry_message_metadata().export_address();
786    let value = serde_json::to_value(tmw)?;
787    let key = serde_json::Value::String(ip.to_string());
788    Ok((Some(key), value))
789}
790
791#[derive(Debug, strum_macros::Display)]
792pub enum FlowSerializationError {
793    SerializationError(serde_json::Error),
794    Utf8Error(Utf8Error),
795}
796
797impl std::error::Error for FlowSerializationError {}
798
799impl From<serde_json::Error> for FlowSerializationError {
800    fn from(err: serde_json::Error) -> Self {
801        FlowSerializationError::SerializationError(err)
802    }
803}
804
805fn serialize_flow_req(
806    input: Arc<FlowRequest>,
807    _writer_id: String,
808) -> Result<(Option<serde_json::Value>, serde_json::Value), FlowSerializationError> {
809    let (peer, msg) = input.as_ref();
810    let value = serde_json::to_value(msg)?;
811    let key = serde_json::Value::String(peer.ip().to_string());
812    Ok((Some(key), value))
813}
814
815fn serialize_flow(
816    input: (IpAddr, FlowInfo),
817    _writer_id: String,
818) -> Result<(Option<serde_json::Value>, serde_json::Value), FlowSerializationError> {
819    let (ip, msg) = input;
820    let value = serde_json::to_value(msg)?;
821    let key = serde_json::Value::String(ip.to_string());
822    Ok((Some(key), value))
823}
824
825fn netconf_fetcher(config: &NetconfConfig) -> Result<NetconfYangLibraryFetcher, std::io::Error> {
826    let user = &config.username;
827    let private_key_path: PathBuf = (&config.private_key_path).into();
828
829    let private_key_string = match std::fs::read_to_string(&private_key_path) {
830        Ok(key) => key,
831        Err(err) => {
832            return Err(std::io::Error::other(format!(
833                "Failed to read private key: {err}"
834            )));
835        }
836    };
837    let private_key =
838        russh::keys::decode_secret_key(&private_key_string, config.password.as_deref())
839            .map_err(|err| std::io::Error::other(format!("Failed to decode private key: {err}")))?;
840    let ssh_config = russh::client::Config {
841        inactivity_timeout: Some(Duration::from_secs(60)),
842        ..<_>::default()
843    };
844    let fetcher = NetconfYangLibraryFetcher::new(
845        user.to_string(),
846        Arc::new(private_key),
847        ssh_config,
848        config.port,
849        Duration::from_secs(100),
850    );
851    Ok(fetcher)
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use bytes::Bytes;
858    use netgauze_udp_notif_pkt::raw::UdpNotifPacket;
859    use std::collections::HashMap;
860    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
861
862    #[test]
863    fn test_serialize_udp_notif_unknown_media_type() {
864        let writer_id = String::from("writer_id");
865        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
866        let pkt = UdpNotifPacket::new(
867            MediaType::Unknown(0xee),
868            0x01000001,
869            0x02000002,
870            HashMap::new(),
871            Bytes::from(&[0xffu8, 0xffu8][..]),
872        );
873
874        let request = Arc::new((peer, pkt));
875        let serialized = serialize_udp_notif(request.clone(), writer_id.clone());
876        assert!(matches!(
877            serialized,
878            Err(UdpNotifSerializationError::UnsupportedMediaType(
879                MediaType::Unknown(0xee)
880            ))
881        ));
882    }
883
884    #[test]
885    fn test_serialize_udp_notif_json() {
886        let writer_id = String::from("writer_id");
887        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
888        let pkt = UdpNotifPacket::new(
889            MediaType::YangDataJson,
890            0x01000001,
891            0x02000002,
892            HashMap::new(),
893            Bytes::from(r#"{"id": 1}"#),
894        );
895
896        let pkt_invalid_json = UdpNotifPacket::new(
897            MediaType::YangDataJson,
898            0x01000001,
899            0x02000002,
900            HashMap::new(),
901            Bytes::from(r#"{"id""#),
902        );
903
904        let expected_value = serde_json::json!(
905            {
906                "media_type": "YangDataJson",
907                "message_id": 33554434,
908                "options": {},
909                "payload": {"id": 1},
910                "publisher_id": 16777217,
911                "writer_id": "writer_id"
912            }
913        );
914        let request_invalid = Arc::new((peer, pkt_invalid_json));
915        let request_good = Arc::new((peer, pkt));
916        let result_invalid = serialize_udp_notif(request_invalid, writer_id.clone());
917        let serialized =
918            serialize_udp_notif(request_good, writer_id.clone()).expect("failed to serialize json");
919
920        assert!(matches!(
921            result_invalid,
922            Err(UdpNotifSerializationError::SerializationError(_))
923        ));
924        assert_eq!(
925            serialized,
926            (
927                Some(serde_json::Value::String(peer.ip().to_string())),
928                expected_value
929            )
930        );
931    }
932
933    #[test]
934    fn test_serialize_udp_notif_xml() {
935        let writer_id = String::from("writer_id");
936        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
937        let pkt = UdpNotifPacket::new(
938            MediaType::YangDataXml,
939            0x01000001,
940            0x02000002,
941            HashMap::new(),
942            Bytes::from("<id>1</id>"),
943        );
944        let pkt_invalid_utf8 = UdpNotifPacket::new(
945            MediaType::YangDataXml,
946            0x01000001,
947            0x02000002,
948            HashMap::new(),
949            // A UTF-8 continuation byte (10xxxxxx) without a leading byte
950            Bytes::from(vec![0x80]),
951        );
952
953        let expected_value = serde_json::json!(
954            {
955                "media_type": "YangDataXml",
956                "message_id": 33554434,
957                "options": {},
958                "payload": "<id>1</id>",
959                "publisher_id": 16777217,
960                "writer_id": "writer_id"
961            }
962        );
963
964        let request_invalid = Arc::new((peer, pkt_invalid_utf8));
965        let request_good = Arc::new((peer, pkt));
966        let result_invalid = serialize_udp_notif(request_invalid, writer_id.clone());
967        let serialized =
968            serialize_udp_notif(request_good, writer_id.clone()).expect("failed to serialize json");
969        assert!(matches!(
970            result_invalid,
971            Err(UdpNotifSerializationError::Utf8Error(_))
972        ));
973        assert_eq!(
974            serialized,
975            (
976                Some(serde_json::Value::String(peer.ip().to_string())),
977                expected_value
978            )
979        );
980    }
981
982    #[test]
983    fn test_serialize_udp_notif_cbor() {
984        let writer_id = String::from("writer_id");
985        let peer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
986        let mut cursor = std::io::Cursor::new(vec![]);
987        ciborium::ser::into_writer(&serde_json::json!({"id": 1}), &mut cursor)
988            .expect("failed to serialize cbor");
989        let payload = cursor.into_inner();
990        let pkt = UdpNotifPacket::new(
991            MediaType::YangDataCbor,
992            0x01000001,
993            0x02000002,
994            HashMap::new(),
995            Bytes::from(payload),
996        );
997        let pkt_invalid = UdpNotifPacket::new(
998            MediaType::YangDataCbor,
999            0x01000001,
1000            0x02000002,
1001            HashMap::new(),
1002            // Array of length 3, but only contains 2 elements
1003            Bytes::from(vec![0x83, 0x01, 0x02]),
1004        );
1005
1006        let expected_value = serde_json::json!(
1007            {
1008                "media_type": "YangDataCbor",
1009                "message_id": 33554434,
1010                "options": {},
1011                "payload": {"id": 1},
1012                "publisher_id": 16777217,
1013                "writer_id": "writer_id"
1014            }
1015        );
1016
1017        let request_invalid = Arc::new((peer, pkt_invalid));
1018        let request_good = Arc::new((peer, pkt));
1019        let result_invalid = serialize_udp_notif(request_invalid, writer_id.clone());
1020        let serialized =
1021            serialize_udp_notif(request_good, writer_id.clone()).expect("failed to serialize json");
1022        assert!(matches!(
1023            result_invalid,
1024            Err(UdpNotifSerializationError::CborError(_))
1025        ));
1026        assert_eq!(
1027            serialized,
1028            (
1029                Some(serde_json::Value::String(peer.ip().to_string())),
1030                expected_value
1031            )
1032        );
1033    }
1034}