agp_datapath/
message_processing.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::{pin::Pin, sync::Arc};
7
8use agp_config::grpc::client::ClientConfig;
9use tokio::sync::mpsc;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::{Stream, StreamExt};
12use tokio_util::sync::CancellationToken;
13use tonic::codegen::{Body, StdError};
14use tonic::{Request, Response, Status};
15use tracing::{debug, error, info, trace};
16
17use crate::connection::{Channel, Connection, Type as ConnectionType};
18use crate::errors::DataPathError;
19use crate::forwarder::Forwarder;
20use crate::messages::utils::{
21    add_incoming_connection, create_subscription, get_agent_id, get_fanout, process_name,
22    CommandType,
23};
24use crate::messages::AgentClass;
25use crate::pubsub::proto::pubsub::v1::message::MessageType::Publish as PublishType;
26use crate::pubsub::proto::pubsub::v1::message::MessageType::Subscribe as SubscribeType;
27use crate::pubsub::proto::pubsub::v1::message::MessageType::Unsubscribe as UnsubscribeType;
28use crate::pubsub::proto::pubsub::v1::pub_sub_service_client::PubSubServiceClient;
29use crate::pubsub::proto::pubsub::v1::{pub_sub_service_server::PubSubService, Message};
30
31#[derive(Debug)]
32struct MessageProcessorInternal {
33    forwarder: Forwarder<Connection>,
34    drain_channel: drain::Watch,
35}
36
37#[derive(Debug, Clone)]
38pub struct MessageProcessor {
39    internal: Arc<MessageProcessorInternal>,
40}
41
42impl MessageProcessor {
43    pub fn new() -> (Self, drain::Signal) {
44        let (signal, watch) = drain::channel();
45        let forwarder = Forwarder::new();
46        let forwarder = MessageProcessorInternal {
47            forwarder,
48            drain_channel: watch,
49        };
50
51        (
52            Self {
53                internal: Arc::new(forwarder),
54            },
55            signal,
56        )
57    }
58
59    pub fn with_drain_channel(watch: drain::Watch) -> Self {
60        let forwarder = Forwarder::new();
61        let forwarder = MessageProcessorInternal {
62            forwarder,
63            drain_channel: watch,
64        };
65        Self {
66            internal: Arc::new(forwarder),
67        }
68    }
69
70    fn forwarder(&self) -> &Forwarder<Connection> {
71        &self.internal.forwarder
72    }
73
74    fn get_drain_watch(&self) -> drain::Watch {
75        self.internal.drain_channel.clone()
76    }
77
78    async fn try_to_connect<C>(
79        &self,
80        channel: C,
81        client_config: Option<ClientConfig>,
82        local: Option<SocketAddr>,
83        remote: Option<SocketAddr>,
84        existing_conn_index: Option<u64>,
85        max_retry: u32,
86    ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
87    where
88        C: tonic::client::GrpcService<tonic::body::BoxBody>,
89        C::Error: Into<StdError>,
90        C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
91        <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
92    {
93        let mut client: PubSubServiceClient<C> = PubSubServiceClient::new(channel);
94        let mut i = 0;
95        while i < max_retry {
96            let (tx, rx) = mpsc::channel(128);
97            match client
98                .open_channel(Request::new(ReceiverStream::new(rx)))
99                .await
100            {
101                Ok(stream) => {
102                    let cancellation_token = CancellationToken::new();
103                    let connection = Connection::new(ConnectionType::Remote)
104                        .with_local_addr(local)
105                        .with_remote_addr(remote)
106                        .with_channel(Channel::Client(tx))
107                        .with_cancellation_token(Some(cancellation_token.clone()));
108
109                    info!(
110                        "new connection initiated locally: (remote: {:?} - local: {:?})",
111                        connection.remote_addr(),
112                        connection.local_addr()
113                    );
114
115                    // insert connection into connection table
116                    let opt = self
117                        .forwarder()
118                        .on_connection_established(connection, existing_conn_index);
119                    if opt.is_none() {
120                        error!("error adding connection to the connection table");
121                        return Err(DataPathError::ConnectionError(
122                            "error adding connection to the connection tables".to_string(),
123                        ));
124                    }
125
126                    let conn_index = opt.unwrap();
127                    info!(
128                        "new connection index = {:?}, is local {:?}",
129                        conn_index, false
130                    );
131
132                    // Start loop to process messages
133                    let ret = self.process_stream(
134                        stream.into_inner(),
135                        conn_index,
136                        client_config,
137                        cancellation_token,
138                        false,
139                    );
140                    return Ok((ret, conn_index));
141                }
142                Err(e) => {
143                    error!("connection error: {:?}.", e.to_string());
144                }
145            }
146            i += 1;
147
148            // sleep 1 sec between each connection retry
149            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
150        }
151
152        error!("unable to connect to the endpoint");
153        Err(DataPathError::ConnectionError(
154            "reached max connection retries".to_string(),
155        ))
156    }
157
158    pub async fn connect<C>(
159        &self,
160        channel: C,
161        client_config: Option<ClientConfig>,
162        local: Option<SocketAddr>,
163        remote: Option<SocketAddr>,
164    ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
165    where
166        C: tonic::client::GrpcService<tonic::body::BoxBody>,
167        C::Error: Into<StdError>,
168        C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
169        <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
170    {
171        self.try_to_connect(channel, client_config, local, remote, None, 10)
172            .await
173    }
174
175    pub fn disconnect(&self, conn: u64) -> Result<(), DataPathError> {
176        match self.forwarder().get_connection(conn) {
177            None => {
178                error!("error handling disconnect: connection unknown");
179                return Err(DataPathError::DisconnectionError(
180                    "connection not found".to_string(),
181                ));
182            }
183            Some(c) => {
184                match c.cancellation_token() {
185                    None => {
186                        error!("error handling disconnect: missing cancellation token");
187                    }
188                    Some(t) => {
189                        // here token cancel will stop the receiving loop on
190                        // conn and this will cause the delition of the state
191                        // for this connection
192                        t.cancel();
193                    }
194                }
195            }
196        }
197
198        Ok(())
199    }
200
201    pub fn register_local_connection(
202        &self,
203    ) -> (
204        tokio::sync::mpsc::Sender<Result<Message, Status>>,
205        tokio::sync::mpsc::Receiver<Result<Message, Status>>,
206    ) {
207        // create a pair tx, rx to be able to send messages with the standard processing loop
208        let (tx1, rx1) = mpsc::channel(128);
209
210        info!("establishing new local app connection");
211
212        // create a pair tx, rx to be able to receive messages and insert it into the connection table
213        let (tx2, rx2) = mpsc::channel(128);
214
215        // create a connection
216        let connection = Connection::new(ConnectionType::Local).with_channel(Channel::Server(tx2));
217
218        // add it to the connection table
219        let conn_id = self
220            .forwarder()
221            .on_connection_established(connection, None)
222            .unwrap();
223
224        debug!("local connection established with id: {:?}", conn_id);
225        info!(telemetry = true, counter.num_active_connections = 1);
226
227        // this loop will process messages from the local app
228        self.process_stream(
229            ReceiverStream::new(rx1),
230            conn_id,
231            None,
232            CancellationToken::new(),
233            true,
234        );
235
236        // return the handles to be used to send and receive messages
237        (tx1, rx2)
238    }
239
240    pub async fn send_msg(
241        &self,
242        msg: Message,
243        out_conn: u64,
244    ) -> Result<(), Box<dyn std::error::Error>> {
245        let connection = self.forwarder().get_connection(out_conn);
246        match connection {
247            Some(conn) => match conn.channel() {
248                Channel::Server(s) => s.send(Ok(msg)).await?,
249                Channel::Client(s) => s.send(msg).await?,
250                _ => error!("error reading channel"),
251            },
252            None => error!("connection {:?} not found", out_conn),
253        }
254        Ok(())
255    }
256
257    async fn match_and_forward_msg(
258        &self,
259        msg: Message,
260        class: AgentClass,
261        in_connection: u64,
262        fanout: u32,
263        agent_id: Option<u64>,
264    ) -> Result<(), DataPathError> {
265        debug!(
266            "match and forward message: class: {:?} - agent_id: {:?} - fanout: {:?}",
267            class, agent_id, fanout,
268        );
269
270        if fanout == 1 {
271            match self
272                .forwarder()
273                .on_publish_msg_match_one(class, agent_id, in_connection)
274            {
275                Ok(out) => match self.send_msg(msg, out).await {
276                    Ok(_) => Ok(()),
277                    Err(e) => {
278                        error!("error sending a message {:?}", e);
279                        Err(DataPathError::PublicationError(e.to_string()))
280                    }
281                },
282                Err(e) => {
283                    error!("error matching a message {:?}", e);
284                    Err(DataPathError::PublicationError(e.to_string()))
285                }
286            }
287        } else {
288            match self
289                .forwarder()
290                .on_publish_msg_match_all(class, agent_id, in_connection)
291            {
292                Ok(out_set) => {
293                    for out in out_set {
294                        match self.send_msg(msg.clone(), out).await {
295                            Ok(_) => {}
296                            Err(e) => {
297                                error!("error sending a message {:?}", e);
298                                return Err(DataPathError::PublicationError(e.to_string()));
299                            }
300                        }
301                    }
302                    Ok(())
303                }
304                Err(e) => {
305                    error!("error sending a message {:?}", e);
306                    Err(DataPathError::PublicationError(e.to_string()))
307                }
308            }
309        }
310    }
311
312    async fn process_publish(
313        &self,
314        mut msg: Message,
315        in_connection: u64,
316    ) -> Result<(), DataPathError> {
317        let pubmsg = match &msg.message_type {
318            Some(PublishType(p)) => p,
319            // this should never happen
320            _ => panic!("wrong message type"),
321        };
322
323        match process_name(&pubmsg.name) {
324            Ok(class) => {
325                let fanout = get_fanout(pubmsg);
326                let agent_id = get_agent_id(&pubmsg.name);
327
328                debug!(
329                    "received publication from connection {}: {:?}",
330                    in_connection, pubmsg
331                );
332
333                // add incoming connection to the metadata
334                add_incoming_connection(&mut msg, in_connection);
335
336                // if we get valid class also the name is valid so we can safely unwrap
337                return self
338                    .match_and_forward_msg(msg, class, in_connection, fanout, agent_id)
339                    .await;
340            }
341            Err(e) => {
342                error!("error processing publication message {:?}", e);
343                Err(DataPathError::PublicationError(e.to_string()))
344            }
345        }
346    }
347
348    fn process_command(&self, msg: &Message) -> Result<(CommandType, u64), DataPathError> {
349        if !msg.metadata.is_empty() {
350            match msg.metadata.get(&CommandType::ReceivedFrom.to_string()) {
351                None => {}
352                Some(out_str) => match out_str.parse::<u64>() {
353                    Err(e) => {
354                        error! {"error parsing the connection in command type ReceivedFrom: {:?}", e};
355                        return Err(DataPathError::CommandError(e.to_string()));
356                    }
357                    Ok(out) => {
358                        debug!(%out, "received subscription_from command, register subscription");
359                        return Ok((CommandType::ReceivedFrom, out));
360                    }
361                },
362            }
363            match msg.metadata.get(&CommandType::ForwardTo.to_string()) {
364                None => {}
365                Some(out_str) => match out_str.parse::<u64>() {
366                    Err(e) => {
367                        error! {"error parsing the connection in command type ForwardTo: {:?}", e};
368                        return Err(DataPathError::CommandError(e.to_string()));
369                    }
370                    Ok(out) => {
371                        debug!(%out, "received forward_to command, register subscription and forward");
372                        return Ok((CommandType::ForwardTo, out));
373                    }
374                },
375            }
376        }
377        Ok((CommandType::Unknown, 0))
378    }
379
380    async fn process_unsubscription(
381        &self,
382        mut msg: Message,
383        in_connection: u64,
384    ) -> Result<(), DataPathError> {
385        let unsubmsg = match &msg.message_type {
386            Some(UnsubscribeType(s)) => s,
387            // this should never happen
388            _ => panic!("wrong message type"),
389        };
390
391        match process_name(&unsubmsg.name) {
392            Ok(class) => {
393                // process command
394                let command = self.process_command(&msg);
395                let mut conn = in_connection;
396                let mut forward = false;
397                // only used if the subscription needs to be forwarded
398                let mut out_conn = in_connection;
399                match command {
400                    Err(e) => {
401                        return Err(e);
402                    }
403                    Ok(tuple) => match tuple.0 {
404                        CommandType::ReceivedFrom => {
405                            conn = tuple.1;
406                        }
407                        CommandType::ForwardTo => {
408                            forward = true;
409                            out_conn = tuple.1;
410                        }
411                        _ => {}
412                    },
413                }
414                let connection = self.forwarder().get_connection(in_connection);
415                if connection.is_none() {
416                    // this should never happen
417                    error!("incoming connection does not exists");
418                    return Err(DataPathError::SubscriptionError(
419                        "incoming connection does not exists".to_string(),
420                    ));
421                }
422                let agent_id = get_agent_id(&unsubmsg.name);
423                match self.forwarder().on_unsubscription_msg(
424                    class.clone(),
425                    agent_id,
426                    conn,
427                    connection.unwrap().is_local_connection(),
428                ) {
429                    Ok(_) => {}
430                    Err(e) => {
431                        return Err(DataPathError::UnsubscriptionError(e.to_string()));
432                    }
433                }
434                if forward {
435                    debug!("forward unsubscription to {:?}", out_conn);
436                    msg.metadata.clear();
437                    let source_class = match process_name(&unsubmsg.source) {
438                        Ok(s) => s,
439                        Err(e) => {
440                            error!("error processing unsubscription source {:?}", e);
441                            return Err(DataPathError::UnsubscriptionError(e.to_string()));
442                        }
443                    };
444                    let source_id = get_agent_id(&unsubmsg.source);
445                    match self.send_msg(msg, out_conn).await {
446                        Ok(_) => {
447                            self.forwarder().on_forwarded_unsubscription(
448                                source_class,
449                                source_id,
450                                class,
451                                agent_id,
452                                out_conn,
453                            );
454                        }
455                        Err(e) => {
456                            error!("error sending a message {:?}", e);
457                            return Err(DataPathError::UnsubscriptionError(e.to_string()));
458                        }
459                    };
460                }
461                Ok(())
462            }
463            Err(e) => {
464                error!("error processing unsubscription message {:?}", e);
465                Err(DataPathError::UnsubscriptionError(e.to_string()))
466            }
467        }
468    }
469
470    async fn process_subscription(
471        &self,
472        mut msg: Message,
473        in_connection: u64,
474    ) -> Result<(), DataPathError> {
475        let submsg = match &msg.message_type {
476            Some(SubscribeType(s)) => s,
477            // this should never happen
478            _ => panic!("wrong message type"),
479        };
480
481        debug!(
482            "received subscription from connection {}: {:?}",
483            in_connection, submsg
484        );
485
486        match process_name(&submsg.name) {
487            Ok(class) => {
488                // process command
489                trace!("process command");
490                let command = self.process_command(&msg);
491                let mut conn = in_connection;
492                let mut forward = false;
493
494                // only used if the subscription needs to be forwarded
495                let mut out_conn = in_connection;
496                match command {
497                    Err(e) => {
498                        return Err(e);
499                    }
500                    Ok(tuple) => match tuple.0 {
501                        CommandType::ReceivedFrom => {
502                            conn = tuple.1;
503                            trace!("received subscription_from command, register subscription with conn id {:?}", tuple.1);
504                        }
505                        CommandType::ForwardTo => {
506                            forward = true;
507                            out_conn = tuple.1;
508                            trace!("received forward_to command, register subscription and forward to conn id {:?}", out_conn);
509                        }
510                        _ => {}
511                    },
512                }
513
514                let connection = self.forwarder().get_connection(conn);
515                if connection.is_none() {
516                    // this should never happen
517                    error!("incoming connection does not exists");
518                    return Err(DataPathError::SubscriptionError(
519                        "incoming connection does not exists".to_string(),
520                    ));
521                }
522                let agent_id = get_agent_id(&submsg.name);
523                match self.forwarder().on_subscription_msg(
524                    class.clone(),
525                    agent_id,
526                    conn,
527                    connection.unwrap().is_local_connection(),
528                ) {
529                    Ok(_) => {}
530                    Err(e) => {
531                        return Err(DataPathError::SubscriptionError(e.to_string()));
532                    }
533                }
534
535                if forward {
536                    debug!("forward subscription to {:?}", out_conn);
537                    msg.metadata.clear();
538                    let source_class = match process_name(&submsg.source) {
539                        Ok(s) => s,
540                        Err(e) => {
541                            error!("error processing unsubscription source {:?}", e);
542                            return Err(DataPathError::SubscriptionError(e.to_string()));
543                        }
544                    };
545                    let source_id = get_agent_id(&submsg.source);
546                    match self.send_msg(msg, out_conn).await {
547                        Ok(_) => {
548                            self.forwarder().on_forwarded_subscription(
549                                source_class,
550                                source_id,
551                                class,
552                                agent_id,
553                                out_conn,
554                            );
555                        }
556                        Err(e) => {
557                            error!("error sending a message {:?}", e);
558                            return Err(DataPathError::UnsubscriptionError(e.to_string()));
559                        }
560                    };
561                }
562                Ok(())
563            }
564            Err(e) => {
565                error!("error processing subscription message {:?}", e);
566                Err(DataPathError::SubscriptionError(e.to_string()))
567            }
568        }
569    }
570
571    pub async fn process_message(
572        &self,
573        msg: Message,
574        in_connection: u64,
575    ) -> Result<(), DataPathError> {
576        match &msg.message_type {
577            None => {
578                error!(
579                    "received message without message type from connection {}: {:?}",
580                    in_connection, msg
581                );
582                info!(
583                    telemetry = true,
584                    monotonic_counter.num_messages_by_type = 1,
585                    message_type = "none"
586                );
587                Err(DataPathError::UnknownMsgType("".to_string()))
588            }
589            Some(msg_type) => match msg_type {
590                SubscribeType(s) => {
591                    debug!(
592                        "received subscription from connection {}: {:?}",
593                        in_connection, s
594                    );
595                    info!(
596                        telemetry = true,
597                        monotonic_counter.num_messages_by_type = 1,
598                        message_type = "subscribe"
599                    );
600                    match self.process_subscription(msg, in_connection).await {
601                        Err(e) => {
602                            error! {"error processing subscription {:?}", e}
603                            Err(e)
604                        }
605                        Ok(_) => Ok(()),
606                    }
607                }
608                UnsubscribeType(u) => {
609                    debug!(
610                        "Received ubsubscription from client {}: {:?}",
611                        in_connection, u
612                    );
613                    info!(
614                        telemetry = true,
615                        monotonic_counter.num_messages_by_type = 1,
616                        message_type = "unsubscribe"
617                    );
618                    match self.process_unsubscription(msg, in_connection).await {
619                        Err(e) => {
620                            error! {"error processing unsubscription {:?}", e}
621                            Err(e)
622                        }
623                        Ok(_) => Ok(()),
624                    }
625                }
626                PublishType(p) => {
627                    debug!("Received publish from client {}: {:?}", in_connection, p);
628                    info!(
629                        telemetry = true,
630                        monotonic_counter.num_messages_by_type = 1,
631                        method = "publish"
632                    );
633                    match self.process_publish(msg, in_connection).await {
634                        Err(e) => {
635                            error! {"error processing publication {:?}", e}
636                            Err(e)
637                        }
638                        Ok(_) => Ok(()),
639                    }
640                }
641            },
642        }
643    }
644
645    async fn handle_new_message(
646        &self,
647        conn_index: u64,
648        result: Result<Message, Status>,
649    ) -> Result<(), DataPathError> {
650        debug!(%conn_index, "Received message from connection");
651        info!(
652            telemetry = true,
653            monotonic_counter.num_processed_messages = 1
654        );
655
656        match result {
657            Ok(msg) => {
658                match self.process_message(msg, conn_index).await {
659                    Ok(_) => Ok(()),
660                    Err(e) => {
661                        // drop message and log
662                        error!(
663                            "error processing message from connection {:?}: {:?}",
664                            conn_index, e
665                        );
666                        info!(
667                            telemetry = true,
668                            monotonic_counter.num_message_process_errors = 1
669                        );
670                        Ok(())
671                    }
672                }
673            }
674            Err(e) => {
675                if let Some(io_err) = MessageProcessor::match_for_io_error(&e) {
676                    if io_err.kind() == std::io::ErrorKind::BrokenPipe {
677                        info!("Connection {:?} closed by peer", conn_index);
678                        return Err(DataPathError::StreamError(e.to_string()));
679                    }
680                }
681                error!("error receiving messages {:?}", e);
682                let connection = self.forwarder().get_connection(conn_index);
683                match connection {
684                    Some(conn) => {
685                        match conn.channel() {
686                            Channel::Server(tx) => tx
687                                .send(Err(e))
688                                .await
689                                .map_err(|e| DataPathError::MessageSendError(e.to_string())),
690                            _ => Err(DataPathError::WrongChannelType), // error
691                        }
692                    }
693                    None => {
694                        error!("connection {:?} not found", conn_index);
695                        Err(DataPathError::ConnectionNotFound(conn_index.to_string()))
696                    }
697                }
698            }
699        }
700    }
701
702    #[tracing::instrument(fields(telemetry = true), skip(stream))]
703    fn process_stream(
704        &self,
705        mut stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
706        conn_index: u64,
707        client_config: Option<ClientConfig>,
708        cancellation_token: CancellationToken,
709        is_local: bool,
710    ) -> tokio::task::JoinHandle<()> {
711        // Clone self to be able to move it into the spawned task
712        let self_clone = self.clone();
713        let token_clone = cancellation_token.clone();
714        let client_conf_clone = client_config.clone();
715        let handle = tokio::spawn(async move {
716            let mut try_to_reconnect = true;
717            loop {
718                tokio::select! {
719                    res = stream.next() => {
720                        match res {
721                            Some(msg) => {
722                                if let Err(e) = self_clone.handle_new_message(conn_index, msg).await {
723                                    error!("error handling stream {:?}", e);
724                                    break;
725                                }
726                            }
727                            None => {
728                                info!(%conn_index, "end of stream");
729                                break;
730                            }
731                        }
732                    }
733                    _ = self_clone.get_drain_watch().signaled() => {
734                        info!("shutting down stream on drain: {}", conn_index);
735                        try_to_reconnect = false;
736                        break;
737                    }
738                    _ = token_clone.cancelled() => {
739                        info!("shutting down stream cancellation token: {}", conn_index);
740                        try_to_reconnect = false;
741                        break;
742                    }
743                }
744            }
745
746            let mut delete_connection = true;
747
748            if try_to_reconnect && client_conf_clone.is_some() {
749                let config = client_conf_clone.unwrap();
750                match config.to_channel() {
751                    Err(e) => {
752                        error!(
753                            "cannot parse connection config, unable to reconnect {:?}",
754                            e.to_string()
755                        );
756                    }
757                    Ok(channel) => {
758                        info!("connection lost with remote endpoint, try to reconnect");
759                        // These are the subscriptions that we forwarded to the remote gateway on
760                        // this connection. It is necessary to restore them to keep receive the messages
761                        // The connections on the local subscription table (created using the set_route command) are still there and will be removed
762                        // only if the reconnection process will fail.
763                        let remote_subscriptions = self_clone
764                            .forwarder()
765                            .get_subscriptions_forwarded_on_connection(conn_index);
766
767                        match self_clone
768                            .try_to_connect(
769                                channel,
770                                Some(config),
771                                None,
772                                None,
773                                Some(conn_index),
774                                120,
775                            )
776                            .await
777                        {
778                            Ok(_) => {
779                                info!("connection re-established");
780                                // the subscription table should be ok already
781                                delete_connection = false;
782                                for r in remote_subscriptions.iter() {
783                                    // TODO in remote subscription put also the source name of the subscription
784                                    // so that I can reuse it here
785                                    let sub_msg = create_subscription(
786                                        r.source(),
787                                        &r.name().agent_class,
788                                        Some(r.name().agent_id),
789                                        HashMap::new(),
790                                    );
791                                    if self_clone.send_msg(sub_msg, conn_index).await.is_err() {
792                                        error!("error restoring subscription on remote node");
793                                    }
794                                }
795                            }
796                            Err(e) => {
797                                // TODO: notify the app that the connection is not working anymore
798                                error!("unable to connect to remote node {:?}", e.to_string());
799                            }
800                        }
801                    }
802                }
803            } else {
804                info!(
805                    "connection lost on connection {}, do not try to reconnect",
806                    conn_index
807                )
808            }
809
810            if delete_connection {
811                self_clone
812                    .forwarder()
813                    .on_connection_drop(conn_index, is_local);
814
815                info!(telemetry = true, counter.num_active_connections = -1);
816            }
817        });
818
819        handle
820    }
821
822    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
823        let mut err: &(dyn std::error::Error + 'static) = err_status;
824
825        loop {
826            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
827                return Some(io_err);
828            }
829
830            // h2::Error do not expose std::io::Error with `source()`
831            // https://github.com/hyperium/h2/pull/462
832            if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
833                if let Some(io_err) = h2_err.get_io() {
834                    return Some(io_err);
835                }
836            }
837
838            err = err.source()?;
839        }
840    }
841}
842
843#[tonic::async_trait]
844impl PubSubService for MessageProcessor {
845    type OpenChannelStream = Pin<Box<dyn Stream<Item = Result<Message, Status>> + Send + 'static>>;
846
847    #[tracing::instrument(fields(telemetry = true))]
848    async fn open_channel(
849        &self,
850        request: Request<tonic::Streaming<Message>>,
851    ) -> Result<Response<Self::OpenChannelStream>, Status> {
852        let remote_addr = request.remote_addr();
853        let local_addr = request.local_addr();
854
855        let stream = request.into_inner();
856        let (tx, rx) = mpsc::channel(128);
857
858        let connection = Connection::new(ConnectionType::Remote)
859            .with_remote_addr(remote_addr)
860            .with_local_addr(local_addr)
861            .with_channel(Channel::Server(tx));
862
863        info!(
864            "new connection received from remote: (remote: {:?} - local: {:?})",
865            connection.remote_addr(),
866            connection.local_addr()
867        );
868        info!(telemetry = true, counter.num_active_connections = 1);
869
870        // insert connection into connection table
871        let conn_index = self
872            .forwarder()
873            .on_connection_established(connection, None)
874            .unwrap();
875
876        self.process_stream(stream, conn_index, None, CancellationToken::new(), false);
877
878        let out_stream = ReceiverStream::new(rx);
879        Ok(Response::new(
880            Box::pin(out_stream) as Self::OpenChannelStream
881        ))
882    }
883}