1use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::{pin::Pin, sync::Arc};
7
8use agp_config::grpc::client::ClientConfig;
9use agp_tracing::utils::INSTANCE_ID;
10use opentelemetry::propagation::{Extractor, Injector};
11use opentelemetry::trace::TraceContextExt;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tokio_stream::{Stream, StreamExt};
15use tokio_util::sync::CancellationToken;
16use tonic::codegen::{Body, StdError};
17use tonic::{Request, Response, Status};
18use tracing::{debug, error, info, trace};
19use tracing_opentelemetry::OpenTelemetrySpanExt;
20
21use crate::connection::{Channel, Connection, Type as ConnectionType};
22use crate::errors::DataPathError;
23use crate::forwarder::Forwarder;
24use crate::messages::utils::{
25 add_incoming_connection, create_publication, create_subscription, get_agent_id, get_fanout,
26 get_name, get_source, process_name, MetadataType,
27};
28use crate::messages::AgentClass;
29use crate::pubsub::proto::pubsub::v1::message::MessageType;
30use crate::pubsub::proto::pubsub::v1::message::MessageType::Publish as PublishType;
31use crate::pubsub::proto::pubsub::v1::message::MessageType::Subscribe as SubscribeType;
32use crate::pubsub::proto::pubsub::v1::message::MessageType::Unsubscribe as UnsubscribeType;
33use crate::pubsub::proto::pubsub::v1::pub_sub_service_client::PubSubServiceClient;
34use crate::pubsub::proto::pubsub::v1::{pub_sub_service_server::PubSubService, Message};
35
36struct MetadataExtractor<'a>(&'a std::collections::HashMap<String, String>);
38
39impl Extractor for MetadataExtractor<'_> {
40 fn get(&self, key: &str) -> Option<&str> {
41 self.0.get(key).map(|s| s.as_str())
42 }
43
44 fn keys(&self) -> Vec<&str> {
45 self.0.keys().map(|s| s.as_str()).collect()
46 }
47}
48
49struct MetadataInjector<'a>(&'a mut std::collections::HashMap<String, String>);
50
51impl Injector for MetadataInjector<'_> {
52 fn set(&mut self, key: &str, value: String) {
53 self.0.insert(key.to_string(), value);
54 }
55}
56
57fn extract_parent_context(msg: &Message) -> Option<opentelemetry::Context> {
59 let extractor = MetadataExtractor(&msg.metadata);
60 let parent_context =
61 opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
62
63 if parent_context.span().span_context().is_valid() {
64 Some(parent_context)
65 } else {
66 None
67 }
68}
69
70fn inject_current_context(msg: &mut Message) {
72 let cx = tracing::Span::current().context();
73 let mut injector = MetadataInjector(&mut msg.metadata);
74 opentelemetry::global::get_text_map_propagator(|propagator| {
75 propagator.inject_context(&cx, &mut injector)
76 });
77}
78
79fn message_type_to_str(message_type: &Option<MessageType>) -> &'static str {
80 match message_type {
81 Some(PublishType(_)) => "publish",
82 Some(SubscribeType(_)) => "subscribe",
83 Some(UnsubscribeType(_)) => "unsubscribe",
84 None => "unknown",
85 }
86}
87
88#[derive(Debug)]
89struct MessageProcessorInternal {
90 forwarder: Forwarder<Connection>,
91 drain_channel: drain::Watch,
92}
93
94#[derive(Debug, Clone)]
95pub struct MessageProcessor {
96 internal: Arc<MessageProcessorInternal>,
97}
98
99impl MessageProcessor {
100 pub fn new() -> (Self, drain::Signal) {
101 let (signal, watch) = drain::channel();
102 let forwarder = Forwarder::new();
103 let forwarder = MessageProcessorInternal {
104 forwarder,
105 drain_channel: watch,
106 };
107
108 (
109 Self {
110 internal: Arc::new(forwarder),
111 },
112 signal,
113 )
114 }
115
116 pub fn with_drain_channel(watch: drain::Watch) -> Self {
117 let forwarder = Forwarder::new();
118 let forwarder = MessageProcessorInternal {
119 forwarder,
120 drain_channel: watch,
121 };
122 Self {
123 internal: Arc::new(forwarder),
124 }
125 }
126
127 fn forwarder(&self) -> &Forwarder<Connection> {
128 &self.internal.forwarder
129 }
130
131 fn get_drain_watch(&self) -> drain::Watch {
132 self.internal.drain_channel.clone()
133 }
134
135 async fn try_to_connect<C>(
136 &self,
137 channel: C,
138 client_config: Option<ClientConfig>,
139 local: Option<SocketAddr>,
140 remote: Option<SocketAddr>,
141 existing_conn_index: Option<u64>,
142 max_retry: u32,
143 ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
144 where
145 C: tonic::client::GrpcService<tonic::body::BoxBody>,
146 C::Error: Into<StdError>,
147 C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
148 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
149 {
150 let mut client: PubSubServiceClient<C> = PubSubServiceClient::new(channel);
151 let mut i = 0;
152 while i < max_retry {
153 let (tx, rx) = mpsc::channel(128);
154 match client
155 .open_channel(Request::new(ReceiverStream::new(rx)))
156 .await
157 {
158 Ok(stream) => {
159 let cancellation_token = CancellationToken::new();
160 let connection = Connection::new(ConnectionType::Remote)
161 .with_local_addr(local)
162 .with_remote_addr(remote)
163 .with_channel(Channel::Client(tx))
164 .with_cancellation_token(Some(cancellation_token.clone()));
165
166 info!(
167 "new connection initiated locally: (remote: {:?} - local: {:?})",
168 connection.remote_addr(),
169 connection.local_addr()
170 );
171
172 let opt = self
174 .forwarder()
175 .on_connection_established(connection, existing_conn_index);
176 if opt.is_none() {
177 error!("error adding connection to the connection table");
178 return Err(DataPathError::ConnectionError(
179 "error adding connection to the connection tables".to_string(),
180 ));
181 }
182
183 let conn_index = opt.unwrap();
184 info!(
185 "new connection index = {:?}, is local {:?}",
186 conn_index, false
187 );
188
189 let ret = self.process_stream(
191 stream.into_inner(),
192 conn_index,
193 client_config,
194 cancellation_token,
195 false,
196 );
197 return Ok((ret, conn_index));
198 }
199 Err(e) => {
200 error!("connection error: {:?}.", e.to_string());
201 }
202 }
203 i += 1;
204
205 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
207 }
208
209 error!("unable to connect to the endpoint");
210 Err(DataPathError::ConnectionError(
211 "reached max connection retries".to_string(),
212 ))
213 }
214
215 pub async fn connect<C>(
216 &self,
217 channel: C,
218 client_config: Option<ClientConfig>,
219 local: Option<SocketAddr>,
220 remote: Option<SocketAddr>,
221 ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
222 where
223 C: tonic::client::GrpcService<tonic::body::BoxBody>,
224 C::Error: Into<StdError>,
225 C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
226 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
227 {
228 self.try_to_connect(channel, client_config, local, remote, None, 10)
229 .await
230 }
231
232 pub fn disconnect(&self, conn: u64) -> Result<(), DataPathError> {
233 match self.forwarder().get_connection(conn) {
234 None => {
235 error!("error handling disconnect: connection unknown");
236 return Err(DataPathError::DisconnectionError(
237 "connection not found".to_string(),
238 ));
239 }
240 Some(c) => {
241 match c.cancellation_token() {
242 None => {
243 error!("error handling disconnect: missing cancellation token");
244 }
245 Some(t) => {
246 t.cancel();
250 }
251 }
252 }
253 }
254
255 Ok(())
256 }
257
258 pub fn register_local_connection(
259 &self,
260 ) -> (
261 tokio::sync::mpsc::Sender<Result<Message, Status>>,
262 tokio::sync::mpsc::Receiver<Result<Message, Status>>,
263 ) {
264 let (tx1, rx1) = mpsc::channel(128);
266
267 info!("establishing new local app connection");
268
269 let (tx2, rx2) = mpsc::channel(128);
271
272 let connection = Connection::new(ConnectionType::Local).with_channel(Channel::Server(tx2));
274
275 let conn_id = self
277 .forwarder()
278 .on_connection_established(connection, None)
279 .unwrap();
280
281 debug!("local connection established with id: {:?}", conn_id);
282 info!(telemetry = true, counter.num_active_connections = 1);
283
284 self.process_stream(
286 ReceiverStream::new(rx1),
287 conn_id,
288 None,
289 CancellationToken::new(),
290 true,
291 );
292
293 (tx1, rx2)
295 }
296
297 pub async fn send_msg(
298 &self,
299 mut msg: Message,
300 out_conn: u64,
301 ) -> Result<(), Box<dyn std::error::Error>> {
302 let connection = self.forwarder().get_connection(out_conn);
303 match connection {
304 Some(conn) => {
305 if conn.is_local_connection() {
306 let span = tracing::span!(
308 tracing::Level::DEBUG,
309 "send_message_to_local",
310 instance_id = %INSTANCE_ID.as_str(),
311 connection_id = out_conn,
312 message_type = message_type_to_str(&msg.message_type),
313 telemetry = true
314 );
315 let _guard = span.enter();
316
317 inject_current_context(&mut msg);
318 } else {
319 let parent_context = extract_parent_context(&msg);
320
321 let span = tracing::span!(
323 tracing::Level::DEBUG,
324 "send_message_to_remote",
325 instance_id = %INSTANCE_ID.as_str(),
326 connection_id = out_conn,
327 message_type = message_type_to_str(&msg.message_type),
328 telemetry = true
329 );
330
331 if let Some(ctx) = parent_context {
332 span.set_parent(ctx);
333 }
334 let _guard = span.enter();
335
336 inject_current_context(&mut msg);
337 }
338
339 match conn.channel() {
340 Channel::Server(s) => s.send(Ok(msg)).await?,
341 Channel::Client(s) => s.send(msg).await?,
342 _ => error!("error reading channel"),
343 }
344 }
345 None => error!("connection {:?} not found", out_conn),
346 }
347 Ok(())
348 }
349
350 async fn match_and_forward_msg(
351 &self,
352 msg: Message,
353 class: AgentClass,
354 in_connection: u64,
355 fanout: u32,
356 agent_id: Option<u64>,
357 ) -> Result<(), DataPathError> {
358 debug!(
359 "match and forward message: class: {:?} - agent_id: {:?} - fanout: {:?}",
360 class, agent_id, fanout,
361 );
362
363 if fanout == 1 {
364 match self
365 .forwarder()
366 .on_publish_msg_match_one(class, agent_id, in_connection)
367 {
368 Ok(out) => match self.send_msg(msg, out).await {
369 Ok(_) => Ok(()),
370 Err(e) => {
371 error!("error sending a message {:?}", e);
372 Err(DataPathError::PublicationError(e.to_string()))
373 }
374 },
375 Err(e) => {
376 error!("error matching a message {:?}", e);
377 Err(DataPathError::PublicationError(e.to_string()))
378 }
379 }
380 } else {
381 match self
382 .forwarder()
383 .on_publish_msg_match_all(class, agent_id, in_connection)
384 {
385 Ok(out_set) => {
386 for out in out_set {
387 match self.send_msg(msg.clone(), out).await {
388 Ok(_) => {}
389 Err(e) => {
390 error!("error sending a message {:?}", e);
391 return Err(DataPathError::PublicationError(e.to_string()));
392 }
393 }
394 }
395 Ok(())
396 }
397 Err(e) => {
398 error!("error sending a message {:?}", e);
399 Err(DataPathError::PublicationError(e.to_string()))
400 }
401 }
402 }
403 }
404
405 async fn process_publish(
406 &self,
407 mut msg: Message,
408 in_connection: u64,
409 ) -> Result<(), DataPathError> {
410 let pubmsg = match &msg.message_type {
411 Some(PublishType(p)) => p,
412 _ => panic!("wrong message type"),
414 };
415
416 match process_name(&pubmsg.name) {
417 Ok(class) => {
418 let fanout = get_fanout(pubmsg);
419 let agent_id = get_agent_id(&pubmsg.name);
420
421 debug!(
422 "received publication from connection {}: {:?}",
423 in_connection, pubmsg
424 );
425
426 add_incoming_connection(&mut msg, in_connection);
428
429 return self
431 .match_and_forward_msg(msg, class, in_connection, fanout, agent_id)
432 .await;
433 }
434 Err(e) => {
435 error!("error processing publication message {:?}", e);
436 Err(DataPathError::PublicationError(e.to_string()))
437 }
438 }
439 }
440
441 fn process_command(&self, msg: &Message) -> Result<(MetadataType, u64), DataPathError> {
442 if !msg.metadata.is_empty() {
443 match msg.metadata.get(&MetadataType::ReceivedFrom.to_string()) {
444 None => {}
445 Some(out_str) => match out_str.parse::<u64>() {
446 Err(e) => {
447 error! {"error parsing the connection in command type ReceivedFrom: {:?}", e};
448 return Err(DataPathError::CommandError(e.to_string()));
449 }
450 Ok(out) => {
451 debug!(%out, "received subscription_from command, register subscription");
452 return Ok((MetadataType::ReceivedFrom, out));
453 }
454 },
455 }
456 match msg.metadata.get(&MetadataType::ForwardTo.to_string()) {
457 None => {}
458 Some(out_str) => match out_str.parse::<u64>() {
459 Err(e) => {
460 error! {"error parsing the connection in command type ForwardTo: {:?}", e};
461 return Err(DataPathError::CommandError(e.to_string()));
462 }
463 Ok(out) => {
464 debug!(%out, "received forward_to command, register subscription and forward");
465 return Ok((MetadataType::ForwardTo, out));
466 }
467 },
468 }
469 }
470 Ok((MetadataType::Unknown, 0))
471 }
472
473 async fn process_unsubscription(
474 &self,
475 mut msg: Message,
476 in_connection: u64,
477 ) -> Result<(), DataPathError> {
478 let unsubmsg = match &msg.message_type {
479 Some(UnsubscribeType(s)) => s,
480 _ => panic!("wrong message type"),
482 };
483
484 match process_name(&unsubmsg.name) {
485 Ok(class) => {
486 let command = self.process_command(&msg);
488 let mut conn = in_connection;
489 let mut forward = false;
490 let mut out_conn = in_connection;
492 match command {
493 Err(e) => {
494 return Err(e);
495 }
496 Ok(tuple) => match tuple.0 {
497 MetadataType::ReceivedFrom => {
498 conn = tuple.1;
499 }
500 MetadataType::ForwardTo => {
501 forward = true;
502 out_conn = tuple.1;
503 }
504 _ => {}
505 },
506 }
507 let connection = self.forwarder().get_connection(in_connection);
508 if connection.is_none() {
509 error!("incoming connection does not exists");
511 return Err(DataPathError::SubscriptionError(
512 "incoming connection does not exists".to_string(),
513 ));
514 }
515 let agent_id = get_agent_id(&unsubmsg.name);
516 match self.forwarder().on_unsubscription_msg(
517 class.clone(),
518 agent_id,
519 conn,
520 connection.unwrap().is_local_connection(),
521 ) {
522 Ok(_) => {}
523 Err(e) => {
524 return Err(DataPathError::UnsubscriptionError(e.to_string()));
525 }
526 }
527 if forward {
528 debug!("forward unsubscription to {:?}", out_conn);
529
530 msg.metadata.remove(&MetadataType::ForwardTo.to_string());
533 let source_class = match process_name(&unsubmsg.source) {
534 Ok(s) => s,
535 Err(e) => {
536 error!("error processing unsubscription source {:?}", e);
537 return Err(DataPathError::UnsubscriptionError(e.to_string()));
538 }
539 };
540 let source_id = get_agent_id(&unsubmsg.source);
541 match self.send_msg(msg, out_conn).await {
542 Ok(_) => {
543 self.forwarder().on_forwarded_unsubscription(
544 source_class,
545 source_id,
546 class,
547 agent_id,
548 out_conn,
549 );
550 }
551 Err(e) => {
552 error!("error sending a message {:?}", e);
553 return Err(DataPathError::UnsubscriptionError(e.to_string()));
554 }
555 };
556 }
557 Ok(())
558 }
559 Err(e) => {
560 error!("error processing unsubscription message {:?}", e);
561 Err(DataPathError::UnsubscriptionError(e.to_string()))
562 }
563 }
564 }
565
566 async fn process_subscription(
567 &self,
568 mut msg: Message,
569 in_connection: u64,
570 ) -> Result<(), DataPathError> {
571 let submsg = match &msg.message_type {
572 Some(SubscribeType(s)) => s,
573 _ => panic!("wrong message type"),
575 };
576
577 debug!(
578 "received subscription from connection {}: {:?}",
579 in_connection, submsg
580 );
581
582 match process_name(&submsg.name) {
583 Ok(class) => {
584 trace!("process command");
586 let command = self.process_command(&msg);
587 let mut conn = in_connection;
588 let mut forward = false;
589
590 let mut out_conn = in_connection;
592 match command {
593 Err(e) => {
594 return Err(e);
595 }
596 Ok(tuple) => match tuple.0 {
597 MetadataType::ReceivedFrom => {
598 conn = tuple.1;
599 trace!("received subscription_from command, register subscription with conn id {:?}", tuple.1);
600 }
601 MetadataType::ForwardTo => {
602 forward = true;
603 out_conn = tuple.1;
604 trace!("received forward_to command, register subscription and forward to conn id {:?}", out_conn);
605 }
606 _ => {}
607 },
608 }
609
610 let connection = self.forwarder().get_connection(conn);
611 if connection.is_none() {
612 error!("incoming connection does not exists");
614 return Err(DataPathError::SubscriptionError(
615 "incoming connection does not exists".to_string(),
616 ));
617 }
618 let agent_id = get_agent_id(&submsg.name);
619 match self.forwarder().on_subscription_msg(
620 class.clone(),
621 agent_id,
622 conn,
623 connection.unwrap().is_local_connection(),
624 ) {
625 Ok(_) => {}
626 Err(e) => {
627 return Err(DataPathError::SubscriptionError(e.to_string()));
628 }
629 }
630
631 if forward {
632 debug!("forward subscription to {:?}", out_conn);
633
634 msg.metadata.remove(&MetadataType::ForwardTo.to_string());
637 let source_class = match process_name(&submsg.source) {
638 Ok(s) => s,
639 Err(e) => {
640 error!("error processing unsubscription source {:?}", e);
641 return Err(DataPathError::SubscriptionError(e.to_string()));
642 }
643 };
644 let source_id = get_agent_id(&submsg.source);
645 match self.send_msg(msg, out_conn).await {
646 Ok(_) => {
647 self.forwarder().on_forwarded_subscription(
648 source_class,
649 source_id,
650 class,
651 agent_id,
652 out_conn,
653 );
654 }
655 Err(e) => {
656 error!("error sending a message {:?}", e);
657 return Err(DataPathError::UnsubscriptionError(e.to_string()));
658 }
659 };
660 }
661 Ok(())
662 }
663 Err(e) => {
664 error!("error processing subscription message {:?}", e);
665 Err(DataPathError::SubscriptionError(e.to_string()))
666 }
667 }
668 }
669
670 pub async fn process_message(
671 &self,
672 msg: Message,
673 in_connection: u64,
674 ) -> Result<(), DataPathError> {
675 match &msg.message_type {
676 None => {
677 error!(
678 "received message without message type from connection {}: {:?}",
679 in_connection, msg
680 );
681 info!(
682 telemetry = true,
683 monotonic_counter.num_messages_by_type = 1,
684 message_type = "none"
685 );
686 Err(DataPathError::UnknownMsgType("".to_string()))
687 }
688 Some(msg_type) => match msg_type {
689 SubscribeType(s) => {
690 debug!(
691 "received subscription from connection {}: {:?}",
692 in_connection, s
693 );
694 info!(
695 telemetry = true,
696 monotonic_counter.num_messages_by_type = 1,
697 message_type = "subscribe"
698 );
699 match self.process_subscription(msg, in_connection).await {
700 Err(e) => {
701 error! {"error processing subscription {:?}", e}
702 Err(e)
703 }
704 Ok(_) => Ok(()),
705 }
706 }
707 UnsubscribeType(u) => {
708 debug!(
709 "Received ubsubscription from client {}: {:?}",
710 in_connection, u
711 );
712 info!(
713 telemetry = true,
714 monotonic_counter.num_messages_by_type = 1,
715 message_type = "unsubscribe"
716 );
717 match self.process_unsubscription(msg, in_connection).await {
718 Err(e) => {
719 error! {"error processing unsubscription {:?}", e}
720 Err(e)
721 }
722 Ok(_) => Ok(()),
723 }
724 }
725 PublishType(p) => {
726 debug!("Received publish from client {}: {:?}", in_connection, p);
727 info!(
728 telemetry = true,
729 monotonic_counter.num_messages_by_type = 1,
730 method = "publish"
731 );
732 match self.process_publish(msg, in_connection).await {
733 Err(e) => {
734 error! {"error processing publication {:?}", e}
735 Err(e)
736 }
737 Ok(_) => Ok(()),
738 }
739 }
740 },
741 }
742 }
743
744 async fn handle_new_message(
745 &self,
746 conn_index: u64,
747 is_local: bool,
748 mut msg: Message,
749 ) -> Result<(), DataPathError> {
750 debug!(%conn_index, "Received message from connection");
751 info!(
752 telemetry = true,
753 monotonic_counter.num_processed_messages = 1
754 );
755
756 if is_local {
757 let span = tracing::span!(
760 tracing::Level::DEBUG,
761 "handle_local_message",
762 instance_id = %INSTANCE_ID.as_str(),
763 connection_id = conn_index,
764 message_type = message_type_to_str(&msg.message_type),
765 telemetry = true
766 );
767 let _guard = span.enter();
768
769 inject_current_context(&mut msg);
770 } else {
771 let parent_context = extract_parent_context(&msg);
774
775 let span = tracing::span!(
776 tracing::Level::DEBUG,
777 "handle_remote_message",
778 instance_id = %INSTANCE_ID.as_str(),
779 connection_id = conn_index,
780 message_type = message_type_to_str(&msg.message_type),
781 telemetry = true
782 );
783
784 if let Some(ctx) = parent_context {
785 span.set_parent(ctx);
786 }
787 let _guard = span.enter();
788
789 inject_current_context(&mut msg);
790 }
791
792 match self.process_message(msg, conn_index).await {
793 Ok(_) => Ok(()),
794 Err(e) => {
795 error!(
797 "error processing message from connection {:?}: {:?}",
798 conn_index, e
799 );
800 info!(
801 telemetry = true,
802 monotonic_counter.num_message_process_errors = 1
803 );
804 Err(DataPathError::ProcessingError(e.to_string()))
805 }
806 }
807 }
808
809 fn process_stream(
810 &self,
811 mut stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
812 conn_index: u64,
813 client_config: Option<ClientConfig>,
814 cancellation_token: CancellationToken,
815 is_local: bool,
816 ) -> tokio::task::JoinHandle<()> {
817 let self_clone = self.clone();
819 let token_clone = cancellation_token.clone();
820 let client_conf_clone = client_config.clone();
821 let handle = tokio::spawn(async move {
822 let mut try_to_reconnect = true;
823 loop {
824 tokio::select! {
825 next = stream.next() => {
826 match next {
827 Some(result) => {
828 match result {
829 Ok(msg) => {
830 let mut msg_source = None;
832 let mut msg_name = None;
833 if is_local {
834 msg_source = get_source(&msg);
835 msg_name = get_name(&msg);
836 }
837 if let Err(e) = self_clone.handle_new_message(conn_index, is_local, msg).await {
838 error!("error processing incoming messages {:?}", e);
839 if is_local {
841 let connection = self_clone.forwarder().get_connection(conn_index);
842 match connection {
843 Some(conn) => {
844 debug!("try to notify local application");
845 if msg_source.is_none() || msg_name.is_none() {
846 debug!("unable to notify the error to the remote end");
847 } else {
848 let dest = msg_name.unwrap();
850 let mut err_message = create_publication(
851 &msg_source.unwrap(),
852 &dest.agent_class,
853 Some(dest.agent_id),
854 HashMap::new(), 1, "",
855 Vec::new());
856
857 err_message.metadata.insert(MetadataType::Error.to_string(), e.to_string());
858 if let Channel::Server(tx) = conn.channel() {
859 if tx.send(Ok(err_message)).await.is_err() {
860 debug!("unable to notify the error to the local app");
861 }
862 }
863 }
864 }
865 None => {
866 error!("connection {:?} not found", conn_index);
867 }
868 }
869 }
870 }
871 }
872 Err(e) => {
873 if let Some(io_err) = MessageProcessor::match_for_io_error(&e) {
874 if io_err.kind() == std::io::ErrorKind::BrokenPipe {
875 info!("connection {:?} closed by peer", conn_index);
876 }
877 } else {
878 error!("error receiving messages {:?}", e);
879 }
880 break;
881 }
882 }
883 }
884 None => {
885 debug!(%conn_index, "end of stream");
886 break;
887 }
888 }
889 }
890 _ = self_clone.get_drain_watch().signaled() => {
891 info!("shutting down stream on drain: {}", conn_index);
892 try_to_reconnect = false;
893 break;
894 }
895 _ = token_clone.cancelled() => {
896 info!("shutting down stream cancellation token: {}", conn_index);
897 try_to_reconnect = false;
898 break;
899 }
900 }
901 }
902
903 let mut delete_connection = true;
904
905 if try_to_reconnect && client_conf_clone.is_some() {
906 let config = client_conf_clone.unwrap();
907 match config.to_channel() {
908 Err(e) => {
909 error!(
910 "cannot parse connection config, unable to reconnect {:?}",
911 e.to_string()
912 );
913 }
914 Ok(channel) => {
915 info!("connection lost with remote endpoint, try to reconnect");
916 let remote_subscriptions = self_clone
921 .forwarder()
922 .get_subscriptions_forwarded_on_connection(conn_index);
923
924 match self_clone
925 .try_to_connect(
926 channel,
927 Some(config),
928 None,
929 None,
930 Some(conn_index),
931 120,
932 )
933 .await
934 {
935 Ok(_) => {
936 info!("connection re-established");
937 delete_connection = false;
939 for r in remote_subscriptions.iter() {
940 let sub_msg = create_subscription(
941 r.source(),
942 &r.name().agent_class,
943 Some(r.name().agent_id),
944 HashMap::new(),
945 );
946 if self_clone.send_msg(sub_msg, conn_index).await.is_err() {
947 error!("error restoring subscription on remote node");
948 }
949 }
950 }
951 Err(e) => {
952 error!("unable to connect to remote node {:?}", e.to_string());
954 }
955 }
956 }
957 }
958 } else {
959 info!("close connection {}", conn_index)
960 }
961
962 if delete_connection {
963 self_clone
964 .forwarder()
965 .on_connection_drop(conn_index, is_local);
966
967 info!(telemetry = true, counter.num_active_connections = -1);
968 }
969 });
970
971 handle
972 }
973
974 fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
975 let mut err: &(dyn std::error::Error + 'static) = err_status;
976
977 loop {
978 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
979 return Some(io_err);
980 }
981
982 if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
985 if let Some(io_err) = h2_err.get_io() {
986 return Some(io_err);
987 }
988 }
989
990 err = err.source()?;
991 }
992 }
993}
994
995#[tonic::async_trait]
996impl PubSubService for MessageProcessor {
997 type OpenChannelStream = Pin<Box<dyn Stream<Item = Result<Message, Status>> + Send + 'static>>;
998
999 async fn open_channel(
1000 &self,
1001 request: Request<tonic::Streaming<Message>>,
1002 ) -> Result<Response<Self::OpenChannelStream>, Status> {
1003 let remote_addr = request.remote_addr();
1004 let local_addr = request.local_addr();
1005
1006 let stream = request.into_inner();
1007 let (tx, rx) = mpsc::channel(128);
1008
1009 let connection = Connection::new(ConnectionType::Remote)
1010 .with_remote_addr(remote_addr)
1011 .with_local_addr(local_addr)
1012 .with_channel(Channel::Server(tx));
1013
1014 info!(
1015 "new connection received from remote: (remote: {:?} - local: {:?})",
1016 connection.remote_addr(),
1017 connection.local_addr()
1018 );
1019 info!(telemetry = true, counter.num_active_connections = 1);
1020
1021 let conn_index = self
1023 .forwarder()
1024 .on_connection_established(connection, None)
1025 .unwrap();
1026
1027 self.process_stream(stream, conn_index, None, CancellationToken::new(), false);
1028
1029 let out_stream = ReceiverStream::new(rx);
1030 Ok(Response::new(
1031 Box::pin(out_stream) as Self::OpenChannelStream
1032 ))
1033 }
1034}