1use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::{pin::Pin, sync::Arc};
7
8use agp_config::grpc::client::ClientConfig;
9use tokio::sync::mpsc;
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_stream::{Stream, StreamExt};
12use tokio_util::sync::CancellationToken;
13use tonic::codegen::{Body, StdError};
14use tonic::{Request, Response, Status};
15use tracing::{debug, error, info, trace};
16
17use crate::connection::{Channel, Connection, Type as ConnectionType};
18use crate::errors::DataPathError;
19use crate::forwarder::Forwarder;
20use crate::messages::utils::{
21 add_incoming_connection, create_subscription, get_agent_id, get_fanout, process_name,
22 CommandType,
23};
24use crate::messages::AgentClass;
25use crate::pubsub::proto::pubsub::v1::message::MessageType::Publish as PublishType;
26use crate::pubsub::proto::pubsub::v1::message::MessageType::Subscribe as SubscribeType;
27use crate::pubsub::proto::pubsub::v1::message::MessageType::Unsubscribe as UnsubscribeType;
28use crate::pubsub::proto::pubsub::v1::pub_sub_service_client::PubSubServiceClient;
29use crate::pubsub::proto::pubsub::v1::{pub_sub_service_server::PubSubService, Message};
30
31#[derive(Debug)]
32struct MessageProcessorInternal {
33 forwarder: Forwarder<Connection>,
34 drain_channel: drain::Watch,
35}
36
37#[derive(Debug, Clone)]
38pub struct MessageProcessor {
39 internal: Arc<MessageProcessorInternal>,
40}
41
42impl MessageProcessor {
43 pub fn new() -> (Self, drain::Signal) {
44 let (signal, watch) = drain::channel();
45 let forwarder = Forwarder::new();
46 let forwarder = MessageProcessorInternal {
47 forwarder,
48 drain_channel: watch,
49 };
50
51 (
52 Self {
53 internal: Arc::new(forwarder),
54 },
55 signal,
56 )
57 }
58
59 pub fn with_drain_channel(watch: drain::Watch) -> Self {
60 let forwarder = Forwarder::new();
61 let forwarder = MessageProcessorInternal {
62 forwarder,
63 drain_channel: watch,
64 };
65 Self {
66 internal: Arc::new(forwarder),
67 }
68 }
69
70 fn forwarder(&self) -> &Forwarder<Connection> {
71 &self.internal.forwarder
72 }
73
74 fn get_drain_watch(&self) -> drain::Watch {
75 self.internal.drain_channel.clone()
76 }
77
78 async fn try_to_connect<C>(
79 &self,
80 channel: C,
81 client_config: Option<ClientConfig>,
82 local: Option<SocketAddr>,
83 remote: Option<SocketAddr>,
84 existing_conn_index: Option<u64>,
85 max_retry: u32,
86 ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
87 where
88 C: tonic::client::GrpcService<tonic::body::BoxBody>,
89 C::Error: Into<StdError>,
90 C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
91 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
92 {
93 let mut client: PubSubServiceClient<C> = PubSubServiceClient::new(channel);
94 let mut i = 0;
95 while i < max_retry {
96 let (tx, rx) = mpsc::channel(128);
97 match client
98 .open_channel(Request::new(ReceiverStream::new(rx)))
99 .await
100 {
101 Ok(stream) => {
102 let cancellation_token = CancellationToken::new();
103 let connection = Connection::new(ConnectionType::Remote)
104 .with_local_addr(local)
105 .with_remote_addr(remote)
106 .with_channel(Channel::Client(tx))
107 .with_cancellation_token(Some(cancellation_token.clone()));
108
109 info!(
110 "new connection initiated locally: (remote: {:?} - local: {:?})",
111 connection.remote_addr(),
112 connection.local_addr()
113 );
114
115 let opt = self
117 .forwarder()
118 .on_connection_established(connection, existing_conn_index);
119 if opt.is_none() {
120 error!("error adding connection to the connection table");
121 return Err(DataPathError::ConnectionError(
122 "error adding connection to the connection tables".to_string(),
123 ));
124 }
125
126 let conn_index = opt.unwrap();
127 info!(
128 "new connection index = {:?}, is local {:?}",
129 conn_index, false
130 );
131
132 let ret = self.process_stream(
134 stream.into_inner(),
135 conn_index,
136 client_config,
137 cancellation_token,
138 false,
139 );
140 return Ok((ret, conn_index));
141 }
142 Err(e) => {
143 error!("connection error: {:?}.", e.to_string());
144 }
145 }
146 i += 1;
147
148 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
150 }
151
152 error!("unable to connect to the endpoint");
153 Err(DataPathError::ConnectionError(
154 "reached max connection retries".to_string(),
155 ))
156 }
157
158 pub async fn connect<C>(
159 &self,
160 channel: C,
161 client_config: Option<ClientConfig>,
162 local: Option<SocketAddr>,
163 remote: Option<SocketAddr>,
164 ) -> Result<(tokio::task::JoinHandle<()>, u64), DataPathError>
165 where
166 C: tonic::client::GrpcService<tonic::body::BoxBody>,
167 C::Error: Into<StdError>,
168 C::ResponseBody: Body<Data = bytes::Bytes> + std::marker::Send + 'static,
169 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
170 {
171 self.try_to_connect(channel, client_config, local, remote, None, 10)
172 .await
173 }
174
175 pub fn disconnect(&self, conn: u64) -> Result<(), DataPathError> {
176 match self.forwarder().get_connection(conn) {
177 None => {
178 error!("error handling disconnect: connection unknown");
179 return Err(DataPathError::DisconnectionError(
180 "connection not found".to_string(),
181 ));
182 }
183 Some(c) => {
184 match c.cancellation_token() {
185 None => {
186 error!("error handling disconnect: missing cancellation token");
187 }
188 Some(t) => {
189 t.cancel();
193 }
194 }
195 }
196 }
197
198 Ok(())
199 }
200
201 pub fn register_local_connection(
202 &self,
203 ) -> (
204 tokio::sync::mpsc::Sender<Result<Message, Status>>,
205 tokio::sync::mpsc::Receiver<Result<Message, Status>>,
206 ) {
207 let (tx1, rx1) = mpsc::channel(128);
209
210 info!("establishing new local app connection");
211
212 let (tx2, rx2) = mpsc::channel(128);
214
215 let connection = Connection::new(ConnectionType::Local).with_channel(Channel::Server(tx2));
217
218 let conn_id = self
220 .forwarder()
221 .on_connection_established(connection, None)
222 .unwrap();
223
224 debug!("local connection established with id: {:?}", conn_id);
225 info!(telemetry = true, counter.num_active_connections = 1);
226
227 self.process_stream(
229 ReceiverStream::new(rx1),
230 conn_id,
231 None,
232 CancellationToken::new(),
233 true,
234 );
235
236 (tx1, rx2)
238 }
239
240 pub async fn send_msg(
241 &self,
242 msg: Message,
243 out_conn: u64,
244 ) -> Result<(), Box<dyn std::error::Error>> {
245 let connection = self.forwarder().get_connection(out_conn);
246 match connection {
247 Some(conn) => match conn.channel() {
248 Channel::Server(s) => s.send(Ok(msg)).await?,
249 Channel::Client(s) => s.send(msg).await?,
250 _ => error!("error reading channel"),
251 },
252 None => error!("connection {:?} not found", out_conn),
253 }
254 Ok(())
255 }
256
257 async fn match_and_forward_msg(
258 &self,
259 msg: Message,
260 class: AgentClass,
261 in_connection: u64,
262 fanout: u32,
263 agent_id: Option<u64>,
264 ) -> Result<(), DataPathError> {
265 debug!(
266 "match and forward message: class: {:?} - agent_id: {:?} - fanout: {:?}",
267 class, agent_id, fanout,
268 );
269
270 if fanout == 1 {
271 match self
272 .forwarder()
273 .on_publish_msg_match_one(class, agent_id, in_connection)
274 {
275 Ok(out) => match self.send_msg(msg, out).await {
276 Ok(_) => Ok(()),
277 Err(e) => {
278 error!("error sending a message {:?}", e);
279 Err(DataPathError::PublicationError(e.to_string()))
280 }
281 },
282 Err(e) => {
283 error!("error matching a message {:?}", e);
284 Err(DataPathError::PublicationError(e.to_string()))
285 }
286 }
287 } else {
288 match self
289 .forwarder()
290 .on_publish_msg_match_all(class, agent_id, in_connection)
291 {
292 Ok(out_set) => {
293 for out in out_set {
294 match self.send_msg(msg.clone(), out).await {
295 Ok(_) => {}
296 Err(e) => {
297 error!("error sending a message {:?}", e);
298 return Err(DataPathError::PublicationError(e.to_string()));
299 }
300 }
301 }
302 Ok(())
303 }
304 Err(e) => {
305 error!("error sending a message {:?}", e);
306 Err(DataPathError::PublicationError(e.to_string()))
307 }
308 }
309 }
310 }
311
312 async fn process_publish(
313 &self,
314 mut msg: Message,
315 in_connection: u64,
316 ) -> Result<(), DataPathError> {
317 let pubmsg = match &msg.message_type {
318 Some(PublishType(p)) => p,
319 _ => panic!("wrong message type"),
321 };
322
323 match process_name(&pubmsg.name) {
324 Ok(class) => {
325 let fanout = get_fanout(pubmsg);
326 let agent_id = get_agent_id(&pubmsg.name);
327
328 debug!(
329 "received publication from connection {}: {:?}",
330 in_connection, pubmsg
331 );
332
333 add_incoming_connection(&mut msg, in_connection);
335
336 return self
338 .match_and_forward_msg(msg, class, in_connection, fanout, agent_id)
339 .await;
340 }
341 Err(e) => {
342 error!("error processing publication message {:?}", e);
343 Err(DataPathError::PublicationError(e.to_string()))
344 }
345 }
346 }
347
348 fn process_command(&self, msg: &Message) -> Result<(CommandType, u64), DataPathError> {
349 if !msg.metadata.is_empty() {
350 match msg.metadata.get(&CommandType::ReceivedFrom.to_string()) {
351 None => {}
352 Some(out_str) => match out_str.parse::<u64>() {
353 Err(e) => {
354 error! {"error parsing the connection in command type ReceivedFrom: {:?}", e};
355 return Err(DataPathError::CommandError(e.to_string()));
356 }
357 Ok(out) => {
358 debug!(%out, "received subscription_from command, register subscription");
359 return Ok((CommandType::ReceivedFrom, out));
360 }
361 },
362 }
363 match msg.metadata.get(&CommandType::ForwardTo.to_string()) {
364 None => {}
365 Some(out_str) => match out_str.parse::<u64>() {
366 Err(e) => {
367 error! {"error parsing the connection in command type ForwardTo: {:?}", e};
368 return Err(DataPathError::CommandError(e.to_string()));
369 }
370 Ok(out) => {
371 debug!(%out, "received forward_to command, register subscription and forward");
372 return Ok((CommandType::ForwardTo, out));
373 }
374 },
375 }
376 }
377 Ok((CommandType::Unknown, 0))
378 }
379
380 async fn process_unsubscription(
381 &self,
382 mut msg: Message,
383 in_connection: u64,
384 ) -> Result<(), DataPathError> {
385 let unsubmsg = match &msg.message_type {
386 Some(UnsubscribeType(s)) => s,
387 _ => panic!("wrong message type"),
389 };
390
391 match process_name(&unsubmsg.name) {
392 Ok(class) => {
393 let command = self.process_command(&msg);
395 let mut conn = in_connection;
396 let mut forward = false;
397 let mut out_conn = in_connection;
399 match command {
400 Err(e) => {
401 return Err(e);
402 }
403 Ok(tuple) => match tuple.0 {
404 CommandType::ReceivedFrom => {
405 conn = tuple.1;
406 }
407 CommandType::ForwardTo => {
408 forward = true;
409 out_conn = tuple.1;
410 }
411 _ => {}
412 },
413 }
414 let connection = self.forwarder().get_connection(in_connection);
415 if connection.is_none() {
416 error!("incoming connection does not exists");
418 return Err(DataPathError::SubscriptionError(
419 "incoming connection does not exists".to_string(),
420 ));
421 }
422 let agent_id = get_agent_id(&unsubmsg.name);
423 match self.forwarder().on_unsubscription_msg(
424 class.clone(),
425 agent_id,
426 conn,
427 connection.unwrap().is_local_connection(),
428 ) {
429 Ok(_) => {}
430 Err(e) => {
431 return Err(DataPathError::UnsubscriptionError(e.to_string()));
432 }
433 }
434 if forward {
435 debug!("forward unsubscription to {:?}", out_conn);
436 msg.metadata.clear();
437 let source_class = match process_name(&unsubmsg.source) {
438 Ok(s) => s,
439 Err(e) => {
440 error!("error processing unsubscription source {:?}", e);
441 return Err(DataPathError::UnsubscriptionError(e.to_string()));
442 }
443 };
444 let source_id = get_agent_id(&unsubmsg.source);
445 match self.send_msg(msg, out_conn).await {
446 Ok(_) => {
447 self.forwarder().on_forwarded_unsubscription(
448 source_class,
449 source_id,
450 class,
451 agent_id,
452 out_conn,
453 );
454 }
455 Err(e) => {
456 error!("error sending a message {:?}", e);
457 return Err(DataPathError::UnsubscriptionError(e.to_string()));
458 }
459 };
460 }
461 Ok(())
462 }
463 Err(e) => {
464 error!("error processing unsubscription message {:?}", e);
465 Err(DataPathError::UnsubscriptionError(e.to_string()))
466 }
467 }
468 }
469
470 async fn process_subscription(
471 &self,
472 mut msg: Message,
473 in_connection: u64,
474 ) -> Result<(), DataPathError> {
475 let submsg = match &msg.message_type {
476 Some(SubscribeType(s)) => s,
477 _ => panic!("wrong message type"),
479 };
480
481 debug!(
482 "received subscription from connection {}: {:?}",
483 in_connection, submsg
484 );
485
486 match process_name(&submsg.name) {
487 Ok(class) => {
488 trace!("process command");
490 let command = self.process_command(&msg);
491 let mut conn = in_connection;
492 let mut forward = false;
493
494 let mut out_conn = in_connection;
496 match command {
497 Err(e) => {
498 return Err(e);
499 }
500 Ok(tuple) => match tuple.0 {
501 CommandType::ReceivedFrom => {
502 conn = tuple.1;
503 trace!("received subscription_from command, register subscription with conn id {:?}", tuple.1);
504 }
505 CommandType::ForwardTo => {
506 forward = true;
507 out_conn = tuple.1;
508 trace!("received forward_to command, register subscription and forward to conn id {:?}", out_conn);
509 }
510 _ => {}
511 },
512 }
513
514 let connection = self.forwarder().get_connection(conn);
515 if connection.is_none() {
516 error!("incoming connection does not exists");
518 return Err(DataPathError::SubscriptionError(
519 "incoming connection does not exists".to_string(),
520 ));
521 }
522 let agent_id = get_agent_id(&submsg.name);
523 match self.forwarder().on_subscription_msg(
524 class.clone(),
525 agent_id,
526 conn,
527 connection.unwrap().is_local_connection(),
528 ) {
529 Ok(_) => {}
530 Err(e) => {
531 return Err(DataPathError::SubscriptionError(e.to_string()));
532 }
533 }
534
535 if forward {
536 debug!("forward subscription to {:?}", out_conn);
537 msg.metadata.clear();
538 let source_class = match process_name(&submsg.source) {
539 Ok(s) => s,
540 Err(e) => {
541 error!("error processing unsubscription source {:?}", e);
542 return Err(DataPathError::SubscriptionError(e.to_string()));
543 }
544 };
545 let source_id = get_agent_id(&submsg.source);
546 match self.send_msg(msg, out_conn).await {
547 Ok(_) => {
548 self.forwarder().on_forwarded_subscription(
549 source_class,
550 source_id,
551 class,
552 agent_id,
553 out_conn,
554 );
555 }
556 Err(e) => {
557 error!("error sending a message {:?}", e);
558 return Err(DataPathError::UnsubscriptionError(e.to_string()));
559 }
560 };
561 }
562 Ok(())
563 }
564 Err(e) => {
565 error!("error processing subscription message {:?}", e);
566 Err(DataPathError::SubscriptionError(e.to_string()))
567 }
568 }
569 }
570
571 pub async fn process_message(
572 &self,
573 msg: Message,
574 in_connection: u64,
575 ) -> Result<(), DataPathError> {
576 match &msg.message_type {
577 None => {
578 error!(
579 "received message without message type from connection {}: {:?}",
580 in_connection, msg
581 );
582 info!(
583 telemetry = true,
584 monotonic_counter.num_messages_by_type = 1,
585 message_type = "none"
586 );
587 Err(DataPathError::UnknownMsgType("".to_string()))
588 }
589 Some(msg_type) => match msg_type {
590 SubscribeType(s) => {
591 debug!(
592 "received subscription from connection {}: {:?}",
593 in_connection, s
594 );
595 info!(
596 telemetry = true,
597 monotonic_counter.num_messages_by_type = 1,
598 message_type = "subscribe"
599 );
600 match self.process_subscription(msg, in_connection).await {
601 Err(e) => {
602 error! {"error processing subscription {:?}", e}
603 Err(e)
604 }
605 Ok(_) => Ok(()),
606 }
607 }
608 UnsubscribeType(u) => {
609 debug!(
610 "Received ubsubscription from client {}: {:?}",
611 in_connection, u
612 );
613 info!(
614 telemetry = true,
615 monotonic_counter.num_messages_by_type = 1,
616 message_type = "unsubscribe"
617 );
618 match self.process_unsubscription(msg, in_connection).await {
619 Err(e) => {
620 error! {"error processing unsubscription {:?}", e}
621 Err(e)
622 }
623 Ok(_) => Ok(()),
624 }
625 }
626 PublishType(p) => {
627 debug!("Received publish from client {}: {:?}", in_connection, p);
628 info!(
629 telemetry = true,
630 monotonic_counter.num_messages_by_type = 1,
631 method = "publish"
632 );
633 match self.process_publish(msg, in_connection).await {
634 Err(e) => {
635 error! {"error processing publication {:?}", e}
636 Err(e)
637 }
638 Ok(_) => Ok(()),
639 }
640 }
641 },
642 }
643 }
644
645 async fn handle_new_message(
646 &self,
647 conn_index: u64,
648 result: Result<Message, Status>,
649 ) -> Result<(), DataPathError> {
650 debug!(%conn_index, "Received message from connection");
651 info!(
652 telemetry = true,
653 monotonic_counter.num_processed_messages = 1
654 );
655
656 match result {
657 Ok(msg) => {
658 match self.process_message(msg, conn_index).await {
659 Ok(_) => Ok(()),
660 Err(e) => {
661 error!(
663 "error processing message from connection {:?}: {:?}",
664 conn_index, e
665 );
666 info!(
667 telemetry = true,
668 monotonic_counter.num_message_process_errors = 1
669 );
670 Ok(())
671 }
672 }
673 }
674 Err(e) => {
675 if let Some(io_err) = MessageProcessor::match_for_io_error(&e) {
676 if io_err.kind() == std::io::ErrorKind::BrokenPipe {
677 info!("Connection {:?} closed by peer", conn_index);
678 return Err(DataPathError::StreamError(e.to_string()));
679 }
680 }
681 error!("error receiving messages {:?}", e);
682 let connection = self.forwarder().get_connection(conn_index);
683 match connection {
684 Some(conn) => {
685 match conn.channel() {
686 Channel::Server(tx) => tx
687 .send(Err(e))
688 .await
689 .map_err(|e| DataPathError::MessageSendError(e.to_string())),
690 _ => Err(DataPathError::WrongChannelType), }
692 }
693 None => {
694 error!("connection {:?} not found", conn_index);
695 Err(DataPathError::ConnectionNotFound(conn_index.to_string()))
696 }
697 }
698 }
699 }
700 }
701
702 #[tracing::instrument(fields(telemetry = true), skip(stream))]
703 fn process_stream(
704 &self,
705 mut stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
706 conn_index: u64,
707 client_config: Option<ClientConfig>,
708 cancellation_token: CancellationToken,
709 is_local: bool,
710 ) -> tokio::task::JoinHandle<()> {
711 let self_clone = self.clone();
713 let token_clone = cancellation_token.clone();
714 let client_conf_clone = client_config.clone();
715 let handle = tokio::spawn(async move {
716 let mut try_to_reconnect = true;
717 loop {
718 tokio::select! {
719 res = stream.next() => {
720 match res {
721 Some(msg) => {
722 if let Err(e) = self_clone.handle_new_message(conn_index, msg).await {
723 error!("error handling stream {:?}", e);
724 break;
725 }
726 }
727 None => {
728 info!(%conn_index, "end of stream");
729 break;
730 }
731 }
732 }
733 _ = self_clone.get_drain_watch().signaled() => {
734 info!("shutting down stream on drain: {}", conn_index);
735 try_to_reconnect = false;
736 break;
737 }
738 _ = token_clone.cancelled() => {
739 info!("shutting down stream cancellation token: {}", conn_index);
740 try_to_reconnect = false;
741 break;
742 }
743 }
744 }
745
746 let mut delete_connection = true;
747
748 if try_to_reconnect && client_conf_clone.is_some() {
749 let config = client_conf_clone.unwrap();
750 match config.to_channel() {
751 Err(e) => {
752 error!(
753 "cannot parse connection config, unable to reconnect {:?}",
754 e.to_string()
755 );
756 }
757 Ok(channel) => {
758 info!("connection lost with remote endpoint, try to reconnect");
759 let remote_subscriptions = self_clone
764 .forwarder()
765 .get_subscriptions_forwarded_on_connection(conn_index);
766
767 match self_clone
768 .try_to_connect(
769 channel,
770 Some(config),
771 None,
772 None,
773 Some(conn_index),
774 120,
775 )
776 .await
777 {
778 Ok(_) => {
779 info!("connection re-established");
780 delete_connection = false;
782 for r in remote_subscriptions.iter() {
783 let sub_msg = create_subscription(
786 r.source(),
787 &r.name().agent_class,
788 Some(r.name().agent_id),
789 HashMap::new(),
790 );
791 if self_clone.send_msg(sub_msg, conn_index).await.is_err() {
792 error!("error restoring subscription on remote node");
793 }
794 }
795 }
796 Err(e) => {
797 error!("unable to connect to remote node {:?}", e.to_string());
799 }
800 }
801 }
802 }
803 } else {
804 info!(
805 "connection lost on connection {}, do not try to reconnect",
806 conn_index
807 )
808 }
809
810 if delete_connection {
811 self_clone
812 .forwarder()
813 .on_connection_drop(conn_index, is_local);
814
815 info!(telemetry = true, counter.num_active_connections = -1);
816 }
817 });
818
819 handle
820 }
821
822 fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
823 let mut err: &(dyn std::error::Error + 'static) = err_status;
824
825 loop {
826 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
827 return Some(io_err);
828 }
829
830 if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
833 if let Some(io_err) = h2_err.get_io() {
834 return Some(io_err);
835 }
836 }
837
838 err = err.source()?;
839 }
840 }
841}
842
843#[tonic::async_trait]
844impl PubSubService for MessageProcessor {
845 type OpenChannelStream = Pin<Box<dyn Stream<Item = Result<Message, Status>> + Send + 'static>>;
846
847 #[tracing::instrument(fields(telemetry = true))]
848 async fn open_channel(
849 &self,
850 request: Request<tonic::Streaming<Message>>,
851 ) -> Result<Response<Self::OpenChannelStream>, Status> {
852 let remote_addr = request.remote_addr();
853 let local_addr = request.local_addr();
854
855 let stream = request.into_inner();
856 let (tx, rx) = mpsc::channel(128);
857
858 let connection = Connection::new(ConnectionType::Remote)
859 .with_remote_addr(remote_addr)
860 .with_local_addr(local_addr)
861 .with_channel(Channel::Server(tx));
862
863 info!(
864 "new connection received from remote: (remote: {:?} - local: {:?})",
865 connection.remote_addr(),
866 connection.local_addr()
867 );
868 info!(telemetry = true, counter.num_active_connections = 1);
869
870 let conn_index = self
872 .forwarder()
873 .on_connection_established(connection, None)
874 .unwrap();
875
876 self.process_stream(stream, conn_index, None, CancellationToken::new(), false);
877
878 let out_stream = ReceiverStream::new(rx);
879 Ok(Response::new(
880 Box::pin(out_stream) as Self::OpenChannelStream
881 ))
882 }
883}