agp_datapath/
message_processing.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::{pin::Pin, sync::Arc};
7
8use agp_config::grpc::client::ClientConfig;
9use agp_tracing::utils::INSTANCE_ID;
10use opentelemetry::propagation::{Extractor, Injector};
11use opentelemetry::trace::TraceContextExt;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tokio_stream::{Stream, StreamExt};
15use tokio_util::sync::CancellationToken;
16use tonic::codegen::{Body, StdError};
17use tonic::{Request, Response, Status};
18use tracing::{debug, error, info, trace};
19use tracing_opentelemetry::OpenTelemetrySpanExt;
20
21use crate::connection::{Channel, Connection, Type as ConnectionType};
22use crate::errors::DataPathError;
23use crate::forwarder::Forwarder;
24use crate::messages::utils::{
25    add_incoming_connection, create_publication, create_subscription, get_agent_id, get_fanout,
26    get_name, get_source, process_name, MetadataType,
27};
28use crate::messages::AgentClass;
29use crate::pubsub::proto::pubsub::v1::message::MessageType;
30use crate::pubsub::proto::pubsub::v1::message::MessageType::Publish as PublishType;
31use crate::pubsub::proto::pubsub::v1::message::MessageType::Subscribe as SubscribeType;
32use crate::pubsub::proto::pubsub::v1::message::MessageType::Unsubscribe as UnsubscribeType;
33use crate::pubsub::proto::pubsub::v1::pub_sub_service_client::PubSubServiceClient;
34use crate::pubsub::proto::pubsub::v1::{pub_sub_service_server::PubSubService, Message};
35
36// Implementation based on: https://docs.rs/opentelemetry-tonic/latest/src/opentelemetry_tonic/lib.rs.html#1-134
37struct MetadataExtractor<'a>(&'a std::collections::HashMap<String, String>);
38
39impl Extractor for MetadataExtractor<'_> {
40    fn get(&self, key: &str) -> Option<&str> {
41        self.0.get(key).map(|s| s.as_str())
42    }
43
44    fn keys(&self) -> Vec<&str> {
45        self.0.keys().map(|s| s.as_str()).collect()
46    }
47}
48
49struct MetadataInjector<'a>(&'a mut std::collections::HashMap<String, String>);
50
51impl Injector for MetadataInjector<'_> {
52    fn set(&mut self, key: &str, value: String) {
53        self.0.insert(key.to_string(), value);
54    }
55}
56
57// Helper function to extract the parent OpenTelemetry context from metadata
58fn extract_parent_context(msg: &Message) -> Option<opentelemetry::Context> {
59    let extractor = MetadataExtractor(&msg.metadata);
60    let parent_context =
61        opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
62
63    if parent_context.span().span_context().is_valid() {
64        Some(parent_context)
65    } else {
66        None
67    }
68}
69
70// Helper function to inject the current OpenTelemetry context into metadata
71fn inject_current_context(msg: &mut Message) {
72    let cx = tracing::Span::current().context();
73    let mut injector = MetadataInjector(&mut msg.metadata);
74    opentelemetry::global::get_text_map_propagator(|propagator| {
75        propagator.inject_context(&cx, &mut injector)
76    });
77}
78
79fn message_type_to_str(message_type: &Option<MessageType>) -> &'static str {
80    match message_type {
81        Some(PublishType(_)) => "publish",
82        Some(SubscribeType(_)) => "subscribe",
83        Some(UnsubscribeType(_)) => "unsubscribe",
84        None => "unknown",
85    }
86}
87
88#[derive(Debug)]
89struct MessageProcessorInternal {
90    forwarder: Forwarder<Connection>,
91    drain_channel: drain::Watch,
92}
93
94#[derive(Debug, Clone)]
95pub struct MessageProcessor {
96    internal: Arc<MessageProcessorInternal>,
97}
98
99impl MessageProcessor {
100    pub fn new() -> (Self, drain::Signal) {
101        let (signal, watch) = drain::channel();
102        let forwarder = Forwarder::new();
103        let forwarder = MessageProcessorInternal {
104            forwarder,
105            drain_channel: watch,
106        };
107
108        (
109            Self {
110                internal: Arc::new(forwarder),
111            },
112            signal,
113        )
114    }
115
116    pub fn with_drain_channel(watch: drain::Watch) -> Self {
117        let forwarder = Forwarder::new();
118        let forwarder = MessageProcessorInternal {
119            forwarder,
120            drain_channel: watch,
121        };
122        Self {
123            internal: Arc::new(forwarder),
124        }
125    }
126
127    fn forwarder(&self) -> &Forwarder<Connection> {
128        &self.internal.forwarder
129    }
130
131    fn get_drain_watch(&self) -> drain::Watch {
132        self.internal.drain_channel.clone()
133    }
134
135    async fn try_to_connect<C>(
136        &self,
137        channel: C,
138        client_config: Option<ClientConfig>,
139        local: Option<SocketAddr>,
140        remote: Option<SocketAddr>,
141        existing_conn_index: Option<u64>,
142        max_retry: u32,
143    ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
144    where
145        C: tonic::client::GrpcService<tonic::body::BoxBody>,
146        C::Error: Into<StdError>,
147        C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
148        <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
149    {
150        let mut client: PubSubServiceClient<C> = PubSubServiceClient::new(channel);
151        let mut i = 0;
152        while i < max_retry {
153            let (tx, rx) = mpsc::channel(128);
154            match client
155                .open_channel(Request::new(ReceiverStream::new(rx)))
156                .await
157            {
158                Ok(stream) => {
159                    let cancellation_token = CancellationToken::new();
160                    let connection = Connection::new(ConnectionType::Remote)
161                        .with_local_addr(local)
162                        .with_remote_addr(remote)
163                        .with_channel(Channel::Client(tx))
164                        .with_cancellation_token(Some(cancellation_token.clone()));
165
166                    info!(
167                        "new connection initiated locally: (remote: {:?} - local: {:?})",
168                        connection.remote_addr(),
169                        connection.local_addr()
170                    );
171
172                    // insert connection into connection table
173                    let opt = self
174                        .forwarder()
175                        .on_connection_established(connection, existing_conn_index);
176                    if opt.is_none() {
177                        error!("error adding connection to the connection table");
178                        return Err(DataPathError::ConnectionError(
179                            "error adding connection to the connection tables".to_string(),
180                        ));
181                    }
182
183                    let conn_index = opt.unwrap();
184                    info!(
185                        "new connection index = {:?}, is local {:?}",
186                        conn_index, false
187                    );
188
189                    // Start loop to process messages
190                    let ret = self.process_stream(
191                        stream.into_inner(),
192                        conn_index,
193                        client_config,
194                        cancellation_token,
195                        false,
196                    );
197                    return Ok((ret, conn_index));
198                }
199                Err(e) => {
200                    error!("connection error: {:?}.", e.to_string());
201                }
202            }
203            i += 1;
204
205            // sleep 1 sec between each connection retry
206            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
207        }
208
209        error!("unable to connect to the endpoint");
210        Err(DataPathError::ConnectionError(
211            "reached max connection retries".to_string(),
212        ))
213    }
214
215    pub async fn connect<C>(
216        &self,
217        channel: C,
218        client_config: Option<ClientConfig>,
219        local: Option<SocketAddr>,
220        remote: Option<SocketAddr>,
221    ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
222    where
223        C: tonic::client::GrpcService<tonic::body::BoxBody>,
224        C::Error: Into<StdError>,
225        C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
226        <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
227    {
228        self.try_to_connect(channel, client_config, local, remote, None, 10)
229            .await
230    }
231
232    pub fn disconnect(&self, conn: u64) -> Result<(), DataPathError> {
233        match self.forwarder().get_connection(conn) {
234            None => {
235                error!("error handling disconnect: connection unknown");
236                return Err(DataPathError::DisconnectionError(
237                    "connection not found".to_string(),
238                ));
239            }
240            Some(c) => {
241                match c.cancellation_token() {
242                    None => {
243                        error!("error handling disconnect: missing cancellation token");
244                    }
245                    Some(t) => {
246                        // here token cancel will stop the receiving loop on
247                        // conn and this will cause the delition of the state
248                        // for this connection
249                        t.cancel();
250                    }
251                }
252            }
253        }
254
255        Ok(())
256    }
257
258    pub fn register_local_connection(
259        &self,
260    ) -> (
261        tokio::sync::mpsc::Sender<Result<Message, Status>>,
262        tokio::sync::mpsc::Receiver<Result<Message, Status>>,
263    ) {
264        // create a pair tx, rx to be able to send messages with the standard processing loop
265        let (tx1, rx1) = mpsc::channel(128);
266
267        info!("establishing new local app connection");
268
269        // create a pair tx, rx to be able to receive messages and insert it into the connection table
270        let (tx2, rx2) = mpsc::channel(128);
271
272        // create a connection
273        let connection = Connection::new(ConnectionType::Local).with_channel(Channel::Server(tx2));
274
275        // add it to the connection table
276        let conn_id = self
277            .forwarder()
278            .on_connection_established(connection, None)
279            .unwrap();
280
281        debug!("local connection established with id: {:?}", conn_id);
282        info!(telemetry = true, counter.num_active_connections = 1);
283
284        // this loop will process messages from the local app
285        self.process_stream(
286            ReceiverStream::new(rx1),
287            conn_id,
288            None,
289            CancellationToken::new(),
290            true,
291        );
292
293        // return the handles to be used to send and receive messages
294        (tx1, rx2)
295    }
296
297    pub async fn send_msg(
298        &self,
299        mut msg: Message,
300        out_conn: u64,
301    ) -> Result<(), Box<dyn std::error::Error>> {
302        let connection = self.forwarder().get_connection(out_conn);
303        match connection {
304            Some(conn) => {
305                if conn.is_local_connection() {
306                    // [local gateway] -[send_msg]-> [destination]
307                    let span = tracing::span!(
308                        tracing::Level::DEBUG,
309                        "send_message_to_local",
310                        instance_id = %INSTANCE_ID.as_str(),
311                        connection_id = out_conn,
312                        message_type = message_type_to_str(&msg.message_type),
313                        telemetry = true
314                    );
315                    let _guard = span.enter();
316
317                    inject_current_context(&mut msg);
318                } else {
319                    let parent_context = extract_parent_context(&msg);
320
321                    // [source] -[send_msg]-> [remote gateway]
322                    let span = tracing::span!(
323                        tracing::Level::DEBUG,
324                        "send_message_to_remote",
325                        instance_id = %INSTANCE_ID.as_str(),
326                        connection_id = out_conn,
327                        message_type = message_type_to_str(&msg.message_type),
328                        telemetry = true
329                    );
330
331                    if let Some(ctx) = parent_context {
332                        span.set_parent(ctx);
333                    }
334                    let _guard = span.enter();
335
336                    inject_current_context(&mut msg);
337                }
338
339                match conn.channel() {
340                    Channel::Server(s) => s.send(Ok(msg)).await?,
341                    Channel::Client(s) => s.send(msg).await?,
342                    _ => error!("error reading channel"),
343                }
344            }
345            None => error!("connection {:?} not found", out_conn),
346        }
347        Ok(())
348    }
349
350    async fn match_and_forward_msg(
351        &self,
352        msg: Message,
353        class: AgentClass,
354        in_connection: u64,
355        fanout: u32,
356        agent_id: Option<u64>,
357    ) -> Result<(), DataPathError> {
358        debug!(
359            "match and forward message: class: {:?} - agent_id: {:?} - fanout: {:?}",
360            class, agent_id, fanout,
361        );
362
363        if fanout == 1 {
364            match self
365                .forwarder()
366                .on_publish_msg_match_one(class, agent_id, in_connection)
367            {
368                Ok(out) => match self.send_msg(msg, out).await {
369                    Ok(_) => Ok(()),
370                    Err(e) => {
371                        error!("error sending a message {:?}", e);
372                        Err(DataPathError::PublicationError(e.to_string()))
373                    }
374                },
375                Err(e) => {
376                    error!("error matching a message {:?}", e);
377                    Err(DataPathError::PublicationError(e.to_string()))
378                }
379            }
380        } else {
381            match self
382                .forwarder()
383                .on_publish_msg_match_all(class, agent_id, in_connection)
384            {
385                Ok(out_set) => {
386                    for out in out_set {
387                        match self.send_msg(msg.clone(), out).await {
388                            Ok(_) => {}
389                            Err(e) => {
390                                error!("error sending a message {:?}", e);
391                                return Err(DataPathError::PublicationError(e.to_string()));
392                            }
393                        }
394                    }
395                    Ok(())
396                }
397                Err(e) => {
398                    error!("error sending a message {:?}", e);
399                    Err(DataPathError::PublicationError(e.to_string()))
400                }
401            }
402        }
403    }
404
405    async fn process_publish(
406        &self,
407        mut msg: Message,
408        in_connection: u64,
409    ) -> Result<(), DataPathError> {
410        let pubmsg = match &msg.message_type {
411            Some(PublishType(p)) => p,
412            // this should never happen
413            _ => panic!("wrong message type"),
414        };
415
416        match process_name(&pubmsg.name) {
417            Ok(class) => {
418                let fanout = get_fanout(pubmsg);
419                let agent_id = get_agent_id(&pubmsg.name);
420
421                debug!(
422                    "received publication from connection {}: {:?}",
423                    in_connection, pubmsg
424                );
425
426                // add incoming connection to the metadata
427                add_incoming_connection(&mut msg, in_connection);
428
429                // if we get valid class also the name is valid so we can safely unwrap
430                return self
431                    .match_and_forward_msg(msg, class, in_connection, fanout, agent_id)
432                    .await;
433            }
434            Err(e) => {
435                error!("error processing publication message {:?}", e);
436                Err(DataPathError::PublicationError(e.to_string()))
437            }
438        }
439    }
440
441    fn process_command(&self, msg: &Message) -> Result<(MetadataType, u64), DataPathError> {
442        if !msg.metadata.is_empty() {
443            match msg.metadata.get(&MetadataType::ReceivedFrom.to_string()) {
444                None => {}
445                Some(out_str) => match out_str.parse::<u64>() {
446                    Err(e) => {
447                        error! {"error parsing the connection in command type ReceivedFrom: {:?}", e};
448                        return Err(DataPathError::CommandError(e.to_string()));
449                    }
450                    Ok(out) => {
451                        debug!(%out, "received subscription_from command, register subscription");
452                        return Ok((MetadataType::ReceivedFrom, out));
453                    }
454                },
455            }
456            match msg.metadata.get(&MetadataType::ForwardTo.to_string()) {
457                None => {}
458                Some(out_str) => match out_str.parse::<u64>() {
459                    Err(e) => {
460                        error! {"error parsing the connection in command type ForwardTo: {:?}", e};
461                        return Err(DataPathError::CommandError(e.to_string()));
462                    }
463                    Ok(out) => {
464                        debug!(%out, "received forward_to command, register subscription and forward");
465                        return Ok((MetadataType::ForwardTo, out));
466                    }
467                },
468            }
469        }
470        Ok((MetadataType::Unknown, 0))
471    }
472
473    async fn process_unsubscription(
474        &self,
475        mut msg: Message,
476        in_connection: u64,
477    ) -> Result<(), DataPathError> {
478        let unsubmsg = match &msg.message_type {
479            Some(UnsubscribeType(s)) => s,
480            // this should never happen
481            _ => panic!("wrong message type"),
482        };
483
484        match process_name(&unsubmsg.name) {
485            Ok(class) => {
486                // process command
487                let command = self.process_command(&msg);
488                let mut conn = in_connection;
489                let mut forward = false;
490                // only used if the subscription needs to be forwarded
491                let mut out_conn = in_connection;
492                match command {
493                    Err(e) => {
494                        return Err(e);
495                    }
496                    Ok(tuple) => match tuple.0 {
497                        MetadataType::ReceivedFrom => {
498                            conn = tuple.1;
499                        }
500                        MetadataType::ForwardTo => {
501                            forward = true;
502                            out_conn = tuple.1;
503                        }
504                        _ => {}
505                    },
506                }
507                let connection = self.forwarder().get_connection(in_connection);
508                if connection.is_none() {
509                    // this should never happen
510                    error!("incoming connection does not exists");
511                    return Err(DataPathError::SubscriptionError(
512                        "incoming connection does not exists".to_string(),
513                    ));
514                }
515                let agent_id = get_agent_id(&unsubmsg.name);
516                match self.forwarder().on_unsubscription_msg(
517                    class.clone(),
518                    agent_id,
519                    conn,
520                    connection.unwrap().is_local_connection(),
521                ) {
522                    Ok(_) => {}
523                    Err(e) => {
524                        return Err(DataPathError::UnsubscriptionError(e.to_string()));
525                    }
526                }
527                if forward {
528                    debug!("forward unsubscription to {:?}", out_conn);
529
530                    // NOTE(msardara): this is temporary and will be removed once
531                    // the new packet formast is in place
532                    msg.metadata.remove(&MetadataType::ForwardTo.to_string());
533                    let source_class = match process_name(&unsubmsg.source) {
534                        Ok(s) => s,
535                        Err(e) => {
536                            error!("error processing unsubscription source {:?}", e);
537                            return Err(DataPathError::UnsubscriptionError(e.to_string()));
538                        }
539                    };
540                    let source_id = get_agent_id(&unsubmsg.source);
541                    match self.send_msg(msg, out_conn).await {
542                        Ok(_) => {
543                            self.forwarder().on_forwarded_unsubscription(
544                                source_class,
545                                source_id,
546                                class,
547                                agent_id,
548                                out_conn,
549                            );
550                        }
551                        Err(e) => {
552                            error!("error sending a message {:?}", e);
553                            return Err(DataPathError::UnsubscriptionError(e.to_string()));
554                        }
555                    };
556                }
557                Ok(())
558            }
559            Err(e) => {
560                error!("error processing unsubscription message {:?}", e);
561                Err(DataPathError::UnsubscriptionError(e.to_string()))
562            }
563        }
564    }
565
566    async fn process_subscription(
567        &self,
568        mut msg: Message,
569        in_connection: u64,
570    ) -> Result<(), DataPathError> {
571        let submsg = match &msg.message_type {
572            Some(SubscribeType(s)) => s,
573            // this should never happen
574            _ => panic!("wrong message type"),
575        };
576
577        debug!(
578            "received subscription from connection {}: {:?}",
579            in_connection, submsg
580        );
581
582        match process_name(&submsg.name) {
583            Ok(class) => {
584                // process command
585                trace!("process command");
586                let command = self.process_command(&msg);
587                let mut conn = in_connection;
588                let mut forward = false;
589
590                // only used if the subscription needs to be forwarded
591                let mut out_conn = in_connection;
592                match command {
593                    Err(e) => {
594                        return Err(e);
595                    }
596                    Ok(tuple) => match tuple.0 {
597                        MetadataType::ReceivedFrom => {
598                            conn = tuple.1;
599                            trace!("received subscription_from command, register subscription with conn id {:?}", tuple.1);
600                        }
601                        MetadataType::ForwardTo => {
602                            forward = true;
603                            out_conn = tuple.1;
604                            trace!("received forward_to command, register subscription and forward to conn id {:?}", out_conn);
605                        }
606                        _ => {}
607                    },
608                }
609
610                let connection = self.forwarder().get_connection(conn);
611                if connection.is_none() {
612                    // this should never happen
613                    error!("incoming connection does not exists");
614                    return Err(DataPathError::SubscriptionError(
615                        "incoming connection does not exists".to_string(),
616                    ));
617                }
618                let agent_id = get_agent_id(&submsg.name);
619                match self.forwarder().on_subscription_msg(
620                    class.clone(),
621                    agent_id,
622                    conn,
623                    connection.unwrap().is_local_connection(),
624                ) {
625                    Ok(_) => {}
626                    Err(e) => {
627                        return Err(DataPathError::SubscriptionError(e.to_string()));
628                    }
629                }
630
631                if forward {
632                    debug!("forward subscription to {:?}", out_conn);
633
634                    // NOTE(msardara): this is temporary and will be removed once
635                    // the new packet formast is in place
636                    msg.metadata.remove(&MetadataType::ForwardTo.to_string());
637                    let source_class = match process_name(&submsg.source) {
638                        Ok(s) => s,
639                        Err(e) => {
640                            error!("error processing unsubscription source {:?}", e);
641                            return Err(DataPathError::SubscriptionError(e.to_string()));
642                        }
643                    };
644                    let source_id = get_agent_id(&submsg.source);
645                    match self.send_msg(msg, out_conn).await {
646                        Ok(_) => {
647                            self.forwarder().on_forwarded_subscription(
648                                source_class,
649                                source_id,
650                                class,
651                                agent_id,
652                                out_conn,
653                            );
654                        }
655                        Err(e) => {
656                            error!("error sending a message {:?}", e);
657                            return Err(DataPathError::UnsubscriptionError(e.to_string()));
658                        }
659                    };
660                }
661                Ok(())
662            }
663            Err(e) => {
664                error!("error processing subscription message {:?}", e);
665                Err(DataPathError::SubscriptionError(e.to_string()))
666            }
667        }
668    }
669
670    pub async fn process_message(
671        &self,
672        msg: Message,
673        in_connection: u64,
674    ) -> Result<(), DataPathError> {
675        match &msg.message_type {
676            None => {
677                error!(
678                    "received message without message type from connection {}: {:?}",
679                    in_connection, msg
680                );
681                info!(
682                    telemetry = true,
683                    monotonic_counter.num_messages_by_type = 1,
684                    message_type = "none"
685                );
686                Err(DataPathError::UnknownMsgType("".to_string()))
687            }
688            Some(msg_type) => match msg_type {
689                SubscribeType(s) => {
690                    debug!(
691                        "received subscription from connection {}: {:?}",
692                        in_connection, s
693                    );
694                    info!(
695                        telemetry = true,
696                        monotonic_counter.num_messages_by_type = 1,
697                        message_type = "subscribe"
698                    );
699                    match self.process_subscription(msg, in_connection).await {
700                        Err(e) => {
701                            error! {"error processing subscription {:?}", e}
702                            Err(e)
703                        }
704                        Ok(_) => Ok(()),
705                    }
706                }
707                UnsubscribeType(u) => {
708                    debug!(
709                        "Received ubsubscription from client {}: {:?}",
710                        in_connection, u
711                    );
712                    info!(
713                        telemetry = true,
714                        monotonic_counter.num_messages_by_type = 1,
715                        message_type = "unsubscribe"
716                    );
717                    match self.process_unsubscription(msg, in_connection).await {
718                        Err(e) => {
719                            error! {"error processing unsubscription {:?}", e}
720                            Err(e)
721                        }
722                        Ok(_) => Ok(()),
723                    }
724                }
725                PublishType(p) => {
726                    debug!("Received publish from client {}: {:?}", in_connection, p);
727                    info!(
728                        telemetry = true,
729                        monotonic_counter.num_messages_by_type = 1,
730                        method = "publish"
731                    );
732                    match self.process_publish(msg, in_connection).await {
733                        Err(e) => {
734                            error! {"error processing publication {:?}", e}
735                            Err(e)
736                        }
737                        Ok(_) => Ok(()),
738                    }
739                }
740            },
741        }
742    }
743
744    async fn handle_new_message(
745        &self,
746        conn_index: u64,
747        is_local: bool,
748        mut msg: Message,
749    ) -> Result<(), DataPathError> {
750        debug!(%conn_index, "Received message from connection");
751        info!(
752            telemetry = true,
753            monotonic_counter.num_processed_messages = 1
754        );
755
756        if is_local {
757            // handling the message from the local gw
758            // [local gateway] -[handle_new_message]-> [destination]
759            let span = tracing::span!(
760                tracing::Level::DEBUG,
761                "handle_local_message",
762                instance_id = %INSTANCE_ID.as_str(),
763                connection_id = conn_index,
764                message_type = message_type_to_str(&msg.message_type),
765                telemetry = true
766            );
767            let _guard = span.enter();
768
769            inject_current_context(&mut msg);
770        } else {
771            // handling the message on the remote gateway
772            // [source] -[handle_new_message]-> [remote gateway]
773            let parent_context = extract_parent_context(&msg);
774
775            let span = tracing::span!(
776                tracing::Level::DEBUG,
777                "handle_remote_message",
778                instance_id = %INSTANCE_ID.as_str(),
779                connection_id = conn_index,
780                message_type = message_type_to_str(&msg.message_type),
781                telemetry = true
782            );
783
784            if let Some(ctx) = parent_context {
785                span.set_parent(ctx);
786            }
787            let _guard = span.enter();
788
789            inject_current_context(&mut msg);
790        }
791
792        match self.process_message(msg, conn_index).await {
793            Ok(_) => Ok(()),
794            Err(e) => {
795                // drop message and log
796                error!(
797                    "error processing message from connection {:?}: {:?}",
798                    conn_index, e
799                );
800                info!(
801                    telemetry = true,
802                    monotonic_counter.num_message_process_errors = 1
803                );
804                Err(DataPathError::ProcessingError(e.to_string()))
805            }
806        }
807    }
808
809    fn process_stream(
810        &self,
811        mut stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
812        conn_index: u64,
813        client_config: Option<ClientConfig>,
814        cancellation_token: CancellationToken,
815        is_local: bool,
816    ) -> tokio::task::JoinHandle<()> {
817        // Clone self to be able to move it into the spawned task
818        let self_clone = self.clone();
819        let token_clone = cancellation_token.clone();
820        let client_conf_clone = client_config.clone();
821        let handle = tokio::spawn(async move {
822            let mut try_to_reconnect = true;
823            loop {
824                tokio::select! {
825                    next = stream.next() => {
826                        match next {
827                            Some(result) => {
828                                match result {
829                                    Ok(msg) => {
830                                        // save message source to use in case of error
831                                        let mut msg_source = None;
832                                        let mut msg_name = None;
833                                        if is_local {
834                                            msg_source = get_source(&msg);
835                                            msg_name = get_name(&msg);
836                                        }
837                                        if let Err(e) = self_clone.handle_new_message(conn_index, is_local, msg).await {
838                                            error!("error processing incoming messages {:?}", e);
839                                            // If the message is coming from a local app, notify it
840                                            if is_local {
841                                                let connection = self_clone.forwarder().get_connection(conn_index);
842                                                match connection {
843                                                    Some(conn) => {
844                                                        debug!("try to notify local application");
845                                                        if msg_source.is_none() || msg_name.is_none() {
846                                                            debug!("unable to notify the error to the remote end");
847                                                        } else {
848                                                            // keep the same message format for the error
849                                                            let dest = msg_name.unwrap();
850                                                            let mut err_message = create_publication(
851                                                                &msg_source.unwrap(),
852                                                                &dest.agent_class,
853                                                                Some(dest.agent_id),
854                                                                HashMap::new(), 1, "",
855                                                                Vec::new());
856
857                                                            err_message.metadata.insert(MetadataType::Error.to_string(), e.to_string());
858                                                            if let Channel::Server(tx) = conn.channel() {
859                                                                if tx.send(Ok(err_message)).await.is_err() {
860                                                                    debug!("unable to notify the error to the local app");
861                                                                }
862                                                            }
863                                                        }
864                                                    }
865                                                    None => {
866                                                        error!("connection {:?} not found", conn_index);
867                                                    }
868                                                }
869                                            }
870                                        }
871                                    }
872                                    Err(e) => {
873                                        if let Some(io_err) = MessageProcessor::match_for_io_error(&e) {
874                                            if io_err.kind() == std::io::ErrorKind::BrokenPipe {
875                                                info!("connection {:?} closed by peer", conn_index);
876                                            }
877                                        } else {
878                                            error!("error receiving messages {:?}", e);
879                                        }
880                                        break;
881                                    }
882                                }
883                            }
884                            None => {
885                                debug!(%conn_index, "end of stream");
886                                break;
887                            }
888                        }
889                    }
890                    _ = self_clone.get_drain_watch().signaled() => {
891                        info!("shutting down stream on drain: {}", conn_index);
892                        try_to_reconnect = false;
893                        break;
894                    }
895                    _ = token_clone.cancelled() => {
896                        info!("shutting down stream cancellation token: {}", conn_index);
897                        try_to_reconnect = false;
898                        break;
899                    }
900                }
901            }
902
903            let mut delete_connection = true;
904
905            if try_to_reconnect && client_conf_clone.is_some() {
906                let config = client_conf_clone.unwrap();
907                match config.to_channel() {
908                    Err(e) => {
909                        error!(
910                            "cannot parse connection config, unable to reconnect {:?}",
911                            e.to_string()
912                        );
913                    }
914                    Ok(channel) => {
915                        info!("connection lost with remote endpoint, try to reconnect");
916                        // These are the subscriptions that we forwarded to the remote gateway on
917                        // this connection. It is necessary to restore them to keep receive the messages
918                        // The connections on the local subscription table (created using the set_route command) are still there and will be removed
919                        // only if the reconnection process will fail.
920                        let remote_subscriptions = self_clone
921                            .forwarder()
922                            .get_subscriptions_forwarded_on_connection(conn_index);
923
924                        match self_clone
925                            .try_to_connect(
926                                channel,
927                                Some(config),
928                                None,
929                                None,
930                                Some(conn_index),
931                                120,
932                            )
933                            .await
934                        {
935                            Ok(_) => {
936                                info!("connection re-established");
937                                // the subscription table should be ok already
938                                delete_connection = false;
939                                for r in remote_subscriptions.iter() {
940                                    let sub_msg = create_subscription(
941                                        r.source(),
942                                        &r.name().agent_class,
943                                        Some(r.name().agent_id),
944                                        HashMap::new(),
945                                    );
946                                    if self_clone.send_msg(sub_msg, conn_index).await.is_err() {
947                                        error!("error restoring subscription on remote node");
948                                    }
949                                }
950                            }
951                            Err(e) => {
952                                // TODO: notify the app that the connection is not working anymore
953                                error!("unable to connect to remote node {:?}", e.to_string());
954                            }
955                        }
956                    }
957                }
958            } else {
959                info!("close connection {}", conn_index)
960            }
961
962            if delete_connection {
963                self_clone
964                    .forwarder()
965                    .on_connection_drop(conn_index, is_local);
966
967                info!(telemetry = true, counter.num_active_connections = -1);
968            }
969        });
970
971        handle
972    }
973
974    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
975        let mut err: &(dyn std::error::Error + 'static) = err_status;
976
977        loop {
978            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
979                return Some(io_err);
980            }
981
982            // h2::Error do not expose std::io::Error with `source()`
983            // https://github.com/hyperium/h2/pull/462
984            if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
985                if let Some(io_err) = h2_err.get_io() {
986                    return Some(io_err);
987                }
988            }
989
990            err = err.source()?;
991        }
992    }
993}
994
995#[tonic::async_trait]
996impl PubSubService for MessageProcessor {
997    type OpenChannelStream = Pin<Box<dyn Stream<Item = Result<Message, Status>> + Send + 'static>>;
998
999    async fn open_channel(
1000        &self,
1001        request: Request<tonic::Streaming<Message>>,
1002    ) -> Result<Response<Self::OpenChannelStream>, Status> {
1003        let remote_addr = request.remote_addr();
1004        let local_addr = request.local_addr();
1005
1006        let stream = request.into_inner();
1007        let (tx, rx) = mpsc::channel(128);
1008
1009        let connection = Connection::new(ConnectionType::Remote)
1010            .with_remote_addr(remote_addr)
1011            .with_local_addr(local_addr)
1012            .with_channel(Channel::Server(tx));
1013
1014        info!(
1015            "new connection received from remote: (remote: {:?} - local: {:?})",
1016            connection.remote_addr(),
1017            connection.local_addr()
1018        );
1019        info!(telemetry = true, counter.num_active_connections = 1);
1020
1021        // insert connection into connection table
1022        let conn_index = self
1023            .forwarder()
1024            .on_connection_established(connection, None)
1025            .unwrap();
1026
1027        self.process_stream(stream, conn_index, None, CancellationToken::new(), false);
1028
1029        let out_stream = ReceiverStream::new(rx);
1030        Ok(Response::new(
1031            Box::pin(out_stream) as Self::OpenChannelStream
1032        ))
1033    }
1034}