1use std::collections::{HashMap, HashSet};
5use std::net::SocketAddr;
6use std::{pin::Pin, sync::Arc};
7
8use crate::api::DataPlaneServiceServer;
9use display_error_chain::ErrorChainExt;
10use parking_lot::RwLock;
11use slim_config::component::configuration::Configuration;
12use slim_config::grpc::client::ClientConfig;
13use slim_config::grpc::server::ServerConfig;
14use tokio::sync::mpsc::{self, Sender};
15use tokio::task::JoinHandle;
16use tokio_stream::wrappers::ReceiverStream;
17use tokio_stream::{Stream, StreamExt};
18use tokio_util::sync::CancellationToken;
19
20use tonic::{Request, Response, Status};
21use tracing::{Instrument, debug, error, info};
22
23#[cfg(feature = "otel_tracing")]
24use crate::otel_tracing;
25
26use crate::api::ProtoMessage;
27use crate::api::ProtoPublishType as PublishType;
28use crate::api::ProtoSubscribeType as SubscribeType;
29use crate::api::ProtoSubscriptionAckType as SubscriptionAckType;
30use crate::api::ProtoUnsubscribeType as UnsubscribeType;
31use crate::api::proto::dataplane::v1::Message;
32use crate::api::{
33 LinkNegotiationPayload, ProtoLink, ProtoLinkMessageType as LinkType, ProtoLinkType,
34};
35use semver;
36
37use crate::api::proto::dataplane::v1::data_plane_service_client::DataPlaneServiceClient;
38use crate::api::proto::dataplane::v1::data_plane_service_server::DataPlaneService;
39use crate::connection::{Channel, Connection, Type as ConnectionType};
40use crate::errors::{DataPathError, MessageContext};
41use crate::forwarder::Forwarder;
42use crate::messages::Name;
43use crate::messages::utils::SlimHeaderFlags;
44use crate::recovery::RecoveryTable;
45use crate::tables::connection_table::ConnectionTable;
46use crate::tables::remote_subscription_table::SubscriptionInfo;
47use crate::tables::subscription_table::SubscriptionTableImpl;
48
49fn local_version() -> &'static str {
50 slim_version::version()
51}
52
53#[derive(Debug)]
54struct MessageProcessorInternal {
55 forwarder: Forwarder<Connection>,
57
58 drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
60
61 drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
63
64 tx_control_plane: RwLock<Option<Sender<Result<Message, Status>>>>,
66
67 recovery_table: RecoveryTable,
69
70 sub_ack_manager: crate::subscription_ack::RemoteSubAckManager,
72
73 service_id: String,
75}
76
77#[derive(Debug, Clone)]
78pub struct MessageProcessor {
79 internal: Arc<MessageProcessorInternal>,
80}
81
82impl Default for MessageProcessor {
83 fn default() -> Self {
84 Self::new_with_service_id(String::new())
85 }
86}
87
88impl MessageProcessor {
89 pub fn new_with_service_id(service_id: String) -> Self {
90 Self::new_with_options(service_id, None)
91 }
92
93 pub fn new_with_options(service_id: String, recovery_ttl: Option<std::time::Duration>) -> Self {
94 let (signal, watch) = drain::channel();
95 let recovery_table = match recovery_ttl {
96 Some(ttl) => RecoveryTable::new(ttl),
97 None => RecoveryTable::default(),
98 };
99 let internal = MessageProcessorInternal {
100 forwarder: Forwarder::new(),
101 drain_signal: RwLock::new(Some(signal)),
102 drain_watch: RwLock::new(Some(watch)),
103 tx_control_plane: RwLock::new(None),
104 recovery_table,
105 sub_ack_manager: crate::subscription_ack::RemoteSubAckManager::new(),
106 service_id,
107 };
108 Self {
109 internal: Arc::new(internal),
110 }
111 }
112
113 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub async fn run_server(
120 &self,
121 config: &ServerConfig,
122 ) -> Result<CancellationToken, DataPathError> {
123 debug!(%config, "starting dataplane server");
124 let watch = self.get_drain_watch()?;
125 let svc = Arc::new(self.clone());
127 let res = config
128 .run_server(&[DataPlaneServiceServer::from_arc(svc)], watch)
129 .await?;
130
131 Ok(res)
132 }
133
134 pub async fn shutdown(&self) -> Result<(), DataPathError> {
135 let signal = self
137 .internal
138 .drain_signal
139 .write()
140 .take()
141 .ok_or(DataPathError::AlreadyClosedError)?;
142
143 self.internal.drain_watch.write().take();
145
146 signal.drain().await;
148
149 Ok(())
150 }
151
152 fn set_tx_control_plane(&self, tx: Sender<Result<Message, Status>>) {
153 let mut tx_guard = self.internal.tx_control_plane.write();
154 *tx_guard = Some(tx);
155 }
156
157 fn get_tx_control_plane(&self) -> Option<Sender<Result<Message, Status>>> {
158 let tx_guard = self.internal.tx_control_plane.read();
159 tx_guard.clone()
160 }
161
162 fn forwarder(&self) -> &Forwarder<Connection> {
163 &self.internal.forwarder
164 }
165
166 pub(crate) fn remove_sub_ack(&self, subscription_id: u64) {
167 self.internal.sub_ack_manager.remove(subscription_id);
168 }
169
170 fn get_drain_watch(&self) -> Result<drain::Watch, DataPathError> {
171 self.internal
172 .drain_watch
173 .read()
174 .clone()
175 .ok_or(DataPathError::AlreadyClosedError)
176 }
177
178 async fn restore_remote_subscriptions(
188 &self,
189 remote_subs: &HashSet<SubscriptionInfo>,
190 conn_index: u64,
191 restore_tracking: bool,
192 ) {
193 for r in remote_subs {
194 let sub_msg = Message::builder()
195 .source(r.source().clone())
196 .destination(r.name().clone())
197 .identity(r.source_identity())
198 .build_subscribe()
199 .unwrap();
200 if let Err(e) = self.send_msg(sub_msg, conn_index).await {
201 error!(
202 error = %e.chain(), %conn_index,
203 "error restoring subscription on remote node",
204 );
205 } else if restore_tracking {
206 self.forwarder().on_forwarded_subscription(
207 r.source().clone(),
208 r.name().clone(),
209 r.source_identity().clone(),
210 conn_index,
211 true,
212 r.subscription_id(),
213 );
214 }
215 }
216 }
217
218 async fn try_to_connect(
219 &self,
220 client_config: ClientConfig,
221 local: Option<SocketAddr>,
222 remote: Option<SocketAddr>,
223 existing_conn_index: Option<u64>,
224 ) -> Result<(JoinHandle<()>, u64), DataPathError> {
225 client_config.validate()?;
226 let mut watch = std::pin::pin!(self.get_drain_watch()?.signaled());
227
228 let channel = tokio::select! {
229 _ = &mut watch => {
230 return Err(DataPathError::ShuttingDownError);
231 }
232 res = client_config.to_channel() => {
233 res?
234 }
235 };
236
237 let mut client = DataPlaneServiceClient::new(channel);
238 let (tx, rx) = mpsc::channel(128);
239
240 let stream = client
241 .open_channel(Request::new(ReceiverStream::new(rx)))
242 .await?;
243
244 let link_id = client_config.link_id.clone();
248
249 let cancellation_token = CancellationToken::new();
250 let connection = Connection::new(ConnectionType::Remote, Channel::Client(tx))
251 .with_local_addr(local)
252 .with_remote_addr(remote)
253 .with_config_data(Some(client_config.clone()))
254 .with_cancellation_token(Some(cancellation_token.clone()))
255 .with_link_id(link_id.clone());
256
257 debug!(
258 remote = ?connection.remote_addr(),
259 local = ?connection.local_addr(),
260 "new connection initiated locally",
261 );
262
263 let conn_index = self
265 .forwarder()
266 .on_connection_established(connection, existing_conn_index)
267 .ok_or(DataPathError::ConnectionTableAddError)?;
268
269 debug!(
270 %conn_index,
271 is_local = false,
272 "new connection index",
273 );
274
275 let handle = self.process_stream(
277 stream.into_inner(),
278 conn_index,
279 Some(client_config.clone()),
280 cancellation_token,
281 false,
282 false,
283 )?;
284
285 let negotiation_msg =
288 ProtoMessage::builder().build_link_negotiation(&link_id, local_version(), false);
289 if let Err(e) = self.send_msg(negotiation_msg, conn_index).await {
290 debug!(
291 %conn_index,
292 error = %e.chain(),
293 "failed to send link negotiation (remote may be an older SLIM instance)",
294 );
295 }
296
297 Ok((handle, conn_index))
298 }
299
300 pub async fn connect(
301 &self,
302 client_config: ClientConfig,
303 local: Option<SocketAddr>,
304 remote: Option<SocketAddr>,
305 ) -> Result<(JoinHandle<()>, u64), DataPathError> {
306 self.try_to_connect(client_config, local, remote, None)
307 .await
308 }
309
310 pub fn disconnect(&self, conn: u64) -> Result<ClientConfig, DataPathError> {
311 let connection = match self.forwarder().get_connection(conn) {
312 Some(c) => c,
313 None => {
314 error!(%conn, "error handling disconnect: connection unknown");
315 return Err(DataPathError::DisconnectionError(conn));
316 }
317 };
318
319 let token = match connection.cancellation_token() {
320 Some(t) => t,
321 None => {
322 error!(%conn, "error handling disconnect: missing cancellation token");
323 return Err(DataPathError::DisconnectionError(conn));
324 }
325 };
326
327 token.cancel();
329
330 connection
331 .config_data()
332 .cloned()
333 .ok_or(DataPathError::DisconnectionError(conn))
334 }
335
336 #[tracing::instrument(skip_all, fields(service_id = %self.internal.service_id))]
337 pub fn register_local_connection(
338 &self,
339 from_control_plane: bool,
340 ) -> Result<
341 (
342 u64,
343 tokio::sync::mpsc::Sender<Result<Message, Status>>,
344 tokio::sync::mpsc::Receiver<Result<Message, Status>>,
345 ),
346 DataPathError,
347 > {
348 let (tx1, rx1) = mpsc::channel(512);
350
351 debug!("establishing new local app connection");
352
353 let (tx2, rx2) = mpsc::channel(512);
355
356 if from_control_plane && self.get_tx_control_plane().is_none() {
359 self.set_tx_control_plane(tx2.clone());
360 }
361
362 let cancellation_token = CancellationToken::new();
364 let connection = Connection::new(ConnectionType::Local, Channel::Server(tx2))
365 .with_cancellation_token(Some(cancellation_token.clone()));
366
367 let conn_id = self
369 .forwarder()
370 .on_connection_established(connection, None)
371 .unwrap();
372
373 debug!(%conn_id, "local connection established");
374 info!(telemetry = true, counter.num_active_connections = 1);
375
376 self.process_stream(
378 ReceiverStream::new(rx1),
379 conn_id,
380 None,
381 cancellation_token,
382 true,
383 from_control_plane,
384 )?;
385
386 Ok((conn_id, tx1, rx2))
388 }
389
390 pub async fn send_msg(
391 &self,
392 #[cfg(feature = "otel_tracing")] mut msg: Message,
393 #[cfg(not(feature = "otel_tracing"))] msg: Message,
394 out_conn: u64,
395 ) -> Result<(), DataPathError> {
396 #[cfg(feature = "otel_tracing")]
397 otel_tracing::prepare_outbound_msg(
398 &mut msg,
399 "send_message",
400 &self.internal.service_id,
401 otel_tracing::SpanTarget::Connection(out_conn),
402 );
403 self.send_msg_raw(msg, out_conn).await
404 }
405
406 async fn send_msg_raw(&self, mut msg: Message, out_conn: u64) -> Result<(), DataPathError> {
407 let connection = self.forwarder().get_connection(out_conn);
408 match connection {
409 Some(conn) => {
410 msg.clear_slim_header();
411 match conn.channel() {
412 Channel::Server(s) => {
413 s.send(Ok(msg))
414 .await
415 .map_err(|e| DataPathError::MessageProcessingError {
416 source: Box::new(DataPathError::ConnectionNotFound(out_conn)),
417 msg: Box::new(e.0.unwrap_or_default()),
418 })
419 }
420 Channel::Client(s) => {
421 s.send(msg)
422 .await
423 .map_err(|e| DataPathError::MessageProcessingError {
424 source: Box::new(DataPathError::ConnectionNotFound(out_conn)),
425 msg: Box::new(e.0),
426 })
427 }
428 }
429 }
430 None => Err(DataPathError::ConnectionNotFound(out_conn)),
431 }
432 }
433
434 async fn match_and_forward_msg(
435 &self,
436 #[cfg(feature = "otel_tracing")] mut msg: Message,
437 #[cfg(not(feature = "otel_tracing"))] msg: Message,
438 name: Name,
439 in_connection: u64,
440 fanout: u32,
441 ) -> Result<(), DataPathError> {
442 debug!(
443 %name,
444 %fanout,
445 "match and forward message"
446 );
447
448 if let Some(val) = msg.get_forward_to() {
451 debug!(conn = %val, "forwarding message to connection");
452 return self.send_msg(msg, val).await;
453 }
454
455 match self
456 .forwarder()
457 .on_publish_msg_match(name, in_connection, fanout)
458 {
459 Ok(out_vec) => {
460 let len = out_vec.len();
461 if len == 1 {
463 return self.send_msg(msg, out_vec[0]).await;
464 }
465
466 #[cfg(feature = "otel_tracing")]
467 otel_tracing::prepare_fanout_msg(
468 &mut msg,
469 "send_message",
470 &self.internal.service_id,
471 len as u32,
472 );
473
474 let mut i = 0usize;
475 while i < len - 1 {
476 self.send_msg_raw(msg.clone(), out_vec[i]).await?;
477 i += 1;
478 }
479 self.send_msg_raw(msg, out_vec[i]).await?;
480 Ok(())
481 }
482 Err(e) => Err(DataPathError::MessageProcessingError {
483 source: Box::new(e),
484 msg: Box::new(msg),
485 }),
486 }
487 }
488
489 async fn handle_link_message(
494 &self,
495 link: ProtoLink,
496 conn_index: u64,
497 is_local: bool,
498 ) -> Result<(), DataPathError> {
499 if is_local {
500 debug!(%conn_index, "ignoring link message received on local connection");
501 return Ok(());
502 }
503 match link.link_type {
504 Some(ProtoLinkType::LinkNegotiation(payload)) => {
505 self.handle_link_negotiation(&payload, conn_index).await
506 }
507 None => {
508 debug!(%conn_index, "received link message with unset link_type");
509 Ok(())
510 }
511 }
512 }
513
514 async fn handle_link_negotiation(
524 &self,
525 payload: &LinkNegotiationPayload,
526 in_connection: u64,
527 ) -> Result<(), DataPathError> {
528 let link_id = &payload.link_id;
529 let remote_version = &payload.slim_version;
530
531 debug!(
532 %in_connection,
533 %link_id,
534 %remote_version,
535 is_reply = payload.is_reply,
536 "received link negotiation",
537 );
538
539 let Some(conn) = self.forwarder().get_connection(in_connection) else {
540 debug!(%in_connection, "ignoring link negotiation request received on unknown connection");
541 return Ok(());
542 };
543
544 match (conn.is_outgoing(), payload.is_reply) {
546 (true, false) => {
547 debug!(%in_connection, "ignoring link negotiation request received on outgoing connection");
548 return Ok(());
549 }
550 (false, true) => {
551 debug!(%in_connection, "ignoring link negotiation reply received on incoming connection");
552 return Ok(());
553 }
554 _ => {}
555 }
556
557 let version = match semver::Version::parse(remote_version) {
559 Ok(v) => v,
560 Err(e) => {
561 debug!(%in_connection, %remote_version, error = %e, "ignoring link negotiation with unparsable remote SLIM version");
562 return Ok(());
563 }
564 };
565
566 if payload.is_reply {
567 if !conn.complete_negotiation_as_client(link_id, version) {
570 debug!(%in_connection, %link_id, "ignoring link negotiation reply");
571 }
572 } else {
573 if !conn.complete_negotiation_as_server(link_id, version) {
576 debug!(%in_connection, %link_id, "ignoring link negotiation request");
577 return Ok(());
578 }
579
580 if let Some(entry) = self.internal.recovery_table.take(link_id) {
583 info!(%in_connection, %link_id, "recovering routes for reconnected peer");
584
585 for (name, sub_ids) in &entry.local_subs {
589 for &subscription_id in sub_ids {
590 if let Err(e) = self.forwarder().on_subscription_msg(
591 name.clone(),
592 in_connection,
593 false,
594 true,
595 subscription_id,
596 ) {
597 error!(
598 error = %e.chain(), %in_connection,
599 "error re-adding local subscription during recovery",
600 );
601 }
602 }
603 }
604
605 self.restore_remote_subscriptions(&entry.remote_subs, in_connection, true)
609 .await;
610 }
611
612 let reply =
614 ProtoMessage::builder().build_link_negotiation(link_id, local_version(), true);
615 if let Err(e) = self.send_msg(reply, in_connection).await {
616 debug!(
617 %in_connection,
618 error = %e.chain(),
619 "failed to send link negotiation reply",
620 );
621 }
622 }
623
624 Ok(())
625 }
626
627 async fn process_publish(&self, msg: Message, in_connection: u64) -> Result<(), DataPathError> {
628 debug!(
629 %in_connection,
630 ?msg,
631 "received publication"
632 );
633
634 info!(
636 telemetry = true,
637 monotonic_counter.num_messages_by_type = 1,
638 method = "publish"
639 );
640 let header = msg.get_slim_header();
644
645 let dst = header.get_dst();
646
647 let fanout = msg.get_fanout();
650
651 self.match_and_forward_msg(msg, dst, in_connection, fanout)
652 .await
653 }
654
655 pub(crate) async fn send_subscription_ack(
656 &self,
657 in_connection: u64,
658 subscription_id: u64,
659 result: &Result<(), DataPathError>,
660 ) {
661 let (success, error_msg) = match result {
662 Ok(()) => (true, String::new()),
663 Err(e) => (false, e.to_string()),
664 };
665
666 let ack_msg =
667 Message::builder().build_subscription_ack(subscription_id, success, error_msg);
668
669 if let Err(e) = self.send_msg(ack_msg, in_connection).await {
670 error!(error = %e.chain(), "failed to send subscription ack");
671 }
672 }
673
674 async fn process_subscription_update_and_forward(
675 &self,
676 msg: Message,
677 conn: u64,
678 forward: Option<u64>,
679 add: bool,
680 subscription_id: u64,
681 ) -> Result<(), DataPathError> {
682 let dst = msg.get_dst();
683
684 let connection = if let Some(c) = self.forwarder().get_connection(conn) {
686 c
687 } else {
688 return Err(DataPathError::MessageProcessingError {
689 source: Box::new(DataPathError::ConnectionNotFound(conn)),
690 msg: Box::new(msg),
691 });
692 };
693
694 debug!(
695 %conn,
696 %dst,
697 is_local = connection.is_local_connection(),
698 "processing {}subscription",
699 if add { "" } else { "un" }
700 );
701
702 self.forwarder().on_subscription_msg(
703 dst.clone(),
704 conn,
705 connection.is_local_connection(),
706 add,
707 subscription_id,
708 )?;
709
710 match forward {
711 None => Ok(()),
712 Some(out_conn) => {
713 debug!(
714 %out_conn,
715 "forwarding {}subscription to connection",
716 if add { "" } else { "un" }
717 );
718
719 let source = msg.get_source();
720 let identity = msg.get_identity();
721
722 self.send_msg(msg, out_conn).await.map(|_| {
723 self.forwarder().on_forwarded_subscription(
724 source,
725 dst,
726 identity,
727 out_conn,
728 add,
729 subscription_id,
730 );
731 })
732 }
733 }
734 }
735
736 async fn process_subscription(
740 &self,
741 msg: Message,
742 in_connection: u64,
743 add: bool,
744 ) -> Result<(), DataPathError> {
745 debug!(
746 %in_connection,
747 ?msg,
748 "received {}subscription",
749 if add { "" } else { "un" }
750 );
751
752 info!(
754 telemetry = true,
755 monotonic_counter.num_messages_by_type = 1,
756 message_type = { if add { "subscribe" } else { "unsubscribe" } }
757 );
758 let subscription_id = msg.get_subscription_id();
761
762 debug!(?subscription_id, "received subscription id");
763
764 let header = msg.get_slim_header();
766
767 let (in_conn, recv_from, forward) = header.get_connections();
769 let in_conn = recv_from.unwrap_or(in_conn);
770
771 let forward = forward.filter(|&out| {
774 self.forwarder()
775 .get_connection(out)
776 .map(|c| !c.is_local_connection())
777 .unwrap_or(true)
778 });
779
780 let use_remote_ack = forward
784 .and_then(|out| self.forwarder().get_connection(out))
785 .map(|c| crate::subscription_ack::supports(&c))
786 .unwrap_or(false);
787
788 if forward.is_some() && !use_remote_ack {
789 debug!(
790 forward_to = forward,
791 "subscription: remote ack not available, link negotiation may not have completed yet"
792 );
793 }
794
795 let Some(connection) = self.forwarder().get_connection(in_conn) else {
797 if let Some(id) = subscription_id {
798 debug!(%in_conn, "connection not found, sending error ack");
799 self.send_subscription_ack(
800 in_connection,
801 id,
802 &Err(DataPathError::ConnectionNotFound(in_conn)),
803 )
804 .await;
805 }
806 return Err(DataPathError::MessageProcessingError {
807 source: Box::new(DataPathError::ConnectionNotFound(in_conn)),
808 msg: Box::new(msg),
809 });
810 };
811
812 if recv_from.is_some() && connection.is_local_connection() {
814 if let Some(id) = subscription_id {
815 debug!(%in_conn, "subscription looped back to local connection, acking ok");
816 self.send_subscription_ack(in_connection, id, &Ok(())).await;
817 }
818 return Ok(());
819 }
820
821 debug!(use_remote_ack, dst = %msg.get_dst(), forward_to = forward, "subscription: ack path decision");
822
823 let sub_id = subscription_id.unwrap_or(0);
824
825 let rx = self.internal.sub_ack_manager.register(sub_id);
828
829 let result = self
831 .process_subscription_update_and_forward(msg.clone(), in_conn, forward, add, sub_id)
832 .await;
833
834 if use_remote_ack && result.is_ok() {
838 let out_conn = forward.unwrap();
839
840 tokio::spawn(crate::subscription_ack::retry_loop(
841 self.clone(),
842 sub_id,
843 msg,
844 out_conn,
845 in_connection,
846 subscription_id,
847 rx,
848 ));
849
850 return Ok(());
851 }
852
853 if let Some(id) = subscription_id {
855 debug!(%in_connection, ok = result.is_ok(), "sending immediate subscription ack");
856 self.send_subscription_ack(in_connection, id, &result).await;
857 }
858
859 result
860 }
861
862 pub async fn process_message(
863 &self,
864 msg: Message,
865 in_connection: u64,
866 is_local: bool,
867 ) -> Result<(), DataPathError> {
868 match msg.message_type {
869 Some(SubscribeType(_)) => self.process_subscription(msg, in_connection, true).await,
870 Some(UnsubscribeType(_)) => self.process_subscription(msg, in_connection, false).await,
871 Some(PublishType(_)) => self.process_publish(msg, in_connection).await,
872 Some(LinkType(link)) => {
873 self.handle_link_message(link, in_connection, is_local)
874 .await
875 }
876 Some(SubscriptionAckType(ack)) => {
877 let result = if ack.success {
878 Ok(())
879 } else {
880 Err(DataPathError::RemoteSubscriptionAckError(ack.error.clone()))
881 };
882 self.internal
883 .sub_ack_manager
884 .resolve(ack.subscription_id, result);
885 Ok(())
886 }
887 None => unreachable!(
888 "message type not set; validate() must be called before process_message"
889 ),
890 }
891 }
892
893 async fn handle_new_message(
894 &self,
895 conn_index: u64,
896 is_local: bool,
897 mut msg: Message,
898 ) -> Result<(), DataPathError> {
899 debug!(%conn_index, "received message from connection");
900 info!(
901 telemetry = true,
902 monotonic_counter.num_processed_messages = 1
903 );
904
905 if let Err(err) = msg.validate() {
907 info!(
908 telemetry = true,
909 monotonic_counter.num_messages_by_type = 1,
910 message_type = "none"
911 );
912
913 let ret_err = DataPathError::MessageProcessingError {
914 source: Box::new(err.into()),
915 msg: Box::new(msg),
916 };
917
918 return Err(ret_err);
919 }
920
921 if !msg.is_link() && !msg.is_subscription_ack() {
923 msg.set_incoming_conn(Some(conn_index));
925
926 #[cfg(feature = "otel_tracing")]
927 otel_tracing::prepare_inbound_msg(
928 &mut msg,
929 "process_local",
930 &self.internal.service_id,
931 conn_index,
932 is_local,
933 );
934 }
935
936 match self.process_message(msg, conn_index, is_local).await {
937 Ok(_) => Ok(()),
938 Err(e) => {
939 info!(
941 telemetry = true,
942 monotonic_counter.num_message_process_errors = 1
943 );
944 Err(e)
948 }
949 }
950 }
951
952 #[tracing::instrument(skip_all, fields(service_id = %self.internal.service_id, conn_index))]
953 async fn send_error_to_local_app(&self, conn_index: u64, err: DataPathError) {
954 debug!(%conn_index, "sending error to local application");
955 let connection = self.forwarder().get_connection(conn_index);
956 match connection {
957 Some(conn) => {
958 debug!("try to notify the error to the local application");
959 if let Channel::Server(tx) = conn.channel() {
960 let session_ctx = match &err {
962 DataPathError::MessageProcessingError { msg, .. } => {
963 MessageContext::from_msg(msg)
964 }
965 _ => None,
966 };
967
968 let payload = crate::errors::ErrorPayload::new(err.to_string(), session_ctx);
970 let error_message = payload.to_json_string();
971
972 let status = Status::new(tonic::Code::Internal, error_message);
974
975 if tx.send(Err(status)).await.is_err() {
976 debug!(error = %err.chain(), "unable to notify the error to the local app");
977 }
978 }
979 }
980 None => {
981 error!(
982 "error sending error to local app: connection {:?} not found",
983 conn_index
984 );
985 }
986 }
987 }
988
989 #[tracing::instrument(skip_all, fields(service_id = %self.internal.service_id, conn_index))]
990 async fn reconnect(
991 &self,
992 client_conf: ClientConfig,
993 conn_index: u64,
994 cancellation_token: &CancellationToken,
995 ) -> bool {
996 info!("connection lost with remote endpoint, attempting to reconnect");
997
998 let remote_subscriptions = self
1003 .forwarder()
1004 .get_subscriptions_forwarded_on_connection(conn_index);
1005
1006 tokio::select! {
1007 _ = cancellation_token.cancelled() => {
1008 debug!("cancellation token signaled, stopping reconnection process");
1009 false
1010 }
1011 res = self.try_to_connect(client_conf, None, None, Some(conn_index)) => {
1012 match res {
1013 Ok(_) => {
1014 info!("connection re-established successfully");
1015 self.restore_remote_subscriptions(
1020 &remote_subscriptions,
1021 conn_index,
1022 false,
1023 )
1024 .await;
1025 true
1026 }
1027 Err(e) => {
1028 error!(error = %e.chain(), "unable to reconnect to remote node");
1029 false
1030 }
1031 }
1032 }
1033 }
1034 }
1035
1036 async fn notify_control_plane_subscriptions_lost(
1042 tx_cp: Option<Sender<Result<Message, Status>>>,
1043 local_subs: HashMap<Name, HashSet<u64>>,
1044 conn_index: u64,
1045 ) {
1046 let Some(tx) = tx_cp else { return };
1047 for local_sub in local_subs.into_keys() {
1048 debug!(
1049 %local_sub,
1050 "notify control plane about lost subscription",
1051 );
1052 let msg = Message::builder()
1053 .source(local_sub.clone())
1054 .destination(local_sub.clone())
1055 .flags(SlimHeaderFlags::default().with_recv_from(conn_index))
1056 .build_unsubscribe()
1057 .unwrap();
1058 if let Err(e) = tx.send(Ok(msg)).await {
1059 debug!(
1060 %local_sub,
1061 error = %e.chain(),
1062 "failed to send unsubscribe to control plane",
1063 );
1064 }
1065 }
1066 }
1067
1068 fn process_stream(
1069 &self,
1070 mut stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
1071 conn_index: u64,
1072 client_config: Option<ClientConfig>,
1073 cancellation_token: CancellationToken,
1074 is_local: bool,
1075 from_control_plane: bool,
1076 ) -> Result<JoinHandle<()>, DataPathError> {
1077 let self_clone = self.clone();
1079 let token_clone = cancellation_token.clone();
1080 let client_conf_clone = client_config.clone();
1081 let tx_cp: Option<Sender<Result<Message, Status>>> = self.get_tx_control_plane();
1082 let watch = self.get_drain_watch()?;
1083 let span = tracing::info_span!(
1084 "process_stream",
1085 service_id = %self.internal.service_id,
1086 %conn_index,
1087 is_local,
1088 );
1089 let handle = tokio::spawn(async move {
1090 let mut try_to_reconnect = true;
1091
1092 let mut watch = std::pin::pin!(watch.signaled());
1093 loop {
1094 tokio::select! {
1095 next = stream.next() => {
1096 match next {
1097 Some(result) => {
1098 match result {
1099 Ok(msg) => {
1100 if !is_local && !from_control_plane && let Some(txcp) = &tx_cp {
1106 match msg.get_type() {
1107 PublishType(_) | LinkType(_) | SubscriptionAckType(_) => {}
1108 _ => {
1109 let _ = txcp.send(Ok(msg.clone())).await;
1112 }
1113 }
1114 }
1115
1116 if let Err(e) = self_clone.handle_new_message(conn_index, is_local, msg).await {
1117 debug!(%conn_index, error = %e.chain(), "error processing incoming message");
1118 if is_local {
1120 self_clone.send_error_to_local_app(conn_index, e).await;
1122 }
1123 }
1124 }
1125 Err(e) => {
1126 if let Some(io_err) = MessageProcessor::match_for_io_error(&e) {
1127 if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1128 info!(%conn_index, "connection closed by peer");
1129 }
1130 } else {
1131 error!(error = %e.chain(), "error receiving messages");
1132 }
1133 break;
1134 }
1135 }
1136 }
1137 None => {
1138 debug!(%conn_index, "end of stream");
1139 break;
1140 }
1141 }
1142 }
1143 _ = &mut watch => {
1144 info!(%conn_index, "shutting down stream on drain");
1145 try_to_reconnect = false;
1146 break;
1147 }
1148 _ = token_clone.cancelled() => {
1149 info!(%conn_index, "shutting down stream on cancellation token");
1150 try_to_reconnect = false;
1151 break;
1152 }
1153 }
1154 }
1155
1156 drop(stream);
1160
1161 let is_client_connection = client_conf_clone.is_some();
1164 let mut connected = false;
1165
1166 if try_to_reconnect && let Some(config) = client_conf_clone {
1167 connected = self_clone.reconnect(config, conn_index, &token_clone)
1171 .instrument(tracing::Span::none())
1172 .await;
1173 } else {
1174 debug!(%conn_index, "close connection")
1175 }
1176
1177 if !connected {
1178 let link_id = if !is_local && !is_client_connection {
1181 self_clone
1182 .forwarder()
1183 .get_connection(conn_index)
1184 .and_then(|c| c.link_id())
1185 } else {
1186 None
1187 };
1188
1189 let (local_subs, remote_subs) = self_clone
1191 .forwarder()
1192 .on_connection_drop(conn_index, is_local);
1193
1194 let recovery_enabled =
1195 !self_clone.internal.recovery_table.ttl().is_zero();
1196
1197 if let Some(lid) = link_id.filter(|_| recovery_enabled) {
1198 info!(
1202 %conn_index, %lid,
1203 "connection lost, storing recovery state (TTL: {:?})",
1204 self_clone.internal.recovery_table.ttl(),
1205 );
1206 self_clone
1207 .internal
1208 .recovery_table
1209 .store(lid.clone(), local_subs, remote_subs);
1210
1211 if let Ok(drain) = self_clone.get_drain_watch() {
1213 let tx_cp_ttl = tx_cp;
1214 let mp = self_clone.clone();
1215 self_clone.internal.recovery_table.spawn_ttl_task(
1216 lid,
1217 drain,
1218 move |entry| async move {
1219 info!("recovery window expired, notifying control plane");
1220 let unreachable = entry
1226 .local_subs
1227 .into_iter()
1228 .filter(|(name, _)| {
1229 mp.forwarder()
1230 .on_publish_msg_match(name.clone(), u64::MAX, u32::MAX)
1231 .is_err()
1232 })
1233 .collect();
1234 MessageProcessor::notify_control_plane_subscriptions_lost(
1235 tx_cp_ttl,
1236 unreachable,
1237 conn_index,
1238 )
1239 .await;
1240 },
1241 );
1242 }
1243 } else {
1244 if !is_local {
1247 MessageProcessor::notify_control_plane_subscriptions_lost(
1248 tx_cp, local_subs, conn_index,
1249 )
1250 .await;
1251 }
1252 }
1253
1254 info!(telemetry = true, counter.num_active_connections = -1);
1255 }
1256 }.instrument(span));
1257
1258 Ok(handle)
1259 }
1260
1261 fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1262 let mut err: &(dyn std::error::Error + 'static) = err_status;
1263
1264 loop {
1265 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1266 return Some(io_err);
1267 }
1268
1269 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1272 && let Some(io_err) = h2_err.get_io()
1273 {
1274 return Some(io_err);
1275 }
1276
1277 err = err.source()?;
1278 }
1279 }
1280
1281 pub fn subscription_table(&self) -> &SubscriptionTableImpl {
1282 &self.internal.forwarder.subscription_table
1283 }
1284
1285 pub fn connection_table(&self) -> &ConnectionTable<Connection> {
1286 &self.internal.forwarder.connection_table
1287 }
1288}
1289
1290#[tonic::async_trait]
1291impl DataPlaneService for MessageProcessor {
1292 type OpenChannelStream = Pin<Box<dyn Stream<Item = Result<Message, Status>> + Send + 'static>>;
1293
1294 async fn open_channel(
1295 &self,
1296 request: Request<tonic::Streaming<Message>>,
1297 ) -> Result<Response<Self::OpenChannelStream>, Status> {
1298 let remote_addr = request.remote_addr();
1299 let local_addr = request.local_addr();
1300
1301 let stream = request.into_inner();
1302 let (tx, rx) = mpsc::channel(128);
1303
1304 let connection = Connection::new(ConnectionType::Remote, Channel::Server(tx))
1305 .with_remote_addr(remote_addr)
1306 .with_local_addr(local_addr);
1307
1308 debug!(
1309 remote = ?connection.remote_addr(),
1310 local = ?connection.local_addr(),
1311 "new connection received from remote",
1312 );
1313 info!(telemetry = true, counter.num_active_connections = 1);
1314
1315 let conn_index = self
1317 .forwarder()
1318 .on_connection_established(connection, None)
1319 .unwrap();
1320
1321 self.process_stream(
1322 stream,
1323 conn_index,
1324 None,
1325 CancellationToken::new(),
1326 false,
1327 false,
1328 )
1329 .map_err(|e| {
1330 error!(error = %e.chain(), "error starting new processing stream");
1331 Status::unavailable(format!("error processing stream: {:?}", e))
1332 })?;
1333
1334 let out_stream = ReceiverStream::new(rx);
1335 Ok(Response::new(
1336 Box::pin(out_stream) as Self::OpenChannelStream
1337 ))
1338 }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343 use slim_config::grpc::client::is_valid_uuid_v4;
1344 use std::time::Duration;
1345
1346 use super::*;
1347 use crate::api::ProtoSubscriptionAck;
1348 use crate::tables::remote_subscription_table::SubscriptionInfo;
1349 use tonic::Status;
1350
1351 async fn assert_failed_subscription_ack_is_sent(add: bool) {
1352 let processor = MessageProcessor::new();
1353 let (in_connection, _tx, mut rx) = processor
1354 .register_local_connection(false)
1355 .expect("failed to create local connection");
1356
1357 let source = Name::from_strings(["org", "ns", "source"]).with_id(1);
1358 let destination = Name::from_strings(["org", "ns", "destination"]).with_id(2);
1359 let ack_id: u64 = if add { 1 } else { 2 };
1360 let invalid_connection = u64::MAX - 1;
1361
1362 let builder = Message::builder()
1363 .source(source.clone())
1364 .destination(destination.clone())
1365 .incoming_conn(invalid_connection)
1366 .subscription_id(ack_id);
1367
1368 let msg = if add {
1369 builder.build_subscribe().unwrap()
1370 } else {
1371 builder.build_unsubscribe().unwrap()
1372 };
1373
1374 let result = processor
1375 .process_subscription(msg, in_connection, add)
1376 .await;
1377 assert!(matches!(
1378 result,
1379 Err(DataPathError::MessageProcessingError { .. })
1380 ));
1381
1382 let ack_msg = tokio::time::timeout(Duration::from_secs(1), rx.recv())
1383 .await
1384 .expect("timeout waiting for ack")
1385 .expect("ack channel closed")
1386 .expect("failed to receive ack message");
1387
1388 assert!(matches!(ack_msg.get_type(), SubscriptionAckType(_)));
1389 let ack = ack_msg.get_subscription_ack();
1390 assert_eq!(ack.subscription_id, ack_id);
1391 assert!(!ack.success, "failed ack should have success=false");
1392 assert!(
1393 !ack.error.is_empty(),
1394 "failed ack should include an error message"
1395 );
1396 }
1397
1398 #[tokio::test]
1399 async fn test_process_subscription_sends_failed_ack_on_subscribe_error() {
1400 assert_failed_subscription_ack_is_sent(true).await;
1401 }
1402
1403 #[tokio::test]
1404 async fn test_process_subscription_sends_failed_ack_on_unsubscribe_error() {
1405 assert_failed_subscription_ack_is_sent(false).await;
1406 }
1407
1408 #[test]
1409 fn test_is_valid_uuid_v4_accepts_v4() {
1410 let id = uuid::Uuid::new_v4().to_string();
1411 assert!(is_valid_uuid_v4(&id));
1412 }
1413
1414 #[test]
1415 fn test_is_valid_uuid_v4_rejects_non_uuid_string() {
1416 assert!(!is_valid_uuid_v4("not-a-uuid"));
1417 assert!(!is_valid_uuid_v4(""));
1418 }
1419
1420 #[test]
1421 fn test_is_valid_uuid_v4_rejects_non_v4_uuid() {
1422 assert!(!is_valid_uuid_v4("00000000-0000-1000-8000-000000000000"));
1424 }
1425
1426 #[tokio::test]
1429 async fn test_handle_link_message_is_local_ignored() {
1430 let processor = MessageProcessor::new();
1431 let link = ProtoLink { link_type: None };
1432 assert!(processor.handle_link_message(link, 0, true).await.is_ok());
1433 }
1434
1435 #[tokio::test]
1436 async fn test_handle_link_message_none_link_type_ignored() {
1437 let processor = MessageProcessor::new();
1438 let link = ProtoLink { link_type: None };
1439 assert!(processor.handle_link_message(link, 0, false).await.is_ok());
1440 }
1441
1442 fn make_server_conn(
1445 processor: &MessageProcessor,
1446 ) -> (u64, tokio::sync::mpsc::Receiver<Result<Message, Status>>) {
1447 let (tx, rx) = mpsc::channel(16);
1448 let conn = Connection::new(ConnectionType::Remote, Channel::Server(tx));
1449 let conn_id = processor
1450 .forwarder()
1451 .on_connection_established(conn, None)
1452 .unwrap();
1453 (conn_id, rx)
1454 }
1455
1456 fn make_client_conn(
1457 processor: &MessageProcessor,
1458 ) -> (u64, tokio::sync::mpsc::Receiver<Message>) {
1459 let (tx, rx) = mpsc::channel(16);
1460 let conn = Connection::new(ConnectionType::Remote, Channel::Client(tx));
1461 let conn_id = processor
1462 .forwarder()
1463 .on_connection_established(conn, None)
1464 .unwrap();
1465 (conn_id, rx)
1466 }
1467
1468 #[tokio::test]
1469 async fn test_handle_link_negotiation_unknown_connection_ignored() {
1470 let processor = MessageProcessor::new();
1471 let payload = LinkNegotiationPayload {
1472 link_id: uuid::Uuid::new_v4().to_string(),
1473 slim_version: "1.0.0".into(),
1474 is_reply: false,
1475 };
1476 assert!(
1477 processor
1478 .handle_link_negotiation(&payload, u64::MAX)
1479 .await
1480 .is_ok()
1481 );
1482 }
1483
1484 #[tokio::test]
1485 async fn test_handle_link_negotiation_role_outgoing_receives_request_ignored() {
1486 let processor = MessageProcessor::new();
1487 let (conn_id, _rx) = make_client_conn(&processor);
1488 let payload = LinkNegotiationPayload {
1489 link_id: uuid::Uuid::new_v4().to_string(),
1490 slim_version: "1.0.0".into(),
1491 is_reply: false, };
1493 assert!(
1494 processor
1495 .handle_link_negotiation(&payload, conn_id)
1496 .await
1497 .is_ok()
1498 );
1499 assert!(
1500 processor
1501 .forwarder()
1502 .get_connection(conn_id)
1503 .unwrap()
1504 .remote_slim_version()
1505 .is_none()
1506 );
1507 }
1508
1509 #[tokio::test]
1510 async fn test_handle_link_negotiation_role_incoming_receives_reply_ignored() {
1511 let processor = MessageProcessor::new();
1512 let (conn_id, _rx) = make_server_conn(&processor);
1513 let payload = LinkNegotiationPayload {
1514 link_id: uuid::Uuid::new_v4().to_string(),
1515 slim_version: "1.0.0".into(),
1516 is_reply: true, };
1518 assert!(
1519 processor
1520 .handle_link_negotiation(&payload, conn_id)
1521 .await
1522 .is_ok()
1523 );
1524 assert!(
1525 processor
1526 .forwarder()
1527 .get_connection(conn_id)
1528 .unwrap()
1529 .remote_slim_version()
1530 .is_none()
1531 );
1532 }
1533
1534 #[tokio::test]
1535 async fn test_handle_link_negotiation_unparsable_version_ignored() {
1536 let processor = MessageProcessor::new();
1537 let (conn_id, _rx) = make_server_conn(&processor);
1538 let payload = LinkNegotiationPayload {
1539 link_id: uuid::Uuid::new_v4().to_string(),
1540 slim_version: "not-semver".into(),
1541 is_reply: false,
1542 };
1543 assert!(
1544 processor
1545 .handle_link_negotiation(&payload, conn_id)
1546 .await
1547 .is_ok()
1548 );
1549 assert!(
1550 processor
1551 .forwarder()
1552 .get_connection(conn_id)
1553 .unwrap()
1554 .remote_slim_version()
1555 .is_none()
1556 );
1557 }
1558
1559 #[tokio::test]
1560 async fn test_handle_link_negotiation_server_invalid_uuid_ignored() {
1561 let processor = MessageProcessor::new();
1562 let (conn_id, _rx) = make_server_conn(&processor);
1563 let payload = LinkNegotiationPayload {
1564 link_id: "not-a-uuid".into(),
1565 slim_version: "1.0.0".into(),
1566 is_reply: false,
1567 };
1568 assert!(
1569 processor
1570 .handle_link_negotiation(&payload, conn_id)
1571 .await
1572 .is_ok()
1573 );
1574 assert!(
1575 processor
1576 .forwarder()
1577 .get_connection(conn_id)
1578 .unwrap()
1579 .remote_slim_version()
1580 .is_none()
1581 );
1582 }
1583
1584 #[tokio::test]
1585 async fn test_handle_link_negotiation_server_happy_path() {
1586 let processor = MessageProcessor::new();
1587 let (conn_id, mut rx) = make_server_conn(&processor);
1588 let link_id = uuid::Uuid::new_v4().to_string();
1589 let payload = LinkNegotiationPayload {
1590 link_id: link_id.clone(),
1591 slim_version: "1.2.3".into(),
1592 is_reply: false,
1593 };
1594 assert!(
1595 processor
1596 .handle_link_negotiation(&payload, conn_id)
1597 .await
1598 .is_ok()
1599 );
1600 let conn = processor.forwarder().get_connection(conn_id).unwrap();
1601 assert_eq!(conn.link_id(), Some(link_id));
1602 assert_eq!(
1603 conn.remote_slim_version(),
1604 Some(semver::Version::parse("1.2.3").unwrap())
1605 );
1606 let reply = rx.try_recv().expect("reply should be sent").unwrap();
1608 assert!(reply.is_link());
1609 }
1610
1611 #[tokio::test]
1612 async fn test_handle_link_negotiation_server_replay_protection() {
1613 let processor = MessageProcessor::new();
1614 let (conn_id, mut rx) = make_server_conn(&processor);
1615 let link_id = uuid::Uuid::new_v4().to_string();
1616 let payload = LinkNegotiationPayload {
1617 link_id: link_id.clone(),
1618 slim_version: "1.0.0".into(),
1619 is_reply: false,
1620 };
1621 assert!(
1623 processor
1624 .handle_link_negotiation(&payload, conn_id)
1625 .await
1626 .is_ok()
1627 );
1628 assert!(rx.try_recv().is_ok());
1629 assert!(
1631 processor
1632 .handle_link_negotiation(&payload, conn_id)
1633 .await
1634 .is_ok()
1635 );
1636 assert!(rx.try_recv().is_err());
1637 }
1638
1639 #[tokio::test]
1640 async fn test_handle_link_negotiation_client_happy_path() {
1641 let processor = MessageProcessor::new();
1642 let (conn_id, _rx) = make_client_conn(&processor);
1643 let link_id = uuid::Uuid::new_v4().to_string();
1644 let conn = processor.forwarder().get_connection(conn_id).unwrap();
1645 conn.set_link_id(link_id.clone());
1646 let payload = LinkNegotiationPayload {
1647 link_id: link_id.clone(),
1648 slim_version: "2.0.0".into(),
1649 is_reply: true,
1650 };
1651 assert!(
1652 processor
1653 .handle_link_negotiation(&payload, conn_id)
1654 .await
1655 .is_ok()
1656 );
1657 assert_eq!(
1658 conn.remote_slim_version(),
1659 Some(semver::Version::parse("2.0.0").unwrap())
1660 );
1661 }
1662
1663 #[tokio::test]
1664 async fn test_handle_link_negotiation_client_link_id_mismatch_ignored() {
1665 let processor = MessageProcessor::new();
1666 let (conn_id, _rx) = make_client_conn(&processor);
1667 let conn = processor.forwarder().get_connection(conn_id).unwrap();
1668 conn.set_link_id("correct-id".to_string());
1669 let payload = LinkNegotiationPayload {
1670 link_id: "wrong-id".into(),
1671 slim_version: "1.0.0".into(),
1672 is_reply: true,
1673 };
1674 assert!(
1675 processor
1676 .handle_link_negotiation(&payload, conn_id)
1677 .await
1678 .is_ok()
1679 );
1680 assert!(conn.remote_slim_version().is_none());
1681 }
1682
1683 #[tokio::test]
1684 async fn test_handle_link_negotiation_client_replay_protection() {
1685 let processor = MessageProcessor::new();
1686 let (conn_id, _rx) = make_client_conn(&processor);
1687 let link_id = uuid::Uuid::new_v4().to_string();
1688 let conn = processor.forwarder().get_connection(conn_id).unwrap();
1689 conn.set_link_id(link_id.clone());
1690 let payload = LinkNegotiationPayload {
1691 link_id: link_id.clone(),
1692 slim_version: "1.0.0".into(),
1693 is_reply: true,
1694 };
1695 assert!(
1697 processor
1698 .handle_link_negotiation(&payload, conn_id)
1699 .await
1700 .is_ok()
1701 );
1702 let stored = conn.remote_slim_version();
1703 assert!(stored.is_some());
1704 assert!(
1706 processor
1707 .handle_link_negotiation(&payload, conn_id)
1708 .await
1709 .is_ok()
1710 );
1711 assert_eq!(conn.remote_slim_version(), stored);
1712 }
1713
1714 fn negotiate_conn(processor: &MessageProcessor, conn_id: u64, version: &str) {
1719 let c = processor.forwarder().get_connection(conn_id).unwrap();
1720 c.complete_negotiation_as_server(
1721 &uuid::Uuid::new_v4().to_string(),
1722 semver::Version::parse(version).unwrap(),
1723 );
1724 }
1725
1726 #[tokio::test]
1727 async fn test_process_subscription_remote_ack_path_success() {
1728 let processor = MessageProcessor::new();
1731 let (local_conn, _tx_local, mut rx_local) = processor
1732 .register_local_connection(false)
1733 .expect("failed to create local connection");
1734
1735 let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1736 negotiate_conn(&processor, remote_conn, "1.2.0");
1737
1738 let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1739 let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1740 let upstream_ack_id: u64 = 100;
1741
1742 let sub_msg = Message::builder()
1744 .source(source.clone())
1745 .destination(destination.clone())
1746 .incoming_conn(local_conn)
1747 .forward_to(remote_conn)
1748 .subscription_id(upstream_ack_id)
1749 .build_subscribe()
1750 .unwrap();
1751
1752 let result = processor
1754 .process_subscription(sub_msg, local_conn, true)
1755 .await;
1756 assert!(result.is_ok());
1757
1758 let forwarded = tokio::time::timeout(Duration::from_secs(1), rx_remote.recv())
1761 .await
1762 .expect("timeout waiting for forwarded subscribe")
1763 .expect("forwarded subscribe channel closed")
1764 .unwrap();
1765 assert!(matches!(forwarded.get_type(), SubscribeType(_)));
1766
1767 let forwarded_sub_id = forwarded
1769 .get_subscription_id()
1770 .expect("forwarded subscribe must carry the same subscription_id");
1771 assert_eq!(
1772 forwarded_sub_id, upstream_ack_id,
1773 "subscription_id must not change when forwarding"
1774 );
1775
1776 let ack = ProtoSubscriptionAck {
1778 subscription_id: upstream_ack_id,
1779 success: true,
1780 error: String::new(),
1781 };
1782 processor.internal.sub_ack_manager.resolve(
1783 ack.subscription_id,
1784 if ack.success {
1785 Ok(())
1786 } else {
1787 Err(DataPathError::RemoteSubscriptionAckError(ack.error.clone()))
1788 },
1789 );
1790
1791 let upstream_ack = tokio::time::timeout(Duration::from_secs(2), rx_local.recv())
1793 .await
1794 .expect("timeout waiting for upstream ack")
1795 .expect("upstream ack channel closed")
1796 .expect("upstream ack should be Ok");
1797
1798 assert!(matches!(upstream_ack.get_type(), SubscriptionAckType(_)));
1799 let ack_inner = upstream_ack.get_subscription_ack();
1800 assert_eq!(ack_inner.subscription_id, upstream_ack_id);
1801 assert!(ack_inner.success);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_process_subscription_remote_ack_path_old_node_immediate_ack() {
1806 let processor = MessageProcessor::new();
1808 let (local_conn, _tx_local, mut rx_local) = processor
1809 .register_local_connection(false)
1810 .expect("failed to create local connection");
1811
1812 let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1813 negotiate_conn(&processor, remote_conn, "1.1.0");
1814
1815 let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1816 let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1817 let upstream_ack_id: u64 = 101;
1818
1819 let sub_msg = Message::builder()
1820 .source(source.clone())
1821 .destination(destination.clone())
1822 .incoming_conn(local_conn)
1823 .forward_to(remote_conn)
1824 .subscription_id(upstream_ack_id)
1825 .build_subscribe()
1826 .unwrap();
1827
1828 processor
1829 .process_subscription(sub_msg, local_conn, true)
1830 .await
1831 .unwrap();
1832
1833 let forwarded = tokio::time::timeout(Duration::from_secs(1), rx_remote.recv())
1837 .await
1838 .expect("timeout waiting for forwarded subscribe")
1839 .expect("channel closed")
1840 .unwrap();
1841 assert!(matches!(forwarded.get_type(), SubscribeType(_)));
1842 let forwarded_sub_id = forwarded
1843 .get_subscription_id()
1844 .expect("forwarded subscribe must carry the subscription_id");
1845 assert_eq!(
1846 forwarded_sub_id, upstream_ack_id,
1847 "subscription_id must not change when forwarding"
1848 );
1849
1850 let upstream_ack = tokio::time::timeout(Duration::from_secs(1), rx_local.recv())
1852 .await
1853 .expect("timeout waiting for upstream ack")
1854 .expect("channel closed")
1855 .expect("upstream ack must be Ok");
1856
1857 assert!(matches!(upstream_ack.get_type(), SubscriptionAckType(_)));
1858 let ack = upstream_ack.get_subscription_ack();
1859 assert_eq!(ack.subscription_id, upstream_ack_id);
1860 assert!(ack.success);
1861 }
1862
1863 #[tokio::test]
1864 async fn test_process_subscription_remote_ack_error_forwarded_upstream() {
1865 let processor = MessageProcessor::new();
1867 let (local_conn, _tx_local, mut rx_local) = processor
1868 .register_local_connection(false)
1869 .expect("failed to create local connection");
1870
1871 let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1872 negotiate_conn(&processor, remote_conn, "1.2.0");
1873
1874 let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1875 let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1876 let upstream_ack_id: u64 = 102;
1877
1878 let sub_msg = Message::builder()
1879 .source(source.clone())
1880 .destination(destination.clone())
1881 .incoming_conn(local_conn)
1882 .forward_to(remote_conn)
1883 .subscription_id(upstream_ack_id)
1884 .build_subscribe()
1885 .unwrap();
1886
1887 processor
1888 .process_subscription(sub_msg, local_conn, true)
1889 .await
1890 .unwrap();
1891
1892 let forwarded = tokio::time::timeout(Duration::from_secs(1), rx_remote.recv())
1893 .await
1894 .expect("timeout")
1895 .expect("channel closed")
1896 .unwrap();
1897
1898 let forwarded_sub_id = forwarded
1899 .get_subscription_id()
1900 .expect("forwarded subscribe must carry the same subscription_id");
1901 assert_eq!(
1902 forwarded_sub_id, upstream_ack_id,
1903 "subscription_id must not change when forwarding"
1904 );
1905
1906 let ack = ProtoSubscriptionAck {
1908 subscription_id: upstream_ack_id,
1909 success: false,
1910 error: "remote error".to_string(),
1911 };
1912 processor.internal.sub_ack_manager.resolve(
1913 ack.subscription_id,
1914 if ack.success {
1915 Ok(())
1916 } else {
1917 Err(DataPathError::RemoteSubscriptionAckError(ack.error.clone()))
1918 },
1919 );
1920
1921 let upstream_ack = tokio::time::timeout(Duration::from_secs(2), rx_local.recv())
1922 .await
1923 .expect("timeout")
1924 .expect("channel closed")
1925 .expect("must be Ok");
1926
1927 assert!(matches!(upstream_ack.get_type(), SubscriptionAckType(_)));
1928 let ack_inner = upstream_ack.get_subscription_ack();
1929 assert_eq!(ack_inner.subscription_id, upstream_ack_id);
1930 assert!(!ack_inner.success);
1931 assert!(!ack_inner.error.is_empty());
1932 }
1933
1934 fn make_test_subscribe(sub_id: u64) -> Message {
1937 let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1938 let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1939 Message::builder()
1940 .source(source)
1941 .destination(destination)
1942 .subscription_id(sub_id)
1943 .build_subscribe()
1944 .unwrap()
1945 }
1946
1947 #[tokio::test(start_paused = true)]
1948 async fn test_retry_loop_ack_received_before_timeout() {
1949 let processor = MessageProcessor::new();
1951 let (local_conn, _tx_local, mut rx_local) = processor
1952 .register_local_connection(false)
1953 .expect("failed to create local connection");
1954 let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1955
1956 let sub_id: u64 = 1000;
1957 let msg = make_test_subscribe(sub_id);
1958 let rx = processor.internal.sub_ack_manager.register(sub_id);
1959
1960 let proc_clone = processor.clone();
1961 let handle = tokio::spawn(crate::subscription_ack::retry_loop(
1962 proc_clone,
1963 sub_id,
1964 msg,
1965 remote_conn,
1966 local_conn,
1967 Some(sub_id),
1968 rx,
1969 ));
1970
1971 processor.internal.sub_ack_manager.resolve(sub_id, Ok(()));
1973
1974 handle.await.unwrap();
1975
1976 assert!(
1978 rx_remote.try_recv().is_err(),
1979 "no retry send expected when ack arrives before timeout"
1980 );
1981
1982 let ack = rx_local
1984 .try_recv()
1985 .expect("upstream ack should have been sent")
1986 .unwrap();
1987 assert!(ack.get_subscription_ack().success);
1988 }
1989
1990 #[tokio::test(start_paused = true)]
1991 async fn test_retry_loop_timeout_then_retry_send_then_ack() {
1992 let processor = MessageProcessor::new();
1994 let (local_conn, _tx_local, mut rx_local) = processor
1995 .register_local_connection(false)
1996 .expect("failed to create local connection");
1997 let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1998
1999 let sub_id: u64 = 1001;
2000 let msg = make_test_subscribe(sub_id);
2001 let rx = processor.internal.sub_ack_manager.register(sub_id);
2002
2003 let proc_clone = processor.clone();
2004 let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2005 proc_clone,
2006 sub_id,
2007 msg,
2008 remote_conn,
2009 local_conn,
2010 Some(sub_id),
2011 rx,
2012 ));
2013
2014 tokio::time::sleep(crate::subscription_ack::TIMEOUT + Duration::from_millis(100)).await;
2016
2017 let retried = rx_remote
2019 .try_recv()
2020 .expect("retry send expected after first timeout")
2021 .unwrap();
2022 assert!(retried.get_subscription_id().is_some());
2023
2024 processor.internal.sub_ack_manager.resolve(sub_id, Ok(()));
2026
2027 handle.await.unwrap();
2028
2029 let ack = rx_local
2031 .try_recv()
2032 .expect("upstream ack should have been sent")
2033 .unwrap();
2034 assert!(ack.get_subscription_ack().success);
2035 }
2036
2037 #[tokio::test(start_paused = true)]
2038 async fn test_retry_loop_retry_send_fails() {
2039 let processor = MessageProcessor::new();
2042 let (local_conn, _tx_local, mut rx_local) = processor
2043 .register_local_connection(false)
2044 .expect("failed to create local connection");
2045 let (remote_conn, _rx_remote) = make_server_conn(&processor);
2046
2047 let sub_id: u64 = 1002;
2048 let msg = make_test_subscribe(sub_id);
2049 let rx = processor.internal.sub_ack_manager.register(sub_id);
2050
2051 processor.connection_table().remove(remote_conn);
2053
2054 let proc_clone = processor.clone();
2055 let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2056 proc_clone,
2057 sub_id,
2058 msg,
2059 remote_conn,
2060 local_conn,
2061 Some(sub_id),
2062 rx,
2063 ));
2064
2065 tokio::time::sleep(crate::subscription_ack::TIMEOUT + Duration::from_millis(100)).await;
2067
2068 handle.await.unwrap();
2069
2070 let ack = rx_local
2072 .try_recv()
2073 .expect("upstream ack should have been sent")
2074 .unwrap();
2075 assert!(!ack.get_subscription_ack().success);
2076 }
2077
2078 #[tokio::test(start_paused = true)]
2079 async fn test_retry_loop_all_retries_exhausted() {
2080 let processor = MessageProcessor::new();
2082 let (local_conn, _tx_local, mut rx_local) = processor
2083 .register_local_connection(false)
2084 .expect("failed to create local connection");
2085 let (remote_conn, mut rx_remote) = make_server_conn(&processor);
2086
2087 let sub_id: u64 = 1003;
2088 let msg = make_test_subscribe(sub_id);
2089 let rx = processor.internal.sub_ack_manager.register(sub_id);
2090
2091 let proc_clone = processor.clone();
2092 let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2093 proc_clone,
2094 sub_id,
2095 msg,
2096 remote_conn,
2097 local_conn,
2098 Some(sub_id),
2099 rx,
2100 ));
2101
2102 for _ in 0..=crate::subscription_ack::MAX_RETRIES {
2104 tokio::time::sleep(crate::subscription_ack::TIMEOUT + Duration::from_millis(100)).await;
2105 }
2106
2107 handle.await.unwrap();
2108
2109 let mut retry_count = 0;
2112 while rx_remote.try_recv().is_ok() {
2113 retry_count += 1;
2114 }
2115 assert_eq!(
2116 retry_count,
2117 crate::subscription_ack::MAX_RETRIES as usize,
2118 "expected {} retry sends",
2119 crate::subscription_ack::MAX_RETRIES,
2120 );
2121
2122 let ack = rx_local
2124 .try_recv()
2125 .expect("upstream ack should have been sent")
2126 .unwrap();
2127 let ack_inner = ack.get_subscription_ack();
2128 assert!(
2129 !ack_inner.success,
2130 "ack must indicate failure after exhausting retries"
2131 );
2132 assert!(!ack_inner.error.is_empty());
2133 }
2134
2135 #[tokio::test(start_paused = true)]
2136 async fn test_retry_loop_no_upstream_subscription_id() {
2137 let processor = MessageProcessor::new();
2139 let (_local_conn, _tx_local, mut rx_local) = processor
2140 .register_local_connection(false)
2141 .expect("failed to create local connection");
2142 let (remote_conn, _rx_remote) = make_server_conn(&processor);
2143
2144 let sub_id: u64 = 1004;
2145 let msg = make_test_subscribe(sub_id);
2146 let rx = processor.internal.sub_ack_manager.register(sub_id);
2147
2148 let proc_clone = processor.clone();
2149 let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2150 proc_clone,
2151 sub_id,
2152 msg,
2153 remote_conn,
2154 0, None,
2156 rx,
2157 ));
2158
2159 processor.internal.sub_ack_manager.resolve(sub_id, Ok(()));
2161
2162 handle.await.unwrap();
2163
2164 assert!(
2166 rx_local.try_recv().is_err(),
2167 "no upstream ack when upstream_subscription_id is None"
2168 );
2169 }
2170
2171 #[tokio::test(start_paused = true)]
2172 async fn test_retry_loop_sender_dropped() {
2173 let processor = MessageProcessor::new();
2176 let (local_conn, _tx_local, mut rx_local) = processor
2177 .register_local_connection(false)
2178 .expect("failed to create local connection");
2179 let (remote_conn, _rx_remote) = make_server_conn(&processor);
2180
2181 let sub_id: u64 = 1005;
2182 let msg = make_test_subscribe(sub_id);
2183 let rx = processor.internal.sub_ack_manager.register(sub_id);
2184
2185 processor.internal.sub_ack_manager.remove(sub_id);
2187
2188 let proc_clone = processor.clone();
2189 let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2190 proc_clone,
2191 sub_id,
2192 msg,
2193 remote_conn,
2194 local_conn,
2195 Some(sub_id),
2196 rx,
2197 ));
2198
2199 handle.await.unwrap();
2200
2201 let ack = rx_local
2203 .try_recv()
2204 .expect("upstream ack should have been sent")
2205 .unwrap();
2206 assert!(!ack.get_subscription_ack().success);
2207 }
2208
2209 #[test]
2212 fn test_new_with_options_custom_ttl() {
2213 let processor =
2214 MessageProcessor::new_with_options("svc".into(), Some(Duration::from_secs(5)));
2215 assert_eq!(
2216 processor.internal.recovery_table.ttl(),
2217 Duration::from_secs(5)
2218 );
2219 }
2220
2221 #[test]
2222 fn test_new_with_options_none_uses_default() {
2223 let processor = MessageProcessor::new_with_options("svc".into(), None);
2224 assert_eq!(
2225 processor.internal.recovery_table.ttl(),
2226 Duration::from_secs(30)
2227 );
2228 }
2229
2230 #[test]
2231 fn test_new_with_options_zero_ttl() {
2232 let processor = MessageProcessor::new_with_options("svc".into(), Some(Duration::ZERO));
2233 assert!(processor.internal.recovery_table.ttl().is_zero());
2234 }
2235
2236 #[tokio::test]
2239 async fn test_notify_cp_subs_lost_sends_unsubscribes() {
2240 let (tx, mut rx) = mpsc::channel::<Result<Message, Status>>(16);
2241 let mut subs = HashMap::new();
2242 let name = Name::from_strings(["org", "default", "svc"]);
2243 subs.insert(name.clone(), HashSet::from([1u64, 2u64]));
2244
2245 MessageProcessor::notify_control_plane_subscriptions_lost(Some(tx), subs, 42).await;
2246
2247 let msg = rx.recv().await.unwrap().unwrap();
2248 assert!(matches!(msg.get_type(), UnsubscribeType(_)));
2249 assert_eq!(msg.get_source(), name.clone());
2250 }
2251
2252 #[tokio::test]
2253 async fn test_notify_cp_subs_lost_no_tx_is_noop() {
2254 let subs = HashMap::from([(
2255 Name::from_strings(["org", "default", "svc"]),
2256 HashSet::from([1u64]),
2257 )]);
2258 MessageProcessor::notify_control_plane_subscriptions_lost(None, subs, 1).await;
2260 }
2261
2262 #[tokio::test]
2263 async fn test_notify_cp_subs_lost_empty_subs() {
2264 let (tx, mut rx) = mpsc::channel::<Result<Message, Status>>(16);
2265 MessageProcessor::notify_control_plane_subscriptions_lost(Some(tx), HashMap::new(), 1)
2266 .await;
2267 assert!(rx.try_recv().is_err());
2269 }
2270
2271 #[tokio::test]
2274 async fn test_link_negotiation_server_triggers_route_recovery() {
2275 let processor = MessageProcessor::new();
2276 let (conn_id, _rx) = make_server_conn(&processor);
2277
2278 let link_id = uuid::Uuid::new_v4().to_string();
2279 let sub_name = Name::from_strings(["org", "default", "recovered"]);
2280
2281 let mut local_subs = HashMap::new();
2283 local_subs.insert(sub_name.clone(), HashSet::from([99u64]));
2284 processor
2285 .internal
2286 .recovery_table
2287 .store(link_id.clone(), local_subs, HashSet::new());
2288
2289 let payload = LinkNegotiationPayload {
2291 link_id: link_id.clone(),
2292 slim_version: "1.0.0".into(),
2293 is_reply: false,
2294 };
2295 processor
2296 .handle_link_negotiation(&payload, conn_id)
2297 .await
2298 .unwrap();
2299
2300 let result = processor
2302 .forwarder()
2303 .on_publish_msg_match(sub_name, u64::MAX, 1);
2304 assert!(result.is_ok(), "recovered subscription should be routable");
2305 assert_eq!(result.unwrap(), vec![conn_id]);
2306 }
2307
2308 #[tokio::test]
2309 async fn test_link_negotiation_server_recovery_restores_remote_subs() {
2310 let processor = MessageProcessor::new();
2311 let (conn_id, mut rx) = make_server_conn(&processor);
2312
2313 let link_id = uuid::Uuid::new_v4().to_string();
2314 let source = Name::from_strings(["org", "default", "src"]);
2315 let dest = Name::from_strings(["org", "default", "dst"]);
2316
2317 let remote_sub =
2318 SubscriptionInfo::new(source.clone(), dest.clone(), "identity".into(), conn_id, 42);
2319
2320 processor.internal.recovery_table.store(
2322 link_id.clone(),
2323 HashMap::new(),
2324 HashSet::from([remote_sub]),
2325 );
2326
2327 let payload = LinkNegotiationPayload {
2328 link_id: link_id.clone(),
2329 slim_version: "1.0.0".into(),
2330 is_reply: false,
2331 };
2332 processor
2333 .handle_link_negotiation(&payload, conn_id)
2334 .await
2335 .unwrap();
2336
2337 let sub_msg = rx.recv().await.unwrap().unwrap();
2339 assert!(matches!(sub_msg.get_type(), SubscribeType(_)));
2340 let reply = rx.recv().await.unwrap().unwrap();
2341 assert!(reply.is_link());
2342 }
2343
2344 #[tokio::test]
2345 async fn test_link_negotiation_server_no_recovery_entry() {
2346 let processor = MessageProcessor::new();
2347 let (conn_id, mut rx) = make_server_conn(&processor);
2348
2349 let link_id = uuid::Uuid::new_v4().to_string();
2350 let payload = LinkNegotiationPayload {
2352 link_id: link_id.clone(),
2353 slim_version: "1.0.0".into(),
2354 is_reply: false,
2355 };
2356 processor
2357 .handle_link_negotiation(&payload, conn_id)
2358 .await
2359 .unwrap();
2360
2361 let reply = rx.try_recv().unwrap().unwrap();
2363 assert!(reply.is_link());
2364 assert!(rx.try_recv().is_err());
2365 }
2366
2367 #[tokio::test]
2370 async fn test_restore_remote_subscriptions_with_tracking() {
2371 let processor = MessageProcessor::new();
2372 let (conn_id, mut rx) = make_server_conn(&processor);
2373
2374 let source = Name::from_strings(["org", "default", "src"]);
2375 let dest = Name::from_strings(["org", "default", "dst"]);
2376 let sub = SubscriptionInfo::new(source.clone(), dest.clone(), "id1".into(), conn_id, 7);
2377 let subs = HashSet::from([sub]);
2378
2379 processor
2380 .restore_remote_subscriptions(&subs, conn_id, true)
2381 .await;
2382
2383 let msg = rx.recv().await.unwrap().unwrap();
2385 assert!(matches!(msg.get_type(), SubscribeType(_)));
2386
2387 let tracked = processor
2389 .forwarder()
2390 .get_subscriptions_forwarded_on_connection(conn_id);
2391 assert_eq!(tracked.len(), 1);
2392 }
2393
2394 #[tokio::test]
2395 async fn test_restore_remote_subscriptions_without_tracking() {
2396 let processor = MessageProcessor::new();
2397 let (conn_id, mut rx) = make_server_conn(&processor);
2398
2399 let source = Name::from_strings(["org", "default", "src"]);
2400 let dest = Name::from_strings(["org", "default", "dst"]);
2401 let sub = SubscriptionInfo::new(source.clone(), dest.clone(), "id1".into(), conn_id, 7);
2402 let subs = HashSet::from([sub]);
2403
2404 processor
2405 .restore_remote_subscriptions(&subs, conn_id, false)
2406 .await;
2407
2408 let msg = rx.recv().await.unwrap().unwrap();
2410 assert!(matches!(msg.get_type(), SubscribeType(_)));
2411
2412 let tracked = processor
2414 .forwarder()
2415 .get_subscriptions_forwarded_on_connection(conn_id);
2416 assert!(tracked.is_empty());
2417 }
2418}